from __future__ import annotations
import re
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from perago.errors import TaskDefinitionError
from perago.guards import _WorkspaceGuardrail
DEFAULT_RETRY_COUNT = 3
MAX_RETRY_COUNT = 10
DEFAULT_RETRY_LOGIC: Literal["FIXED", "EXPONENTIAL_BACKOFF", "LINEAR_BACKOFF"] = "FIXED"
DEFAULT_RETRY_DELAY_SECONDS = 60
DEFAULT_RETRY_MAX_DELAY_SECONDS = 0
DEFAULT_RETRY_JITTER_MS = 0
DEFAULT_TIMEOUT_POLICY: Literal["RETRY", "TIME_OUT_WF", "ALERT_ONLY"] = "TIME_OUT_WF"
DEFAULT_TIMEOUT_SECONDS = 0
DEFAULT_TIMEOUT_RESPONSE_SECONDS = 600
DEFAULT_TIMEOUT_POLL_SECONDS = 0
DEFAULT_TIMEOUT_TOTAL_SECONDS = 0
[文档]
class RetryPolicy(BaseModel):
"""Retry controls copied into the generated Conductor TaskDef.
``RetryPolicy`` configures the retry-related TaskDef fields exposed through
:class:`TaskControls`. The model rejects unknown fields and validates all
timing values as non-negative integers.
Parameters
----------
count : int, default=3
Number of retries written as ``retryCount``. Must be between ``0`` and
``10``.
logic : {"FIXED", "EXPONENTIAL_BACKOFF", "LINEAR_BACKOFF"}, default="FIXED"
Retry algorithm written as ``retryLogic``.
delay_seconds : int, default=60
Initial retry delay written as ``retryDelaySeconds``.
max_delay_seconds : int, default=0
Maximum retry delay written as ``maxRetryDelaySeconds``.
jitter_ms : int, default=0
Backoff jitter written as ``backoffJitterMs``.
Notes
-----
This model only describes Conductor retry fields. It does not decide
whether a specific Perago failure is retryable.
Examples
--------
>>> RetryPolicy(count=4, logic="FIXED", delay_seconds=30)
RetryPolicy(...)
"""
model_config = ConfigDict(extra="forbid")
count: int = Field(default=DEFAULT_RETRY_COUNT, ge=0, le=MAX_RETRY_COUNT)
logic: Literal["FIXED", "EXPONENTIAL_BACKOFF", "LINEAR_BACKOFF"] = DEFAULT_RETRY_LOGIC
delay_seconds: int = Field(default=DEFAULT_RETRY_DELAY_SECONDS, ge=0)
max_delay_seconds: int = Field(default=DEFAULT_RETRY_MAX_DELAY_SECONDS, ge=0)
jitter_ms: int = Field(default=DEFAULT_RETRY_JITTER_MS, ge=0)
[文档]
class TimeoutPolicy(BaseModel):
"""Timeout controls copied into the generated Conductor TaskDef.
``TimeoutPolicy`` holds the general Conductor timeout fields used by the
generated TaskDef.
Parameters
----------
policy : {"RETRY", "TIME_OUT_WF", "ALERT_ONLY"}, default="TIME_OUT_WF"
Timeout behavior written as ``timeoutPolicy``.
seconds : int, default=0
Task timeout written as ``timeoutSeconds``.
response_seconds : int, default=600
Response timeout written as ``responseTimeoutSeconds``.
poll_seconds : int, default=0
Poll timeout written as ``pollTimeoutSeconds``.
total_seconds : int, default=0
Total timeout written as ``totalTimeoutSeconds``.
Notes
-----
All values are non-negative integers and unknown fields are rejected.
Examples
--------
>>> TimeoutPolicy(response_seconds=900)
TimeoutPolicy(...)
"""
model_config = ConfigDict(extra="forbid")
policy: Literal["RETRY", "TIME_OUT_WF", "ALERT_ONLY"] = DEFAULT_TIMEOUT_POLICY
seconds: int = Field(default=DEFAULT_TIMEOUT_SECONDS, ge=0)
response_seconds: int = Field(default=DEFAULT_TIMEOUT_RESPONSE_SECONDS, ge=0)
poll_seconds: int = Field(default=DEFAULT_TIMEOUT_POLL_SECONDS, ge=0)
total_seconds: int = Field(default=DEFAULT_TIMEOUT_TOTAL_SECONDS, ge=0)
[文档]
class ExecutionLimits(BaseModel):
"""Optional execution and rate-limit controls for a TaskDef.
``ExecutionLimits`` maps to Conductor concurrency and rate limit fields.
``None`` values are omitted from the generated TaskDef.
Parameters
----------
concurrent_exec_limit : int or None, default=None
Optional value written as ``concurrentExecLimit``.
rate_limit_frequency_in_seconds : int or None, default=None
Optional rate-limit window written as
``rateLimitFrequencyInSeconds``. Must be configured together with
``rate_limit_per_frequency``.
rate_limit_per_frequency : int or None, default=None
Optional rate-limit count written as ``rateLimitPerFrequency``. Must be
configured together with ``rate_limit_frequency_in_seconds``.
Raises
------
TaskDefinitionError
If only one side of the rate-limit pair is configured.
Examples
--------
>>> ExecutionLimits(concurrent_exec_limit=2)
ExecutionLimits(...)
"""
model_config = ConfigDict(extra="forbid")
concurrent_exec_limit: int | None = Field(default=None, ge=0)
rate_limit_frequency_in_seconds: int | None = Field(default=None, ge=0)
rate_limit_per_frequency: int | None = Field(default=None, ge=0)
@model_validator(mode="after")
def validate_rate_limit_pair(self) -> ExecutionLimits:
"""Validate that Conductor rate-limit fields are configured together.
Returns
-------
ExecutionLimits
The validated model.
Raises
------
TaskDefinitionError
If exactly one rate-limit field is configured.
"""
has_frequency = self.rate_limit_frequency_in_seconds is not None
has_limit = self.rate_limit_per_frequency is not None
if has_frequency != has_limit:
raise TaskDefinitionError(
"rate_limit_frequency_in_seconds and rate_limit_per_frequency must be configured together"
)
return self
[文档]
class PublishBudget(BaseModel):
"""Operational time budget for workspace publication.
``PublishBudget`` derives a publication budget that can be compared with
the task response timeout during TaskDef generation. The LakeFS merge field
is used as a LakeFS SDK request timeout. The Conductor completion field is
currently a completion-phase reserve; it is not wired to the Conductor SDK
``TaskRunner`` result-update HTTP request timeout. It is an operational
time budget, not an exactly-once publication proof.
Parameters
----------
observed_merge_p99_seconds : int
Observed high-percentile LakeFS merge latency under expected workload.
safety_margin_seconds : int
Additional safety margin added to the observed merge latency.
lakefs_merge_timeout_seconds : int
Request timeout for the LakeFS merge operation. Must cover
``observed_merge_p99_seconds + safety_margin_seconds``.
conductor_completion_timeout_seconds : int
Reserve for the Conductor completion phase. The SDK ``TaskRunner`` owns
result update, and Perago does not currently apply this value as its
internal HTTP request timeout.
worker_shutdown_grace_seconds : int
Grace period reserved for worker shutdown after publication.
heartbeat_interval_seconds : int
Heartbeat interval included in the derived response timeout.
Attributes
----------
response_timeout_seconds : int
Derived publication budget that TaskDef generation can compare with
``TimeoutPolicy.response_seconds``.
Raises
------
TaskDefinitionError
If the LakeFS merge timeout is smaller than the observed latency plus
safety margin.
Notes
-----
Publish budgets are only valid for workspace tasks. Workspace-free tasks
reject ``TaskControls(publish_budget=...)`` during task definition
validation.
Examples
--------
>>> budget = PublishBudget(
... observed_merge_p99_seconds=20,
... safety_margin_seconds=10,
... lakefs_merge_timeout_seconds=45,
... conductor_completion_timeout_seconds=15,
... worker_shutdown_grace_seconds=30,
... heartbeat_interval_seconds=10,
... )
>>> budget.response_timeout_seconds
100
"""
model_config = ConfigDict(frozen=True, extra="forbid")
observed_merge_p99_seconds: int = Field(ge=0)
safety_margin_seconds: int = Field(ge=0)
lakefs_merge_timeout_seconds: int = Field(ge=1)
conductor_completion_timeout_seconds: int = Field(ge=1)
worker_shutdown_grace_seconds: int = Field(ge=1)
heartbeat_interval_seconds: int = Field(ge=1)
@model_validator(mode="after")
def validate_merge_timeout_budget(self) -> PublishBudget:
"""Validate that the LakeFS merge timeout covers the observed budget.
Returns
-------
PublishBudget
The validated model.
Raises
------
TaskDefinitionError
If the merge timeout is smaller than the observed latency plus
safety margin.
"""
required_merge_timeout = self.observed_merge_p99_seconds + self.safety_margin_seconds
if self.lakefs_merge_timeout_seconds < required_merge_timeout:
raise TaskDefinitionError(
"lakefs_merge_timeout_seconds must cover observed_merge_p99_seconds plus safety_margin_seconds"
)
return self
@property
def response_timeout_seconds(self) -> int:
"""Derived publication budget for workspace publication."""
return (
self.lakefs_merge_timeout_seconds
+ self.conductor_completion_timeout_seconds
+ self.worker_shutdown_grace_seconds
+ self.heartbeat_interval_seconds
)
[文档]
class TaskControls(BaseModel):
"""Task-level controls consumed by TaskDef generation and runtime code.
``TaskControls`` is the only public entry point for retry, timeout,
execution limit, and publish budget controls. Task authors pass it to
:func:`perago.task`; Perago expands the supported fields into the generated
Conductor TaskDef.
Parameters
----------
retry : RetryPolicy, optional
Retry controls. Defaults to ``RetryPolicy()``.
timeout : TimeoutPolicy, optional
General timeout controls. Defaults to ``TimeoutPolicy()``.
limits : ExecutionLimits, optional
Optional concurrency and rate-limit controls. Defaults to
``ExecutionLimits()``.
publish_budget : PublishBudget or None, default=None
Workspace publication budget. Only workspace tasks may configure it.
Attributes
----------
response_timeout_seconds : int
Alias for ``timeout.response_seconds``. It is not influenced by
``publish_budget``.
Examples
--------
>>> TaskControls(
... retry=RetryPolicy(count=4),
... timeout=TimeoutPolicy(response_seconds=900),
... limits=ExecutionLimits(concurrent_exec_limit=2),
... )
TaskControls(...)
"""
model_config = ConfigDict(extra="forbid")
retry: RetryPolicy = Field(default_factory=RetryPolicy)
timeout: TimeoutPolicy = Field(default_factory=TimeoutPolicy)
limits: ExecutionLimits = Field(default_factory=ExecutionLimits)
publish_budget: PublishBudget | None = None
@property
def response_timeout_seconds(self) -> int:
"""Alias for timeout.response_seconds; publish_budget does not override it."""
return self.timeout.response_seconds
[文档]
class WorkspaceSpec(BaseModel):
"""Workspace declaration for a workspace task.
``WorkspaceSpec`` tells Perago which LakeFS object prefix should be
projected into the local attempt workspace and which guardrails should run
before and after the task body.
Parameters
----------
prefix : str, default="/"
Workspace object prefix. ``"/"`` maps the whole repository root; other
values are normalized to relative POSIX-style prefixes without a
leading slash.
pre : list of workspace guardrails, optional
Guardrails evaluated after workspace download and before the task body.
post : list of workspace guardrails, optional
Guardrails evaluated after the task body and before publication.
read_only : bool, default=False
Whether this workspace task consumes workspace input without publishing
workspace changes. Read-only tasks still receive a local workspace path
and return workspace output, but the output ref stays equal to the input
ref.
Raises
------
TaskDefinitionError
If ``prefix`` is empty, escapes the repository, or uses backslash
separators.
Notes
-----
``WorkspaceSpec`` is frozen and rejects unknown fields. Guardrails are
runtime metadata; they are not serialized into generated Conductor
TaskDefs.
Examples
--------
>>> WorkspaceSpec(prefix="/audio/render", pre=[require_file("raw/manifest.json")])
WorkspaceSpec(...)
"""
model_config = ConfigDict(frozen=True, extra="forbid")
prefix: str = "/"
pre: list[_WorkspaceGuardrail] = Field(default_factory=list)
post: list[_WorkspaceGuardrail] = Field(default_factory=list)
read_only: bool = False
@field_validator("prefix")
@classmethod
def validate_prefix(cls, value: str) -> str:
"""Normalize and validate a workspace object prefix.
Parameters
----------
value : str
Prefix value supplied to ``WorkspaceSpec``.
Returns
-------
str
Normalized prefix, with ``"/"`` preserved as the repository root
marker and other values stripped of leading slashes.
Raises
------
TaskDefinitionError
If the prefix is empty, escapes the repository, or uses backslash
separators.
"""
if "\\" in value:
raise TaskDefinitionError("WorkspaceSpec.prefix must use '/' separators")
stripped = value.strip()
if stripped == "/":
return "/"
normalized = stripped.lstrip("/")
if not normalized:
raise TaskDefinitionError("WorkspaceSpec.prefix must not be empty")
parts = normalized.split("/")
if any(part in {"", ".", ".."} for part in parts):
raise TaskDefinitionError("WorkspaceSpec.prefix must stay inside the repository")
return "/".join(parts)
class WorkspaceRef(BaseModel):
"""Base model for Conductor workspace references."""
model_config = ConfigDict(extra="forbid")
repository: str = Field(min_length=1)
branch: str = Field(min_length=1)
ref_type: Literal["commit"]
ref: str = Field(min_length=1)
@field_validator("repository", "branch", "ref")
@classmethod
def validate_non_blank_ref_fields(cls, value: str) -> str:
"""Validate non-blank workspace reference strings.
Parameters
----------
value : str
Repository, branch, or commit ref field value.
Returns
-------
str
The original non-blank value.
Raises
------
TaskDefinitionError
If the value is blank.
"""
if not value.strip():
raise TaskDefinitionError("workspace repository, branch, and ref must not be blank")
return value
[文档]
class WorkspaceOutput(WorkspaceRef):
"""Workspace reference returned after successful publication.
``WorkspaceOutput`` is generated by the runtime for completed workspace
tasks. It preserves the input repository and branch, and points ``ref`` to
the LakeFS commit produced by publication.
Parameters
----------
repository : str
LakeFS repository name.
branch : str
Branch that was successfully advanced.
ref_type : {"commit"}
Type of output reference. Perago currently emits commit references.
ref : str
Published commit ref.
"""