perago.run_workspace_free_task_attempt#
- perago.run_workspace_free_task_attempt(task, input_data, *, failure_reason_max_length)[源代码]#
Run one workspace-free task attempt.
Workspace-free attempts only validate the
paramswrapper, invoke the task callable, validate the output model, and convert failures to the runtime result contract.- Parameters:
- taskTaskDefinition
Loaded workspace-free task definition. Workspace task definitions are rejected.
- input_datamapping of str to Any
Conductor task input. Workspace-free attempts must contain exactly
"params".- failure_reason_max_lengthint
Maximum number of characters written to
reasonForIncompletionfor failed attempts.
- Returns:
- RuntimeTaskResult
COMPLETEDresult containing the validatedresultpayload, or a failed result produced from the raised exception.
- Raises:
- TaskInputError
If
taskis a workspace task. Input and output validation failures after execution enters the attempttryblock are converted toRuntimeTaskResult.
- 参数:
task (TaskDefinition)
failure_reason_max_length (int)
- 返回类型:
RuntimeTaskResult
参见
invoke_workspace_free_taskInvoke and validate a workspace-free task.
build_workspace_free_task_outputBuild the completed output payload.
result_for_exceptionConvert execution exceptions to runtime results.
Examples
>>> task_def = load_module_task("app.workers.metadata_validate") >>> result = run_workspace_free_task_attempt( ... task_def, ... {"params": {"song_id": "song-000123", "min_duration_seconds": 30}}, ... ) >>> result.status 'COMPLETED'