perago.task 源代码

from __future__ import annotations

import importlib
import inspect
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, get_type_hints

from pydantic import BaseModel, ValidationError

from perago.errors import TaskDefinitionError
from perago.models import TaskControls, WorkspaceSpec


_REGISTERED_TASKS: dict[str, list["TaskDefinition"]] = {}


[文档] @dataclass(frozen=True) class TaskDefinition: """ Validated contract extracted from a single Perago task module. A task definition is created by the :func:`task` decorator after Perago validates the function signature, Pydantic contract models, workspace declaration, and runtime controls. Task authors normally receive this object through :func:`load_module_task` or from the function's ``__perago_task__`` attribute; they do not construct it directly. Parameters ---------- name : str Conductor task definition name generated by Perago. owner_email : str Owner email written to the generated Conductor TaskDef. fn : collections.abc.Callable Synchronous Python function that implements the task body. params_model : type[pydantic.BaseModel] Pydantic model used to validate the ``params`` input object. output_model : type[pydantic.BaseModel] Pydantic model used to validate the task result object. description : str or None, default=None Optional human-readable description copied into the TaskDef. workspace : WorkspaceSpec or None, default=None Workspace declaration for workspace tasks, or ``None`` for workspace-free tasks. controls : TaskControls, optional Retry, timeout, execution limit, and publish budget controls. Attributes ---------- has_workspace : bool Whether this task expects an injected attempt-local workspace path. See Also -------- task : Declare and validate a task definition. load_module_task : Import a module and retrieve its single task definition. Notes ----- Perago supports one task definition per Python module. Modules that register zero or multiple tasks are rejected by :func:`load_module_task`. Examples -------- >>> definition = load_module_task("app.workers.features_build") >>> definition.name 'features.build' """ name: str owner_email: str fn: Callable[..., BaseModel] params_model: type[BaseModel] output_model: type[BaseModel] description: str | None = None workspace: WorkspaceSpec | None = None controls: TaskControls = field(default_factory=TaskControls) @property def has_workspace(self) -> bool: """Whether this task expects an injected attempt-local workspace path.""" return self.workspace is not None
[文档] def task( *, name: str, owner_email: str, description: str | None = None, workspace: WorkspaceSpec | None = None, controls: TaskControls | None = None, **unsupported: object, ) -> Callable[[Callable[..., BaseModel]], Callable[..., BaseModel]]: """ Declare the single Perago task exported by a Python module. The decorator validates task metadata and the decorated function's type hints at import time. A workspace task must have the exact signature ``(workspace: pathlib.Path, params: ParamsModel) -> OutputModel`` and must pass ``workspace=WorkspaceSpec(...)``. A workspace-free task must have the exact signature ``(params: ParamsModel) -> OutputModel`` and must not pass a workspace declaration. Parameters ---------- name : str Required Conductor task name. Path separators are rejected. owner_email : str Required owner email copied into the generated TaskDef. description : str or None, default=None Optional TaskDef description. workspace : WorkspaceSpec or None, default=None Workspace declaration for workspace tasks. Leave as ``None`` for workspace-free tasks. controls : TaskControls or None, default=None Optional retry, timeout, execution limit, and publish budget controls. ``None`` uses default controls. **unsupported : object Any extra decorator fields. Perago rejects them so that the task function signature remains the only business contract source. Returns ------- collections.abc.Callable Decorator that returns the original task function after attaching a validated ``__perago_task__`` definition. Raises ------ TaskDefinitionError If metadata, controls, workspace declaration, function shape, or type hints violate the Perago task contract. See Also -------- TaskDefinition : Validated task contract created by this decorator. load_module_task : Load the decorated task from a module target. WorkspaceSpec : Declare the workspace prefix and guardrails for workspace tasks. Examples -------- >>> from pathlib import Path >>> from pydantic import BaseModel >>> class Params(BaseModel): ... source: str >>> class Output(BaseModel): ... rows: int >>> @task(name="features.build", owner_email="data@example.com", workspace=WorkspaceSpec()) ... def build_features(workspace: Path, params: Params) -> Output: ... return Output(rows=0) """ if unsupported: names = ", ".join(sorted(unsupported)) raise TaskDefinitionError(f"unsupported task decorator fields: {names}") def decorate(fn: Callable[..., BaseModel]) -> Callable[..., BaseModel]: try: definition = _build_task_definition( fn=fn, name=name, owner_email=owner_email, description=description, workspace=workspace, controls=controls if controls is not None else TaskControls(), ) except ValidationError as exc: raise TaskDefinitionError(str(exc)) from exc _REGISTERED_TASKS.setdefault(fn.__module__, []).append(definition) setattr(fn, "__perago_task__", definition) return fn return decorate
[文档] def load_module_task(module_target: str) -> TaskDefinition: """ Import a module target and return its single Perago task definition. ``load_module_task`` is the shared loader used by ``perago check``, ``perago extract``, and worker startup. The target must be a Python import path such as ``"app.workers.features_build"``. File paths, object paths, and modules that register zero or multiple tasks are rejected. Parameters ---------- module_target : str Python module import path that should contain exactly one :func:`task`-decorated function. Returns ------- TaskDefinition Validated task definition registered by the imported module. Raises ------ TaskDefinitionError If ``module_target`` is not a module import path, cannot provide exactly one Perago task, or imports a task with an invalid contract. See Also -------- task : Decorator that registers the task definition on import. TaskDefinition : Validated contract returned by this loader. Examples -------- >>> definition = load_module_task("app.workers.features_build") >>> definition.name 'features.build' """ _validate_module_target(module_target) module = importlib.import_module(module_target) tasks = [ task_def for task_def in _REGISTERED_TASKS.get(module.__name__, []) if task_def.fn.__module__ == module.__name__ ] unique_tasks = [] seen_ids: set[int] = set() for task_def in tasks: task_id = id(task_def) if task_id not in seen_ids: unique_tasks.append(task_def) seen_ids.add(task_id) if not unique_tasks: raise TaskDefinitionError(f"{module_target} does not declare a Perago task") if len(unique_tasks) > 1: raise TaskDefinitionError(f"{module_target} declares more than one Perago task") return unique_tasks[0]
def _build_task_definition( *, fn: Callable[..., BaseModel], name: str, owner_email: str, description: str | None, workspace: WorkspaceSpec | None, controls: TaskControls, ) -> TaskDefinition: _validate_required_metadata(name=name, owner_email=owner_email) if workspace is not None and not isinstance(workspace, WorkspaceSpec): raise TaskDefinitionError("workspace must be a WorkspaceSpec") if not isinstance(controls, TaskControls): raise TaskDefinitionError("controls must be a TaskControls") if workspace is None and controls.publish_budget is not None: raise TaskDefinitionError("publish_budget requires workspace=WorkspaceSpec(...)") if inspect.iscoroutinefunction(fn): raise TaskDefinitionError("task function must be a synchronous function") signature = inspect.signature(fn) parameters = list(signature.parameters.values()) if any(parameter.kind is not inspect.Parameter.POSITIONAL_OR_KEYWORD for parameter in parameters): raise TaskDefinitionError("task function must not use *args, **kwargs, or keyword-only fields") if any(parameter.default is not inspect.Parameter.empty for parameter in parameters): raise TaskDefinitionError("task function parameters must not declare defaults") try: hints = get_type_hints(fn) except Exception as exc: # noqa: BLE001 raise TaskDefinitionError(f"failed to resolve task type hints: {exc}") from exc if len(parameters) == 2: _validate_workspace_signature(parameters, hints, workspace) elif len(parameters) == 1: _validate_workspace_free_signature(parameters, workspace) else: raise TaskDefinitionError( "task function must be exactly (workspace: Path, params: ParamsModel) or (params: ParamsModel)" ) params_model = hints.get("params") output_model = hints.get("return") if not _is_pydantic_model(params_model): raise TaskDefinitionError("params must be annotated as a Pydantic BaseModel subclass") if not _is_pydantic_model(output_model): raise TaskDefinitionError("return value must be annotated as a Pydantic BaseModel subclass") return TaskDefinition( name=name, owner_email=owner_email, description=description, workspace=workspace, controls=controls, fn=fn, params_model=params_model, output_model=output_model, ) def _validate_required_metadata(*, name: str, owner_email: str) -> None: if not name.strip(): raise TaskDefinitionError("task name is required") if "/" in name or "\\" in name: raise TaskDefinitionError("task name must not contain path separators") if not owner_email.strip(): raise TaskDefinitionError("owner_email is required") def _validate_workspace_signature( parameters: list[inspect.Parameter], hints: dict[str, Any], workspace: WorkspaceSpec | None, ) -> None: if parameters[0].name != "workspace" or parameters[1].name != "params": raise TaskDefinitionError("workspace task parameters must be named workspace and params") if hints.get("workspace") is not Path: raise TaskDefinitionError("workspace must be annotated as pathlib.Path") if workspace is None: raise TaskDefinitionError("workspace task functions require workspace=WorkspaceSpec(...)") def _validate_workspace_free_signature( parameters: list[inspect.Parameter], workspace: WorkspaceSpec | None, ) -> None: if parameters[0].name != "params": raise TaskDefinitionError("workspace-free task parameter must be named params") if workspace is not None: raise TaskDefinitionError("workspace-free task functions must not declare workspace=WorkspaceSpec(...)") def _is_pydantic_model(value: object) -> bool: return inspect.isclass(value) and issubclass(value, BaseModel) def _validate_module_target(module_target: str) -> None: if ( "/" in module_target or "\\" in module_target or ":" in module_target or module_target.endswith(".py") or not module_target or any(not segment.isidentifier() for segment in module_target.split(".")) ): raise TaskDefinitionError("module target must be a Python import path, not a file path or object path")