Getting Started#

本页给新任务作者一条最短本地路径:写一个 single-task module,运行 perago check,再生成 Conductor TaskDef。这里不启动 worker;perago start 需要真实 Conductor 和 LakeFS 配置,通常属于部署或运行时维护流程。

前置条件#

Perago 要求 Python 3.10 或更新版本。仓库本地开发默认使用 uv

uv sync

以下命令都从仓库根目录运行。示例 module 来自 tests/fixtures/app/workers/,因此本地验证 fixture 时需要设置 PYTHONPATH=tests/fixtures

写一个 workspace task#

Workspace task 适合需要读写版本化 workspace 的 Conductor task。函数签名必须是 (workspace: Path, params: ParamsModel) -> OutputModel,并且 decorator 必须提供 nameowner_email

from pathlib import Path

from pydantic import BaseModel, Field

from perago import WorkspaceSpec, require_dir, require_glob, task


class BuildFeaturesParams(BaseModel):
    feature_set: str
    min_rows: int = Field(ge=1)


class BuildFeaturesOutput(BaseModel):
    row_count: int = Field(ge=0)
    feature_count: int = Field(ge=0)


@task(
    name="features.build",
    description="Build feature parquet files.",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(
        prefix="/audio/render",
        pre=[
            require_dir("raw"),
            require_glob("raw/**/*.parquet", min_count=1),
        ],
    ),
)
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
    features = workspace / "features"
    features.mkdir(exist_ok=True)
    (features / f"{params.feature_set}.parquet").write_text("ok", encoding="utf-8")
    return BuildFeaturesOutput(row_count=100, feature_count=24)

Required: nameowner_emailworkspace: Pathparams Pydantic model、返回 Pydantic model。Optional: description、workspace prefix、pre/post workspace checks 和 task controls。Generated: Conductor input 中的 workspace 本机路径注入、TaskDef schema、成功输出中的 workspace ref。

写一个 workspace-free task#

Workspace-free task 适合只处理 typed input/output、无需 LakeFS workspace 的 task。函数签名必须是 (params: ParamsModel) -> OutputModel,不能声明 WorkspaceSpec

from pydantic import BaseModel, Field

from perago import task


class ValidateMetadataParams(BaseModel):
    song_id: str
    min_duration_seconds: int = Field(ge=1)


class ValidateMetadataOutput(BaseModel):
    valid: bool
    reason: str | None = None


@task(
    name="metadata.validate",
    description="Validate song metadata.",
    owner_email="data@example.com",
)
def validate_metadata(params: ValidateMetadataParams) -> ValidateMetadataOutput:
    return ValidateMetadataOutput(valid=True)

Required: nameowner_emailparams Pydantic model 和返回 Pydantic model。Optional: description 和不涉及 publication 的 TaskControls。Forbidden: workspace 参数、WorkspaceSpecpublish_budget、多个业务参数和 keyword-only 参数。

本地检查#

perago check 会加载 runtime config、导入 module、校验 task contract,并确认 TaskDef 可以生成。它不连接 Conductor 或 LakeFS:

PYTHONPATH=tests/fixtures uv run perago check app.workers.features_build
PYTHONPATH=tests/fixtures uv run perago check app.workers.metadata_validate

成功输出会包含任务名、本机 workspace/log 目录、worker id 前缀,以及 Conductor 和 LakeFS 是否已配置。conductor: not configuredlakefs: not configuredcheck 不是失败条件。

生成 TaskDef#

perago extract 使用同一套校验,并把 generated Conductor TaskDef 写到 JSON 文件。它不会注册 TaskDef:

PYTHONPATH=tests/fixtures uv run perago extract app.workers.features_build --output /tmp/features.build.json
PYTHONPATH=tests/fixtures uv run perago extract app.workers.metadata_validate --output /tmp/metadata.validate.json

输出路径必须以 .json 结尾。部署流程或 Conductor 管理工具负责把生成的 TaskDef 注册到 Conductor。

启动 worker 的边界#

perago start <module_target> -j N 是长运行入口。启动前它会要求:

  • CONDUCTOR_SERVER_URL 已配置。

  • LakeFS endpoint、access key id 和 secret access key 已完整配置。

  • Conductor 中已经注册同名 TaskDef。

开发 task body、params/output schema 或 workspace checks 时,先停在 checkextract。需要理解启动后的 poll、workspace download、stage、publish 和 cleanup 生命周期时,再阅读 Runtime

下一步#

新任务作者继续读 Task Authoring。需要核对精确 JSON shape、TaskDef 字段或错误分类时,读 Reference。运行时维护者从 Runtime 开始;需要理解 publication fence 和事务取舍时,读 Architecture