Perago#

Perago 是一个内部任务运行时上下文,用于让 typed Python workers 在版本化 workspace 上执行 Conductor tasks。

它把任务契约、Conductor TaskDef、LakeFS workspace 输入输出、attempt-local workspace、发布事务和 guardrail 收敛到同一组边界里。任务作者主要关心函数签名、Pydantic 契约、workspace 注入和 guardrail;运行时维护者主要关心 worker process、Conductor poll/result、LakeFS 同步和 publication fence。

最小 workspace task#

from pathlib import Path

from pydantic import BaseModel

from perago import WorkspaceSpec, require_file, task


class Params(BaseModel):
    source: str


class Output(BaseModel):
    rows: int


@task(
    name="features.build",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(
        prefix="/",
        pre=[require_file("input/data.csv")],
    )
)
def build_features(workspace: Path, params: Params) -> Output:
    input_path = workspace / "input" / "data.csv"
    return Output(rows=sum(1 for _ in input_path.open()))

最小 workspace-free task#

from pydantic import BaseModel

from perago import task


class Params(BaseModel):
    value: int


class Output(BaseModel):
    doubled: int


@task(
    name="numbers.double",
    owner_email="data@example.com",
)
def double(params: Params) -> Output:
    return Output(doubled=params.value * 2)