perago.attempt 源代码

from __future__ import annotations

from typing import Any

from perago.errors import StaleAttemptError


[文档] def assert_current_attempt_snapshot(task: object, fresh: object) -> None: """ Assert that a fresh Conductor task still represents the same attempt. This is Perago's attempt fence. Workspace publication calls it before and after staging so a worker fails closed when Conductor no longer reports the same in-progress workflow, task id, and retry count. Parameters ---------- task : object Original attempt-like object captured before task execution. It must expose ``status``, ``workflow_instance_id``, ``task_id``, and ``retry_count`` attributes. fresh : object Fresh attempt-like object loaded from Conductor for comparison. It must expose the same attributes as ``task``. Raises ------ StaleAttemptError If ``fresh`` is not ``IN_PROGRESS`` or no longer matches the original workflow id, task id, or retry count. AttributeError If either object is missing a required attempt identity attribute. See Also -------- StaleAttemptError : Exception raised when the attempt fence rejects. run_workspace_task_attempt : Runtime flow that checks the fence around workspace staging and publication. Examples -------- >>> from dataclasses import dataclass >>> @dataclass(frozen=True) ... class Attempt: ... status: str ... workflow_instance_id: str ... task_id: str ... retry_count: int >>> attempt = Attempt("IN_PROGRESS", "wf-1", "task-1", 0) >>> assert_current_attempt_snapshot(attempt, attempt) """ if ( _task_attr(fresh, "status") != "IN_PROGRESS" or _task_attr(fresh, "workflow_instance_id") != _task_attr(task, "workflow_instance_id") or _task_attr(fresh, "task_id") != _task_attr(task, "task_id") or _task_attr(fresh, "retry_count") != _task_attr(task, "retry_count") ): raise StaleAttemptError(_task_attr(task, "task_id"))
def _task_attr(task: object, name: str) -> Any: try: return getattr(task, name) except AttributeError as exc: raise AttributeError(f"task is missing required attribute {name}") from exc