Controls and TaskDef#
TaskControls 是任务作者能影响 Conductor TaskDef 执行控制字段的唯一入口。业务 input/output、workspace schema、TaskDef key 列表和 JSON Schema 都由 Perago 从函数签名与 Pydantic model 生成,不在 @task(...) 里重复声明。
最小示例#
没有特殊控制需求时,可以省略 controls。Perago 会使用 TaskControls() 的默认值:
from pathlib import Path
from pydantic import BaseModel, Field
from perago import WorkspaceSpec, task
class BuildFeaturesParams(BaseModel):
feature_set: str
min_rows: int = Field(ge=1)
class BuildFeaturesOutput(BaseModel):
row_count: int = Field(ge=0)
feature_count: int = Field(ge=0)
@task(
name="features.build",
owner_email="data@example.com",
workspace=WorkspaceSpec(prefix="/audio/render"),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
...
需要覆盖 retry、timeout 或 execution limit 时,把 TaskControls 作为 decorator metadata 传入:
from perago import ExecutionLimits, RetryPolicy, TaskControls, TimeoutPolicy
@task(
name="features.build",
owner_email="data@example.com",
workspace=WorkspaceSpec(prefix="/audio/render"),
controls=TaskControls(
retry=RetryPolicy(count=4, logic="FIXED", delay_seconds=30),
timeout=TimeoutPolicy(response_seconds=900),
limits=ExecutionLimits(concurrent_exec_limit=2),
),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
...
Required/optional/generated 字段边界:
required:
name和owner_email必须由任务作者显式提供。optional:
description和controls可以省略;controls省略时使用TaskControls()。conditional:
workspace=WorkspaceSpec(...)只在 workspace task 上 required,workspace-free task 上 forbidden。generated:
inputKeys、outputKeys、inputSchema、outputSchema、responseTimeoutSeconds的最终值和业务 schema 由 Perago 生成。forbidden: 任务作者不能在 decorator 里提供
params、output、inputTemplate或 Conductor schema 字段。
控制字段映射#
perago extract 会把 TaskControls 展开到 Conductor TaskDef 顶层字段。值为 None 的 execution limit 字段会从生成的 JSON 中省略。
Perago 字段 |
Conductor TaskDef 字段 |
默认值 |
说明 |
|---|---|---|---|
|
|
无 |
任务名,必须显式提供。 |
|
|
无 |
Conductor owner email,必须显式提供。 |
|
|
|
为 |
|
|
|
允许 |
|
|
|
可选 |
|
|
|
非负整数。 |
|
|
|
非负整数。 |
|
|
|
非负整数。 |
|
|
|
非负整数。 |
|
|
|
可选 |
|
|
|
非负整数。 |
|
|
|
没有 publish budget 时使用。 |
|
|
|
非负整数。 |
|
|
|
为 |
|
|
|
必须和 |
|
|
|
必须和 |
|
publication runtime budget |
|
只允许 workspace task 使用;read-only workspace task 上会被忽略并发出 warning。 |
所有 control model 都使用 Pydantic 校验并拒绝未知字段。校验失败会在模块导入、perago check 或 perago extract 阶段暴露为 task definition 错误。
生成的 TaskDef 结构#
workspace task 的 TaskDef input/output key 固定为:
inputKeys = ["workspace", "params"]
outputKeys = ["workspace", "result"]
workspace-free task 的 TaskDef input/output key 固定为:
inputKeys = ["params"]
outputKeys = ["result"]
workspace schema 来自 Perago 的 WorkspaceInput / WorkspaceOutput,params 和 result schema 来自任务函数的 Pydantic 类型注解。perago extract 还会把嵌套 schema inline,删除 Pydantic title 和从 BaseModel class docstring 自动生成的 object-level description,并把 object schema 关闭为 additionalProperties: false。
不要在 task 的 params / result model 中使用 RootModel,也不要依赖 ConfigDict。Perago 期望 task contract 是普通 BaseModel object model:RootModel 会被 perago check、perago extract 和 perago start 直接拒绝;配置了 ConfigDict 的 task model 会报 warning。Perago 当前不保证 ConfigDict model 的 TaskDef schema 或运行时行为。
Perago 不生成 Conductor inputTemplate。Pydantic 字段默认值会保留在 JSON Schema 里,但不会被复制到 TaskDef 顶层的 input template 中。
Guardrail 也不会写入 TaskDef。require_file、require_dir、require_glob 和 forbid_glob 是 Perago runtime metadata,只影响 workspace 准备前后的本地检查。
下面是 tests/fixtures/app/workers/features_build.py 通过 build_taskdef(load_module_task("app.workers.features_build")) 生成的真实 TaskDef 输出:
{
"name": "features.build",
"ownerEmail": "data@example.com",
"description": "Build feature parquet files.",
"retryCount": 4,
"retryLogic": "FIXED",
"retryDelaySeconds": 30,
"maxRetryDelaySeconds": 0,
"backoffJitterMs": 0,
"totalTimeoutSeconds": 0,
"timeoutPolicy": "TIME_OUT_WF",
"timeoutSeconds": 0,
"responseTimeoutSeconds": 900,
"pollTimeoutSeconds": 0,
"concurrentExecLimit": 2,
"inputKeys": [
"workspace",
"params"
],
"outputKeys": [
"workspace",
"result"
],
"inputSchema": {
"name": "features.build.input",
"version": 1,
"type": "JSON",
"data": {
"type": "object",
"properties": {
"workspace": {
"additionalProperties": false,
"properties": {
"repository": {
"minLength": 1,
"type": "string"
},
"branch": {
"minLength": 1,
"type": "string"
},
"ref_type": {
"const": "commit",
"type": "string"
},
"ref": {
"minLength": 1,
"type": "string"
}
},
"required": [
"repository",
"branch",
"ref_type",
"ref"
],
"type": "object"
},
"params": {
"properties": {
"feature_set": {
"type": "string"
},
"min_rows": {
"minimum": 1,
"type": "integer"
}
},
"required": [
"feature_set",
"min_rows"
],
"type": "object",
"additionalProperties": false
}
},
"required": [
"workspace",
"params"
],
"additionalProperties": false
}
},
"outputSchema": {
"name": "features.build.output",
"version": 1,
"type": "JSON",
"data": {
"type": "object",
"properties": {
"workspace": {
"additionalProperties": false,
"properties": {
"repository": {
"minLength": 1,
"type": "string"
},
"branch": {
"minLength": 1,
"type": "string"
},
"ref_type": {
"const": "commit",
"type": "string"
},
"ref": {
"minLength": 1,
"type": "string"
}
},
"required": [
"repository",
"branch",
"ref_type",
"ref"
],
"type": "object"
},
"result": {
"properties": {
"row_count": {
"minimum": 0,
"type": "integer"
},
"feature_count": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"row_count",
"feature_count"
],
"type": "object",
"additionalProperties": false
}
},
"required": [
"workspace",
"result"
],
"additionalProperties": false
}
}
}
WorkspaceSpec.prefix、WorkspaceSpec.read_only、pre/post guardrails、LakeFS endpoint、credentials、attempt branch、publish fence 和 publish_budget 原始字段不会写入 TaskDef。
Publish budget#
PublishBudget 用来声明 workspace publication 的本地运行时预算:
from perago import PublishBudget, TaskControls, TimeoutPolicy
controls = TaskControls(
timeout=TimeoutPolicy(response_seconds=999),
publish_budget=PublishBudget(
observed_merge_p99_seconds=20,
safety_margin_seconds=10,
lakefs_merge_timeout_seconds=45,
conductor_completion_timeout_seconds=15,
worker_shutdown_grace_seconds=30,
heartbeat_interval_seconds=10,
),
)
上面的 PublishBudget.response_timeout_seconds 派生值是 100:
45 + 15 + 30 + 10 = 100
responseTimeoutSeconds 仍然来自 TimeoutPolicy.response_seconds,所以上面的 TaskDef 会生成 999。如果 TimeoutPolicy.response_seconds 小于 PublishBudget.response_timeout_seconds,TaskDef 生成会发出 warning,但不会用 publish budget 覆盖 task timeout。PublishBudget 本身不会写入 TaskDef JSON。它给 runtime 提供 LakeFS merge request timeout,并保留 Conductor completion 阶段预算。SDK TaskRunner owns completion result update;Perago 当前不把 conductor_completion_timeout_seconds 写成 SDK 内部 HTTP request timeout。lakefs_merge_timeout_seconds 必须覆盖 observed_merge_p99_seconds + safety_margin_seconds,否则校验失败。
publish_budget 只适用于 workspace task。workspace-free task 没有 LakeFS publication 阶段,配置 TaskControls(publish_budget=...) 会被拒绝。read-only workspace task 仍然是 workspace task,但没有 publication 阶段;如果配置了 publish_budget,perago check、perago extract 和 perago start 应在启动/校验阶段 warning 一次,并忽略该预算,不应在每次 task execution 中刷 warning。
常见拒绝场景#
只配置 rate limit 的一半会失败:
TaskControls(
limits=ExecutionLimits(rate_limit_frequency_in_seconds=60),
)
workspace-free task 配置 publish budget 会失败:
@task(
name="metadata.validate",
owner_email="data@example.com",
controls=TaskControls(publish_budget=budget),
)
def validate_metadata(params: ValidateMetadataParams) -> ValidateMetadataOutput:
...
把 Conductor 生成字段塞回 decorator 也会失败:
@task(
name="features.build",
owner_email="data@example.com",
params=BuildFeaturesParams,
output=BuildFeaturesOutput,
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
...
这些字段的真源分别是函数签名、Pydantic model 和 TaskControls。任务作者只维护这些 Python 类型和 metadata,再用 perago check 验证,用 perago extract 生成 TaskDef JSON。