perago.invoke_workspace_free_task#
- perago.invoke_workspace_free_task(task, input_data)[源代码]#
Invoke a workspace-free task and validate its output wrapper.
The helper is the body-level execution path for tasks that do not declare a
WorkspaceSpec. It accepts only the Conductorparamswrapper, calls the task function with the validated params model, and returns the validated output payload.- Parameters:
- taskTaskDefinition
Loaded workspace-free task definition.
- input_datamapping of str to Any
Conductor task input containing exactly
"params".
- Returns:
- dict of str to Any
Completed workspace-free output wrapper containing only
"result".
- Raises:
- TaskInputError
If
taskis a workspace task or if the input wrapper shape is invalid.- pydantic.ValidationError
If params or task result validation fails.
- 参数:
task (TaskDefinition)
- 返回类型:
参见
run_workspace_free_task_attemptAttempt wrapper that converts exceptions to runtime results.
build_workspace_free_task_outputValidate and wrap a raw task result.
Examples
>>> task_def = load_module_task("app.workers.metadata_validate") >>> invoke_workspace_free_task( ... task_def, ... {"params": {"song_id": "song-000123", "min_duration_seconds": 30}}, ... ) {'result': {'valid': True, 'reason': None}}