from __future__ import annotations
import json
from copy import deepcopy
from pathlib import Path
from typing import Any
from pydantic import BaseModel
from perago.models import WorkspaceInput, WorkspaceOutput
from perago.task import TaskDefinition
CONTROL_FIELD_MAP = {
"retryCount": ("retry", "count"),
"retryLogic": ("retry", "logic"),
"retryDelaySeconds": ("retry", "delay_seconds"),
"maxRetryDelaySeconds": ("retry", "max_delay_seconds"),
"backoffJitterMs": ("retry", "jitter_ms"),
"totalTimeoutSeconds": ("timeout", "total_seconds"),
"timeoutPolicy": ("timeout", "policy"),
"timeoutSeconds": ("timeout", "seconds"),
"responseTimeoutSeconds": ("response_timeout_seconds",),
"pollTimeoutSeconds": ("timeout", "poll_seconds"),
"concurrentExecLimit": ("limits", "concurrent_exec_limit"),
"rateLimitFrequencyInSeconds": ("limits", "rate_limit_frequency_in_seconds"),
"rateLimitPerFrequency": ("limits", "rate_limit_per_frequency"),
}
[文档]
def build_taskdef(task: TaskDefinition) -> dict[str, Any]:
"""
Build the Conductor TaskDef dictionary for one Perago task.
``build_taskdef`` is the library equivalent of ``perago extract``. It
converts a validated :class:`perago.TaskDefinition` into the JSON-compatible
mapping registered with Conductor. The task function signature determines
input and output keys, Pydantic models provide JSON Schema, and
:class:`perago.TaskControls` provide retry, timeout, response timeout, and
execution limit fields.
Parameters
----------
task : TaskDefinition
Validated task definition returned by :func:`perago.load_module_task`
or attached to a decorated function as ``__perago_task__``.
Returns
-------
dict of str to Any
JSON-compatible Conductor TaskDef mapping. Workspace tasks contain
``workspace`` and ``params`` input keys and ``workspace`` and
``result`` output keys; workspace-free tasks contain only ``params``
and ``result``.
See Also
--------
write_taskdef : Write the generated TaskDef mapping to a JSON file.
Notes
-----
Workspace guardrails, workspace prefixes, LakeFS connection settings, and
publish budget internals are not serialized into the TaskDef. A publish
budget only affects the generated ``responseTimeoutSeconds`` value.
Examples
--------
>>> task_def = build_taskdef(load_module_task("app.workers.features_build"))
>>> task_def["name"]
'features.build'
"""
input_properties: dict[str, Any] = {}
output_properties: dict[str, Any] = {}
input_required: list[str] = []
output_required: list[str] = []
if task.has_workspace:
input_properties["workspace"] = schema_for_model(WorkspaceInput)
output_properties["workspace"] = schema_for_model(WorkspaceOutput)
input_required.append("workspace")
output_required.append("workspace")
input_properties["params"] = schema_for_model(task.params_model)
output_properties["result"] = schema_for_model(task.output_model)
input_required.append("params")
output_required.append("result")
data: dict[str, Any] = {
"name": task.name,
"ownerEmail": task.owner_email,
**_control_fields(task),
"inputKeys": input_required,
"outputKeys": output_required,
"inputSchema": {
"name": f"{task.name}.input",
"version": 1,
"type": "JSON",
"data": _object_schema(input_properties, input_required),
},
"outputSchema": {
"name": f"{task.name}.output",
"version": 1,
"type": "JSON",
"data": _object_schema(output_properties, output_required),
},
}
if task.description is not None:
data["description"] = task.description
return data
[文档]
def write_taskdef(task: TaskDefinition, output: Path) -> Path:
"""
Write a generated Conductor TaskDef to a JSON file.
The parent directory is created when needed, and the file is written with
stable indentation so the generated TaskDef can be reviewed before it is
registered with Conductor.
Parameters
----------
task : TaskDefinition
Validated task definition to serialize.
output : pathlib.Path
Destination JSON file path. The path must end with ``.json`` and must
not point to an existing directory.
Returns
-------
pathlib.Path
The output path after the JSON file has been written.
Raises
------
ValueError
If ``output`` does not end with ``.json`` or points to a directory.
See Also
--------
build_taskdef : Build the TaskDef mapping without writing a file.
Examples
--------
>>> task_def = load_module_task("app.workers.metadata_validate")
>>> write_taskdef(task_def, Path("generated/metadata.validate.json"))
PosixPath('generated/metadata.validate.json')
"""
if output.suffix != ".json":
raise ValueError("output must be a JSON file path, for example generated/features.build.json")
if output.exists() and output.is_dir():
raise ValueError("output must be a JSON file path, not a directory")
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(build_taskdef(task), indent=2, sort_keys=False) + "\n", encoding="utf-8")
return output
def schema_for_model(model: type[BaseModel]) -> dict[str, Any]:
schema = model.model_json_schema()
inlined = _inline_refs(schema)
_strip_titles(inlined)
_close_object_schemas(inlined)
return inlined
def _control_fields(task: TaskDefinition) -> dict[str, Any]:
fields: dict[str, Any] = {}
for conductor_name, path in CONTROL_FIELD_MAP.items():
value: object = task.controls
for segment in path:
value = getattr(value, segment)
if value is not None:
fields[conductor_name] = value
return fields
def _object_schema(properties: dict[str, Any], required: list[str]) -> dict[str, Any]:
return {
"type": "object",
"properties": properties,
"required": required,
"additionalProperties": False,
}
def _inline_refs(schema: dict[str, Any]) -> dict[str, Any]:
copied = deepcopy(schema)
defs = copied.pop("$defs", {})
def visit(value: Any) -> Any:
if isinstance(value, dict):
ref = value.get("$ref")
if isinstance(ref, str) and ref.startswith("#/$defs/"):
name = ref.removeprefix("#/$defs/")
replacement = deepcopy(defs[name])
siblings = {key: visit(item) for key, item in value.items() if key != "$ref"}
replacement.update(siblings)
return visit(replacement)
return {key: visit(item) for key, item in value.items()}
if isinstance(value, list):
return [visit(item) for item in value]
return value
return visit(copied)
def _close_object_schemas(schema: Any) -> None:
if isinstance(schema, dict):
if schema.get("type") == "object":
schema.setdefault("additionalProperties", False)
for value in schema.values():
_close_object_schemas(value)
elif isinstance(schema, list):
for value in schema:
_close_object_schemas(value)
def _strip_titles(schema: Any, *, in_properties: bool = False) -> None:
if isinstance(schema, dict):
if not in_properties:
schema.pop("title", None)
for key, value in schema.items():
_strip_titles(value, in_properties=(key == "properties"))
elif isinstance(schema, list):
for value in schema:
_strip_titles(value, in_properties=in_properties)