Controls and TaskDef#

TaskControls 是任务作者能影响 Conductor TaskDef 执行控制字段的唯一入口。业务 input/output、workspace schema、TaskDef key 列表和 JSON Schema 都由 Perago 从函数签名与 Pydantic model 生成,不在 @task(...) 里重复声明。

最小形状#

没有特殊控制需求时,可以省略 controls。Perago 会使用 TaskControls() 的默认值:

from pathlib import Path

from pydantic import BaseModel, Field

from perago import WorkspaceSpec, task


class BuildFeaturesParams(BaseModel):
    feature_set: str
    min_rows: int = Field(ge=1)


class BuildFeaturesOutput(BaseModel):
    row_count: int = Field(ge=0)
    feature_count: int = Field(ge=0)


@task(
    name="features.build",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(prefix="/audio/render"),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
    ...

需要覆盖 retry、timeout 或 execution limit 时,把 TaskControls 作为 decorator metadata 传入:

from perago import ExecutionLimits, RetryPolicy, TaskControls, TimeoutPolicy


@task(
    name="features.build",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(prefix="/audio/render"),
    controls=TaskControls(
        retry=RetryPolicy(count=4, logic="FIXED", delay_seconds=30),
        timeout=TimeoutPolicy(response_seconds=900),
        limits=ExecutionLimits(concurrent_exec_limit=2),
    ),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
    ...

Required/optional/generated 字段边界:

  • required: nameowner_email 必须由任务作者显式提供。

  • optional: descriptioncontrols 可以省略;controls 省略时使用 TaskControls()

  • conditional: workspace=WorkspaceSpec(...) 只在 workspace task 上 required,workspace-free task 上 forbidden。

  • generated: inputKeysoutputKeysinputSchemaoutputSchemaresponseTimeoutSeconds 的最终值和业务 schema 由 Perago 生成。

  • forbidden: 任务作者不能在 decorator 里提供 paramsoutputinputTemplate 或 Conductor schema 字段。

控制字段映射#

perago extract 会把 TaskControls 展开到 Conductor TaskDef 顶层字段。值为 None 的 execution limit 字段会从生成的 JSON 中省略。

Perago 字段

Conductor TaskDef 字段

默认值

说明

name

name

任务名,必须显式提供。

owner_email

ownerEmail

Conductor owner email,必须显式提供。

description

description

None

None 时省略。

controls.retry.count

retryCount

3

允许 0..10

controls.retry.logic

retryLogic

"FIXED"

可选 "FIXED""EXPONENTIAL_BACKOFF""LINEAR_BACKOFF"

controls.retry.delay_seconds

retryDelaySeconds

60

非负整数。

controls.retry.max_delay_seconds

maxRetryDelaySeconds

0

非负整数。

controls.retry.jitter_ms

backoffJitterMs

0

非负整数。

controls.timeout.total_seconds

totalTimeoutSeconds

0

非负整数。

controls.timeout.policy

timeoutPolicy

"TIME_OUT_WF"

可选 "RETRY""TIME_OUT_WF""ALERT_ONLY"

controls.timeout.seconds

timeoutSeconds

0

非负整数。

controls.timeout.response_seconds

responseTimeoutSeconds

600

没有 publish budget 时使用。

controls.timeout.poll_seconds

pollTimeoutSeconds

0

非负整数。

controls.limits.concurrent_exec_limit

concurrentExecLimit

None

None 时省略。

controls.limits.rate_limit_frequency_in_seconds

rateLimitFrequencyInSeconds

None

必须和 rate_limit_per_frequency 成对配置。

controls.limits.rate_limit_per_frequency

rateLimitPerFrequency

None

必须和 rate_limit_frequency_in_seconds 成对配置。

controls.publish_budget

derives responseTimeoutSeconds

None

只允许 workspace task 使用,不直接写入 TaskDef。

所有 control model 都使用 Pydantic 校验并拒绝未知字段。校验失败会在模块导入、perago checkperago extract 阶段暴露为 task definition 错误。

生成的 TaskDef 形状#

workspace task 的 TaskDef input/output key 固定为:

inputKeys = ["workspace", "params"]
outputKeys = ["workspace", "result"]

workspace-free task 的 TaskDef input/output key 固定为:

inputKeys = ["params"]
outputKeys = ["result"]

workspace schema 来自 Perago 的 WorkspaceInput / WorkspaceOutputparamsresult schema 来自任务函数的 Pydantic 类型注解。perago extract 还会把嵌套 schema inline,删除 Pydantic title,并把 object schema 关闭为 additionalProperties: false

Perago 不生成 Conductor inputTemplate。Pydantic 字段默认值会保留在 JSON Schema 里,但不会被复制到 TaskDef 顶层的 input template 中。

Guardrail 也不会写入 TaskDef。require_filerequire_dirrequire_globforbid_glob 是 Perago runtime metadata,只影响 workspace 准备前后的本地检查。

Publish budget#

PublishBudget 用来把 workspace publication 的本地运行时预算折算成 Conductor responseTimeoutSeconds

from perago import PublishBudget, TaskControls, TimeoutPolicy


controls = TaskControls(
    timeout=TimeoutPolicy(response_seconds=999),
    publish_budget=PublishBudget(
        observed_merge_p99_seconds=20,
        safety_margin_seconds=10,
        lakefs_merge_timeout_seconds=45,
        conductor_completion_timeout_seconds=15,
        worker_shutdown_grace_seconds=30,
        heartbeat_interval_seconds=10,
    ),
)

上面的 responseTimeoutSeconds 会生成成 100,而不是 999

45 + 15 + 30 + 10 = 100

PublishBudget 本身不会写入 TaskDef JSON。它同时给 runtime 提供 LakeFS merge request timeout 和向 Conductor 回写 completion 的 request timeout。lakefs_merge_timeout_seconds 必须覆盖 observed_merge_p99_seconds + safety_margin_seconds,否则校验失败。

publish_budget 只适用于 workspace task。workspace-free task 没有 LakeFS publication 阶段,配置 TaskControls(publish_budget=...) 会被拒绝。

常见拒绝形状#

只配置 rate limit 的一半会失败:

TaskControls(
    limits=ExecutionLimits(rate_limit_frequency_in_seconds=60),
)

workspace-free task 配置 publish budget 会失败:

@task(
    name="metadata.validate",
    owner_email="data@example.com",
    controls=TaskControls(publish_budget=budget),
)
def validate_metadata(params: ValidateMetadataParams) -> ValidateMetadataOutput:
    ...

把 Conductor 生成字段塞回 decorator 也会失败:

@task(
    name="features.build",
    owner_email="data@example.com",
    params=BuildFeaturesParams,
    output=BuildFeaturesOutput,
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
    ...

这些字段的真源分别是函数签名、Pydantic model 和 TaskControls。任务作者只维护这些 Python 类型和 metadata,再用 perago check 验证,用 perago extract 生成 TaskDef JSON。