from __future__ import annotations
import importlib
import inspect
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, get_type_hints
from pydantic import BaseModel, ValidationError
from perago.errors import TaskDefinitionError
from perago.models import TaskControls, WorkspaceSpec
_REGISTERED_TASKS: dict[str, list["TaskDefinition"]] = {}
[文档]
@dataclass(frozen=True)
class TaskDefinition:
"""
Validated contract extracted from a single Perago task module.
A task definition is created by the :func:`task` decorator after Perago
validates the function signature, Pydantic contract models, workspace
declaration, and runtime controls. Task authors normally receive this object
through :func:`load_module_task` or from the function's ``__perago_task__``
attribute; they do not construct it directly.
Parameters
----------
name : str
Conductor task definition name generated by Perago.
owner_email : str
Owner email written to the generated Conductor TaskDef.
fn : collections.abc.Callable
Synchronous Python function that implements the task body.
params_model : type[pydantic.BaseModel]
Pydantic model used to validate the ``params`` input object.
output_model : type[pydantic.BaseModel]
Pydantic model used to validate the task result object.
description : str or None, default=None
Optional human-readable description copied into the TaskDef.
workspace : WorkspaceSpec or None, default=None
Workspace declaration for workspace tasks, or ``None`` for
workspace-free tasks.
controls : TaskControls, optional
Retry, timeout, execution limit, and publish budget controls.
Attributes
----------
has_workspace : bool
Whether this task expects an injected attempt-local workspace path.
See Also
--------
task : Declare and validate a task definition.
load_module_task : Import a module and retrieve its single task definition.
Notes
-----
Perago supports one task definition per Python module. Modules that register
zero or multiple tasks are rejected by :func:`load_module_task`.
Examples
--------
>>> definition = load_module_task("app.workers.features_build")
>>> definition.name
'features.build'
"""
name: str
owner_email: str
fn: Callable[..., BaseModel]
params_model: type[BaseModel]
output_model: type[BaseModel]
description: str | None = None
workspace: WorkspaceSpec | None = None
controls: TaskControls = field(default_factory=TaskControls)
@property
def has_workspace(self) -> bool:
"""Whether this task expects an injected attempt-local workspace path."""
return self.workspace is not None
[文档]
def task(
*,
name: str,
owner_email: str,
description: str | None = None,
workspace: WorkspaceSpec | None = None,
controls: TaskControls | None = None,
**unsupported: object,
) -> Callable[[Callable[..., BaseModel]], Callable[..., BaseModel]]:
"""
Declare the single Perago task exported by a Python module.
The decorator validates task metadata and the decorated function's type
hints at import time. A workspace task must have the exact signature
``(workspace: pathlib.Path, params: ParamsModel) -> OutputModel`` and must
pass ``workspace=WorkspaceSpec(...)``. A workspace-free task must have the
exact signature ``(params: ParamsModel) -> OutputModel`` and must not pass a
workspace declaration.
Parameters
----------
name : str
Required Conductor task name. Path separators are rejected.
owner_email : str
Required owner email copied into the generated TaskDef.
description : str or None, default=None
Optional TaskDef description.
workspace : WorkspaceSpec or None, default=None
Workspace declaration for workspace tasks. Leave as ``None`` for
workspace-free tasks.
controls : TaskControls or None, default=None
Optional retry, timeout, execution limit, and publish budget controls.
``None`` uses default controls.
**unsupported : object
Any extra decorator fields. Perago rejects them so that the task
function signature remains the only business contract source.
Returns
-------
collections.abc.Callable
Decorator that returns the original task function after attaching a
validated ``__perago_task__`` definition.
Raises
------
TaskDefinitionError
If metadata, controls, workspace declaration, function shape, or type
hints violate the Perago task contract.
See Also
--------
TaskDefinition : Validated task contract created by this decorator.
load_module_task : Load the decorated task from a module target.
WorkspaceSpec : Declare the workspace prefix and guardrails for workspace tasks.
Examples
--------
>>> from pathlib import Path
>>> from pydantic import BaseModel
>>> class Params(BaseModel):
... source: str
>>> class Output(BaseModel):
... rows: int
>>> @task(name="features.build", owner_email="data@example.com", workspace=WorkspaceSpec())
... def build_features(workspace: Path, params: Params) -> Output:
... return Output(rows=0)
"""
if unsupported:
names = ", ".join(sorted(unsupported))
raise TaskDefinitionError(f"unsupported task decorator fields: {names}")
def decorate(fn: Callable[..., BaseModel]) -> Callable[..., BaseModel]:
try:
definition = _build_task_definition(
fn=fn,
name=name,
owner_email=owner_email,
description=description,
workspace=workspace,
controls=controls if controls is not None else TaskControls(),
)
except ValidationError as exc:
raise TaskDefinitionError(str(exc)) from exc
_REGISTERED_TASKS.setdefault(fn.__module__, []).append(definition)
setattr(fn, "__perago_task__", definition)
return fn
return decorate
[文档]
def load_module_task(module_target: str) -> TaskDefinition:
"""
Import a module target and return its single Perago task definition.
``load_module_task`` is the shared loader used by ``perago check``,
``perago extract``, and worker startup. The target must be a Python import
path such as ``"app.workers.features_build"``. File paths, object paths, and
modules that register zero or multiple tasks are rejected.
Parameters
----------
module_target : str
Python module import path that should contain exactly one
:func:`task`-decorated function.
Returns
-------
TaskDefinition
Validated task definition registered by the imported module.
Raises
------
TaskDefinitionError
If ``module_target`` is not a module import path, cannot provide exactly
one Perago task, or imports a task with an invalid contract.
See Also
--------
task : Decorator that registers the task definition on import.
TaskDefinition : Validated contract returned by this loader.
Examples
--------
>>> definition = load_module_task("app.workers.features_build")
>>> definition.name
'features.build'
"""
_validate_module_target(module_target)
module = importlib.import_module(module_target)
tasks = [
task_def
for task_def in _REGISTERED_TASKS.get(module.__name__, [])
if task_def.fn.__module__ == module.__name__
]
unique_tasks = []
seen_ids: set[int] = set()
for task_def in tasks:
task_id = id(task_def)
if task_id not in seen_ids:
unique_tasks.append(task_def)
seen_ids.add(task_id)
if not unique_tasks:
raise TaskDefinitionError(f"{module_target} does not declare a Perago task")
if len(unique_tasks) > 1:
raise TaskDefinitionError(f"{module_target} declares more than one Perago task")
return unique_tasks[0]
def _build_task_definition(
*,
fn: Callable[..., BaseModel],
name: str,
owner_email: str,
description: str | None,
workspace: WorkspaceSpec | None,
controls: TaskControls,
) -> TaskDefinition:
_validate_required_metadata(name=name, owner_email=owner_email)
if workspace is not None and not isinstance(workspace, WorkspaceSpec):
raise TaskDefinitionError("workspace must be a WorkspaceSpec")
if not isinstance(controls, TaskControls):
raise TaskDefinitionError("controls must be a TaskControls")
if workspace is None and controls.publish_budget is not None:
raise TaskDefinitionError("publish_budget requires workspace=WorkspaceSpec(...)")
if inspect.iscoroutinefunction(fn):
raise TaskDefinitionError("task function must be a synchronous function")
signature = inspect.signature(fn)
parameters = list(signature.parameters.values())
if any(parameter.kind is not inspect.Parameter.POSITIONAL_OR_KEYWORD for parameter in parameters):
raise TaskDefinitionError("task function must not use *args, **kwargs, or keyword-only fields")
if any(parameter.default is not inspect.Parameter.empty for parameter in parameters):
raise TaskDefinitionError("task function parameters must not declare defaults")
try:
hints = get_type_hints(fn)
except Exception as exc: # noqa: BLE001
raise TaskDefinitionError(f"failed to resolve task type hints: {exc}") from exc
if len(parameters) == 2:
_validate_workspace_signature(parameters, hints, workspace)
elif len(parameters) == 1:
_validate_workspace_free_signature(parameters, workspace)
else:
raise TaskDefinitionError(
"task function must be exactly (workspace: Path, params: ParamsModel) or (params: ParamsModel)"
)
params_model = hints.get("params")
output_model = hints.get("return")
if not _is_pydantic_model(params_model):
raise TaskDefinitionError("params must be annotated as a Pydantic BaseModel subclass")
if not _is_pydantic_model(output_model):
raise TaskDefinitionError("return value must be annotated as a Pydantic BaseModel subclass")
return TaskDefinition(
name=name,
owner_email=owner_email,
description=description,
workspace=workspace,
controls=controls,
fn=fn,
params_model=params_model,
output_model=output_model,
)
def _validate_required_metadata(*, name: str, owner_email: str) -> None:
if not name.strip():
raise TaskDefinitionError("task name is required")
if "/" in name or "\\" in name:
raise TaskDefinitionError("task name must not contain path separators")
if not owner_email.strip():
raise TaskDefinitionError("owner_email is required")
def _validate_workspace_signature(
parameters: list[inspect.Parameter],
hints: dict[str, Any],
workspace: WorkspaceSpec | None,
) -> None:
if parameters[0].name != "workspace" or parameters[1].name != "params":
raise TaskDefinitionError("workspace task parameters must be named workspace and params")
if hints.get("workspace") is not Path:
raise TaskDefinitionError("workspace must be annotated as pathlib.Path")
if workspace is None:
raise TaskDefinitionError("workspace task functions require workspace=WorkspaceSpec(...)")
def _validate_workspace_free_signature(
parameters: list[inspect.Parameter],
workspace: WorkspaceSpec | None,
) -> None:
if parameters[0].name != "params":
raise TaskDefinitionError("workspace-free task parameter must be named params")
if workspace is not None:
raise TaskDefinitionError("workspace-free task functions must not declare workspace=WorkspaceSpec(...)")
def _is_pydantic_model(value: object) -> bool:
return inspect.isclass(value) and issubclass(value, BaseModel)
def _validate_module_target(module_target: str) -> None:
if (
"/" in module_target
or "\\" in module_target
or ":" in module_target
or module_target.endswith(".py")
or not module_target
or any(not segment.isidentifier() for segment in module_target.split("."))
):
raise TaskDefinitionError("module target must be a Python import path, not a file path or object path")