perago.workspace.sync 源代码

from __future__ import annotations

from pathlib import Path

from perago.errors import TaskInputError
from perago.models import WorkspaceSpec
from perago.workspace.models import (
    ATTEMPT_WORKSPACE_MARKER,
    WorkspaceDownloadFile,
    WorkspaceSyncPlan,
    WorkspaceUploadFile,
)
from perago.workspace.paths import workspace_local_path, workspace_object_path


[文档] def workspace_upload_files(workspace_dir: Path, workspace_spec: WorkspaceSpec) -> list[WorkspaceUploadFile]: """ List local files that should be uploaded for a workspace publication. The local workspace is scanned recursively and mapped to LakeFS object paths under the task prefix. Perago's attempt marker is implementation state and is never published as a workspace object. Parameters ---------- workspace_dir : pathlib.Path Attempt-local workspace root to scan recursively. workspace_spec : WorkspaceSpec Workspace declaration whose ``prefix`` is prepended to every uploaded object path. Returns ------- list of WorkspaceUploadFile Upload records sorted by local path. Directories and Perago's attempt marker file are skipped. Raises ------ TaskInputError If the local workspace contains a symbolic link. Workspace publication only supports regular files. TaskDefinitionError If a scanned relative path cannot be represented as a safe workspace object path. See Also -------- build_workspace_sync_plan : Combine uploads with stale remote deletes. workspace_delete_object_paths : Find stale remote objects. Examples -------- >>> files = workspace_upload_files(workspace_dir, WorkspaceSpec(prefix="/audio/render")) >>> [file.object_path for file in files] ['audio/render/raw/input.wav'] """ files: list[WorkspaceUploadFile] = [] for local_path in sorted(workspace_dir.rglob("*")): relative_path = local_path.relative_to(workspace_dir) if relative_path.name == ATTEMPT_WORKSPACE_MARKER: continue if local_path.is_symlink(): raise TaskInputError(f"workspace publication does not support symlinks: {relative_path.as_posix()}") if not local_path.is_file(): continue files.append( WorkspaceUploadFile( local_path=local_path, object_path=workspace_object_path(workspace_spec, relative_path), ) ) return files
[文档] def workspace_download_files( workspace_dir: Path, workspace_spec: WorkspaceSpec, object_paths: list[str], ) -> list[WorkspaceDownloadFile]: """ Build download records for objects visible to a workspace task. Download planning applies the task's workspace prefix before constructing local destination paths. The returned records are enough for runtime code to fetch object contents without exposing prefix-outside paths to task code. Parameters ---------- workspace_dir : pathlib.Path Attempt-local workspace root where files should be written. workspace_spec : WorkspaceSpec Workspace declaration whose ``prefix`` filters the remote object list. object_paths : list of str LakeFS object paths listed from the input workspace ref. Returns ------- list of WorkspaceDownloadFile Download records sorted by object path. Objects outside the prefix and Perago's attempt marker file are omitted. Raises ------ TaskDefinitionError If any object path cannot be represented as a safe workspace path. See Also -------- workspace_local_path : Map one object path to a local path. Examples -------- >>> files = workspace_download_files( ... workspace_dir, ... WorkspaceSpec(prefix="/audio/render"), ... ["audio/render/raw/input.wav", "other/input.wav"], ... ) >>> [file.local_path.relative_to(workspace_dir).as_posix() for file in files] ['raw/input.wav'] """ files: list[WorkspaceDownloadFile] = [] for object_path in sorted(object_paths): local_path = workspace_local_path(workspace_spec, object_path) if local_path is None: continue files.append( WorkspaceDownloadFile( object_path=object_path, local_path=workspace_dir / local_path, ) ) return files
[文档] def workspace_delete_object_paths( workspace_spec: WorkspaceSpec, existing_object_paths: list[str], uploaded_files: list[WorkspaceUploadFile], ) -> list[str]: """ Find stale remote objects that should be deleted from a staging branch. The delete list is restricted to the task's workspace prefix. It removes remote objects that still exist on the staging branch but no longer appear in the local attempt workspace. Parameters ---------- workspace_spec : WorkspaceSpec Workspace declaration whose ``prefix`` limits the delete scope. existing_object_paths : list of str Object paths currently present under the staging branch. uploaded_files : list of WorkspaceUploadFile Upload records generated from the local attempt workspace. Returns ------- list of str Existing object paths under the workspace prefix that are absent from the upload list. Objects outside the prefix and Perago's attempt marker file are not returned. Raises ------ TaskDefinitionError If an existing object path cannot be represented as a safe workspace path. See Also -------- workspace_upload_files : Build the upload side of the sync plan. build_workspace_sync_plan : Build the complete upload/delete plan. Examples -------- >>> uploaded = [WorkspaceUploadFile(Path("raw/input.wav"), "audio/render/raw/input.wav")] >>> workspace_delete_object_paths( ... WorkspaceSpec(prefix="/audio/render"), ... ["audio/render/raw/input.wav", "audio/render/old.tmp"], ... uploaded, ... ) ['audio/render/old.tmp'] """ uploaded_object_paths = {file.object_path for file in uploaded_files} delete_paths: list[str] = [] for object_path in sorted(existing_object_paths): if object_path in uploaded_object_paths: continue if workspace_local_path(workspace_spec, object_path) is None: continue delete_paths.append(object_path) return delete_paths
[文档] def build_workspace_sync_plan( workspace_dir: Path, workspace_spec: WorkspaceSpec, existing_object_paths: list[str], ) -> WorkspaceSyncPlan: """ Build a complete sync plan for a workspace prefix. This is the high-level planning helper used by LakeFS staging code. It combines the local upload records with remote stale-object deletes for the same prefix. Parameters ---------- workspace_dir : pathlib.Path Attempt-local workspace root after the task body has finished. workspace_spec : WorkspaceSpec Workspace declaration whose ``prefix`` defines the LakeFS projection to synchronize. existing_object_paths : list of str Object paths currently present on the staging branch before uploading the new local workspace contents. Returns ------- WorkspaceSyncPlan Upload and delete operations needed to make the staging branch prefix match the local attempt workspace. Raises ------ TaskInputError If the local workspace contains a symbolic link. TaskDefinitionError If a local or remote path is not a valid workspace-relative path. See Also -------- workspace_upload_files : Discover local files selected for upload. workspace_delete_object_paths : Discover remote objects selected for deletion. WorkspaceSyncPlan : Return type containing both operation lists. Notes ----- This helper treats the local workspace as the desired state for the whole prefix. Remote objects under the prefix that are not present locally are scheduled for deletion. Examples -------- >>> plan = build_workspace_sync_plan( ... workspace_dir, ... WorkspaceSpec(prefix="/audio/render"), ... ["audio/render/old.tmp"], ... ) >>> plan.changed_object_count 1 """ upload_files = workspace_upload_files(workspace_dir, workspace_spec) return WorkspaceSyncPlan( upload_files=upload_files, delete_object_paths=workspace_delete_object_paths( workspace_spec, existing_object_paths, upload_files, ), )