Controls and TaskDef#

TaskControls 是任务作者能影响 Conductor TaskDef 执行控制字段的唯一入口。业务 input/output、workspace schema、TaskDef key 列表和 JSON Schema 都由 Perago 从函数签名与 Pydantic model 生成,不在 @task(...) 里重复声明。

最小示例#

没有特殊控制需求时,可以省略 controls。Perago 会使用 TaskControls() 的默认值:

from pathlib import Path

from pydantic import BaseModel, Field

from perago import WorkspaceSpec, task


class BuildFeaturesParams(BaseModel):
    feature_set: str
    min_rows: int = Field(ge=1)


class BuildFeaturesOutput(BaseModel):
    row_count: int = Field(ge=0)
    feature_count: int = Field(ge=0)


@task(
    name="features.build",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(prefix="/audio/render"),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
    ...

需要覆盖 retry、timeout 或 execution limit 时,把 TaskControls 作为 decorator metadata 传入:

from perago import ExecutionLimits, RetryPolicy, TaskControls, TimeoutPolicy


@task(
    name="features.build",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(prefix="/audio/render"),
    controls=TaskControls(
        retry=RetryPolicy(count=4, logic="FIXED", delay_seconds=30),
        timeout=TimeoutPolicy(response_seconds=900),
        limits=ExecutionLimits(concurrent_exec_limit=2),
    ),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
    ...

Required/optional/generated 字段边界:

  • required: nameowner_email 必须由任务作者显式提供。

  • optional: descriptioncontrols 可以省略;controls 省略时使用 TaskControls()

  • conditional: workspace=WorkspaceSpec(...) 只在 workspace task 上 required,workspace-free task 上 forbidden。

  • generated: inputKeysoutputKeysinputSchemaoutputSchemaresponseTimeoutSeconds 的最终值和业务 schema 由 Perago 生成。

  • forbidden: 任务作者不能在 decorator 里提供 paramsoutputinputTemplate 或 Conductor schema 字段。

控制字段映射#

perago extract 会把 TaskControls 展开到 Conductor TaskDef 顶层字段。值为 None 的 execution limit 字段会从生成的 JSON 中省略。

Perago 字段

Conductor TaskDef 字段

默认值

说明

name

name

任务名,必须显式提供。

owner_email

ownerEmail

Conductor owner email,必须显式提供。

description

description

None

None 时省略。

controls.retry.count

retryCount

3

允许 0..10

controls.retry.logic

retryLogic

"FIXED"

可选 "FIXED""EXPONENTIAL_BACKOFF""LINEAR_BACKOFF"

controls.retry.delay_seconds

retryDelaySeconds

60

非负整数。

controls.retry.max_delay_seconds

maxRetryDelaySeconds

0

非负整数。

controls.retry.jitter_ms

backoffJitterMs

0

非负整数。

controls.timeout.total_seconds

totalTimeoutSeconds

0

非负整数。

controls.timeout.policy

timeoutPolicy

"TIME_OUT_WF"

可选 "RETRY""TIME_OUT_WF""ALERT_ONLY"

controls.timeout.seconds

timeoutSeconds

0

非负整数。

controls.timeout.response_seconds

responseTimeoutSeconds

600

没有 publish budget 时使用。

controls.timeout.poll_seconds

pollTimeoutSeconds

0

非负整数。

controls.limits.concurrent_exec_limit

concurrentExecLimit

None

None 时省略。

controls.limits.rate_limit_frequency_in_seconds

rateLimitFrequencyInSeconds

None

必须和 rate_limit_per_frequency 成对配置。

controls.limits.rate_limit_per_frequency

rateLimitPerFrequency

None

必须和 rate_limit_frequency_in_seconds 成对配置。

controls.publish_budget

publication runtime budget

None

只允许 workspace task 使用;read-only workspace task 上会被忽略并发出 warning。

所有 control model 都使用 Pydantic 校验并拒绝未知字段。校验失败会在模块导入、perago checkperago extract 阶段暴露为 task definition 错误。

生成的 TaskDef 结构#

workspace task 的 TaskDef input/output key 固定为:

inputKeys = ["workspace", "params"]
outputKeys = ["workspace", "result"]

workspace-free task 的 TaskDef input/output key 固定为:

inputKeys = ["params"]
outputKeys = ["result"]

workspace schema 来自 Perago 的 WorkspaceInput / WorkspaceOutputparamsresult schema 来自任务函数的 Pydantic 类型注解。perago extract 还会把嵌套 schema inline,删除 Pydantic title 和从 BaseModel class docstring 自动生成的 object-level description,并把 object schema 关闭为 additionalProperties: false

不要在 task 的 params / result model 中使用 RootModel,也不要依赖 ConfigDict。Perago 期望 task contract 是普通 BaseModel object model:RootModel 会被 perago checkperago extractperago start 直接拒绝;配置了 ConfigDict 的 task model 会报 warning。Perago 当前不保证 ConfigDict model 的 TaskDef schema 或运行时行为。

Perago 不生成 Conductor inputTemplate。Pydantic 字段默认值会保留在 JSON Schema 里,但不会被复制到 TaskDef 顶层的 input template 中。

Guardrail 也不会写入 TaskDef。require_filerequire_dirrequire_globforbid_glob 是 Perago runtime metadata,只影响 workspace 准备前后的本地检查。

下面是 tests/fixtures/app/workers/features_build.py 通过 build_taskdef(load_module_task("app.workers.features_build")) 生成的真实 TaskDef 输出:

{
  "name": "features.build",
  "ownerEmail": "data@example.com",
  "description": "Build feature parquet files.",
  "retryCount": 4,
  "retryLogic": "FIXED",
  "retryDelaySeconds": 30,
  "maxRetryDelaySeconds": 0,
  "backoffJitterMs": 0,
  "totalTimeoutSeconds": 0,
  "timeoutPolicy": "TIME_OUT_WF",
  "timeoutSeconds": 0,
  "responseTimeoutSeconds": 900,
  "pollTimeoutSeconds": 0,
  "concurrentExecLimit": 2,
  "inputKeys": [
    "workspace",
    "params"
  ],
  "outputKeys": [
    "workspace",
    "result"
  ],
  "inputSchema": {
    "name": "features.build.input",
    "version": 1,
    "type": "JSON",
    "data": {
      "type": "object",
      "properties": {
        "workspace": {
          "additionalProperties": false,
          "properties": {
            "repository": {
              "minLength": 1,
              "type": "string"
            },
            "branch": {
              "minLength": 1,
              "type": "string"
            },
            "ref_type": {
              "const": "commit",
              "type": "string"
            },
            "ref": {
              "minLength": 1,
              "type": "string"
            }
          },
          "required": [
            "repository",
            "branch",
            "ref_type",
            "ref"
          ],
          "type": "object"
        },
        "params": {
          "properties": {
            "feature_set": {
              "type": "string"
            },
            "min_rows": {
              "minimum": 1,
              "type": "integer"
            }
          },
          "required": [
            "feature_set",
            "min_rows"
          ],
          "type": "object",
          "additionalProperties": false
        }
      },
      "required": [
        "workspace",
        "params"
      ],
      "additionalProperties": false
    }
  },
  "outputSchema": {
    "name": "features.build.output",
    "version": 1,
    "type": "JSON",
    "data": {
      "type": "object",
      "properties": {
        "workspace": {
          "additionalProperties": false,
          "properties": {
            "repository": {
              "minLength": 1,
              "type": "string"
            },
            "branch": {
              "minLength": 1,
              "type": "string"
            },
            "ref_type": {
              "const": "commit",
              "type": "string"
            },
            "ref": {
              "minLength": 1,
              "type": "string"
            }
          },
          "required": [
            "repository",
            "branch",
            "ref_type",
            "ref"
          ],
          "type": "object"
        },
        "result": {
          "properties": {
            "row_count": {
              "minimum": 0,
              "type": "integer"
            },
            "feature_count": {
              "minimum": 0,
              "type": "integer"
            }
          },
          "required": [
            "row_count",
            "feature_count"
          ],
          "type": "object",
          "additionalProperties": false
        }
      },
      "required": [
        "workspace",
        "result"
      ],
      "additionalProperties": false
    }
  }
}

WorkspaceSpec.prefixWorkspaceSpec.read_only、pre/post guardrails、LakeFS endpoint、credentials、attempt branch、publish fence 和 publish_budget 原始字段不会写入 TaskDef。

Publish budget#

PublishBudget 用来声明 workspace publication 的本地运行时预算:

from perago import PublishBudget, TaskControls, TimeoutPolicy


controls = TaskControls(
    timeout=TimeoutPolicy(response_seconds=999),
    publish_budget=PublishBudget(
        observed_merge_p99_seconds=20,
        safety_margin_seconds=10,
        lakefs_merge_timeout_seconds=45,
        conductor_completion_timeout_seconds=15,
        worker_shutdown_grace_seconds=30,
        heartbeat_interval_seconds=10,
    ),
)

上面的 PublishBudget.response_timeout_seconds 派生值是 100

45 + 15 + 30 + 10 = 100

responseTimeoutSeconds 仍然来自 TimeoutPolicy.response_seconds,所以上面的 TaskDef 会生成 999。如果 TimeoutPolicy.response_seconds 小于 PublishBudget.response_timeout_seconds,TaskDef 生成会发出 warning,但不会用 publish budget 覆盖 task timeout。PublishBudget 本身不会写入 TaskDef JSON。它给 runtime 提供 LakeFS merge request timeout,并保留 Conductor completion 阶段预算。SDK TaskRunner owns completion result update;Perago 当前不把 conductor_completion_timeout_seconds 写成 SDK 内部 HTTP request timeout。lakefs_merge_timeout_seconds 必须覆盖 observed_merge_p99_seconds + safety_margin_seconds,否则校验失败。

publish_budget 只适用于 workspace task。workspace-free task 没有 LakeFS publication 阶段,配置 TaskControls(publish_budget=...) 会被拒绝。read-only workspace task 仍然是 workspace task,但没有 publication 阶段;如果配置了 publish_budgetperago checkperago extractperago start 应在启动/校验阶段 warning 一次,并忽略该预算,不应在每次 task execution 中刷 warning。

常见拒绝场景#

只配置 rate limit 的一半会失败:

TaskControls(
    limits=ExecutionLimits(rate_limit_frequency_in_seconds=60),
)

workspace-free task 配置 publish budget 会失败:

@task(
    name="metadata.validate",
    owner_email="data@example.com",
    controls=TaskControls(publish_budget=budget),
)
def validate_metadata(params: ValidateMetadataParams) -> ValidateMetadataOutput:
    ...

把 Conductor 生成字段塞回 decorator 也会失败:

@task(
    name="features.build",
    owner_email="data@example.com",
    params=BuildFeaturesParams,
    output=BuildFeaturesOutput,
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
    ...

这些字段的真源分别是函数签名、Pydantic model 和 TaskControls。任务作者只维护这些 Python 类型和 metadata,再用 perago check 验证,用 perago extract 生成 TaskDef JSON。