Workspace Task#

workspace task 是需要读取版本化 workspace 的 Perago worker。它把 Conductor input 中的 workspace ref 下载到本地 attempt workspace,把业务函数的 workspace: Path 指向这个本地目录;可写 workspace task 在产生变更后才把输出发布回 LakeFS。

最小示例#

一个 workspace task module 只定义一个 task worker。@task(...) 声明 task metadata 和 WorkspaceSpec(...);函数签名声明唯一的业务 contract。

from pathlib import Path

from pydantic import BaseModel

from perago import WorkspaceSpec, require_file, task


class BuildFeaturesParams(BaseModel):
    source: str


class BuildFeaturesOutput(BaseModel):
    rows: int


@task(
    name="features.build",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(
        prefix="/",
        pre=[require_file("input/data.csv")],
    ),
)
def build_features(
    workspace: Path,
    params: BuildFeaturesParams,
) -> BuildFeaturesOutput:
    input_path = workspace / "input" / "data.csv"
    return BuildFeaturesOutput(rows=sum(1 for _ in input_path.open()))

Required/generated 字段边界:

  • input required: Conductor input 必须提供 workspaceparams

  • input generated: workspace: Path 由 Perago 注入,不由业务调用方传入函数。

  • output generated: 函数返回值序列化为 Conductor output 的 result;Perago 生成 output workspace ref,ref 可能是 input ref,也可能是成功发布后的新 ref。

  • task metadata required: @task(...) 必须声明 nameowner_email

  • task metadata optional: description、guardrail 和 controls 可按任务需要声明;WorkspaceSpecprefix 参数可省略为默认值 "/"

函数签名规则#

workspace task 的函数签名固定为两个 positional-or-keyword 参数:

def task_fn(workspace: Path, params: ParamsModel) -> OutputModel:
    ...

Perago 在 module import 时校验这个签名,perago check 复用同一套校验并给出 CLI 诊断。合法 workspace task 必须满足:

  • 第一个参数名是 workspace,类型注解是 pathlib.Path

  • 第二个参数名是 params,类型注解是 Pydantic BaseModel 子类。

  • 返回类型注解是 Pydantic BaseModel 子类。

  • 不使用默认参数、keyword-only 参数、*args**kwargs 或未标注类型的 contract 字段。

  • @task(...) 必须声明 workspace=WorkspaceSpec(...)

@task(...) 不重复声明 params 或 output schema。Pydantic params/output models 是 contract 真源,也是 TaskDef schema 生成来源。

WorkspaceSpec prefix#

WorkspaceSpec(prefix=...) 定义业务函数看到的 workspace 根目录对应 LakeFS ref 中的哪个对象前缀。它属于 task metadata,workflow input 只携带 workspace ref。

如果 task 声明:

WorkspaceSpec(prefix="/audio/render")

那么业务代码里的:

workspace / "raw" / "clip.parquet"

对应 LakeFS ref 下的:

audio/render/raw/clip.parquet

prefix 规则:

  • "/" 表示仓库 ref 根目录。

  • "/audio/render" 会规范化为 "audio/render"

  • "audio/render" 合法。

  • """../raw""audio/../raw" 非法。

  • 字符串 prefix 使用 / 分隔,不能使用反斜杠路径。

读取和写入 workspace#

业务函数只操作本地 Path。不要在 task body 内直接拼 LakeFS 对象路径、分支名或 commit ref。

def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
    features = workspace / "features"
    features.mkdir(exist_ok=True)
    (features / f"{params.source}.txt").write_text("ok", encoding="utf-8")
    return BuildFeaturesOutput(rows=1)

运行时负责:

  • 按 Conductor input 的 workspace ref 下载 WorkspaceSpec(prefix=...) 指向的内容。

  • 在本地 attempt workspace 中执行函数。

  • 检查 post guardrails。

  • 根据 WorkspaceSpec(read_only=...) 和 workspace diff 决定是否进入 LakeFS publication。

  • 成功后报告 generated output workspaceresult

Read-only workspace task#

只需要读取 LakeFS workspace 的节点仍然应该声明 workspace task,而不是伪装成 workspace-free task。使用 read_only=True 表示这个 task 不产生 workspace publication:

@task(
    name="metadata.inspect",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(prefix="/audio/render", read_only=True),
)
def inspect_metadata(workspace: Path, params: InspectParams) -> InspectOutput:
    manifest = workspace / "manifest.json"
    return InspectOutput(found=manifest.exists())

read_only=True 的成功 output 保留 input workspace ref。runtime 不检查 target branch HEAD、不创建 staging branch、不提交 LakeFS commit,也不发布新 ref;因此它也不进入 Perago 为可写 LakeFS 路径设置的 attempt fence。最终 COMPLETED result 按普通 Conductor worker completion 回写。它不是 OS-level readonly mount;如果函数误写了本机 attempt workspace,这些写入会随 cleanup 丢弃。

默认 read_only=False。可写 workspace task 执行后如果没有任何 workspace diff,Perago 不会创建 empty commit;runtime 会按 publish fence 检查 target branch 状态,并保持 output ref 与 target branch 可见 head 一致。

常见拒绝场景#

下面这些 module 会在 import-time validation、perago check 或 worker 启动时失败:

# 参数名错误。
@task(
    name="features.build",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(prefix="/"),
)
def build_features(path: Path, input: BuildFeaturesParams) -> BuildFeaturesOutput:
    ...


# 业务字段不能展开到函数签名。
@task(
    name="features.build",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(prefix="/"),
)
def build_features(
    workspace: Path,
    source: str,
) -> BuildFeaturesOutput:
    ...


# workspace task 必须声明 WorkspaceSpec。
@task(name="features.build", owner_email="data@example.com")
def build_features(
    workspace: Path,
    params: BuildFeaturesParams,
) -> BuildFeaturesOutput:
    ...

同一个 Python module 只能定义一个 task worker。perago check app.workers.features_buildperago extract app.workers.features_build --output generated/features.build.jsonperago start app.workers.features_build -j 4 都以这个 single-task module 为目标。

可运行参考#

仓库测试夹具中的 tests/fixtures/app/workers/features_build.py 是完整 workspace task 参考。它展示了 WorkspaceSpec(prefix="/audio/render")、pre/post guardrails 和 TaskControls(...) 如何放在同一个 task declaration 中。