perago.workspace.models 源代码
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
ATTEMPT_WORKSPACE_MARKER = ".perago-attempt.json"
[文档]
@dataclass(frozen=True, init=False)
class WorkspaceUploadFile:
"""
Local file that should be uploaded into a workspace prefix.
``WorkspaceUploadFile`` is produced while staging a workspace task output.
The record keeps the local file selected from the attempt workspace next to
the LakeFS object path that will receive its contents.
Parameters
----------
local_path : pathlib.Path
Absolute or attempt-workspace-relative path to the local file selected
for upload.
object_path : str
LakeFS object path where the file should be written. The path already
includes the task's ``WorkspaceSpec.prefix``.
See Also
--------
workspace_upload_files : Build upload records from a local workspace.
WorkspaceSyncPlan : Combine upload records with stale-object deletes.
Notes
-----
This is a runtime planning object. Task authors normally interact with the
local workspace directory instead of constructing upload records directly.
Examples
--------
>>> record = WorkspaceUploadFile(Path("raw/input.wav"), "audio/render/raw/input.wav")
>>> record.object_path
'audio/render/raw/input.wav'
"""
local_path: Path
object_path: str
[文档]
def __init__(self, local_path: Path, object_path: str) -> None:
"""
Initialize a workspace upload record.
Parameters
----------
local_path : pathlib.Path
Absolute or attempt-workspace-relative path to the local file
selected for upload.
object_path : str
LakeFS object path where the file should be written.
"""
object.__setattr__(self, "local_path", local_path)
object.__setattr__(self, "object_path", object_path)
[文档]
@dataclass(frozen=True, init=False)
class WorkspaceDownloadFile:
"""
Remote object that should be downloaded into an attempt workspace.
``WorkspaceDownloadFile`` is produced from the object listing for a
workspace input ref. The destination path always points under the
attempt-local workspace directory.
Parameters
----------
object_path : str
LakeFS object path selected from the input workspace ref.
local_path : pathlib.Path
Destination path under the attempt-local workspace root after removing
the task's ``WorkspaceSpec.prefix``.
See Also
--------
workspace_download_files : Build download records from LakeFS object paths.
workspace_local_path : Convert a remote object path to a local path.
Notes
-----
Objects outside the task prefix are not represented by this class because
they are filtered before the download plan is returned.
Examples
--------
>>> record = WorkspaceDownloadFile("audio/render/raw/input.wav", Path("raw/input.wav"))
>>> record.local_path
PosixPath('raw/input.wav')
"""
object_path: str
local_path: Path
[文档]
def __init__(self, object_path: str, local_path: Path) -> None:
"""
Initialize a workspace download record.
Parameters
----------
object_path : str
LakeFS object path selected from the input workspace ref.
local_path : pathlib.Path
Destination path under the attempt-local workspace root.
"""
object.__setattr__(self, "object_path", object_path)
object.__setattr__(self, "local_path", local_path)
[文档]
@dataclass(frozen=True, init=False)
class WorkspaceSyncPlan:
"""
Plan for synchronizing an attempt workspace to a LakeFS prefix.
Runtime code uses this plan during the stage phase of workspace
publication. Uploads and deletes are calculated together so the staging
branch mirrors the complete local projection for the task prefix.
Parameters
----------
upload_files : list of WorkspaceUploadFile
Local files that should be uploaded to the staging branch.
delete_object_paths : list of str
Existing object paths under the workspace prefix that should be deleted
from the staging branch because they are absent locally.
Attributes
----------
changed_object_count : int
Number of uploaded plus deleted objects represented by the plan.
upload_bytes : int
Total size, in bytes, of the files selected for upload.
See Also
--------
build_workspace_sync_plan : Build the complete plan from local and remote state.
WorkspaceUploadFile : One file selected for upload.
Notes
-----
The plan represents the complete projected contents of a
``WorkspaceSpec.prefix``. It is not an append-only list of files created by
the current task body.
Examples
--------
>>> plan = WorkspaceSyncPlan(
... upload_files=[WorkspaceUploadFile(Path("raw/input.wav"), "audio/render/raw/input.wav")],
... delete_object_paths=["audio/render/old.tmp"],
... )
>>> plan.changed_object_count
2
"""
upload_files: list[WorkspaceUploadFile]
delete_object_paths: list[str]
[文档]
def __init__(self, upload_files: list[WorkspaceUploadFile], delete_object_paths: list[str]) -> None:
"""
Initialize a workspace sync plan.
Parameters
----------
upload_files : list of WorkspaceUploadFile
Local files that should be uploaded to the staging branch.
delete_object_paths : list of str
Existing object paths under the workspace prefix that should be
deleted.
"""
object.__setattr__(self, "upload_files", upload_files)
object.__setattr__(self, "delete_object_paths", delete_object_paths)
@property
def changed_object_count(self) -> int:
"""Number of remote object changes in this sync plan."""
return len(self.upload_files) + len(self.delete_object_paths)
@property
def upload_bytes(self) -> int:
"""Total byte size of files selected for upload."""
return sum(file.local_path.stat().st_size for file in self.upload_files)
@dataclass(frozen=True)
class WorkspaceOwner:
worker_id: str
pid: int
token: str