Getting Started#
本页介绍 Perago task module 的最小上手路径:先看完整 @task 示例、三个核心命令和生成的 TaskDef,再按需要理解 workspace、controls、guardrail 和 workspace-free task。
完整 workspace task#
Workspace task 适合需要读写 LakeFS workspace 的 Conductor task。函数签名是 (workspace: Path, params: ParamsModel) -> OutputModel。
from pathlib import Path
from pydantic import BaseModel, Field
from perago import (
ExecutionLimits,
RetryPolicy,
TaskControls,
TimeoutPolicy,
WorkspaceSpec,
forbid_glob,
require_dir,
require_glob,
task,
)
class BuildFeaturesParams(BaseModel):
feature_set: str
min_rows: int = Field(ge=1)
class BuildFeaturesOutput(BaseModel):
row_count: int = Field(ge=0)
feature_count: int = Field(ge=0)
@task(
name="features.build",
description="Build feature parquet files.",
owner_email="data@example.com",
workspace=WorkspaceSpec(
prefix="/audio/render",
pre=[
require_dir("raw"),
require_glob("raw/**/*.parquet", min_count=1),
],
post=[
require_dir("features"),
require_glob("features/**/*.parquet", min_count=1),
forbid_glob("**/*.tmp"),
],
),
controls=TaskControls(
retry=RetryPolicy(count=4, logic="FIXED", delay_seconds=30),
timeout=TimeoutPolicy(response_seconds=900),
limits=ExecutionLimits(concurrent_exec_limit=2),
),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
features = workspace / "features"
features.mkdir(exist_ok=True)
(features / f"{params.feature_set}.parquet").write_text("ok", encoding="utf-8")
return BuildFeaturesOutput(row_count=100, feature_count=24)
任务作者需要维护的是 name、owner_email、可选 description、workspace 声明、controls、Pydantic params/output 和函数体。inputKeys、outputKeys、JSON Schema、retry/timeout 字段和 execution limit 字段由 Perago 生成。
深入阅读:Workspace Task 和 Examples。
三个核心命令#
perago check app.workers.features_build
perago extract app.workers.features_build --output generated/features.build.json
perago start app.workers.features_build -j 2
perago check 会导入 module、校验 task contract、加载 runtime config,并确认 TaskDef 可以生成。它不连接 Conductor 或 LakeFS。
perago extract 使用同一套校验,把 generated Conductor TaskDef 写到指定 .json 文件。它不会注册 TaskDef。
perago start 是长运行 worker 入口。启动前需要 CONDUCTOR_SERVER_URL、LakeFS endpoint、LakeFS access key、LakeFS secret key 已配置,并且 Conductor 中已经注册同名 TaskDef。
本仓库 fixture 示例在 tests/fixtures 下,本地验证 fixture 时用:
PYTHONPATH=tests/fixtures uv run perago check app.workers.features_build
PYTHONPATH=tests/fixtures uv run perago extract app.workers.features_build --output /tmp/features.build.json
深入阅读:Runtime 深入,以及 CLI、Runtime Configuration、Conductor Runtime。
生成的 TaskDef#
上面的 task 会生成类似下面的 Conductor TaskDef。下面的示例保留 task 作者最常核对的核心结构:
{
"name": "features.build",
"ownerEmail": "data@example.com",
"description": "Build feature parquet files.",
"retryCount": 4,
"retryLogic": "FIXED",
"retryDelaySeconds": 30,
"maxRetryDelaySeconds": 0,
"backoffJitterMs": 0,
"totalTimeoutSeconds": 0,
"timeoutPolicy": "TIME_OUT_WF",
"timeoutSeconds": 0,
"responseTimeoutSeconds": 900,
"pollTimeoutSeconds": 0,
"concurrentExecLimit": 2,
"inputKeys": ["workspace", "params"],
"outputKeys": ["workspace", "result"],
"inputSchema": {
"name": "features.build.input",
"version": 1,
"type": "JSON",
"data": {
"type": "object",
"required": ["workspace", "params"],
"additionalProperties": false,
"properties": {
"workspace": {
"type": "object",
"required": ["repository", "branch", "ref_type", "ref"],
"properties": {
"repository": {"type": "string"},
"branch": {"type": "string"},
"ref_type": {"const": "commit", "type": "string"},
"ref": {"type": "string"}
}
},
"params": {
"type": "object",
"required": ["feature_set", "min_rows"],
"additionalProperties": false,
"properties": {
"feature_set": {"type": "string"},
"min_rows": {"type": "integer", "minimum": 1}
}
}
}
}
},
"outputSchema": {
"name": "features.build.output",
"version": 1,
"type": "JSON",
"data": {
"type": "object",
"required": ["workspace", "result"],
"additionalProperties": false
}
}
}
WorkspaceSpec.prefix、pre / post guardrail、LakeFS endpoint、credentials、attempt branch、publish fence 和 publish_budget 原始字段不会写入 TaskDef。
深入阅读:Controls and TaskDef、Conductor TaskDef 和 Input/Output Contract。
Control 参数#
TaskControls 是 task 作者影响 Conductor 执行控制字段的唯一入口。没有特殊控制需求时可以省略 controls,默认等价于 TaskControls()。
Perago 参数 |
TaskDef 字段 |
默认值 |
说明 |
|---|---|---|---|
|
|
|
允许 |
|
|
|
可选 |
|
|
|
初始 retry delay。 |
|
|
|
最大 retry delay。 |
|
|
|
backoff jitter。 |
|
|
|
可选 |
|
|
|
Conductor task timeout。 |
|
|
|
没有 publish budget 时使用。 |
|
|
|
Conductor poll timeout。 |
|
|
|
Conductor total timeout。 |
|
|
omitted |
为 |
|
|
omitted |
必须和 |
|
|
omitted |
必须和 |
|
derives |
|
只允许 workspace task 使用,不直接写入 TaskDef。 |
PublishBudget 用 workspace publication 的运行时预算派生 responseTimeoutSeconds:
from perago import PublishBudget, TaskControls, TimeoutPolicy
controls = TaskControls(
timeout=TimeoutPolicy(response_seconds=999),
publish_budget=PublishBudget(
observed_merge_p99_seconds=20,
safety_margin_seconds=10,
lakefs_merge_timeout_seconds=45,
conductor_completion_timeout_seconds=15,
worker_shutdown_grace_seconds=30,
heartbeat_interval_seconds=10,
),
)
上例的 responseTimeoutSeconds 取 45 + 15 + 30 + 10 = 100。lakefs_merge_timeout_seconds 必须覆盖 observed_merge_p99_seconds + safety_margin_seconds。
深入阅读:Controls and TaskDef 和 Publish Budget。
Workspace 和 guardrail#
WorkspaceSpec(prefix=...) 决定从 LakeFS repository 的哪个 object prefix 投影到本地 attempt workspace。"/" 表示 repository root,其他值会归一化成相对 prefix。
pre guardrail 在 task body 运行前检查下载后的 workspace,post guardrail 在 task body 运行后、publication 前检查输出。常用 guardrail:
函数 |
用途 |
|---|---|
|
要求文件存在。 |
|
要求目录存在。 |
|
要求 glob 至少匹配指定数量。 |
|
禁止 glob 匹配任何路径。 |
Guardrail 失败会阻止 task body 或 publication 继续执行;guardrail 本身不写入 Conductor TaskDef。
Workspace-free task#
不需要 LakeFS workspace 的 task 使用 (params: ParamsModel) -> OutputModel 签名,并且不能声明 workspace=WorkspaceSpec(...)。
from pydantic import BaseModel, Field
from perago import task
class ValidateMetadataParams(BaseModel):
song_id: str
min_duration_seconds: int = Field(ge=1)
class ValidateMetadataOutput(BaseModel):
valid: bool
reason: str | None = None
@task(
name="metadata.validate",
description="Validate song metadata.",
owner_email="data@example.com",
)
def validate_metadata(params: ValidateMetadataParams) -> ValidateMetadataOutput:
return ValidateMetadataOutput(valid=True)
Workspace-free TaskDef 的 inputKeys 是 ["params"],outputKeys 是 ["result"]。它可以使用 retry、timeout 和 execution limit controls,但不能配置 publish_budget。
常见边界#
一个 module 只能声明一个 Perago task。
params和返回值必须是 Pydantic model。workspace task 的第一个参数必须是
workspace: Path。业务参数必须收敛到单个
paramsmodel,不能拆成多个函数参数。decorator 不能接收
inputKeys、outputKeys、inputSchema、outputSchema、params或output这类生成字段。
深入阅读:Task Module、Task Contract 和 Troubleshooting。
继续阅读#
LakeFS publication 失败语义和 fence 模型见 LakeFS 发布协议。继续深入时,可从 Development 进入 runtime、reference、architecture 和 API 维护资料。