Guardrails#

Guardrail 是 workspace task 的本地文件形状检查。它检查 Perago 为当前 attempt 准备好的本地 workspace root,确认输入文件已经存在,或确认业务函数产出了承诺的输出文件。

Guardrail 不是 Pydantic 数据校验、不是 Conductor TaskDef schema,也不是跨 repository 的 LakeFS 查询。它只看 WorkspaceSpec(prefix=...) 暴露给业务函数的那棵本地目录树。

最小形状#

from pathlib import Path

from pydantic import BaseModel

from perago import (
    WorkspaceSpec,
    forbid_glob,
    require_dir,
    require_file,
    require_glob,
    task,
)


class BuildFeaturesParams(BaseModel):
    feature_set: str


class BuildFeaturesOutput(BaseModel):
    row_count: int
    feature_count: int


@task(
    name="features.build",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(
        prefix="/audio/render",
        pre=[
            require_dir("raw"),
            require_file("manifest.json"),
            require_glob("raw/**/*.parquet", min_count=1),
        ],
        post=[
            require_dir("features"),
            require_glob("features/**/*.parquet", min_count=1),
            forbid_glob("**/*.tmp"),
        ],
    ),
)
def build_features(
    workspace: Path,
    params: BuildFeaturesParams,
) -> BuildFeaturesOutput:
    output_dir = workspace / "features"
    output_dir.mkdir(exist_ok=True)
    (output_dir / f"{params.feature_set}.parquet").write_text("ok", encoding="utf-8")
    return BuildFeaturesOutput(row_count=100, feature_count=24)

Required/optional/generated 字段边界:

  • required: WorkspaceSpec(pre=...)WorkspaceSpec(post=...) 中的每个 guardrail 都必须使用 workspace-relative path 或 glob pattern。

  • optional: prepost 都可省略;省略表示该阶段没有文件形状检查。

  • generated: guardrail 不生成 Conductor input/output 字段,不写入 TaskDef JSON Schema,也不改变业务函数签名。

  • forbidden: task 作者不能 import 或构造内部 guardrail model;只使用 require_filerequire_dirrequire_globforbid_glob

支持的检查#

require_file(path) 要求 workspace / path 是文件。它适合声明 manifest、单个输入文件或必须产出的标记文件。

require_dir(path) 要求 workspace / path 是目录。它适合声明输入目录或输出目录存在。

require_glob(pattern, min_count=1, max_count=None) 统计 workspace.glob(pattern) 的匹配数量。默认至少需要 1 个匹配;max_countNone 时不限制上限。

forbid_glob(pattern) 要求 workspace.glob(pattern) 没有任何匹配。它适合禁止临时文件、调试文件或不应发布的中间产物。

require_filerequire_dir 不接受 count bound。require_globmin_countmax_count 必须满足 min_count <= max_count

Path 规则#

Guardrail path 是 workspace 逻辑路径,不是进程当前工作目录路径。字符串形式必须使用 / 分隔,并且必须留在 workspace root 内。

合法形状:

from pathlib import Path, PureWindowsPath

require_file("manifest.json")
require_file(Path("raw") / "manifest.json")
require_glob(Path("raw") / "**" / "*.parquet", min_count=1)
require_file(PureWindowsPath("raw") / "windows-authored.json")

这些输入会规范化为 POSIX workspace path:

manifest.json
raw/manifest.json
raw/**/*.parquet
raw/windows-authored.json

非法形状会在 module import、perago checkWorkspaceSpec(...) 构造时失败:

require_file("/raw/manifest.json")     # 不能以 / 开头
require_file("../raw/manifest.json")   # 不能逃出 workspace root
require_file("raw/../manifest.json")   # 不能包含 .. segment
require_file(r"raw\manifest.json")     # 字符串不能使用反斜杠

如果任务声明 WorkspaceSpec(prefix="audio/render"),那么 require_file("manifest.json") 检查的是 LakeFS ref 中的 audio/render/manifest.json。guardrail 不会检查 repository 根目录下的 manifest.json

执行顺序#

Workspace task attempt 的 guardrail 顺序固定:

  1. Perago 下载 Conductor input 中的 workspace ref。

  2. Perago 将 WorkspaceSpec(prefix=...) 映射到本地 attempt workspace。

  3. pre guardrails 检查本地 workspace root。

  4. pre 检查通过后,Perago 调用业务函数。

  5. 业务函数返回值通过 output Pydantic model 校验。

  6. post guardrails 检查同一个本地 workspace root。

  7. post 检查通过后,Perago 才会 stage、发布 workspace output 并报告成功结果。

Pre guardrail 失败表示上游输入 workspace 不满足当前 task 的输入文件契约。Perago 不调用业务函数,不发布 workspace output,并把异常映射为 terminal failure:

{
  "status": "FAILED_WITH_TERMINAL_ERROR",
  "reasonForIncompletion": "pre guardrail require_glob('raw/**/*.parquet') matched 0 files; min_count=1"
}

Post guardrail 失败表示业务函数返回成功,但没有产出承诺的 workspace 文件形状。Perago 不上传或发布这个 attempt 的 workspace,并把异常映射为普通失败,由 Conductor retry 策略决定是否重试:

{
  "status": "FAILED",
  "reasonForIncompletion": "post guardrail require_glob('features/**/*.parquet') matched 0 files; min_count=1"
}

与 TaskDef 的关系#

Guardrail 是 Perago runtime metadata,不是 Conductor TaskDef schema 的一部分。生成的 TaskDef 仍只描述 workspaceparamsresult

这意味着:

  • prepost 不会出现在 inputKeysoutputKeysinputSchemaoutputSchema 中。

  • 修改 guardrail 会改变 Perago runtime 校验行为,但不会给 Conductor 增加新的输入字段。

  • perago check 可以在本地提前发现非法 path、非法 count bound 和非法 workspace 声明。

常见拒绝形状#

# require_file 和 require_dir 不接受数量边界。
require_file("manifest.json", min_count=1)


# min_count 不能大于 max_count。
require_glob("raw/**/*.parquet", min_count=10, max_count=1)


# workspace-free task 不能声明 WorkspaceSpec,因此也不能声明 guardrail。
@task(
    name="features.summarize",
    owner_email="data@example.com",
    workspace=WorkspaceSpec(pre=[require_file("manifest.json")]),
)
def summarize(params: BuildFeaturesParams) -> BuildFeaturesOutput:
    ...

完整正例可参考仓库测试夹具 tests/fixtures/app/workers/features_build.py