perago.workspace 源代码

from __future__ import annotations

import json
import os
import shutil
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from os import PathLike
from pathlib import Path
from threading import Lock
from typing import Any
from uuid import uuid4

from loguru import logger

from perago._segments import safe_segment
from perago.errors import TaskInputError
from perago.guards import _canonical_workspace_path
from perago.models import WorkspaceSpec


ATTEMPT_WORKSPACE_MARKER = ".perago-attempt.json"
_ACTIVE_OWNER_TOKENS: set[str] = set()
_ACTIVE_OWNER_TOKENS_LOCK = Lock()


[文档] @dataclass(frozen=True) class WorkspaceUploadFile: """ Local file that should be uploaded into a workspace prefix. ``WorkspaceUploadFile`` is produced while staging a workspace task output. The record keeps the local file selected from the attempt workspace next to the LakeFS object path that will receive its contents. Parameters ---------- local_path : pathlib.Path Absolute or attempt-workspace-relative path to the local file selected for upload. object_path : str LakeFS object path where the file should be written. The path already includes the task's ``WorkspaceSpec.prefix``. See Also -------- workspace_upload_files : Build upload records from a local workspace. WorkspaceSyncPlan : Combine upload records with stale-object deletes. Notes ----- This is a runtime planning object. Task authors normally interact with the local workspace directory instead of constructing upload records directly. Examples -------- >>> record = WorkspaceUploadFile(Path("raw/input.wav"), "audio/render/raw/input.wav") >>> record.object_path 'audio/render/raw/input.wav' """ local_path: Path object_path: str
[文档] @dataclass(frozen=True) class WorkspaceDownloadFile: """ Remote object that should be downloaded into an attempt workspace. ``WorkspaceDownloadFile`` is produced from the object listing for a workspace input ref. The destination path always points under the attempt-local workspace directory. Parameters ---------- object_path : str LakeFS object path selected from the input workspace ref. local_path : pathlib.Path Destination path under the attempt-local workspace root after removing the task's ``WorkspaceSpec.prefix``. See Also -------- workspace_download_files : Build download records from LakeFS object paths. workspace_local_path : Convert a remote object path to a local path. Notes ----- Objects outside the task prefix are not represented by this class because they are filtered before the download plan is returned. Examples -------- >>> record = WorkspaceDownloadFile("audio/render/raw/input.wav", Path("raw/input.wav")) >>> record.local_path PosixPath('raw/input.wav') """ object_path: str local_path: Path
[文档] @dataclass(frozen=True) class WorkspaceSyncPlan: """ Plan for synchronizing an attempt workspace to a LakeFS prefix. Runtime code uses this plan during the stage phase of workspace publication. Uploads and deletes are calculated together so the staging branch mirrors the complete local projection for the task prefix. Parameters ---------- upload_files : list of WorkspaceUploadFile Local files that should be uploaded to the staging branch. delete_object_paths : list of str Existing object paths under the workspace prefix that should be deleted from the staging branch because they are absent locally. Attributes ---------- changed_object_count : int Number of uploaded plus deleted objects represented by the plan. upload_bytes : int Total size, in bytes, of the files selected for upload. See Also -------- build_workspace_sync_plan : Build the complete plan from local and remote state. WorkspaceUploadFile : One file selected for upload. Notes ----- The plan represents the complete projected contents of a ``WorkspaceSpec.prefix``. It is not an append-only list of files created by the current task body. Examples -------- >>> plan = WorkspaceSyncPlan( ... upload_files=[WorkspaceUploadFile(Path("raw/input.wav"), "audio/render/raw/input.wav")], ... delete_object_paths=["audio/render/old.tmp"], ... ) >>> plan.changed_object_count 2 """ upload_files: list[WorkspaceUploadFile] delete_object_paths: list[str] @property def changed_object_count(self) -> int: """Number of remote object changes in this sync plan.""" return len(self.upload_files) + len(self.delete_object_paths) @property def upload_bytes(self) -> int: """Total byte size of files selected for upload.""" return sum(file.local_path.stat().st_size for file in self.upload_files)
def workspace_object_prefix(workspace_spec: WorkspaceSpec) -> str: if workspace_spec.prefix == "/": return "" return workspace_spec.prefix def workspace_object_path(workspace_spec: WorkspaceSpec, workspace_path: str | PathLike[str]) -> str: local_path = _canonical_workspace_path(workspace_path) prefix = workspace_object_prefix(workspace_spec) if not prefix: return local_path return f"{prefix}/{local_path}"
[文档] def workspace_local_path(workspace_spec: WorkspaceSpec, object_path: str | PathLike[str]) -> Path | None: """ Map a LakeFS object path to a local workspace path. The workspace prefix is removed from visible remote paths before they are exposed to the task body. Objects outside the prefix remain invisible to the local attempt workspace. Parameters ---------- workspace_spec : WorkspaceSpec Workspace declaration whose ``prefix`` defines the visible LakeFS object subtree. object_path : str or os.PathLike[str] LakeFS object path to map into the local attempt workspace. Returns ------- pathlib.Path or None Workspace-relative local path when the object is inside the prefix. ``None`` is returned for objects outside the prefix and for Perago's attempt marker file. Raises ------ TaskDefinitionError If ``object_path`` is not a safe relative POSIX path. See Also -------- workspace_download_files : Build download records from object paths. WorkspaceSpec : Declares the workspace prefix. Examples -------- >>> workspace_local_path(WorkspaceSpec(prefix="/audio/render"), "audio/render/raw/input.wav") PosixPath('raw/input.wav') >>> workspace_local_path(WorkspaceSpec(prefix="/audio/render"), "other/raw/input.wav") is None True """ remote_path = _canonical_workspace_path(object_path) prefix = workspace_object_prefix(workspace_spec) if prefix: prefix_with_separator = f"{prefix}/" if not remote_path.startswith(prefix_with_separator): return None remote_path = remote_path.removeprefix(prefix_with_separator) local_path = Path(remote_path) if local_path.name == ATTEMPT_WORKSPACE_MARKER: return None return local_path
@dataclass(frozen=True) class WorkspaceOwner: worker_id: str pid: int token: str def attempt_workspace_dir(workspace_root: Path, task: object) -> Path: task_part = f"task_id={safe_segment(_task_attr(task, 'task_id'))}" execution_id = _optional_task_attr(task, "execution_id") if execution_id is None: return workspace_root / task_part return workspace_root / f"{task_part}-exec={safe_segment(execution_id)}" def new_workspace_owner(worker_id: str) -> WorkspaceOwner: return WorkspaceOwner(worker_id=worker_id, pid=os.getpid(), token=uuid4().hex) def register_active_workspace_owner(owner: WorkspaceOwner) -> None: with _ACTIVE_OWNER_TOKENS_LOCK: _ACTIVE_OWNER_TOKENS.add(owner.token) def unregister_active_workspace_owner(owner: WorkspaceOwner) -> None: with _ACTIVE_OWNER_TOKENS_LOCK: _ACTIVE_OWNER_TOKENS.discard(owner.token) def active_workspace_owner_tokens() -> set[str]: with _ACTIVE_OWNER_TOKENS_LOCK: return set(_ACTIVE_OWNER_TOKENS) def prepare_attempt_workspace(workspace_root: Path, task: object, owner: WorkspaceOwner | None = None) -> Path: if owner is None: owner = new_workspace_owner(os.environ.get("PERAGO_WORKER_ID", f"pid-{os.getpid()}")) workspace_dir = attempt_workspace_dir(workspace_root, task) workspace_dir.mkdir(parents=True, exist_ok=False) marker = { "workflow_instance_id": _task_attr(task, "workflow_instance_id"), "task_id": _task_attr(task, "task_id"), "execution_id": _task_attr(task, "execution_id"), "retry_count": _task_attr(task, "retry_count"), "task_def_name": _task_attr(task, "task_def_name"), "owner_worker_id": owner.worker_id, "owner_pid": owner.pid, "owner_token": owner.token, "started_at": datetime.now(timezone.utc).isoformat(), } (workspace_dir / ATTEMPT_WORKSPACE_MARKER).write_text( json.dumps(marker, sort_keys=True), encoding="utf-8", ) return workspace_dir
[文档] def workspace_upload_files(workspace_dir: Path, workspace_spec: WorkspaceSpec) -> list[WorkspaceUploadFile]: """ List local files that should be uploaded for a workspace publication. The local workspace is scanned recursively and mapped to LakeFS object paths under the task prefix. Perago's attempt marker is implementation state and is never published as a workspace object. Parameters ---------- workspace_dir : pathlib.Path Attempt-local workspace root to scan recursively. workspace_spec : WorkspaceSpec Workspace declaration whose ``prefix`` is prepended to every uploaded object path. Returns ------- list of WorkspaceUploadFile Upload records sorted by local path. Directories and Perago's attempt marker file are skipped. Raises ------ TaskInputError If the local workspace contains a symbolic link. Workspace publication only supports regular files. TaskDefinitionError If a scanned relative path cannot be represented as a safe workspace object path. See Also -------- build_workspace_sync_plan : Combine uploads with stale remote deletes. workspace_delete_object_paths : Find stale remote objects. Examples -------- >>> files = workspace_upload_files(workspace_dir, WorkspaceSpec(prefix="/audio/render")) >>> [file.object_path for file in files] ['audio/render/raw/input.wav'] """ files: list[WorkspaceUploadFile] = [] for local_path in sorted(workspace_dir.rglob("*")): relative_path = local_path.relative_to(workspace_dir) if relative_path.name == ATTEMPT_WORKSPACE_MARKER: continue if local_path.is_symlink(): raise TaskInputError(f"workspace publication does not support symlinks: {relative_path.as_posix()}") if not local_path.is_file(): continue files.append( WorkspaceUploadFile( local_path=local_path, object_path=workspace_object_path(workspace_spec, relative_path), ) ) return files
[文档] def workspace_download_files( workspace_dir: Path, workspace_spec: WorkspaceSpec, object_paths: list[str], ) -> list[WorkspaceDownloadFile]: """ Build download records for objects visible to a workspace task. Download planning applies the task's workspace prefix before constructing local destination paths. The returned records are enough for runtime code to fetch object contents without exposing prefix-outside paths to task code. Parameters ---------- workspace_dir : pathlib.Path Attempt-local workspace root where files should be written. workspace_spec : WorkspaceSpec Workspace declaration whose ``prefix`` filters the remote object list. object_paths : list of str LakeFS object paths listed from the input workspace ref. Returns ------- list of WorkspaceDownloadFile Download records sorted by object path. Objects outside the prefix and Perago's attempt marker file are omitted. Raises ------ TaskDefinitionError If any object path cannot be represented as a safe workspace path. See Also -------- workspace_local_path : Map one object path to a local path. Examples -------- >>> files = workspace_download_files( ... workspace_dir, ... WorkspaceSpec(prefix="/audio/render"), ... ["audio/render/raw/input.wav", "other/input.wav"], ... ) >>> [file.local_path.relative_to(workspace_dir).as_posix() for file in files] ['raw/input.wav'] """ files: list[WorkspaceDownloadFile] = [] for object_path in sorted(object_paths): local_path = workspace_local_path(workspace_spec, object_path) if local_path is None: continue files.append( WorkspaceDownloadFile( object_path=object_path, local_path=workspace_dir / local_path, ) ) return files
[文档] def workspace_delete_object_paths( workspace_spec: WorkspaceSpec, existing_object_paths: list[str], uploaded_files: list[WorkspaceUploadFile], ) -> list[str]: """ Find stale remote objects that should be deleted from a staging branch. The delete list is restricted to the task's workspace prefix. It removes remote objects that still exist on the staging branch but no longer appear in the local attempt workspace. Parameters ---------- workspace_spec : WorkspaceSpec Workspace declaration whose ``prefix`` limits the delete scope. existing_object_paths : list of str Object paths currently present under the staging branch. uploaded_files : list of WorkspaceUploadFile Upload records generated from the local attempt workspace. Returns ------- list of str Existing object paths under the workspace prefix that are absent from the upload list. Objects outside the prefix and Perago's attempt marker file are not returned. Raises ------ TaskDefinitionError If an existing object path cannot be represented as a safe workspace path. See Also -------- workspace_upload_files : Build the upload side of the sync plan. build_workspace_sync_plan : Build the complete upload/delete plan. Examples -------- >>> uploaded = [WorkspaceUploadFile(Path("raw/input.wav"), "audio/render/raw/input.wav")] >>> workspace_delete_object_paths( ... WorkspaceSpec(prefix="/audio/render"), ... ["audio/render/raw/input.wav", "audio/render/old.tmp"], ... uploaded, ... ) ['audio/render/old.tmp'] """ uploaded_object_paths = {file.object_path for file in uploaded_files} delete_paths: list[str] = [] for object_path in sorted(existing_object_paths): if object_path in uploaded_object_paths: continue if workspace_local_path(workspace_spec, object_path) is None: continue delete_paths.append(object_path) return delete_paths
[文档] def build_workspace_sync_plan( workspace_dir: Path, workspace_spec: WorkspaceSpec, existing_object_paths: list[str], ) -> WorkspaceSyncPlan: """ Build a complete sync plan for a workspace prefix. This is the high-level planning helper used by LakeFS staging code. It combines the local upload records with remote stale-object deletes for the same prefix. Parameters ---------- workspace_dir : pathlib.Path Attempt-local workspace root after the task body has finished. workspace_spec : WorkspaceSpec Workspace declaration whose ``prefix`` defines the LakeFS projection to synchronize. existing_object_paths : list of str Object paths currently present on the staging branch before uploading the new local workspace contents. Returns ------- WorkspaceSyncPlan Upload and delete operations needed to make the staging branch prefix match the local attempt workspace. Raises ------ TaskInputError If the local workspace contains a symbolic link. TaskDefinitionError If a local or remote path is not a valid workspace-relative path. See Also -------- workspace_upload_files : Discover local files selected for upload. workspace_delete_object_paths : Discover remote objects selected for deletion. WorkspaceSyncPlan : Return type containing both operation lists. Notes ----- This helper treats the local workspace as the desired state for the whole prefix. Remote objects under the prefix that are not present locally are scheduled for deletion. Examples -------- >>> plan = build_workspace_sync_plan( ... workspace_dir, ... WorkspaceSpec(prefix="/audio/render"), ... ["audio/render/old.tmp"], ... ) >>> plan.changed_object_count 1 """ upload_files = workspace_upload_files(workspace_dir, workspace_spec) return WorkspaceSyncPlan( upload_files=upload_files, delete_object_paths=workspace_delete_object_paths( workspace_spec, existing_object_paths, upload_files, ), )
def cleanup_attempt_workspace(workspace_dir: Path) -> None: _require_attempt_marker(workspace_dir) shutil.rmtree(workspace_dir) def cleanup_attempt_workspace_safely(workspace_dir: Path, task: object) -> bool: try: cleanup_attempt_workspace(workspace_dir) except OSError as exc: logger.bind( workspace_dir=str(workspace_dir), workflow_instance_id=_task_attr(task, "workflow_instance_id"), task_id=_task_attr(task, "task_id"), retry_count=_task_attr(task, "retry_count"), ).opt(exception=exc).error("failed to clean attempt-local workspace") return False return True def sweep_abandoned_attempt_workspaces(workspace_root: Path) -> list[Path]: return garbage_collect_attempt_workspaces( workspace_root, ttl=timedelta(seconds=0), active_process_owners=set(), active_owner_tokens=active_workspace_owner_tokens(), ) def garbage_collect_workspace_owner( workspace_root: Path, *, owner_worker_id: str, owner_pid: int, ) -> list[Path]: return garbage_collect_attempt_workspaces( workspace_root, ttl=timedelta(seconds=0), active_process_owners=set(), active_owner_tokens=set(), target_process_owner=(owner_worker_id, owner_pid), ) def garbage_collect_attempt_workspaces( workspace_root: Path, *, ttl: timedelta, active_process_owners: set[tuple[str, int]] | None = None, active_owner_tokens: set[str] | None = None, target_process_owner: tuple[str, int] | None = None, now: datetime | None = None, ) -> list[Path]: if not workspace_root.exists(): return [] current_time = now or datetime.now(timezone.utc) process_owners = active_process_owners or set() owner_tokens = active_owner_tokens if active_owner_tokens is not None else active_workspace_owner_tokens() removed: list[Path] = [] for marker in sorted(workspace_root.rglob(ATTEMPT_WORKSPACE_MARKER)): workspace_dir = marker.parent _require_inside(workspace_root, workspace_dir) marker_data = _read_gc_marker(marker) if marker_data is None: continue owner_worker_id = marker_data["owner_worker_id"] owner_pid = marker_data["owner_pid"] owner_token = marker_data["owner_token"] if target_process_owner is not None and (owner_worker_id, owner_pid) != target_process_owner: continue if (owner_worker_id, owner_pid) in process_owners: continue if owner_token in owner_tokens: continue started_at = marker_data["started_at"] if current_time - started_at < ttl: continue shutil.rmtree(workspace_dir) removed.append(workspace_dir) return removed def _require_attempt_marker(workspace_dir: Path) -> None: marker = workspace_dir / ATTEMPT_WORKSPACE_MARKER if not marker.is_file(): raise FileNotFoundError(f"{workspace_dir} is not a Perago attempt workspace") def _require_inside(root: Path, child: Path) -> None: child.resolve().relative_to(root.resolve()) def _task_attr(task: object, name: str) -> Any: try: return getattr(task, name) except AttributeError as exc: raise AttributeError(f"task is missing required attribute {name}") from exc def _optional_task_attr(task: object, name: str) -> str | None: value = getattr(task, name, None) if value is None: return None return str(value) def _read_gc_marker(marker: Path) -> dict[str, Any] | None: try: data = json.loads(marker.read_text(encoding="utf-8")) owner_worker_id = data["owner_worker_id"] owner_pid = data["owner_pid"] owner_token = data["owner_token"] execution_id = data["execution_id"] started_at = data["started_at"] except (OSError, json.JSONDecodeError, KeyError, TypeError): return None if not isinstance(owner_worker_id, str) or not owner_worker_id: return None if not isinstance(owner_pid, int): return None if not isinstance(owner_token, str) or not owner_token: return None if not isinstance(execution_id, str) or not execution_id: return None if not isinstance(started_at, str): return None try: parsed_started_at = datetime.fromisoformat(started_at) except ValueError: return None if parsed_started_at.tzinfo is None: parsed_started_at = parsed_started_at.replace(tzinfo=timezone.utc) return { "owner_worker_id": owner_worker_id, "owner_pid": owner_pid, "owner_token": owner_token, "execution_id": execution_id, "started_at": parsed_started_at, }