perago.execution 源代码

from __future__ import annotations

import os
from collections.abc import Callable, Mapping
from dataclasses import dataclass
from hashlib import sha256
from pathlib import Path
from typing import Any
from uuid import uuid4

from loguru import logger
from pydantic import BaseModel

from perago.attempt import assert_current_attempt_snapshot
from perago.errors import (
    GuardrailViolation,
    PostGuardrailViolation,
    PreGuardrailViolation,
    TaskInputError,
)
from perago.guards import check_guardrails
from perago.models import WorkspaceInput, WorkspaceSpec
from perago.result import RuntimeTaskResult, completed_result, result_for_exception
from perago.task import TaskDefinition
from perago.workspace import (
    ATTEMPT_WORKSPACE_MARKER,
    cleanup_attempt_workspace_safely,
    new_workspace_owner,
    prepare_attempt_workspace,
    register_active_workspace_owner,
    unregister_active_workspace_owner,
)


DownloadWorkspace = Callable[[WorkspaceInput, WorkspaceSpec, Path], None]
LoadCurrentAttempt = Callable[[object], object]
StageWorkspace = Callable[[Path, WorkspaceInput, WorkspaceSpec, object], "StagedWorkspace"]
PublishWorkspace = Callable[["StagedWorkspace", WorkspaceInput, WorkspaceSpec, object], str]
CleanupStaging = Callable[["StagedWorkspace"], None]
CompleteNoOpWorkspace = Callable[[WorkspaceInput, WorkspaceSpec, object], str]
WorkspaceSnapshot = tuple[tuple[str, str, str], ...]


[文档] @dataclass(frozen=True) class StagedWorkspace: """ Workspace staging reference returned before publication. ``StagedWorkspace`` is the complete LakeFS staging reference passed from a workspace staging callback to publication and cleanup callbacks. It carries repository, staging branch, and staging commit so cleanup can be driven by the staged reference itself instead of worker-local mutable state. Parameters ---------- repository : str LakeFS repository that owns the staging branch. branch : str LakeFS staging branch that contains the attempted workspace changes. commit : str Commit reference produced by staging the local attempt workspace. Attributes ---------- repository : str LakeFS repository that owns the staging branch. branch : str LakeFS staging branch that contains the attempted workspace changes. commit : str Commit reference produced by staging the local attempt workspace. See Also -------- run_workspace_task_attempt : Runtime flow that consumes staged workspace references. Notes ----- The dataclass is frozen. Cleanup receives the same object even if publish fails or a later attempt fence rejects the attempt. Examples -------- >>> StagedWorkspace(repository="songs", branch="perago/staging/wf/task", commit="abc123").branch 'perago/staging/wf/task' """ repository: str branch: str commit: str
@dataclass(frozen=True) class TaskExecutionContext: attempt: object execution_id: str def __getattr__(self, name: str) -> Any: return getattr(self.attempt, name)
[文档] def run_workspace_task_attempt( task: TaskDefinition, input_data: Mapping[str, Any], attempt: object, workspace_root: Path, *, download_workspace: DownloadWorkspace, load_current_attempt: LoadCurrentAttempt, stage_workspace: StageWorkspace, publish_workspace: PublishWorkspace, cleanup_staging: CleanupStaging, complete_noop_workspace: CompleteNoOpWorkspace | None = None, owner_worker_id: str | None = None, execution_id: str | None = None, failure_reason_max_length: int, ) -> RuntimeTaskResult: """ Run one workspace task attempt. This function is the testable execution core used by the Conductor worker runtime. It validates the Conductor input shape, prepares an attempt-local workspace, downloads the declared workspace input, invokes the task body, and then follows the task's workspace access mode. Read-only tasks complete with the input ref. Writable tasks either complete a no-op without staging or check the attempt fence before and after staging, publish the staged workspace, and clean local and staging resources. Parameters ---------- task : TaskDefinition Loaded workspace task definition. Workspace-free task definitions are rejected. input_data : mapping of str to Any Conductor task input. Workspace attempts must contain exactly ``"workspace"`` and ``"params"``. attempt : object Conductor task attempt object. It must expose the attributes consumed by :func:`perago.assert_current_attempt_snapshot` and workspace directory helpers. workspace_root : pathlib.Path Root directory under which the attempt-local workspace is prepared. download_workspace : callable Callback that materializes ``WorkspaceInput`` into the local workspace directory. load_current_attempt : callable Callback that reloads the latest Conductor attempt state for attempt fence checks. stage_workspace : callable Callback that stages local workspace changes and returns a :class:`StagedWorkspace`. publish_workspace : callable Callback that publishes a staged workspace and returns the published workspace reference. cleanup_staging : callable Callback that removes or abandons the staging branch after the attempt completes or fails after staging. complete_noop_workspace : callable or None, default=None Callback that validates or reconciles the target branch for writable tasks whose local workspace did not change. If omitted and a writable no-op is reached, the attempt fails closed. owner_worker_id : str or None, default=None Worker id written into the local workspace owner marker for active owner tracking and supervisor GC. execution_id : str or None, default=None Execution-scoped id used to isolate local attempt workspace and LakeFS staging branch names. A new id is generated when omitted. failure_reason_max_length : int Maximum number of characters written to ``reasonForIncompletion`` for failed attempts. Returns ------- RuntimeTaskResult ``COMPLETED`` result containing ``workspace`` and ``result`` output when every phase succeeds; otherwise a failed result produced by :func:`perago.result_for_exception`. Raises ------ TaskInputError If ``task`` is not a workspace task. Exceptions raised after execution enters the attempt ``try`` block are converted to ``RuntimeTaskResult`` instead of being raised. See Also -------- invoke_workspace_task_body : Validate and invoke only the task body phase. build_workspace_task_output : Build the completed output payload. result_for_exception : Convert execution exceptions to runtime results. Notes ----- Cleanup is best effort. A staging cleanup failure is logged and does not replace the result of the completed or failed attempt. Examples -------- >>> from pathlib import Path >>> task_def = load_module_task("app.workers.features_build") >>> result = run_workspace_task_attempt( # doctest: +SKIP ... task_def, ... {"workspace": {...}, "params": {...}}, ... attempt, ... Path("/tmp/perago/workspaces"), ... download_workspace=lakefs.download_workspace, ... load_current_attempt=conductor.load_current_attempt, ... stage_workspace=lakefs.stage_workspace, ... publish_workspace=lakefs.publish_workspace, ... cleanup_staging=lakefs.cleanup_staging, ... ) >>> result.status # doctest: +SKIP 'COMPLETED' """ if not task.has_workspace: raise TaskInputError("run_workspace_task_attempt only supports workspace tasks") workspace = task.workspace if workspace is None: raise TaskInputError("workspace task definition is missing WorkspaceSpec") workspace_dir: Path | None = None staged: StagedWorkspace | None = None execution = TaskExecutionContext( attempt=attempt, execution_id=execution_id or getattr(attempt, "execution_id", uuid4().hex), ) owner = new_workspace_owner(owner_worker_id or os.environ.get("PERAGO_WORKER_ID", f"pid-{os.getpid()}")) register_active_workspace_owner(owner) try: if set(input_data) != {"workspace", "params"}: raise TaskInputError("workspace task input must contain only workspace and params") workspace_input = WorkspaceInput.model_validate(input_data["workspace"]) workspace_dir = prepare_attempt_workspace(workspace_root, execution, owner) download_workspace(workspace_input, workspace, workspace_dir) initial_snapshot = None if workspace.read_only else _snapshot_workspace(workspace_dir) body_output = invoke_workspace_task_body(task, input_data, workspace_dir) if workspace.read_only: # Read-only completion has no LakeFS side effect to fence. The final # TaskResult update follows Conductor's normal task completion # contract; Perago's attempt fence is reserved for writable # workspace publication and no-op branch relocation. output_workspace = workspace_input.published_output(workspace_input.ref) return completed_result( { "workspace": output_workspace.model_dump(mode="json"), **body_output, } ) workspace_changed = _snapshot_workspace(workspace_dir) != initial_snapshot assert_current_attempt_snapshot(attempt, load_current_attempt(attempt)) if not workspace_changed: published_ref = _complete_noop_workspace( complete_noop_workspace, workspace_input, workspace, execution, ) output_workspace = workspace_input.published_output(published_ref) return completed_result( { "workspace": output_workspace.model_dump(mode="json"), **body_output, } ) staged = stage_workspace(workspace_dir, workspace_input, workspace, execution) assert_current_attempt_snapshot(attempt, load_current_attempt(attempt)) published_ref = publish_workspace(staged, workspace_input, workspace, execution) output_workspace = workspace_input.published_output(published_ref) return completed_result( { "workspace": output_workspace.model_dump(mode="json"), **body_output, } ) except Exception as exc: return result_for_exception(exc, max_length=failure_reason_max_length) finally: if staged is not None: _cleanup_staging_safely(staged, cleanup_staging) if workspace_dir is not None: cleanup_attempt_workspace_safely(workspace_dir, attempt) unregister_active_workspace_owner(owner)
[文档] def run_workspace_free_task_attempt( task: TaskDefinition, input_data: Mapping[str, Any], *, failure_reason_max_length: int, ) -> RuntimeTaskResult: """ Run one workspace-free task attempt. Workspace-free attempts only validate the ``params`` wrapper, invoke the task callable, validate the output model, and convert failures to the runtime result contract. Parameters ---------- task : TaskDefinition Loaded workspace-free task definition. Workspace task definitions are rejected. input_data : mapping of str to Any Conductor task input. Workspace-free attempts must contain exactly ``"params"``. failure_reason_max_length : int Maximum number of characters written to ``reasonForIncompletion`` for failed attempts. Returns ------- RuntimeTaskResult ``COMPLETED`` result containing the validated ``result`` payload, or a failed result produced from the raised exception. Raises ------ TaskInputError If ``task`` is a workspace task. Input and output validation failures after execution enters the attempt ``try`` block are converted to ``RuntimeTaskResult``. See Also -------- invoke_workspace_free_task : Invoke and validate a workspace-free task. build_workspace_free_task_output : Build the completed output payload. result_for_exception : Convert execution exceptions to runtime results. Examples -------- >>> task_def = load_module_task("app.workers.metadata_validate") >>> result = run_workspace_free_task_attempt( ... task_def, ... {"params": {"song_id": "song-000123", "min_duration_seconds": 30}}, ... ) >>> result.status 'COMPLETED' """ if task.has_workspace: raise TaskInputError("run_workspace_free_task_attempt only supports workspace-free tasks") try: return completed_result(invoke_workspace_free_task(task, input_data)) except Exception as exc: return result_for_exception(exc, max_length=failure_reason_max_length)
[文档] def invoke_workspace_task_body( task: TaskDefinition, input_data: Mapping[str, Any], workspace_dir: Path, ) -> dict[str, Any]: """ Invoke a workspace task body against a prepared local workspace. The helper performs the task-body portion of a workspace attempt: it checks the Conductor input wrapper, validates ``WorkspaceInput`` and params, applies pre guardrails, calls the task function, validates the declared output model, and applies post guardrails. Parameters ---------- task : TaskDefinition Loaded workspace task definition. input_data : mapping of str to Any Conductor task input containing exactly ``"workspace"`` and ``"params"``. workspace_dir : pathlib.Path Attempt-local workspace directory already populated from the workspace input. Returns ------- dict of str to Any Body output wrapper containing only the validated ``"result"`` field. Raises ------ TaskInputError If ``task`` is not a workspace task or if the input wrapper shape is invalid. PreGuardrailViolation If a pre-execution workspace guardrail fails. PostGuardrailViolation If a post-execution workspace guardrail fails. pydantic.ValidationError If workspace input, params, or task result validation fails. See Also -------- run_workspace_task_attempt : Full workspace attempt execution flow. build_workspace_task_output : Add a published workspace ref to a validated result. check_guardrails : Evaluate workspace guardrail declarations. Examples -------- >>> from pathlib import Path >>> task_def = load_module_task("app.workers.features_build") >>> output = invoke_workspace_task_body( # doctest: +SKIP ... task_def, ... {"workspace": {...}, "params": {"feature_set": "default", "min_rows": 100}}, ... Path("/tmp/perago/workspaces/attempt"), ... ) >>> sorted(output) ['result'] """ if not task.has_workspace: raise TaskInputError("invoke_workspace_task_body only supports workspace tasks") if set(input_data) != {"workspace", "params"}: raise TaskInputError("workspace task input must contain only workspace and params") WorkspaceInput.model_validate(input_data["workspace"]) params = task.params_model.model_validate(input_data["params"], extra="forbid") workspace = task.workspace if workspace is None: raise TaskInputError("workspace task definition is missing WorkspaceSpec") _check_phase_guardrails(workspace_dir, workspace.pre, "pre", PreGuardrailViolation) raw_result = task.fn(workspace_dir, params) result = _validate_result(task, raw_result) _check_phase_guardrails(workspace_dir, workspace.post, "post", PostGuardrailViolation) return {"result": result.model_dump(mode="json")}
[文档] def invoke_workspace_free_task(task: TaskDefinition, input_data: Mapping[str, Any]) -> dict[str, Any]: """ Invoke a workspace-free task and validate its output wrapper. The helper is the body-level execution path for tasks that do not declare a ``WorkspaceSpec``. It accepts only the Conductor ``params`` wrapper, calls the task function with the validated params model, and returns the validated output payload. Parameters ---------- task : TaskDefinition Loaded workspace-free task definition. input_data : mapping of str to Any Conductor task input containing exactly ``"params"``. Returns ------- dict of str to Any Completed workspace-free output wrapper containing only ``"result"``. Raises ------ TaskInputError If ``task`` is a workspace task or if the input wrapper shape is invalid. pydantic.ValidationError If params or task result validation fails. See Also -------- run_workspace_free_task_attempt : Attempt wrapper that converts exceptions to runtime results. build_workspace_free_task_output : Validate and wrap a raw task result. Examples -------- >>> task_def = load_module_task("app.workers.metadata_validate") >>> invoke_workspace_free_task( ... task_def, ... {"params": {"song_id": "song-000123", "min_duration_seconds": 30}}, ... ) {'result': {'valid': True, 'reason': None}} """ if task.has_workspace: raise TaskInputError("invoke_workspace_free_task only supports workspace-free tasks") if set(input_data) != {"params"}: raise TaskInputError("workspace-free task input must contain only params") params = task.params_model.model_validate(input_data["params"], extra="forbid") raw_result = task.fn(params) return build_workspace_free_task_output(task, raw_result)
[文档] def build_workspace_free_task_output(task: TaskDefinition, raw_result: object) -> dict[str, Any]: """ Validate and wrap a workspace-free task result. The helper applies the task output model outside the invocation path so tests and runtime integrations can reuse the exact same output contract. Parameters ---------- task : TaskDefinition Loaded workspace-free task definition whose output model validates the result. raw_result : object Value returned by the task callable. Pydantic model instances are dumped before being revalidated with ``extra="forbid"``; mappings and other supported values are validated directly by the output model. Returns ------- dict of str to Any Output wrapper containing only ``"result"``. Raises ------ TaskInputError If ``task`` is a workspace task. pydantic.ValidationError If ``raw_result`` does not satisfy the task output model or contains extra fields. See Also -------- invoke_workspace_free_task : Invoke a workspace-free task and build this output wrapper. build_workspace_task_output : Build the corresponding workspace task output wrapper. Examples -------- >>> task_def = load_module_task("app.workers.metadata_validate") >>> build_workspace_free_task_output(task_def, {"valid": False, "reason": "missing"}) {'result': {'valid': False, 'reason': 'missing'}} """ if task.has_workspace: raise TaskInputError("workspace-free output can only be built for workspace-free tasks") result = _validate_result(task, raw_result) return {"result": result.model_dump(mode="json")}
[文档] def build_workspace_task_output( task: TaskDefinition, input_workspace: WorkspaceInput | Mapping[str, Any], published_ref: str, raw_result: object, ) -> dict[str, Any]: """ Validate and wrap a completed workspace task output. ``build_workspace_task_output`` combines a published workspace reference with a task body result. It is useful for tests and runtime integrations that already performed download, body execution, staging, and publication. Parameters ---------- task : TaskDefinition Loaded workspace task definition whose output model validates ``raw_result``. input_workspace : WorkspaceInput or mapping of str to Any Original workspace input used for the attempt. The repository, branch, and ref type are preserved in the output. published_ref : str Reference returned by workspace publication. raw_result : object Value returned by the task callable. It is validated by the task output model with extra fields forbidden. Returns ------- dict of str to Any Output wrapper containing ``"workspace"`` with the published ref and ``"result"`` with the validated task output. Raises ------ TaskInputError If ``task`` is not a workspace task. pydantic.ValidationError If ``input_workspace`` or ``raw_result`` is invalid. See Also -------- run_workspace_task_attempt : Full workspace attempt execution flow. WorkspaceInput.published_output : Derive the published workspace output model. build_workspace_free_task_output : Build the workspace-free output wrapper. Examples -------- >>> task_def = load_module_task("app.workers.features_build") >>> output = build_workspace_task_output( # doctest: +SKIP ... task_def, ... {"repository": "song-000123", "branch": "main", "ref_type": "commit", "ref": "..."}, ... "published-ref", ... {"row_count": 100, "feature_count": 24}, ... ) >>> sorted(output) ['result', 'workspace'] """ if not task.has_workspace: raise TaskInputError("workspace output can only be built for workspace tasks") workspace_input = WorkspaceInput.model_validate(input_workspace) workspace_output = workspace_input.published_output(published_ref) result = _validate_result(task, raw_result) return { "workspace": workspace_output.model_dump(mode="json"), "result": result.model_dump(mode="json"), }
def _check_phase_guardrails( workspace_dir: Path, guardrails: list[Any], phase: str, error: type[GuardrailViolation], ) -> None: try: check_guardrails(workspace_dir, guardrails, phase) except GuardrailViolation as exc: raise error(str(exc)) from exc def _cleanup_staging_safely(staged: StagedWorkspace, cleanup_staging: CleanupStaging) -> None: try: cleanup_staging(staged) except Exception as exc: # noqa: BLE001 logger.bind( staging_branch=staged.branch, staging_commit=staged.commit, ).opt(exception=exc).error("failed to clean staging workspace") def _complete_noop_workspace( complete_noop_workspace: CompleteNoOpWorkspace | None, workspace_input: WorkspaceInput, workspace: WorkspaceSpec, attempt: object, ) -> str: if complete_noop_workspace is None: raise TaskInputError( "complete_noop_workspace callback is required for writable no-op workspace completion" ) return complete_noop_workspace(workspace_input, workspace, attempt) def _snapshot_workspace(workspace_dir: Path) -> WorkspaceSnapshot: entries: list[tuple[str, str, str]] = [] for local_path in sorted(workspace_dir.rglob("*")): relative_path = local_path.relative_to(workspace_dir) if relative_path.name == ATTEMPT_WORKSPACE_MARKER: continue workspace_path = relative_path.as_posix() if local_path.is_symlink(): entries.append((workspace_path, "symlink", os.readlink(local_path))) continue if not local_path.is_file(): continue entries.append((workspace_path, "file", _file_digest(local_path))) return tuple(entries) def _file_digest(path: Path) -> str: digest = sha256() with path.open("rb") as file: for chunk in iter(lambda: file.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def _validate_result(task: TaskDefinition, raw_result: object) -> BaseModel: if isinstance(raw_result, BaseModel): return task.output_model.model_validate(raw_result.model_dump(), extra="forbid") return task.output_model.model_validate(raw_result, extra="forbid")