perago.assert_current_attempt_snapshot#

perago.assert_current_attempt_snapshot(task, fresh)[源代码]#

Assert that a fresh Conductor task still represents the same attempt.

This is Perago's attempt fence. Workspace publication calls it before and after staging so a worker fails closed when Conductor no longer reports the same in-progress workflow, task id, and retry count.

Parameters:
taskobject

Original attempt-like object captured before task execution. It must expose status, workflow_instance_id, task_id, and retry_count attributes.

freshobject

Fresh attempt-like object loaded from Conductor for comparison. It must expose the same attributes as task.

Raises:
StaleAttemptError

If fresh is not IN_PROGRESS or no longer matches the original workflow id, task id, or retry count.

AttributeError

If either object is missing a required attempt identity attribute.

参数:
返回类型:

None

参见

StaleAttemptError

Exception raised when the attempt fence rejects.

run_workspace_task_attempt

Runtime flow that checks the fence around workspace staging and publication.

Examples

>>> from dataclasses import dataclass
>>> @dataclass(frozen=True)
... class Attempt:
...     status: str
...     workflow_instance_id: str
...     task_id: str
...     retry_count: int
>>> attempt = Attempt("IN_PROGRESS", "wf-1", "task-1", 0)
>>> assert_current_attempt_snapshot(attempt, attempt)