perago.invoke_workspace_free_task#

perago.invoke_workspace_free_task(task, input_data)[源代码]#

Invoke a workspace-free task and validate its output wrapper.

The helper is the body-level execution path for tasks that do not declare a WorkspaceSpec. It accepts only the Conductor params wrapper, calls the task function with the validated params model, and returns the validated output payload.

Parameters:
taskTaskDefinition

Loaded workspace-free task definition.

input_datamapping of str to Any

Conductor task input containing exactly "params".

Returns:
dict of str to Any

Completed workspace-free output wrapper containing only "result".

Raises:
TaskInputError

If task is a workspace task or if the input wrapper shape is invalid.

pydantic.ValidationError

If params or task result validation fails.

参数:
返回类型:

dict[str, Any]

参见

run_workspace_free_task_attempt

Attempt wrapper that converts exceptions to runtime results.

build_workspace_free_task_output

Validate and wrap a raw task result.

Examples

>>> task_def = load_module_task("app.workers.metadata_validate")
>>> invoke_workspace_free_task(
...     task_def,
...     {"params": {"song_id": "song-000123", "min_duration_seconds": 30}},
... )
{'result': {'valid': True, 'reason': None}}