Controls and TaskDef#
TaskControls 是任务作者能影响 Conductor TaskDef 执行控制字段的唯一入口。业务 input/output、workspace schema、TaskDef key 列表和 JSON Schema 都由 Perago 从函数签名与 Pydantic model 生成,不在 @task(...) 里重复声明。
最小形状#
没有特殊控制需求时,可以省略 controls。Perago 会使用 TaskControls() 的默认值:
from pathlib import Path
from pydantic import BaseModel, Field
from perago import WorkspaceSpec, task
class BuildFeaturesParams(BaseModel):
feature_set: str
min_rows: int = Field(ge=1)
class BuildFeaturesOutput(BaseModel):
row_count: int = Field(ge=0)
feature_count: int = Field(ge=0)
@task(
name="features.build",
owner_email="data@example.com",
workspace=WorkspaceSpec(prefix="/audio/render"),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
...
需要覆盖 retry、timeout 或 execution limit 时,把 TaskControls 作为 decorator metadata 传入:
from perago import ExecutionLimits, RetryPolicy, TaskControls, TimeoutPolicy
@task(
name="features.build",
owner_email="data@example.com",
workspace=WorkspaceSpec(prefix="/audio/render"),
controls=TaskControls(
retry=RetryPolicy(count=4, logic="FIXED", delay_seconds=30),
timeout=TimeoutPolicy(response_seconds=900),
limits=ExecutionLimits(concurrent_exec_limit=2),
),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
...
Required/optional/generated 字段边界:
required:
name和owner_email必须由任务作者显式提供。optional:
description和controls可以省略;controls省略时使用TaskControls()。conditional:
workspace=WorkspaceSpec(...)只在 workspace task 上 required,workspace-free task 上 forbidden。generated:
inputKeys、outputKeys、inputSchema、outputSchema、responseTimeoutSeconds的最终值和业务 schema 由 Perago 生成。forbidden: 任务作者不能在 decorator 里提供
params、output、inputTemplate或 Conductor schema 字段。
控制字段映射#
perago extract 会把 TaskControls 展开到 Conductor TaskDef 顶层字段。值为 None 的 execution limit 字段会从生成的 JSON 中省略。
Perago 字段 |
Conductor TaskDef 字段 |
默认值 |
说明 |
|---|---|---|---|
|
|
无 |
任务名,必须显式提供。 |
|
|
无 |
Conductor owner email,必须显式提供。 |
|
|
|
为 |
|
|
|
允许 |
|
|
|
可选 |
|
|
|
非负整数。 |
|
|
|
非负整数。 |
|
|
|
非负整数。 |
|
|
|
非负整数。 |
|
|
|
可选 |
|
|
|
非负整数。 |
|
|
|
没有 publish budget 时使用。 |
|
|
|
非负整数。 |
|
|
|
为 |
|
|
|
必须和 |
|
|
|
必须和 |
|
derives |
|
只允许 workspace task 使用,不直接写入 TaskDef。 |
所有 control model 都使用 Pydantic 校验并拒绝未知字段。校验失败会在模块导入、perago check 或 perago extract 阶段暴露为 task definition 错误。
生成的 TaskDef 形状#
workspace task 的 TaskDef input/output key 固定为:
inputKeys = ["workspace", "params"]
outputKeys = ["workspace", "result"]
workspace-free task 的 TaskDef input/output key 固定为:
inputKeys = ["params"]
outputKeys = ["result"]
workspace schema 来自 Perago 的 WorkspaceInput / WorkspaceOutput,params 和 result schema 来自任务函数的 Pydantic 类型注解。perago extract 还会把嵌套 schema inline,删除 Pydantic title,并把 object schema 关闭为 additionalProperties: false。
Perago 不生成 Conductor inputTemplate。Pydantic 字段默认值会保留在 JSON Schema 里,但不会被复制到 TaskDef 顶层的 input template 中。
Guardrail 也不会写入 TaskDef。require_file、require_dir、require_glob 和 forbid_glob 是 Perago runtime metadata,只影响 workspace 准备前后的本地检查。
Publish budget#
PublishBudget 用来把 workspace publication 的本地运行时预算折算成 Conductor responseTimeoutSeconds:
from perago import PublishBudget, TaskControls, TimeoutPolicy
controls = TaskControls(
timeout=TimeoutPolicy(response_seconds=999),
publish_budget=PublishBudget(
observed_merge_p99_seconds=20,
safety_margin_seconds=10,
lakefs_merge_timeout_seconds=45,
conductor_completion_timeout_seconds=15,
worker_shutdown_grace_seconds=30,
heartbeat_interval_seconds=10,
),
)
上面的 responseTimeoutSeconds 会生成成 100,而不是 999:
45 + 15 + 30 + 10 = 100
PublishBudget 本身不会写入 TaskDef JSON。它同时给 runtime 提供 LakeFS merge request timeout 和向 Conductor 回写 completion 的 request timeout。lakefs_merge_timeout_seconds 必须覆盖 observed_merge_p99_seconds + safety_margin_seconds,否则校验失败。
publish_budget 只适用于 workspace task。workspace-free task 没有 LakeFS publication 阶段,配置 TaskControls(publish_budget=...) 会被拒绝。
常见拒绝形状#
只配置 rate limit 的一半会失败:
TaskControls(
limits=ExecutionLimits(rate_limit_frequency_in_seconds=60),
)
workspace-free task 配置 publish budget 会失败:
@task(
name="metadata.validate",
owner_email="data@example.com",
controls=TaskControls(publish_budget=budget),
)
def validate_metadata(params: ValidateMetadataParams) -> ValidateMetadataOutput:
...
把 Conductor 生成字段塞回 decorator 也会失败:
@task(
name="features.build",
owner_email="data@example.com",
params=BuildFeaturesParams,
output=BuildFeaturesOutput,
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
...
这些字段的真源分别是函数签名、Pydantic model 和 TaskControls。任务作者只维护这些 Python 类型和 metadata,再用 perago check 验证,用 perago extract 生成 TaskDef JSON。