Conductor Runtime#
Perago 默认 process runtime 由一个 Conductor broker process 拉取单个 task name 的 attempt,再派发给本地 executor process 执行 task body,并由 broker 把 COMPLETED、FAILED 或 FAILED_WITH_TERMINAL_ERROR 写回 Conductor。Conductor 负责调度和重试;Perago 负责把 Conductor task snapshot 映射成强类型 attempt,并在 workspace 发布前确认 attempt 仍然是当前可写入的 attempt。
启动前检查#
perago start 连接 Conductor 前必须已经有完整的 runtime service config:
配置 |
Required |
说明 |
|---|---|---|
|
required for |
Orkes/Conductor API endpoint。 |
LakeFS endpoint 和 credentials |
required for |
workspace task 和当前 worker runtime 统一要求完整 LakeFS config。 |
generated TaskDef |
required before worker starts |
必须已经注册到 Conductor; |
如果 Conductor 中没有同名 TaskDef,启动会失败:
error: Conductor TaskDef 'features.build' is not registered; run perago extract and register it before start
正确顺序是先本地校验,再生成 TaskDef JSON,再通过部署流程注册到 Conductor,最后启动 worker:
perago check app.workers.features_build
perago extract app.workers.features_build --output generated/features.build.json
perago start app.workers.features_build -j 2 --execution-mode process
Poll loop#
当前默认 process runtime 中,supervisor 启动一个 perago-conductor-broker 和 N 个 perago-executor-000N。broker 内嵌 SDK TaskRunner(thread_count=N),只 poll 当前 module 中定义的单个 task name,并用 PERAGO_WORKER_ID_PREFIX + "Broker" 作为 Conductor 可见 worker id。perago start --execution-mode ... 与 PERAGO_EXECUTION_MODE 的解析优先级是 CLI 参数高于环境变量,默认值为 process。
一次 broker dispatch 的顺序是:
SDK
TaskRunnerpoll 当前 task name,并按 SDK 策略处理空 poll、错误退避和 lease tracking。broker adapter 把 SDK
Task转成ConductorTaskAttempt,为这次实际执行生成 execution id,并写入 assignment queue。executor 执行 workspace task 或 workspace-free task。
workspace task 的 attempt fence 通过 broker IPC 重新读取 fresh attempt。
executor 把
RuntimeTaskResult写回 completion queue。broker adapter 把 completion 映射成 SDK
TaskResult。SDK
TaskRunner调用 result update,并按 SDK 策略处理 update-v2 fallback。
process mode 不会在一个 Python module 内路由多个 task。并发来自 broker SDK runner 的 thread_count=N 和 N 个独立 executor process;broker thread 只负责把 SDK Task 派发到 assignment queue 并等待 completion。
显式 thread runtime 使用 SDK TaskRunner 和 PeragoThreadWorker。-j N 会传给 SDK worker 的 thread_count,lease_extend_enabled=True,并且 register_task_def=False、register_schema=False。在这个模式下,SDK thread pool 负责 poll、LeaseManager 追踪和 result update;Perago adapter 只把 SDK Task 转成 ConductorTaskAttempt,执行现有 task body/workspace 流程,再把 RuntimeTaskResult 转回 SDK TaskResult。thread mode 的 Conductor 可见 worker id 当前由 PERAGO_WORKER_ID_PREFIX + "Broker" 派生。
process broker 由 PeragoProcessDispatchWorker 与 run_conductor_process_broker(...) 组成。它同样满足 SDK worker contract:thread_count=N、lease_extend_enabled=True、register_task_def=False、register_schema=False,并用 broker worker id 作为 Conductor 可见 identity。和 thread worker 不同,它不会在 SDK worker thread 内执行 task body;execute(...) 会把 SDK Task 转成 ConductorTaskAttempt,生成本次 execution id,放入 broker-to-executor assignment queue,等待 executor 返回同一个 task_id 和同一个 execution_id 的 RuntimeTaskResult completion,再映射成 SDK TaskResult。SDK TaskRunner 仍负责 broker 侧 poll、LeaseManager tracking 和 result update。
process executor 的本地执行循环是 run_process_executor_loop(...)。executor 只消费 ProcessTaskAssignment,复用现有 execute_polled_task() 跑 workspace 或 workspace-free task,再把同一 task_id 和 execution_id 的 ProcessTaskCompletion 写回 completion queue;它不 poll Conductor,也不 update Conductor result。workspace task 的 attempt-fence reload 会写入 attempt_fence_request_queue,由 broker 调 Conductor get_task 后通过对应 executor 的 response queue 返回 fresh attempt snapshot。
execution id 的作用域是“一次 executor 实际执行 assignment”。它不是 Conductor task id,也不是 workflow step identity;broker 使用它拒绝旧 completion 或重复派发残留 completion,LakeFS runtime 使用它隔离 staging branch 和本机 attempt workspace。
Attempt snapshot#
Conductor task 会被映射成 Perago attempt snapshot:
字段 |
Required |
用途 |
|---|---|---|
|
required |
区分 workflow run。 |
|
required |
当前 Conductor task attempt id,也是重新读取 attempt 的 key。 |
|
required |
attempt fence 的一部分;重试 attempt 不能复用旧 snapshot。 |
|
required |
Conductor TaskDef name。 |
|
required |
workflow 中的 reference task name。 |
|
required |
Conductor task sequence。 |
|
optional |
缺失时按 |
|
required |
当前 Conductor task 状态。 |
|
required |
Perago task input payload;必须是 mapping。 |
|
optional |
用于 metadata 和 publish-state 追踪。 |
|
optional |
SDK task 的 lease timeout snapshot;后续 SDK broker/runner adapter 用它接入 LeaseManager 追踪和日志排查。 |
workspace task 在发布前会重新读取当前 task_id 的 Conductor task,并调用 attempt fence。默认 process 模式下,这个重新读取动作由 broker-owned RPC 完成,executor 不持有 Conductor client;thread 模式下由同进程 runner client 完成。只有 fresh snapshot 同时满足以下条件时,才允许继续进入 stage 或 publish:
status == "IN_PROGRESS"。workflow_instance_id与已 poll 到的 attempt 一致。task_id与已 poll 到的 attempt 一致。retry_count与已 poll 到的 attempt 一致。
Perago 当前在两个位置检查 attempt fence:执行 task body 后、stage workspace 前检查一次;stage workspace 后、publish workspace 前再检查一次。任一检查失败都会返回普通 FAILED,并清理 attempt-local workspace;如果 staging 已经创建,还会尝试清理 staging branch。
Result update#
Perago 内部先生成 RuntimeTaskResult,再转换为 Conductor SDK 的 TaskResult:
Perago status |
Conductor 字段 |
典型来源 |
|---|---|---|
|
|
task body 成功,workspace task 还包含已发布的 workspace output。 |
|
|
bad input、post guardrail、stale attempt、task body exception、publish failure。 |
|
|
pre guardrail failure。 |
COMPLETED 必须带 output,且不能带 failure reason。失败状态必须带 reasonForIncompletion,且不能带 output。worker id 会写入 Conductor result,便于从 Conductor 结果反查 worker 日志目录。
workspace task 如果配置了 PublishBudget,TaskDef 会使用派生出的 responseTimeoutSeconds,让 SDK runner 的 LeaseManager 按 publication 预算续租。LakeFS merge request timeout 仍由 LakeFS runtime 使用 lakefs_merge_timeout_seconds 约束。没有 publish budget 时使用普通 task timeout。
ConductorTaskAttempt.response_timeout_seconds 来自 SDK task snapshot 本身。显式 thread runtime 和默认 process broker runtime 都交给 SDK TaskRunner 处理 LeaseManager 续租。
输入输出边界#
Conductor 传入的 input_data 必须匹配 Perago task 类型:
Task 类型 |
Required input shape |
Completed output shape |
|---|---|---|
workspace task |
顶层只能有 |
顶层包含 |
workspace-free task |
顶层只能有 |
顶层只包含 |
Conductor 不保存 attempt-local workspace 路径,也不参与 workspace 文件同步。workspace 路径、LakeFS download/stage/merge 和 staging cleanup 都由 Perago worker runtime 在本机执行。
故障边界#
Conductor runtime 页面只覆盖与 Conductor 交互有关的边界:
TaskDef 缺失会阻止
perago start启动 worker。poll 失败和 result update 失败会记录日志并退避重试,不会让 supervisor 立即退出。
result update 失败发生在 task 已经本地执行之后;对于 workspace task,publish 可能已经完成,因此排查时要同时看 Conductor task 状态、worker JSONL 日志和 LakeFS publication metadata。
attempt fence 是 client-side soft fence。它降低旧 attempt 继续发布的风险,但不是 exactly-once 证明。
如果 LakeFS publish 已成功但 Conductor result update 未完成,Perago 不会在下一次启动时补发 completion,也不会从 LakeFS metadata 恢复旧 attempt 状态;最终由 Conductor timeout/fail/retry 处理。