perago.run_workspace_free_task_attempt#

perago.run_workspace_free_task_attempt(task, input_data, *, failure_reason_max_length)[源代码]#

Run one workspace-free task attempt.

Workspace-free attempts only validate the params wrapper, invoke the task callable, validate the output model, and convert failures to the runtime result contract.

Parameters:
taskTaskDefinition

Loaded workspace-free task definition. Workspace task definitions are rejected.

input_datamapping of str to Any

Conductor task input. Workspace-free attempts must contain exactly "params".

failure_reason_max_lengthint

Maximum number of characters written to reasonForIncompletion for failed attempts.

Returns:
RuntimeTaskResult

COMPLETED result containing the validated result payload, or a failed result produced from the raised exception.

Raises:
TaskInputError

If task is a workspace task. Input and output validation failures after execution enters the attempt try block are converted to RuntimeTaskResult.

参数:
返回类型:

RuntimeTaskResult

参见

invoke_workspace_free_task

Invoke and validate a workspace-free task.

build_workspace_free_task_output

Build the completed output payload.

result_for_exception

Convert execution exceptions to runtime results.

Examples

>>> task_def = load_module_task("app.workers.metadata_validate")
>>> result = run_workspace_free_task_attempt(
...     task_def,
...     {"params": {"song_id": "song-000123", "min_duration_seconds": 30}},
... )
>>> result.status
'COMPLETED'