from __future__ import annotations
import os
import re
import tempfile
from datetime import timedelta
from decimal import Decimal, ROUND_CEILING
from pathlib import Path
from typing import Literal
from pydantic import BaseModel, ConfigDict, SecretStr
from perago.errors import RuntimeConfigError
LOG_SIZE_UNITS = {
"KB": 1024,
"MB": 1024 * 1024,
"GB": 1024 * 1024 * 1024,
}
ExecutionMode = Literal["process", "thread"]
DEFAULT_LOG_FILE_MAX_SIZE = 100 * LOG_SIZE_UNITS["MB"]
DEFAULT_LOG_RETENTION = timedelta(days=30)
DEFAULT_EXECUTION_MODE: ExecutionMode = "process"
DEFAULT_WORKSPACE_GC_TTL = timedelta(hours=24)
DEFAULT_WORKSPACE_GC_INTERVAL = timedelta(hours=1)
DEFAULT_FAILURE_REASON_MAX_LENGTH = 500
[文档]
class ConductorConfig(BaseModel):
"""
Worker-local Conductor connection settings.
``ConductorConfig`` is loaded from process environment variables and local
``.env`` files by :func:`load_runtime_config`. It is runtime-only
configuration: the server URL is not written into generated TaskDefs and is
not passed through Conductor task input.
Parameters
----------
server_url : str
Conductor API endpoint read from ``CONDUCTOR_SERVER_URL``. Surrounding
whitespace is stripped during environment parsing, empty values are
treated as not configured, and the placeholder value ``"replace-me"``
is rejected before model construction.
See Also
--------
load_runtime_config : Load this model from worker environment settings.
RuntimeConfig : Full runtime configuration containing this model.
Notes
-----
The model is frozen and rejects unknown fields. ``perago check`` and
``perago extract`` can run without this config, but ``perago start`` requires
it before starting worker child processes.
Examples
--------
>>> ConductorConfig(server_url="http://localhost:8080/api")
ConductorConfig(...)
"""
model_config = ConfigDict(frozen=True, extra="forbid")
server_url: str
[文档]
class LakeFSConfig(BaseModel):
"""
Worker-local LakeFS connection settings.
``LakeFSConfig`` is assembled from the LakeFS environment variables used by
the worker runtime. The values stay local to the worker process and are not
serialized into Conductor task input, task output, or generated TaskDefs.
Parameters
----------
endpoint_url : str
LakeFS endpoint read from ``LAKECTL_SERVER_ENDPOINT_URL``.
access_key_id : str
LakeFS access key id read from
``LAKECTL_CREDENTIALS_ACCESS_KEY_ID``.
secret_access_key : pydantic.SecretStr
LakeFS secret access key read from
``LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY``. Pydantic redacts this value
from model reprs and JSON dumps.
Raises
------
RuntimeConfigError
Raised by :func:`load_runtime_config` when only part of the LakeFS
environment variable set is present or when a value is still
``"replace-me"``.
See Also
--------
load_runtime_config : Load this model from worker environment settings.
RuntimeConfig : Full runtime configuration containing this model.
Notes
-----
The model is frozen and rejects unknown fields. The three LakeFS variables
must be configured together for ``perago start``; ``perago check`` and
``perago extract`` may omit all three.
Examples
--------
>>> LakeFSConfig(
... endpoint_url="http://localhost:8000",
... access_key_id="key",
... secret_access_key="secret",
... )
LakeFSConfig(...)
"""
model_config = ConfigDict(frozen=True, extra="forbid")
endpoint_url: str
access_key_id: str
secret_access_key: SecretStr
[文档]
class RuntimeConfig(BaseModel):
"""
Complete worker-local runtime configuration.
``RuntimeConfig`` describes local workspace storage, worker logging,
process identity, and optional Conductor and LakeFS connection settings. It
is loaded before task module import by the CLI and stays outside the task
author contract.
Parameters
----------
workspace_root : pathlib.Path
Root directory for attempt-local workspaces. Defaults to
``<tempdir>/perago/workspaces`` when ``PERAGO_WORKSPACE_ROOT`` is not
configured.
log_root : pathlib.Path
Root directory for worker JSONL logs. Defaults to
``<tempdir>/perago/logs`` when ``PERAGO_LOG_ROOT`` is not configured.
log_file_max_size : int
Log rotation threshold in bytes, parsed from
``PERAGO_LOG_FILE_MAX_SIZE``. The default is ``100MB``.
log_retention : datetime.timedelta
Log retention period parsed from ``PERAGO_LOG_RETENTION``. The default
is ``30d``.
worker_id_prefix : str
ASCII alphanumeric prefix used by the supervisor to generate child
``PERAGO_WORKER_ID`` values.
execution_mode : "process" or "thread"
Worker execution model. Defaults to ``"process"`` and may be
overridden by ``PERAGO_EXECUTION_MODE`` or the ``perago start`` CLI.
workspace_gc_ttl : datetime.timedelta, default=24h
Minimum age before supervisor periodic GC removes an abandoned
attempt-local workspace. Parsed from ``PERAGO_WORKSPACE_GC_TTL``.
workspace_gc_interval : datetime.timedelta, default=1h
Interval for the supervisor workspace GC loop. Parsed from
``PERAGO_WORKSPACE_GC_INTERVAL``.
shutdown_force_kill_after : datetime.timedelta or None, default=None
Optional shutdown drain deadline. When configured through
``PERAGO_SHUTDOWN_FORCE_KILL_AFTER``, child processes still alive after
the deadline are force-killed.
failure_reason_max_length : int, default=500
Maximum number of characters written to Conductor
``reasonForIncompletion`` for failed attempts. Parsed from
``PERAGO_FAILURE_REASON_MAX_LENGTH``.
conductor : ConductorConfig or None, default=None
Optional Conductor connection config. ``perago start`` requires it.
lakefs : LakeFSConfig or None, default=None
Optional LakeFS connection config. ``perago start`` requires it.
See Also
--------
load_runtime_config : Build a runtime config from ``.env`` and process
environment values.
WorkerRuntime : Prepared runtime values for a running worker process.
Notes
-----
The model is frozen and rejects unknown fields. None of these values are
embedded in generated Conductor TaskDefs or task payloads.
Examples
--------
>>> from datetime import timedelta
>>> from pathlib import Path
>>> RuntimeConfig(
... workspace_root=Path("/tmp/perago/workspaces"),
... log_root=Path("/tmp/perago/logs"),
... log_file_max_size=104857600,
... log_retention=timedelta(days=30),
... worker_id_prefix="appworkersfeaturesbuild",
... )
RuntimeConfig(...)
"""
model_config = ConfigDict(frozen=True, extra="forbid")
workspace_root: Path
log_root: Path
log_file_max_size: int
log_retention: timedelta
worker_id_prefix: str
execution_mode: ExecutionMode = DEFAULT_EXECUTION_MODE
workspace_gc_ttl: timedelta = DEFAULT_WORKSPACE_GC_TTL
workspace_gc_interval: timedelta = DEFAULT_WORKSPACE_GC_INTERVAL
shutdown_force_kill_after: timedelta | None = None
failure_reason_max_length: int = DEFAULT_FAILURE_REASON_MAX_LENGTH
conductor: ConductorConfig | None = None
lakefs: LakeFSConfig | None = None
[文档]
def load_runtime_config(
module_target: str,
*,
cwd: Path | None = None,
process_env: dict[str, str] | None = None,
probe_roots: bool = True,
) -> RuntimeConfig:
"""
Load worker-local runtime configuration.
``load_runtime_config`` reads a simple ``.env`` file from ``cwd`` and then
overlays process environment variables. It parses Perago local directory
settings, worker identity settings, and optional Conductor and LakeFS
connection settings into a frozen :class:`RuntimeConfig`.
Parameters
----------
module_target : str
Python module import path for the single task module. It is used to
derive the default worker id prefix when
``PERAGO_WORKER_ID_PREFIX`` is not configured.
cwd : pathlib.Path or None, default=None
Directory used to locate ``.env``. ``None`` uses the current working
directory.
process_env : dict of str to str or None, default=None
Environment mapping that overrides ``.env`` values. ``None`` reads
:data:`os.environ`; an empty dictionary intentionally prevents reading
the real process environment.
probe_roots : bool, default=True
Whether to create and remove temporary probe files under the resolved
workspace and log roots to verify that both directories are writable.
Returns
-------
RuntimeConfig
Parsed runtime configuration for CLI commands and worker processes.
Raises
------
RuntimeConfigError
If a configured value is malformed, a required LakeFS variable is
missing from a partial LakeFS configuration, a connection placeholder is
still set to ``"replace-me"``, or a probed root directory is not
writable.
See Also
--------
RuntimeConfig : Parsed configuration returned by this loader.
prepare_worker_runtime : Prepare runtime identity and logging for a worker
process.
Notes
-----
``perago check`` and ``perago extract`` use this loader but do not require
Conductor or LakeFS config to be present. ``perago start`` performs
additional checks after loading and requires both external service configs.
Examples
--------
>>> load_runtime_config(
... "app.workers.features_build",
... process_env={"PERAGO_WORKER_ID_PREFIX": "featuresBuild"},
... probe_roots=False,
... )
RuntimeConfig(...)
"""
base = cwd or Path.cwd()
current_env = dict(os.environ) if process_env is None else process_env
env = load_runtime_env(current_env, read_dotenv(base / ".env"))
temp_root = Path(tempfile.gettempdir()) / "perago"
config = RuntimeConfig(
workspace_root=Path(env.get("PERAGO_WORKSPACE_ROOT", temp_root / "workspaces")),
log_root=Path(env.get("PERAGO_LOG_ROOT", temp_root / "logs")),
log_file_max_size=parse_log_file_max_size(env.get("PERAGO_LOG_FILE_MAX_SIZE")),
log_retention=parse_log_retention(env.get("PERAGO_LOG_RETENTION")),
worker_id_prefix=resolve_worker_id_prefix(module_target, env),
execution_mode=parse_execution_mode(env.get("PERAGO_EXECUTION_MODE")),
workspace_gc_ttl=parse_duration(
env.get("PERAGO_WORKSPACE_GC_TTL"),
default=DEFAULT_WORKSPACE_GC_TTL,
name="PERAGO_WORKSPACE_GC_TTL",
),
workspace_gc_interval=parse_duration(
env.get("PERAGO_WORKSPACE_GC_INTERVAL"),
default=DEFAULT_WORKSPACE_GC_INTERVAL,
name="PERAGO_WORKSPACE_GC_INTERVAL",
),
shutdown_force_kill_after=parse_optional_duration(
env.get("PERAGO_SHUTDOWN_FORCE_KILL_AFTER"),
name="PERAGO_SHUTDOWN_FORCE_KILL_AFTER",
),
failure_reason_max_length=parse_failure_reason_max_length(
env.get("PERAGO_FAILURE_REASON_MAX_LENGTH")
),
conductor=parse_conductor_config(env),
lakefs=parse_lakefs_config(env),
)
if probe_roots:
check_writable_root(config.workspace_root)
check_writable_root(config.log_root)
return config
def read_dotenv(path: Path) -> dict[str, str]:
if not path.exists():
return {}
values: dict[str, str] = {}
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
if line.startswith("export "):
line = line.removeprefix("export ").strip()
key, value = line.split("=", 1)
values[key.strip()] = _strip_env_value(value.strip())
return values
def load_runtime_env(process_env: dict[str, str], dotenv_env: dict[str, str]) -> dict[str, str]:
merged = dict(dotenv_env)
merged.update(process_env)
return merged
def check_writable_root(path: Path) -> None:
try:
path.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(prefix=".perago-check-", dir=path) as probe:
probe_file = Path(probe) / "write-test"
probe_file.write_text("ok", encoding="utf-8")
probe_file.unlink()
except OSError as exc:
raise RuntimeConfigError(f"{path} is not writable: {exc}") from exc
def parse_log_file_max_size(value: str | None) -> int:
if value is None or value.strip() == "":
return DEFAULT_LOG_FILE_MAX_SIZE
match = re.fullmatch(
r"((?:0|[1-9][0-9]*)(?:\.[0-9]+)?)\s*(KB|MB|GB)",
value.strip(),
flags=re.IGNORECASE,
)
if not match:
raise RuntimeConfigError(
"PERAGO_LOG_FILE_MAX_SIZE must be a positive size such as '512KB', '100MB', or '1.5GB'"
)
amount = Decimal(match.group(1))
if amount <= 0:
raise RuntimeConfigError("PERAGO_LOG_FILE_MAX_SIZE must be greater than zero")
unit = match.group(2).upper()
return int((amount * LOG_SIZE_UNITS[unit]).to_integral_value(rounding=ROUND_CEILING))
def parse_log_retention(value: str | None) -> timedelta:
if value is None or value.strip() == "":
return DEFAULT_LOG_RETENTION
match = re.fullmatch(r"([1-9][0-9]*)d", value.strip(), flags=re.IGNORECASE)
if not match:
raise RuntimeConfigError("PERAGO_LOG_RETENTION must be a positive day count such as '7d' or '30d'")
return timedelta(days=int(match.group(1)))
def parse_execution_mode(value: str | None) -> ExecutionMode:
if value is None or value.strip() == "":
return DEFAULT_EXECUTION_MODE
mode = value.strip().lower()
if mode not in {"process", "thread"}:
raise RuntimeConfigError("PERAGO_EXECUTION_MODE must be either 'process' or 'thread'")
return mode
def parse_duration(value: str | None, *, default: timedelta, name: str) -> timedelta:
if value is None or value.strip() == "":
return default
match = re.fullmatch(r"([1-9][0-9]*)([smhd])", value.strip(), flags=re.IGNORECASE)
if not match:
raise RuntimeConfigError(f"{name} must be a positive duration such as '30s', '5m', '1h', or '24h'")
amount = int(match.group(1))
unit = match.group(2).lower()
if unit == "s":
return timedelta(seconds=amount)
if unit == "m":
return timedelta(minutes=amount)
if unit == "h":
return timedelta(hours=amount)
return timedelta(days=amount)
def parse_optional_duration(value: str | None, *, name: str) -> timedelta | None:
if value is None or value.strip() == "":
return None
return parse_duration(value, default=timedelta(seconds=1), name=name)
def parse_failure_reason_max_length(value: str | None) -> int:
if value is None or value.strip() == "":
return DEFAULT_FAILURE_REASON_MAX_LENGTH
stripped = value.strip()
if not re.fullmatch(r"[0-9]+", stripped):
raise RuntimeConfigError("PERAGO_FAILURE_REASON_MAX_LENGTH must be a positive integer")
parsed = int(stripped)
if parsed <= 0:
raise RuntimeConfigError("PERAGO_FAILURE_REASON_MAX_LENGTH must be greater than zero")
return parsed
def parse_conductor_config(env: dict[str, str]) -> ConductorConfig | None:
server_url = _env_optional(env, "CONDUCTOR_SERVER_URL")
if server_url is None:
return None
return ConductorConfig(server_url=server_url)
def parse_lakefs_config(env: dict[str, str]) -> LakeFSConfig | None:
endpoint_url = _env_optional(env, "LAKECTL_SERVER_ENDPOINT_URL")
access_key_id = _env_optional(env, "LAKECTL_CREDENTIALS_ACCESS_KEY_ID")
secret_access_key = _env_optional(env, "LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY")
if endpoint_url is None and access_key_id is None and secret_access_key is None:
return None
missing = [
name
for name, value in [
("LAKECTL_SERVER_ENDPOINT_URL", endpoint_url),
("LAKECTL_CREDENTIALS_ACCESS_KEY_ID", access_key_id),
("LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY", secret_access_key),
]
if value is None
]
if missing:
raise RuntimeConfigError(f"LakeFS config is incomplete; missing {', '.join(missing)}")
return LakeFSConfig(
endpoint_url=endpoint_url,
access_key_id=access_key_id,
secret_access_key=secret_access_key,
)
def validate_worker_id_prefix(value: str) -> str:
if not value:
raise RuntimeConfigError("PERAGO_WORKER_ID_PREFIX must not be empty")
if not re.fullmatch(r"[A-Za-z0-9]+", value):
raise RuntimeConfigError("PERAGO_WORKER_ID_PREFIX must contain only ASCII letters and digits")
return value
def default_worker_id_prefix(module_target: str) -> str:
candidate = re.sub(r"[^A-Za-z0-9]+", "", module_target)
return validate_worker_id_prefix(candidate)
def resolve_worker_id_prefix(module_target: str, env: dict[str, str]) -> str:
configured = env.get("PERAGO_WORKER_ID_PREFIX")
if configured is not None:
return validate_worker_id_prefix(configured.strip())
return default_worker_id_prefix(module_target)
def worker_id_for_child(prefix: str, index: int) -> str:
return f"{prefix}{index:04d}"
def child_environment(base_env: dict[str, str], module_target: str, index: int) -> dict[str, str]:
env = dict(base_env)
prefix = resolve_worker_id_prefix(module_target, env)
env["PERAGO_WORKER_ID_PREFIX"] = prefix
env["PERAGO_WORKER_ID"] = worker_id_for_child(prefix, index)
return env
def resolve_worker_id(module_target: str, env: dict[str, str]) -> str:
configured = env.get("PERAGO_WORKER_ID")
if configured:
return configured
return f"{default_worker_id_prefix(module_target)}-pid-{os.getpid()}"
def _strip_env_value(value: str) -> str:
if len(value) >= 2 and value[0] == value[-1] and value[0] in {"'", '"'}:
return value[1:-1]
return value
def _env_optional(env: dict[str, str], name: str) -> str | None:
value = env.get(name)
if value is None or value.strip() == "":
return None
stripped = value.strip()
if stripped == "replace-me":
raise RuntimeConfigError(f"{name} must be replaced with a real value")
return stripped