perago.run_workspace_task_attempt#

perago.run_workspace_task_attempt(task, input_data, attempt, workspace_root, *, download_workspace, load_current_attempt, stage_workspace, publish_workspace, cleanup_staging, complete_noop_workspace=None, owner_worker_id=None, execution_id=None, failure_reason_max_length)[源代码]#

Run one workspace task attempt.

This function is the testable execution core used by the Conductor worker runtime. It validates the Conductor input shape, prepares an attempt-local workspace, downloads the declared workspace input, invokes the task body, and then follows the task's workspace access mode. Read-only tasks complete with the input ref. Writable tasks either complete a no-op without staging or check the attempt fence before and after staging, publish the staged workspace, and clean local and staging resources.

Parameters:
taskTaskDefinition

Loaded workspace task definition. Workspace-free task definitions are rejected.

input_datamapping of str to Any

Conductor task input. Workspace attempts must contain exactly "workspace" and "params".

attemptobject

Conductor task attempt object. It must expose the attributes consumed by perago.assert_current_attempt_snapshot() and workspace directory helpers.

workspace_rootpathlib.Path

Root directory under which the attempt-local workspace is prepared.

download_workspacecallable

Callback that materializes WorkspaceInput into the local workspace directory.

load_current_attemptcallable

Callback that reloads the latest Conductor attempt state for attempt fence checks.

stage_workspacecallable

Callback that stages local workspace changes and returns a StagedWorkspace.

publish_workspacecallable

Callback that publishes a staged workspace and returns the published workspace reference.

cleanup_stagingcallable

Callback that removes or abandons the staging branch after the attempt completes or fails after staging.

complete_noop_workspacecallable or None, default=None

Callback that validates or reconciles the target branch for writable tasks whose local workspace did not change. If omitted and a writable no-op is reached, the attempt fails closed.

owner_worker_idstr or None, default=None

Worker id written into the local workspace owner marker for active owner tracking and supervisor GC.

execution_idstr or None, default=None

Execution-scoped id used to isolate local attempt workspace and LakeFS staging branch names. A new id is generated when omitted.

failure_reason_max_lengthint

Maximum number of characters written to reasonForIncompletion for failed attempts.

Returns:
RuntimeTaskResult

COMPLETED result containing workspace and result output when every phase succeeds; otherwise a failed result produced by perago.result_for_exception().

Raises:
TaskInputError

If task is not a workspace task. Exceptions raised after execution enters the attempt try block are converted to RuntimeTaskResult instead of being raised.

参数:
返回类型:

RuntimeTaskResult

参见

invoke_workspace_task_body

Validate and invoke only the task body phase.

build_workspace_task_output

Build the completed output payload.

result_for_exception

Convert execution exceptions to runtime results.

Notes

Cleanup is best effort. A staging cleanup failure is logged and does not replace the result of the completed or failed attempt.

Examples

>>> from pathlib import Path
>>> task_def = load_module_task("app.workers.features_build")
>>> result = run_workspace_task_attempt(
...     task_def,
...     {"workspace": {...}, "params": {...}},
...     attempt,
...     Path("/tmp/perago/workspaces"),
...     download_workspace=lakefs.download_workspace,
...     load_current_attempt=conductor.load_current_attempt,
...     stage_workspace=lakefs.stage_workspace,
...     publish_workspace=lakefs.publish_workspace,
...     cleanup_staging=lakefs.cleanup_staging,
... )
>>> result.status
'COMPLETED'