Perago#
Perago 是一个内部任务运行时上下文,用于让 typed Python workers 在版本化 workspace 上执行 Conductor tasks。
它把任务契约、Conductor TaskDef、LakeFS workspace 输入输出、attempt-local workspace、发布事务和 guardrail 收敛到同一组边界里。任务作者主要关心函数签名、Pydantic 契约、workspace 注入和 guardrail;运行时维护者主要关心 worker process、Conductor poll/result、LakeFS 同步和 publication fence。
最小 workspace task#
from pathlib import Path
from pydantic import BaseModel
from perago import WorkspaceSpec, require_file, task
class Params(BaseModel):
source: str
class Output(BaseModel):
rows: int
@task(
name="features.build",
owner_email="data@example.com",
workspace=WorkspaceSpec(
prefix="/",
pre=[require_file("input/data.csv")],
)
)
def build_features(workspace: Path, params: Params) -> Output:
input_path = workspace / "input" / "data.csv"
return Output(rows=sum(1 for _ in input_path.open()))
最小 workspace-free task#
from pydantic import BaseModel
from perago import task
class Params(BaseModel):
value: int
class Output(BaseModel):
doubled: int
@task(
name="numbers.double",
owner_email="data@example.com",
)
def double(params: Params) -> Output:
return Output(doubled=params.value * 2)