from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
import difflib
from functools import cached_property, singledispatchmethod
import hashlib
import importlib.util
import inspect
import io
import os
from pathlib import Path, PurePath
import stat
from typing import Callable, Generator, Mapping, Optional, Union
import warnings
from .files import (
ItemSelector,
load,
select,
SyncedDir,
SyncedFile,
SyncedItem,
SyncedRoot,
)
from .fs import FS, new_fs, SysInfo
Renderer = Callable[..., Union[str, bytes]]
Renderers = Mapping[str, Renderer]
__all__ = ['ItemSelector', 'load', 'isync', 'sync', 'SyncError']
[docs]
class SyncError(Exception):
"""
An unrecoverable error during sync.
"""
class SyncOpts:
"""
A normalized :class:`~fabsync.config.Opts`.
"""
uid: int = -1
gid: int = -1
perms: int = -1
class ItemSyncer(ABC):
"""
Syncs one item to the server.
This wrapper allows us to cache some values while we're processing this
item.
"""
item: SyncedItem
def __init__(self, fs: FS, item: SyncedItem):
self.fs = fs
self.item = item
def chown(self, uid: int, gid: int) -> None:
self.fs.chown(self.target, uid, gid)
self.reset()
def chmod(self, perms: int) -> None:
self.fs.chmod(self.target, perms)
self.reset()
@cached_property
def stats(self) -> Optional[os.stat_result]:
try:
return self.fs.stat(self.target)
except FileNotFoundError:
return None
def exists(self) -> bool:
return self.stats is not None
def matches_mode(self) -> bool:
"""
True if the local and remote items have compatible filesystem types.
This is trivially true if the remote item doesn't exist. Otherwise, we
make sure that we're not trying to sync a file over a directory or vice
versa.
"""
stats = self.stats
if stats is None:
matches = True
else:
assert stats.st_mode is not None
matches = self.item.matches_mode(stats.st_mode)
return matches
@abstractmethod
def matches_content(self) -> bool:
pass # pragma: no cover
@abstractmethod
def diff(self) -> bytes:
pass # pragma: no cover
@abstractmethod
def put(self) -> None:
pass # pragma: no cover
@cached_property
def target(self) -> str:
return str(self.item.dest)
def reset(self):
"""
Resets all cached attributes.
"""
if 'stats' in self.__dict__:
del self.stats
class DirSyncer(ItemSyncer):
def matches_content(self) -> bool:
"""For a directory, 'content' is mere existence."""
return self.stats is not None
def diff(self) -> bytes:
return b''
def put(self) -> None:
self.fs.mkdir(self.target)
self.reset()
class FileSyncer(ItemSyncer):
file: SyncedFile
renderers: Renderers
def __init__(self, fs: FS, file: SyncedFile, renderers: Renderers):
self.file = file
self.renderers = renderers
super().__init__(fs, file)
def matches_content(self) -> bool:
if self.stats is None:
matches = False
elif len(self.content) != self.stats.st_size:
matches = False
else:
new_md5 = hashlib.md5(self.content).digest()
matches = new_md5 == self.md5
return matches
def diff(self) -> bytes:
"""A unified diff of the remote and local content."""
if not self.file.opts.diff:
return b''
if self.exists():
remote_lines = self.remote_content.splitlines(True)
else:
remote_lines = []
local_lines = self.content.splitlines(True)
try:
path = self.file.dest.relative_to('/')
except ValueError:
path = self.file.dest
diff_lines = difflib.diff_bytes(
difflib.unified_diff,
remote_lines,
local_lines,
fromfile=bytes(PurePath('a') / path),
tofile=bytes(PurePath('b') / path),
)
return b''.join(diff_lines)
def put(self) -> None:
self.fs.put(io.BytesIO(self.content), self.target)
self.reset()
@cached_property
def md5(self) -> bytes:
"""The current MD5 hash of the target, if any."""
return self.fs.md5(self.target)
@cached_property
def remote_content(self) -> bytes:
"""
The current remote content of the file.
This will be an empty string if the file doesn't exist.
"""
content = io.BytesIO()
try:
self.fs.get(self.file.dest, content)
except FileNotFoundError:
pass
return content.getvalue()
@cached_property
def content(self) -> bytes:
"""The expected content of the file (after rendering)."""
renderer: Renderer
file = self.file
key = file.opts.renderer
if not key:
renderer = self._read_file
elif key == 'fabsync/py':
renderer = self._render_py
elif key.startswith('fabsync/'):
raise SyncError(f"Renderer {key} is not defined.")
elif key in self.renderers:
renderer = self.renderers[key]
else:
raise SyncError(f"Renderer {key} is not configured.")
if self._is_legacy_renderer(renderer):
warnings.warn(
f"Two-argument render functions are deprecated ({renderer.__module__}.{renderer.__qualname__}). Hint: add **kwargs.",
DeprecationWarning,
stacklevel=1,
)
content = renderer(file.src, file.opts.vars)
else:
content = renderer(
file.src, file.opts.vars, get_content=lambda: self.remote_content
)
if isinstance(content, str):
content = content.encode()
return content
@staticmethod
def _read_file(src: Path, _vars, **kwargs) -> bytes:
with src.open('rb') as f:
content = f.read()
return content
@staticmethod
def _render_py(
src: Path, vars, get_content: Callable[[], bytes], **kwargs
) -> bytes:
content: bytes
if (
spec := importlib.util.spec_from_file_location('renderer', src)
) and spec.loader:
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except SyntaxError as e:
raise SyncError(f"Failed to load {src} as a python module.") from e
if hasattr(mod, 'render'):
content = mod.render(src, vars, get_content=get_content)
else:
raise SyncError(f"{src} has no 'render' function.")
else: # pragma: no cover
# This should be unreachable.
raise SyncError(f"Failed to load {src} as a python module.")
return content
@staticmethod
def _is_legacy_renderer(func: Callable) -> bool:
param_kinds = tuple(
param.kind for param in inspect.signature(func).parameters.values()
)
return param_kinds == (
inspect.Parameter.POSITIONAL_OR_KEYWORD,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
)
def reset(self) -> None:
super().reset()
if 'md5' in self.__dict__:
del self.md5
if 'remote_content' in self.__dict__:
del self.remote_content
@dataclass(frozen=True)
class SyncResult:
"""
The result of syncing a single item.
"""
#: The item that was synced.
item: SyncedItem
#: True if the item was created.
created: bool
#: True if the path was modified in any way.
modified: bool
#: A unified diff, if this is a file that was uploaded.
diff: bytes
@property
def path(self) -> PurePath:
"""The remote path, for convenience."""
return self.item.dest
class Syncer:
"""
Handles the syncing of individual files.
This must be used as a context manager. Use one Syncer for a batch of
files.
"""
renderers: Renderers
dry_run: bool
no_chown: bool
fs: FS
sys: SysInfo
def __init__(
self,
conn,
renderers: Renderers,
*,
dry_run=False,
no_chown=False,
):
self.conn = conn
self.renderers = renderers
self.dry_run = dry_run
self.no_chown = no_chown
#
# API
#
def sync(self, item: SyncedItem) -> SyncResult:
assert not item.is_dest_root()
if not hasattr(self, 'fs'):
raise SyncError(
f"{self.__class__.__name__} must be used as a context manager."
)
created = False
modified = False
diff = b''
item_syncer = self._item_syncer(item)
# Create the file or directory if it doesn't exist.
if not item_syncer.matches_mode():
raise SyncError(
f"{item.src} (local) and {item.dest} (remote) are different types."
)
elif not item_syncer.matches_content():
diff = item_syncer.diff()
if not item_syncer.exists():
created = True
self._put(item_syncer)
modified = True
opts = self._item_sync_opts(item)
stats = item_syncer.stats
if stats is not None:
if not self.no_chown:
new_uid = opts.uid if (opts.uid >= 0) else (stats.st_uid or 0)
new_gid = opts.gid if (opts.gid >= 0) else (stats.st_gid or 0)
if (new_uid, new_gid) != (stats.st_uid, stats.st_gid):
self._chown(item_syncer, new_uid, new_gid)
modified = True
perms = stat.S_IMODE(stats.st_mode or 0)
if (opts.perms >= 0) and (opts.perms != perms):
self._chmod(item_syncer, opts.perms)
modified = True
return SyncResult(item, created, modified, diff)
#
# Internal
#
@singledispatchmethod
def _item_syncer(self, item: SyncedItem) -> ItemSyncer:
raise TypeError(f"Unhandled item type: {item.__class__}.") # pragma: no cover
@_item_syncer.register
def _(self, item: SyncedDir) -> ItemSyncer:
return DirSyncer(self.fs, item)
@_item_syncer.register
def _(self, item: SyncedFile) -> ItemSyncer:
return FileSyncer(self.fs, item, self.renderers)
def _item_sync_opts(self, item: SyncedItem) -> SyncOpts:
"""
Resolves item.opts into a SyncOpts.
"""
opts = SyncOpts()
opts.perms = item.opts.perms
if isinstance(item.opts.user, str):
try:
opts.uid = self.sys.users[item.opts.user]
except KeyError as e:
raise SyncError(f"Unknown remote user: {item.opts.user}") from e
else:
opts.uid = item.opts.user
if isinstance(item.opts.group, str):
try:
opts.gid = self.sys.groups[item.opts.group]
except KeyError as e:
raise SyncError(f"Unknown remote group: {item.opts.group}") from e
else:
opts.gid = item.opts.group
return opts
#
# ItemSyncer wrappers
#
def _put(self, item_syncer: ItemSyncer) -> None:
if not self.dry_run:
item_syncer.put()
def _chown(self, item_syncer: ItemSyncer, uid: int, gid: int) -> None:
if not self.dry_run:
item_syncer.chown(uid, gid)
def _chmod(self, item_syncer: ItemSyncer, perms: int) -> None:
if not self.dry_run:
item_syncer.chmod(perms)
#
# Context manager
#
def __enter__(self) -> Syncer:
self.fs = new_fs(self.conn)
self.sys = self.fs.sysinfo()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.fs.cleanup()
del self.fs
[docs]
def isync(
conn,
root: SyncedRoot,
selector: Optional[ItemSelector] = None,
renderers: Optional[Renderers] = None,
*,
dry_run=False,
no_chown=False,
) -> Generator[SyncResult, None, None]:
"""
Synchronizes all or part of a local file tree with a remote host.
This processes files and directories lazily, yielding each result before
proceeding to the next item. This is useful for communicating results in
real time as well as potentially terminating the process before the end.
:param conn: Usually a Fabric connection. If this is an invoke Context,
we'll operate locally instead.
:type conn: ~fabric.connection.Connection
:param root: The root of the tree to sync. Get this from
:func:`fabsync.load`.
:type root: ~fabsync.files.SyncedRoot
:param selector: Optional parameters to select a
subset of the tree to sync.
:type selector: ~fabsync.ItemSelector
:param dict renderers: Optional map of keys to render functions.
:param bool dry_run: If true, we will inspect the remote system and report
changes, but nothing will be modified.
:param bool no_chown: If true, we will completely ignore file ownership.
This is primarily useful in local mode, in which you likely don't have
permission for :func:`os.chown`.
:rtype: ~typing.Generator[SyncResult, None, None]
"""
if selector is None:
selector = ItemSelector()
if renderers is None:
renderers = {}
with Syncer(conn, renderers, dry_run=dry_run, no_chown=no_chown) as syncer:
for item in select(root, selector):
if not item.is_dest_root():
yield syncer.sync(item)
[docs]
def sync(*args, **kwargs) -> dict[PurePath, SyncResult]:
"""
Synchronizes all or part of a local file tree with a remote host.
This is just a wrapper around :func:`isync` that gathers up all of the
results to return at the end.
:rtype: dict[~pathlib.PurePath, SyncResult]
"""
results = {}
for result in isync(*args, **kwargs):
results[result.path] = result
return results