Worker Processes#
Perago 的 perago start 是前台 supervisor 进程。supervisor 不直接执行 task body;默认 process 模式会为单个 task module 启动一个 Conductor broker process 和 N 个 executor process。每个 executor process 拥有稳定的 slot、独立的 PERAGO_WORKER_ID 和独立的日志文件。
这个页面说明本机进程模型。Conductor poll、LakeFS workspace 下载与发布的详细生命周期在后续 runtime 页面展开。
进程模型#
perago start app.workers.features_build -j 2 --execution-mode process 会形成以下结构:
supervisor process
├── perago-conductor-broker
├── perago-executor-0001
└── perago-executor-0002
-j 在默认 process execution mode 下是 executor process count,默认 1,最小值为 1。非法值会在 CLI 参数层或 worker_child_specs() 中被拒绝:
worker process count must be at least 1
Perago 当前已支持解析 execution mode 公共接口:perago start --execution-mode ... 优先于 PERAGO_EXECUTION_MODE,再退回默认 process。process 模式是默认重负载模型;thread 模式是显式轻量路径,使用 SDK TaskRunner(thread_count=N) 在同一进程内执行 task body。
默认 process 模式的 task body 并发单位仍是独立 executor process,不是同一进程内的线程池或 asyncio worker pool。broker process 会导入同一个 single-task module,并用同一个 task name 去 poll Conductor;executor process 只消费 broker 派发的 assignment。
process broker 的 adapter、SDK runner 启动函数和 supervisor 进程树已经落地。run_conductor_process_broker(...) 会把 PeragoProcessDispatchWorker 包进 SDK TaskRunner(thread_count=N),让 broker 负责 poll、续租和 update result,同时把 task body 执行派发到 executor assignment queue。每次派发都会生成 execution id;executor completion 必须带回同一个 task_id 和同一个 execution_id 的 RuntimeTaskResult,broker adapter 会 fail closed 处理无法匹配的 completion。
executor 侧的本地执行循环是 run_process_executor_loop(...)。它只消费 broker 派发的 ProcessTaskAssignment,执行 Perago task runtime,然后把 ProcessTaskCompletion 写回 broker completion queue;它不直接 poll Conductor,也不直接 update result。workspace attempt fence reload 会通过 ProcessAttemptFenceRequest 发给 broker,broker 调 Conductor get_task 并把 ProcessAttemptFenceResponse 返回到该 executor 的 response queue。
显式 thread 模式不会创建 executor child process:
supervisor process
└── perago runner threads
这个模式使用 PERAGO_WORKER_ID_PREFIX + "Broker" 作为 Conductor 可见 worker id。它已经接入 SDK poll、LeaseManager 和 result update;默认 process 模式也使用 broker identity 作为 Conductor 可见 worker id。
Worker id#
worker id 由 PERAGO_WORKER_ID_PREFIX 和 child slot 组成。slot 从 1 开始,按四位十进制补零:
PERAGO_WORKER_ID_PREFIX=prodAFeaturesBuild
PERAGO_WORKER_ID=prodAFeaturesBuild0001
PERAGO_WORKER_ID=prodAFeaturesBuild0002
如果没有显式配置 PERAGO_WORKER_ID_PREFIX,Perago 会从 module target 删除非字母数字字符作为默认前缀:
app.workers.features_build -> appworkersfeaturesbuild
supervisor 会把每个 executor 的 PERAGO_WORKER_ID 写入 child environment。executor process 启动后,prepare_worker_runtime() 会通过 resolve_worker_id() 读取这个值,并把它用于本地日志路径和运行时日志字段。broker process 使用 PERAGO_WORKER_ID_PREFIX + "Broker" 作为 Conductor poll/update identity。
不要在常规部署中手动设置 PERAGO_WORKER_ID。它是 supervisor 生成的进程身份,不是 task attempt id、workflow id、logical task key 或 LakeFS branch 名。
Child process 启动步骤#
每个 child process 的启动顺序是:
把 child environment 合并到
os.environ。导入 single-task module,并解析唯一的
@task(...)定义。准备 worker runtime。
检查 runtime config 已存在。
broker 绑定 Conductor SDK runner 并进入 poll/update loop。
executor 绑定 LakeFS workspace runtime,并进入 assignment queue loop。
启动时 supervisor 会先在 PERAGO_WORKSPACE_ROOT 下获取 .perago-supervisor.lock。锁文件写入当前 supervisor pid,用来禁止两个 supervisor 共享同一个 workspace root;如果已有锁的 pid 还活着,新的 supervisor 会拒绝启动;如果 pid 已不存在,旧锁会被清理。拿到锁后,supervisor 会 sweep 一次上次 supervisor/host crash 遗留的 orphan attempt workspace。随后 supervisor 会启动后台 workspace GC loop,按 PERAGO_WORKSPACE_GC_INTERVAL 周期扫描超过 PERAGO_WORKSPACE_GC_TTL 且不属于活跃 executor owner 的 attempt workspace。准备 worker runtime 只做本进程身份和日志初始化:
步骤 |
结果 |
|---|---|
配置 worker 日志 |
在 |
日志初始化发生在 broker 或 executor child process 内。workspace root 加锁、workspace sweep、周期 GC、dead executor targeted GC 和 shutdown 后最终 sweep 由 supervisor 负责。supervisor 正常退出时会释放自己持有的 root 锁。
外部服务前置条件#
perago start 在启动 supervisor 前会先做一轮服务前置校验:
条件 |
失败边界 |
|---|---|
|
缺失时报 |
LakeFS endpoint、access key id、secret access key 已完整配置 |
缺失时报 |
task module 可导入并能生成 TaskDef |
失败时按 task definition 或 schema 错误退出。 |
Conductor 已注册同名 TaskDef |
缺失时报 |
broker child process 内也会再次检查 Conductor config;executor child process 只检查 LakeFS config,因为默认 process 模式下只有 broker 持有 Conductor client。这个重复检查是进程边界内的防护,不能替代启动前的发布流程。
重启和停止#
supervisor 会持续监控 broker 和每个 executor process。如果 broker 退出,supervisor 会停止当前 process runtime set;如果某个 executor 退出且 supervisor 尚未收到停止信号,它会按递增 backoff 重启同一 slot:
1s, 2s, 4s, 8s, 16s, 30s, 30s, ...
重启后,该 slot 的 worker id 不变。例如 slot 2 退出后,替换进程仍使用 prodAFeaturesBuild0002。这让日志目录和 Conductor worker id 按 slot 保持稳定。
收到 SIGINT 或 SIGTERM 时,supervisor 会进入 drain:
broker 停止继续 poll 新 Conductor task。
supervisor 向 assignment queue 写入 executor stop sentinel。
idle executor 从 queue poll 中退出。
正在执行 assignment 的 executor 不会被信号打断;它会尽量跑完当前 task、staging cleanup、本机 workspace cleanup 和 completion enqueue。
supervisor 等待 child 自然退出,然后做一次最终 workspace GC sweep。
executor child 自己收到 SIGTERM 或 SIGINT 时只设置停止标志,不会在 signal handler 中 sys.exit()、抛异常、清理 workspace、调用 LakeFS 或调用 Conductor。当前 assignment 的 finally 仍有机会执行。
默认情况下,Perago 不调用 process.kill()。如果确实需要 supervisor 在 drain deadline 后强制结束子进程,可以显式配置:
PERAGO_SHUTDOWN_FORCE_KILL_AFTER=30s
该值未配置时,最终强制退出交给 systemd、Kubernetes 或容器 runtime。配置后,超过 deadline 仍存活的 child 会被 kill(),并记录 worker id、pid、phase、deadline 等字段;异常死亡后,supervisor 会对该 dead executor 的本机 attempt workspace 运行 targeted GC。
运行时边界#
worker process 只对一个 task module 负责。Perago 不支持在同一文件中定义多个 task,再通过命令行参数选择其中一个运行;也不支持 app registry 形状的多 task worker。
process count 只增加同一个 task name 的 executor 并发能力;Conductor poll/update 仍集中在 broker。它不会改变 TaskDef、workspace prefix、Pydantic contract 或 publish budget。要运行另一个 task,需要启动另一个 perago start <module_target> supervisor。