func(a *Activities)ActivityToBeCanceled(ctx context.Context)(string, error) { logger := activity.GetLogger(ctx) for { select { case <-time.After(10 * time.Second): logger.Info("heartbeating...") activity.RecordHeartbeat(ctx, "") //记录心跳 if ctx.Err() != nil { logger.Info("context err is not nil", ctx.Err()) continue } // do soming... case <-ctx.Done(): logger.Info("context is cancelled") return"I am canceled by Done", nil } } }
if i.hbBatchEndTimer != nil && !skipBatching { // If we have started batching window, keep track of last reported progress. i.lastDetailsToReport = &details returnnil }
// If the activity is canceled, the activity can ignore the cancellation and do its work // and complete. Our cancellation is co-operative, so we will try to heartbeat. if (err == nil || isActivityCanceled) && !skipBatching { // We have successfully sent heartbeat, start next batching window. i.lastDetailsToReport = nil
// Create timer to fire before the threshold to report. i.hbBatchEndTimer = time.NewTimer(i.heartbeatThrottleInterval)
gofunc() { select { case <-i.hbBatchEndTimer.C: // We are close to deadline. case <-i.workerStopChannel: // Activity worker is close to stop. This does the same steps as batch timer ends. case <-i.closeCh: // We got closed. return }
// We close the batch and report the progress. var detailsToReport **commonpb.Payloads
if detailsToReport != nil { // TODO: there is a potential race condition here as the lock is released here and // locked again in the Hearbeat() method. This possible that a heartbeat call from // user activity grabs the lock first and calls internalHeartBeat before this // batching goroutine, which means some activity progress will be lost. _ = i.Heartbeat(ctx, *detailsToReport, false) } }() }