Temporal Activity取消的最佳实践

背景

在Temporal工作流开发中,我们遇到了一个典型的取消延迟问题:当Workflow发送Cancel请求后,Activity并未立即终止,而是继续执行了多个周期后才退出。以下是核心场景复现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Workflow
func MyWorkflow(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
HeartbeatTimeout: 3 * time.Minute,
WaitForCancellation: true,
}
// ...忽略部分代码
// 添加一个定时器以在20秒后取消活动
selector.AddFuture(workflow.NewTimer(ctx, 20*time.Second), func(f workflow.Future) {
cancel()
logger.Info("ActivityToBeCanceled canceled")
})
// 执行活动
future := workflow.ExecuteActivity(activityCtx, a.ActivityToBeCanceled)

// ...忽略部分代码
selector.Select(ctx)
logger.Info("Workflow Execution complete.")

return nil
}


// Activity
type Activities struct{}

func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) {
logger := activity.GetLogger(ctx)
for {
select {
case <-time.After(10 * time.Second):
logger.Info("heartbeating...")
activity.RecordHeartbeat(ctx, "") //记录心跳
if ctx.Err() != nil {
logger.Info("context err is not nil", ctx.Err())
continue
}
// do soming...
case <-ctx.Done():
logger.Info("context is cancelled")
return "I am canceled by Done", nil
}
}
}

整体流程是:工作流启动长时间运行的Activity,同时设置20秒后自动取消该Activity。Activity会响应取消信号并正常结束执行。参考官方demo

执行的日志如下(精简后):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
16:25:44 - Workflow - 工作流启动: "cancel workflow started"
16:25:44 - Workflow - 设置定时器: "NewTimer ... Duration 20s"
16:25:44 - Workflow - 执行活动: "ExecuteActivity ... ActivityType ActivityToBeCanceled"
16:25:44 - Activity - 活动开始: "activity started, to cancel the Workflow Execution..."
16:25:44 - Activity - 活动心跳: "heartbeating..."
16:25:44 - Activity - 活动工作: "do some work..."
16:25:47 - Activity - 工作完成: "work done"
16:26:00 - Activity - 活动心跳: "heartbeating..."
16:26:00 - Activity - 活动工作: "do some work..."
16:26:03 - Activity - 工作完成: "work done"
16:26:04 - Workflow - 取消请求: "RequestCancelActivity"
16:26:04 - Workflow - 活动取消: "ActivityToBeCanceled canceled"
16:26:04 - Workflow - 工作流完成并退出: "Workflow Execution complete."
16:26:14 - Activity - 活动心跳: "heartbeating..."
16:26:14 - Activity - 活动工作: "do some work..."
16:26:17 - Activity - 工作完成: "work done"
16:26:31 - Activity - 活动心跳: "heartbeating..."
16:26:31 - Activity - 活动工作: "do some work..."
16:26:34 - Activity - 工作完成: "work done"
16:26:44 - Activity - 心跳错误: "RecordActivityHeartbeat with error ... workflow execution already completed"
16:26:44 - Activity - 上下文取消,活动退出: "context is cancelled"

异常现象:工作流在16:26:04发送取消请求,但Activity直到16:26:44(40秒后)才响应取消。期间Activity继续执行了两次完整的工作周期。Why?O.o

查阅资料

这里显示了查阅过程,想看结论请直接跳转问题根因分析

官方论坛

遇事不决先谷歌

先在Temporal官方论坛搜索相关问题:activity cancel delay

官方论坛可以解决95%的问题,上面的成员会很友好很耐心地回答你的问题。YYDS

image-20250407175753324

关键:通过查阅资料发现,Temporal在Activity心跳上报机制上,做了限流

https://community.temporal.io/t/problems-cancelling-a-running-activity-from-parent-workflow/2169/9

image-20250407180722529

Temporal老板说了几点很重要的信息:

  1. 心跳批处理优化:为了避免过多调用服务,心跳信号并不会立即发送,而是**等到达到心跳间隔的80%**才会发送到服务端。
  2. 加速取消通知:如果你希望加快活动收到取消/工作流完成通知的速度,应该将心跳间隔从默认值(例如1分钟)降低到更小的值。
  3. 取消后清理:如果需要在当前活动取消后执行清理工作,可以将ActivityOptions.WaitForCancellation设置为true,这样系统会通过阻塞活动的Future来等待活动取消完成。

这些信息对于理解Temporal中活动取消机制的工作原理以及如何优化取消响应速度非常重要。

上面代码中,我们的心跳间隔设置是3分钟:

1
2
3
4
5
6
7
8
9
10
// Workflow
func MyWorkflow(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
HeartbeatTimeout: 3 * time.Minute,
WaitForCancellation: true,
}
ctx = workflow.WithActivityOptions(ctx, ao)
// 省略...
}

上报心跳的时候会一并检查Activity是否被取消,也就是说,文中设置3分钟的心跳超时,心跳间隔会设置在2.4分钟(144秒)才会检测一次。

源码

在官方SDK中,我们可以找到Heartbeat的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// go.temporal.io › sdk@v1.26.0 › internal › internal_task_handlers.go
func (i *temporalInvoker) Heartbeat(ctx context.Context, details *commonpb.Payloads, skipBatching bool) error {
i.Lock()
defer i.Unlock()

if i.hbBatchEndTimer != nil && !skipBatching {
// If we have started batching window, keep track of last reported progress.
i.lastDetailsToReport = &details
return nil
}

isActivityCanceled, err := i.internalHeartBeat(ctx, details)

// If the activity is canceled, the activity can ignore the cancellation and do its work
// and complete. Our cancellation is co-operative, so we will try to heartbeat.
if (err == nil || isActivityCanceled) && !skipBatching {
// We have successfully sent heartbeat, start next batching window.
i.lastDetailsToReport = nil

// Create timer to fire before the threshold to report.
i.hbBatchEndTimer = time.NewTimer(i.heartbeatThrottleInterval)

go func() {
select {
case <-i.hbBatchEndTimer.C:
// We are close to deadline.
case <-i.workerStopChannel:
// Activity worker is close to stop. This does the same steps as batch timer ends.
case <-i.closeCh:
// We got closed.
return
}

// We close the batch and report the progress.
var detailsToReport **commonpb.Payloads

i.Lock()
detailsToReport = i.lastDetailsToReport
i.hbBatchEndTimer.Stop()
i.hbBatchEndTimer = nil
i.Unlock()

if detailsToReport != nil {
// TODO: there is a potential race condition here as the lock is released here and
// locked again in the Hearbeat() method. This possible that a heartbeat call from
// user activity grabs the lock first and calls internalHeartBeat before this
// batching goroutine, which means some activity progress will be lost.
_ = i.Heartbeat(ctx, *detailsToReport, false)
}
}()
}

return err
}

Activity Heartbeat 是 Temporal 工作流中的一个关键机制,主要用于长时间运行的活动。根据代码分析:

  1. 心跳机制目的:

    • 允许长时间运行的活动向 Temporal 服务器报告其进度

    • 检测活动是否被取消

    • 防止超时导致的活动失败

  2. 关键实现细节:

    • 心跳批处理:通过 heartbeatThrottleInterval 控制发送频率,避免过于频繁的心跳信号

    • 最新进度跟踪:使用 lastDetailsToReport 记录最新进度

    • 协作式取消:即使活动被取消,系统也会尝试发送心跳

  3. 心跳限流处理流程:

    • 如果批处理窗口开启(skipBatching=false),只记录进度不立即发送

    • 批处理窗口(heartbeatThrottleInterval)结束或工作线程停止时,发送最新进度报告

问题根因分析

关键机制解析

通过查阅Temporal官方文档和社区讨论,结合源码分析,我们发现以下核心机制:

  1. 心跳批处理机制
    • 心跳上报默认采用80% HeartbeatTimeout的间隔进行批处理
    • 示例中的HeartbeatTimeout=3m,实际检测间隔为:3m * 0.8 = 144秒
    • 活动只有在心跳时才会检测取消信号
  2. 服务端通知延迟
    • 取消信号需要等待下一个心跳窗口才能传递到Activity
    • 即使工作流已关闭,活动实例仍可能继续运行直到检测到取消
  3. SDK实现细节
1
2
3
4
5
6
7
8
9
// SDK内部处理逻辑(节选)
func (i *temporalInvoker) Heartbeat(ctx context.Context, ...) error {
if i.hbBatchEndTimer != nil { // 批处理窗口存在时
i.lastDetailsToReport = &details // 仅记录最新状态
return nil // 不立即发送心跳
}
// 实际发送心跳后启动新的批处理窗口
i.hbBatchEndTimer = time.NewTimer(i.heartbeatThrottleInterval)
}

问题诊断

时间轴 事件
16:25:44 活动启动
16:26:04 工作流发送取消请求
16:26:14-16:26:44 活动继续执行两轮周期(10秒/轮)
16:26:44 检测到取消信号

由于心跳间隔设置过大(3分钟),实际检测窗口为144秒,导致取消信号延迟传递。

优化心跳配置

1
2
3
4
5
6
// 推荐配置示例
ao := workflow.ActivityOptions{
HeartbeatTimeout: 10 * time.Second, // <= 建议10-30秒
StartToCloseTimeout: 30 * time.Minute,
WaitForCancellation: true, // 确保清理完成
}

配置原则

  • 心跳超时 ≤ 活动单次操作耗时的2倍
  • 对于需要快速响应的场景,设置HeartbeatTimeout=10s(实际检测间隔8秒)

总结

问题本质:Temporal的心跳机制通过批处理优化服务端负载,导致取消信号检测存在固有延迟。当HeartbeatTimeout设置过大时,会显著放大这种延迟效应。

实践建议:根据业务场景设置合理的心跳间隔(建议小于1分钟),平衡响应速度与系统负载

Tips: 还可以等Temporal修复这个问题(^-^)

image-20250407184432582