perago.run_workspace_task_attempt#
- perago.run_workspace_task_attempt(task, input_data, attempt, workspace_root, *, download_workspace, load_current_attempt, stage_workspace, publish_workspace, cleanup_staging, complete_noop_workspace=None, owner_worker_id=None, execution_id=None, failure_reason_max_length)[源代码]#
Run one workspace task attempt.
This function is the testable execution core used by the Conductor worker runtime. It validates the Conductor input shape, prepares an attempt-local workspace, downloads the declared workspace input, invokes the task body, and then follows the task's workspace access mode. Read-only tasks complete with the input ref. Writable tasks either complete a no-op without staging or check the attempt fence before and after staging, publish the staged workspace, and clean local and staging resources.
- Parameters:
- taskTaskDefinition
Loaded workspace task definition. Workspace-free task definitions are rejected.
- input_datamapping of str to Any
Conductor task input. Workspace attempts must contain exactly
"workspace"and"params".- attemptobject
Conductor task attempt object. It must expose the attributes consumed by
perago.assert_current_attempt_snapshot()and workspace directory helpers.- workspace_rootpathlib.Path
Root directory under which the attempt-local workspace is prepared.
- download_workspacecallable
Callback that materializes
WorkspaceInputinto the local workspace directory.- load_current_attemptcallable
Callback that reloads the latest Conductor attempt state for attempt fence checks.
- stage_workspacecallable
Callback that stages local workspace changes and returns a
StagedWorkspace.- publish_workspacecallable
Callback that publishes a staged workspace and returns the published workspace reference.
- cleanup_stagingcallable
Callback that removes or abandons the staging branch after the attempt completes or fails after staging.
- complete_noop_workspacecallable or None, default=None
Callback that validates or reconciles the target branch for writable tasks whose local workspace did not change. If omitted and a writable no-op is reached, the attempt fails closed.
- owner_worker_idstr or None, default=None
Worker id written into the local workspace owner marker for active owner tracking and supervisor GC.
- execution_idstr or None, default=None
Execution-scoped id used to isolate local attempt workspace and LakeFS staging branch names. A new id is generated when omitted.
- failure_reason_max_lengthint
Maximum number of characters written to
reasonForIncompletionfor failed attempts.
- Returns:
- RuntimeTaskResult
COMPLETEDresult containingworkspaceandresultoutput when every phase succeeds; otherwise a failed result produced byperago.result_for_exception().
- Raises:
- TaskInputError
If
taskis not a workspace task. Exceptions raised after execution enters the attempttryblock are converted toRuntimeTaskResultinstead of being raised.
- 参数:
task (TaskDefinition)
attempt (object)
workspace_root (Path)
download_workspace (Callable[[WorkspaceInput, WorkspaceSpec, Path], None])
stage_workspace (Callable[[Path, WorkspaceInput, WorkspaceSpec, object], StagedWorkspace])
publish_workspace (Callable[[StagedWorkspace, WorkspaceInput, WorkspaceSpec, object], str])
cleanup_staging (Callable[[StagedWorkspace], None])
complete_noop_workspace (Callable[[WorkspaceInput, WorkspaceSpec, object], str] | None)
owner_worker_id (str | None)
execution_id (str | None)
failure_reason_max_length (int)
- 返回类型:
RuntimeTaskResult
参见
invoke_workspace_task_bodyValidate and invoke only the task body phase.
build_workspace_task_outputBuild the completed output payload.
result_for_exceptionConvert execution exceptions to runtime results.
Notes
Cleanup is best effort. A staging cleanup failure is logged and does not replace the result of the completed or failed attempt.
Examples
>>> from pathlib import Path >>> task_def = load_module_task("app.workers.features_build") >>> result = run_workspace_task_attempt( ... task_def, ... {"workspace": {...}, "params": {...}}, ... attempt, ... Path("/tmp/perago/workspaces"), ... download_workspace=lakefs.download_workspace, ... load_current_attempt=conductor.load_current_attempt, ... stage_workspace=lakefs.stage_workspace, ... publish_workspace=lakefs.publish_workspace, ... cleanup_staging=lakefs.cleanup_staging, ... ) >>> result.status 'COMPLETED'