perago.worker_runtime 源代码
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from perago.config import RuntimeConfig, resolve_worker_id
from perago.runtime_logging import configure_worker_logging
[文档]
@dataclass(frozen=True)
class WorkerRuntime:
"""
Prepared identity and logging state for one worker process.
``WorkerRuntime`` is returned after a child process has resolved its worker
id and installed the per-worker JSONL log sink. It is runtime-local state
and is not serialized into Conductor task input, task output, or generated
TaskDefs.
Parameters
----------
worker_id : str
Stable worker id for the current process. Supervisor-managed children
receive this value from ``PERAGO_WORKER_ID``.
log_file : pathlib.Path
JSONL log file configured for this process.
swept_workspaces : list of pathlib.Path
Reserved for compatibility; worker startup no longer sweeps global
workspace directories.
Attributes
----------
worker_id : str
Stable worker id for the current process.
log_file : pathlib.Path
JSONL log file configured for this process.
swept_workspaces : list of pathlib.Path
Reserved for compatibility; worker startup no longer sweeps global
workspace directories.
See Also
--------
prepare_worker_runtime : Build this value for a worker child process.
RuntimeConfig : Local roots and logging policy used to prepare the worker.
Notes
-----
The dataclass is frozen. Sweeping only removes directories containing
Perago's attempt workspace marker under ``RuntimeConfig.workspace_root``.
Examples
--------
>>> WorkerRuntime(
... worker_id="featuresBuild0001",
... log_file=Path("/tmp/perago/logs/worker.jsonl"),
... swept_workspaces=[],
... ).worker_id
'featuresBuild0001'
"""
worker_id: str
log_file: Path
swept_workspaces: list[Path]
[文档]
def prepare_worker_runtime(
*,
config: RuntimeConfig,
module_target: str,
env: dict[str, str],
) -> WorkerRuntime:
"""
Prepare local runtime state for one worker process.
The worker process calls this before polling Conductor. Preparation
resolves the worker id and configures Loguru to write serialized JSONL logs
under the configured log root.
Parameters
----------
config : RuntimeConfig
Worker-local runtime configuration containing workspace and log roots,
log rotation settings, and the worker id prefix.
module_target : str
Python import path of the single task module served by this worker.
It is used when a worker id must be derived from the module target.
env : dict of str to str
Environment mapping visible to the worker process. ``PERAGO_WORKER_ID``
is used when present.
Returns
-------
WorkerRuntime
Prepared worker id, log file path, and swept attempt workspaces.
See Also
--------
WorkerRuntime : Prepared runtime value returned by this function.
load_runtime_config : Load the ``RuntimeConfig`` argument from local env.
Notes
-----
This function mutates process logging configuration by replacing the
current Loguru sinks with the worker JSONL file sink.
Examples
--------
>>> from datetime import timedelta
>>> config = RuntimeConfig(
... workspace_root=Path("/tmp/perago/workspaces"),
... log_root=Path("/tmp/perago/logs"),
... log_file_max_size=1048576,
... log_retention=timedelta(days=1),
... worker_id_prefix="featuresBuild",
... )
>>> runtime = prepare_worker_runtime(
... config=config,
... module_target="app.workers.features_build",
... env={"PERAGO_WORKER_ID": "featuresBuild0001"},
... )
>>> runtime.worker_id
'featuresBuild0001'
"""
worker_id = resolve_worker_id(module_target, env)
swept: list[Path] = []
log_file = configure_worker_logging(
log_root=config.log_root,
module_target=module_target,
worker_id=worker_id,
max_bytes=config.log_file_max_size,
retention=config.log_retention,
)
return WorkerRuntime(
worker_id=worker_id,
log_file=log_file,
swept_workspaces=swept,
)