Examples#
本页提供可运行的 task module 示例,覆盖 workspace task、workspace-free task 和带 publish budget 的 workspace task。
Workspace task#
tests/fixtures/app/workers/features_build.py 是完整 workspace task。它展示了 single-task module、Pydantic params/output、workspace prefix、pre/post guardrails 和 TaskDef controls 如何组合。
from pathlib import Path
from pydantic import BaseModel, Field
from perago import (
ExecutionLimits,
RetryPolicy,
TaskControls,
TimeoutPolicy,
WorkspaceSpec,
forbid_glob,
require_dir,
require_glob,
task,
)
class BuildFeaturesParams(BaseModel):
feature_set: str
min_rows: int = Field(ge=1)
class BuildFeaturesOutput(BaseModel):
row_count: int = Field(ge=0)
feature_count: int = Field(ge=0)
@task(
name="features.build",
description="Build feature parquet files.",
owner_email="data@example.com",
workspace=WorkspaceSpec(
prefix="/audio/render",
pre=[
require_dir("raw"),
require_glob("raw/**/*.parquet", min_count=1),
],
post=[
require_dir("features"),
require_glob("features/**/*.parquet", min_count=1),
forbid_glob("**/*.tmp"),
],
),
controls=TaskControls(
retry=RetryPolicy(count=4, logic="FIXED", delay_seconds=30),
timeout=TimeoutPolicy(response_seconds=900),
limits=ExecutionLimits(concurrent_exec_limit=2),
),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
features = workspace / "features"
features.mkdir(exist_ok=True)
(features / f"{params.feature_set}.parquet").write_text("ok", encoding="utf-8")
return BuildFeaturesOutput(row_count=100, feature_count=24)
字段边界:
Required:
name、owner_email、params类型注解、返回类型注解,以及 runtime input 中的workspace和params。Optional:
description、WorkspaceSpec.prefix、pre/post guardrails、TaskControls中的 retry/timeout/limits。Generated: 业务函数的
workspace: Path参数、TaskDef schema、成功输出中的workspaceref。Forbidden: 业务函数直接接收 LakeFS ref、把业务字段展开成多个函数参数、在 decorator 中重复声明 params/output schema。
Workspace-free task#
tests/fixtures/app/workers/metadata_validate.py 是完整 workspace-free task。它只声明 task metadata 和 typed params -> result contract,不声明 WorkspaceSpec。
from pydantic import BaseModel, Field
from perago import task
class ValidateMetadataParams(BaseModel):
song_id: str
min_duration_seconds: int = Field(ge=1)
class ValidateMetadataOutput(BaseModel):
valid: bool
reason: str | None = None
@task(
name="metadata.validate",
description="Validate song metadata.",
owner_email="data@example.com",
)
def validate_metadata(params: ValidateMetadataParams) -> ValidateMetadataOutput:
return ValidateMetadataOutput(valid=True)
字段边界:
Required:
name、owner_email、params类型注解、返回类型注解,以及 runtime input 中的顶层params。Optional:
description和不涉及 publication 的TaskControls。Generated: TaskDef schema 和成功输出中的
result。Forbidden: 顶层
workspaceinput、WorkspaceSpec、publish_budget、多个业务参数或 keyword-only contract 参数。
Workspace task with publish budget#
这个正例展示 workspace task 如何声明 publication budget。PublishBudget 不会作为业务 input,也不会写入 TaskDef 的独立字段;它会派生 generated responseTimeoutSeconds,并在 runtime 中约束 LakeFS merge request timeout。Conductor completion 字段只是 responseTimeoutSeconds 中的阶段预留,当前不作为 SDK TaskRunner 内部 HTTP request timeout 生效。
from pathlib import Path
from pydantic import BaseModel
from perago import PublishBudget, TaskControls, WorkspaceSpec, task
class RenderParams(BaseModel):
stem: str
class RenderOutput(BaseModel):
file_count: int
@task(
name="audio.render",
owner_email="audio@example.com",
workspace=WorkspaceSpec(prefix="/audio/render"),
controls=TaskControls(
publish_budget=PublishBudget(
observed_merge_p99_seconds=20,
safety_margin_seconds=10,
lakefs_merge_timeout_seconds=45,
conductor_completion_timeout_seconds=15,
worker_shutdown_grace_seconds=30,
heartbeat_interval_seconds=10,
),
),
)
def render_audio(workspace: Path, params: RenderParams) -> RenderOutput:
output_dir = workspace / "rendered"
output_dir.mkdir(exist_ok=True)
(output_dir / f"{params.stem}.wav").write_bytes(b"")
return RenderOutput(file_count=1)
字段边界:
Required:
PublishBudget的六个时间字段都必须显式配置,且lakefs_merge_timeout_seconds必须覆盖observed_merge_p99_seconds + safety_margin_seconds。Generated:
responseTimeoutSeconds = 45 + 15 + 30 + 10 = 100。Forbidden: workspace-free task 不能配置
publish_budget。
本地验证#
对一个 module 做任务声明层面的检查:
PYTHONPATH=tests/fixtures perago check app.workers.features_build
PYTHONPATH=tests/fixtures perago check app.workers.metadata_validate
生成 Conductor TaskDef JSON:
PYTHONPATH=tests/fixtures perago extract app.workers.features_build --output /tmp/features.build.json
PYTHONPATH=tests/fixtures perago extract app.workers.metadata_validate --output /tmp/metadata.validate.json
perago check 和 perago extract 都以 Python import path 指向单个 module。不要传文件路径,也不要把多个 task 放进同一个 module。
反例索引#
这些 fixture 是文档规则的可执行反例。它们用于测试 @task(...) import-time validation 和 CLI 诊断。
Fixture |
被拒绝的写法 |
对应规则 |
|---|---|---|
|
workspace 参数名写成 |
workspace task 第一个参数必须名为 |
|
task function 是 |
worker body 必须是同步函数 |
|
|
contract 参数不能声明默认值 |
|
增加 keyword-only 参数 |
contract 只允许固定 positional-or-keyword 参数 |
|
|
params model 必须来自函数签名注解 |
|
缺少返回类型注解 |
output model 必须来自返回类型注解 |
|
使用 |
不支持 |
|
guardrail path 以 |
guardrail path 必须相对 workspace root |
|
prefix 逃出 workspace root |
|
|
|
controls model 拒绝额外字段 |
|
task name 包含 path separator |
task name 不能是路径 |
|
一个 module 定义两个 task |
Perago module 只能定义一个 task worker |
|
module 没有 task |
CLI 目标必须声明一个 Perago task |
反例仅用于测试和定位错误。遇到相似错误时,优先运行 perago check <module>,然后回到对应规则页确认字段边界和函数签名。