Workspace Task#
workspace task 是需要读取版本化 workspace 的 Perago worker。它把 Conductor input 中的 workspace ref 下载到本地 attempt workspace,把业务函数的 workspace: Path 指向这个本地目录;可写 workspace task 在产生变更后才把输出发布回 LakeFS。
最小示例#
一个 workspace task module 只定义一个 task worker。@task(...) 声明 task metadata 和 WorkspaceSpec(...);函数签名声明唯一的业务 contract。
from pathlib import Path
from pydantic import BaseModel
from perago import WorkspaceSpec, require_file, task
class BuildFeaturesParams(BaseModel):
source: str
class BuildFeaturesOutput(BaseModel):
rows: int
@task(
name="features.build",
owner_email="data@example.com",
workspace=WorkspaceSpec(
prefix="/",
pre=[require_file("input/data.csv")],
),
)
def build_features(
workspace: Path,
params: BuildFeaturesParams,
) -> BuildFeaturesOutput:
input_path = workspace / "input" / "data.csv"
return BuildFeaturesOutput(rows=sum(1 for _ in input_path.open()))
Required/generated 字段边界:
input required: Conductor input 必须提供
workspace和params。input generated:
workspace: Path由 Perago 注入,不由业务调用方传入函数。output generated: 函数返回值序列化为 Conductor output 的
result;Perago 生成 outputworkspaceref,ref 可能是 input ref,也可能是成功发布后的新 ref。task metadata required:
@task(...)必须声明name和owner_email。task metadata optional:
description、guardrail 和 controls 可按任务需要声明;WorkspaceSpec的prefix参数可省略为默认值"/"。
函数签名规则#
workspace task 的函数签名固定为两个 positional-or-keyword 参数:
def task_fn(workspace: Path, params: ParamsModel) -> OutputModel:
...
Perago 在 module import 时校验这个签名,perago check 复用同一套校验并给出 CLI 诊断。合法 workspace task 必须满足:
第一个参数名是
workspace,类型注解是pathlib.Path。第二个参数名是
params,类型注解是 PydanticBaseModel子类。返回类型注解是 Pydantic
BaseModel子类。不使用默认参数、keyword-only 参数、
*args、**kwargs或未标注类型的 contract 字段。@task(...)必须声明workspace=WorkspaceSpec(...)。
@task(...) 不重复声明 params 或 output schema。Pydantic params/output models 是 contract 真源,也是 TaskDef schema 生成来源。
WorkspaceSpec prefix#
WorkspaceSpec(prefix=...) 定义业务函数看到的 workspace 根目录对应 LakeFS ref 中的哪个对象前缀。它属于 task metadata,workflow input 只携带 workspace ref。
如果 task 声明:
WorkspaceSpec(prefix="/audio/render")
那么业务代码里的:
workspace / "raw" / "clip.parquet"
对应 LakeFS ref 下的:
audio/render/raw/clip.parquet
prefix 规则:
"/"表示仓库 ref 根目录。"/audio/render"会规范化为"audio/render"。"audio/render"合法。""、"../raw"、"audio/../raw"非法。字符串 prefix 使用
/分隔,不能使用反斜杠路径。
读取和写入 workspace#
业务函数只操作本地 Path。不要在 task body 内直接拼 LakeFS 对象路径、分支名或 commit ref。
def build_features(workspace: Path, params: BuildFeaturesParams) -> BuildFeaturesOutput:
features = workspace / "features"
features.mkdir(exist_ok=True)
(features / f"{params.source}.txt").write_text("ok", encoding="utf-8")
return BuildFeaturesOutput(rows=1)
运行时负责:
按 Conductor input 的 workspace ref 下载
WorkspaceSpec(prefix=...)指向的内容。在本地 attempt workspace 中执行函数。
检查 post guardrails。
根据
WorkspaceSpec(read_only=...)和 workspace diff 决定是否进入 LakeFS publication。成功后报告 generated output
workspace和result。
Read-only workspace task#
只需要读取 LakeFS workspace 的节点仍然应该声明 workspace task,而不是伪装成 workspace-free task。使用 read_only=True 表示这个 task 不产生 workspace publication:
@task(
name="metadata.inspect",
owner_email="data@example.com",
workspace=WorkspaceSpec(prefix="/audio/render", read_only=True),
)
def inspect_metadata(workspace: Path, params: InspectParams) -> InspectOutput:
manifest = workspace / "manifest.json"
return InspectOutput(found=manifest.exists())
read_only=True 的成功 output 保留 input workspace ref。runtime 不检查 target branch HEAD、不创建 staging branch、不提交 LakeFS commit,也不发布新 ref;因此它也不进入 Perago 为可写 LakeFS 路径设置的 attempt fence。最终 COMPLETED result 按普通 Conductor worker completion 回写。它不是 OS-level readonly mount;如果函数误写了本机 attempt workspace,这些写入会随 cleanup 丢弃。
默认 read_only=False。可写 workspace task 执行后如果没有任何 workspace diff,Perago 不会创建 empty commit;runtime 会按 publish fence 检查 target branch 状态,并保持 output ref 与 target branch 可见 head 一致。
常见拒绝场景#
下面这些 module 会在 import-time validation、perago check 或 worker 启动时失败:
# 参数名错误。
@task(
name="features.build",
owner_email="data@example.com",
workspace=WorkspaceSpec(prefix="/"),
)
def build_features(path: Path, input: BuildFeaturesParams) -> BuildFeaturesOutput:
...
# 业务字段不能展开到函数签名。
@task(
name="features.build",
owner_email="data@example.com",
workspace=WorkspaceSpec(prefix="/"),
)
def build_features(
workspace: Path,
source: str,
) -> BuildFeaturesOutput:
...
# workspace task 必须声明 WorkspaceSpec。
@task(name="features.build", owner_email="data@example.com")
def build_features(
workspace: Path,
params: BuildFeaturesParams,
) -> BuildFeaturesOutput:
...
同一个 Python module 只能定义一个 task worker。perago check app.workers.features_build、perago extract app.workers.features_build --output generated/features.build.json 和 perago start app.workers.features_build -j 4 都以这个 single-task module 为目标。
可运行参考#
仓库测试夹具中的 tests/fixtures/app/workers/features_build.py 是完整 workspace task 参考。它展示了 WorkspaceSpec(prefix="/audio/render")、pre/post guardrails 和 TaskControls(...) 如何放在同一个 task declaration 中。