from __future__ import annotations
import json
import re
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import Any
from perago.errors import PublishFenceError
from perago.models import WorkspaceInput, WorkspaceSpec
[文档]
@dataclass(frozen=True)
class WorkspacePublicationPlan:
"""Capture the metadata and fence decisions for one workspace publish.
Parameters
----------
logical_task_key : str
Stable workflow-scoped identity used by publish fences to decide whether
branch advancement still belongs to the same logical task.
staging_branch : str
Internal LakeFS branch name that holds the staged attempt output before
the publish step merges it into the target branch.
publish_base_head : str
Commit that the publish step expects to be the head of the target branch
when the merge is attempted.
superseded_commit : str | None
Previous head commit superseded by the same logical task. ``None`` means
the target branch has not advanced beyond the original input ref.
try_metadata : dict[str, str]
Metadata written onto the staging branch commit during the try phase.
confirm_metadata : dict[str, str]
Metadata written onto the publish merge commit during the confirm phase.
"""
logical_task_key: str
staging_branch: str
publish_base_head: str
superseded_commit: str | None
try_metadata: dict[str, str]
confirm_metadata: dict[str, str]
[文档]
def logical_task_key(task: object) -> str:
"""Build the stable publish-fence identity for a Conductor task attempt.
Parameters
----------
task : object
Attempt-like object exposing ``workflow_instance_id``,
``reference_task_name``, ``seq``, ``iteration``, and ``task_def_name``.
Returns
-------
str
Colon-delimited logical task key shared by retries of the same logical
workflow step.
Raises
------
AttributeError
Raised when ``task`` is missing any required workflow identity field.
"""
parts = [
_task_attr(task, "workflow_instance_id"),
_task_attr(task, "reference_task_name"),
str(_task_attr(task, "seq")),
str(_task_attr(task, "iteration")),
_task_attr(task, "task_def_name"),
]
return ":".join(parts)
[文档]
def choose_publish_base(
*,
workspace: WorkspaceInput | dict[str, Any],
current_head: str,
commits: Sequence[object],
logical_task_key: str,
) -> tuple[str, str | None]:
"""Choose the publish base that the current attempt is allowed to merge on.
Parameters
----------
workspace : WorkspaceInput | dict[str, Any]
Original workspace input for the attempt. Dictionaries are validated as
:class:`perago.WorkspaceInput`.
current_head : str
Current head commit of the target branch at publish time.
commits : Sequence[object]
Commit range between the original input ref and ``current_head``. Each
commit must expose ``id`` and optional ``metadata`` either as attributes
or mapping keys.
logical_task_key : str
Stable task identity that is allowed to advance the branch without
tripping the publish fence.
Returns
-------
tuple[str, str | None]
Pair of ``(publish_base_head, superseded_commit)``. ``superseded_commit``
is ``None`` when the branch has not advanced beyond the input ref.
Raises
------
PublishFenceError
Raised when the target branch advanced with commits that cannot all be
attributed to ``logical_task_key``.
pydantic.ValidationError
Raised when ``workspace`` cannot be validated as
:class:`perago.WorkspaceInput`.
"""
workspace_input = WorkspaceInput.model_validate(workspace)
if current_head == workspace_input.ref:
return current_head, None
if (
commits
and _commit_id(commits[-1]) == current_head
and all(
_commit_metadata(commit).get("perago.logical_task_key") == logical_task_key
for commit in commits
)
):
return current_head, _commit_id(commits[-1])
raise PublishFenceError(
f"{workspace_input.branch} advanced from {workspace_input.ref} to {current_head}"
)
[文档]
def staging_branch_name(task: object) -> str:
"""Build the internal LakeFS staging branch name for one attempt.
Parameters
----------
task : object
Attempt-like object exposing workflow, task, and retry identity fields.
Returns
-------
str
LakeFS-safe branch name scoped to one concrete task attempt.
Raises
------
AttributeError
Raised when ``task`` is missing required identity fields.
"""
parts = [
"perago",
"staging",
_lakefs_branch_segment(_task_attr(task, "workflow_instance_id")),
_lakefs_branch_segment(_task_attr(task, "reference_task_name")),
f"seq-{_lakefs_branch_segment(_task_attr(task, 'seq'))}",
f"iteration-{_lakefs_branch_segment(_task_attr(task, 'iteration'))}",
f"task-id-{_lakefs_branch_segment(_task_attr(task, 'task_id'))}",
f"retry-{_lakefs_branch_segment(_task_attr(task, 'retry_count'))}",
f"exec-{_lakefs_branch_segment(_task_attr(task, 'execution_id'))}",
]
return "-".join(parts)
[文档]
def build_workspace_publication_plan(
*,
task: object,
workspace: WorkspaceInput | dict[str, Any],
workspace_spec: WorkspaceSpec,
current_head: str,
commits: Sequence[object],
staging_commit: str,
) -> WorkspacePublicationPlan:
"""Assemble the full publication plan for a workspace task attempt.
Parameters
----------
task : object
Attempt-like object exposing workflow identity, task identity, and retry
fields used for metadata and staging-branch naming.
workspace : WorkspaceInput | dict[str, Any]
Workspace input reference for the attempt.
workspace_spec : WorkspaceSpec
Workspace contract whose normalized prefix is recorded in metadata.
current_head : str
Current target branch head observed immediately before publish.
commits : Sequence[object]
Commit range between the original workspace ref and ``current_head``.
staging_commit : str
Commit id produced after staging the local workspace content.
Returns
-------
WorkspacePublicationPlan
Immutable plan containing publish-fence decisions and both metadata maps.
Raises
------
PublishFenceError
Raised when the current branch advancement cannot be attributed to the
same logical task.
AttributeError
Raised when ``task`` is missing required attempt attributes.
pydantic.ValidationError
Raised when ``workspace`` cannot be validated as
:class:`perago.WorkspaceInput`.
TypeError
Raised when metadata values cannot be serialized.
"""
key = logical_task_key(task)
publish_base_head, superseded_commit = choose_publish_base(
workspace=workspace,
current_head=current_head,
commits=commits,
logical_task_key=key,
)
staging_branch = staging_branch_name(task)
return WorkspacePublicationPlan(
logical_task_key=key,
staging_branch=staging_branch,
publish_base_head=publish_base_head,
superseded_commit=superseded_commit,
try_metadata=perago_metadata(
task=task,
workspace=workspace,
workspace_spec=workspace_spec,
logical_task_key=key,
phase="try",
),
confirm_metadata=perago_metadata(
task=task,
workspace=workspace,
workspace_spec=workspace_spec,
logical_task_key=key,
phase="confirm",
extra=confirm_metadata_extra(
staging_branch=staging_branch,
staging_commit=staging_commit,
expected_head=publish_base_head,
superseded_commit=superseded_commit,
),
),
)
[文档]
def find_matching_publication_commit(
commits: Sequence[object],
*,
logical_task_key: str,
task_id: str,
staging_commit: str,
) -> str | None:
"""Find the published commit that matches one staged workspace attempt.
Parameters
----------
commits : Sequence[object]
Candidate commits from the target branch history. Each commit must expose
``id`` and optional ``metadata`` either as attributes or mapping keys.
logical_task_key : str
Stable logical task identity that must match the publish metadata.
task_id : str
Concrete Conductor task attempt id that must match the publish metadata.
staging_commit : str
Staging commit id that must match the publish metadata.
Returns
-------
str | None
Matching published commit id, or ``None`` when no commit satisfies the
full metadata match.
"""
for commit in commits:
metadata = _commit_metadata(commit)
if (
metadata.get("perago.logical_task_key") == logical_task_key
and metadata.get("perago.task_id") == task_id
and metadata.get("perago.staging_commit") == staging_commit
):
return _commit_id(commit)
return None
def _commit_id(commit: object) -> str:
if isinstance(commit, Mapping):
return str(commit["id"])
return str(getattr(commit, "id"))
def _commit_metadata(commit: object) -> Mapping[str, str]:
if isinstance(commit, Mapping):
metadata = commit.get("metadata", {})
else:
metadata = getattr(commit, "metadata", {})
if not isinstance(metadata, Mapping):
return {}
return metadata
def _task_attr(task: object, name: str) -> object:
try:
return getattr(task, name)
except AttributeError as exc:
raise AttributeError(f"task is missing required attribute {name}") from exc
def _lakefs_branch_segment(value: object) -> str:
text = re.sub(r"[^A-Za-z0-9_-]+", "-", str(value)).strip("-_")
if not text or text.startswith("-"):
return "unknown"
return text