Source code for fabsync.files

from __future__ import annotations

from abc import ABC, abstractmethod
from collections import deque
import collections.abc
from dataclasses import dataclass
from functools import cached_property, singledispatch
from itertools import accumulate, chain
import os
from pathlib import Path, PurePath, PurePosixPath
import stat
from typing import (
    AbstractSet,
    cast,
    Iterable,
    Iterator,
    Mapping,
    NamedTuple,
    Optional,
    Union,
)

from .config import CONFIG_NAME, Opts, SyncConfig
from .functools import getitem


Children = Mapping[str, 'SyncedItem']


@singledispatch
def is_root_path(path: PurePath) -> bool:
    raise NotImplementedError()  # pragma: no cover


@is_root_path.register
def _(path: PurePosixPath) -> bool:
    return path == PurePosixPath('/')


class SyncedItem(ABC, collections.abc.Mapping):
    """
    A base class representing a synced file or directory.
    """

    # The local path of the file or directory.
    src: Path

    # The remote (target) path.
    dest: PurePath

    # Additional options.
    opts: Opts

    is_dir = False
    is_file = False

    def __init__(self, src: Path, dest: PurePath, opts: Opts):
        self.src = src
        self.dest = dest
        self.opts = opts

    @abstractmethod
    def matches_mode(self, mode: int) -> bool:
        pass  # pragma: no cover

    def is_dest_root(self):
        """True if this points to the root of the destination file system."""
        return is_root_path(self.dest)

    @property
    def children(self) -> Children:
        return {}

    def __getitem__(self, name: str) -> SyncedItem:
        return self.children[name]

    def __iter__(self):
        return iter(self.children.items())

    def __len__(self):
        return len(self.children)


class SyncedDir(SyncedItem):
    config: SyncConfig

    is_dir = True

    def __init__(
        self,
        src: Path,
        dest_dir: PurePath,
        parent_config: Optional[SyncConfig] = None,
    ):
        if parent_config is None:
            parent_config = SyncConfig.root()
            is_root = True
        else:
            is_root = False

        self.config = SyncConfig.load(src, parent_config.defaults, is_root=is_root)

        dest = dest_dir / self.config.name

        super().__init__(src, dest, self.config.dir_opts())

    def matches_mode(self, mode: int) -> bool:
        return stat.S_ISDIR(mode)

    def branch(self, path: Union[PurePath, str]) -> list[SyncedItem]:
        """
        Returns the list of items described by a relative path. Returns an
        empty list if any of the path components don't exist.

        :path: A relative path in the source tree.

        """
        if isinstance(path, str):
            path = PurePath(path)

        assert (
            not path.is_absolute()
        ), f"SyncedDir.find_branch() requires a relative path (got {path})."

        try:
            items = list(
                accumulate(path.parts, getitem, initial=cast(SyncedItem, self))
            )
        except KeyError:
            items = []

        return items

    def find(self, path: Union[PurePath, str]) -> Optional[SyncedItem]:
        """
        Returns the item identified by a relative path, if it exists.

        :path: A relative path in the source tree.

        """
        if branch := self.branch(path):
            return branch[-1]
        else:
            return None

    @cached_property
    def children(self) -> Children:
        children: dict[str, SyncedItem] = {}

        for child_src in sorted(self.src.iterdir()):
            if child_src.name == CONFIG_NAME:
                continue

            child: SyncedItem
            if child_src.is_dir():
                child = SyncedDir(child_src, self.dest, self.config)
            elif child_src.is_file():
                child = SyncedFile(
                    child_src,
                    self.dest,
                    self.config.file_opts(child_src.name),
                )
            else:
                raise IOError(
                    f"Can't process {child_src}: neither file nor directory."
                )  # pragma: no cover

            if not child.opts.ignore:
                children[child_src.name] = child

        return children


class SyncedFile(SyncedItem):
    is_file = True

    def __init__(self, src: Path, dest_dir: PurePath, opts: Opts):
        super().__init__(src, dest_dir / opts.name, opts)

    def matches_mode(self, mode: int) -> bool:
        return stat.S_ISREG(mode)


class SyncedRoot(SyncedDir):
    """
    The root of a SyncedItem hierarchy.
    """


[docs] @dataclass(frozen=True) class ItemSelector: """ Identifies a subset of items under a :class:`.SyncedRoot`. This essentially defines a filter to apply when traversing the source tree. The default values match all items. """ #: A relative path into the source tree. We'll traverse the tree from #: this point. If the path isn't found, no items will be selected. subpath: Optional[PurePath] = None #: A set of tags to select. If empty, we will ignore tags. Otherwise, we'll #: only include items with at least one matching tag. tags: AbstractSet[str] = frozenset() #: If ``True`` (the default), we'll ensure that any time we select an item, #: all of its parent directories are selected as well. Thus, if you select #: a specific file—by path, tag, etc.—you're actually saying "select this #: item and all items necessary to reach it." with_parents: bool = True
[docs] @classmethod def new( cls, subpath: Union[PurePath, str, None] = None, tags: Iterable[str] = (), with_parents=True, ) -> ItemSelector: """ Creates a new :class:`ItemSelector`, with more flexible argument types. :param subpath: :type subpath: ~pathlib.PurePath or str or None :param tags: :type tags: ~typing.Iterable[str] :param bool with_parents: :rtype: ItemSelector """ if isinstance(subpath, str): subpath = PurePath(subpath) tags = frozenset(tags) return cls(subpath, tags, with_parents)
[docs] def load( path: Union[Path, str], dest: Union[PurePath, str] = '/', ) -> SyncedRoot: """ Loads a file tree for syncing. :param path: A path to the local file tree (usually relative). :type path: ~pathlib.Path or str :param dest: A remote path to sync to (usually absolute). :type dest: ~pathlib.PurePath or str :rtype: ~fabsync.files.SyncedRoot """ if isinstance(path, str): path = Path(path) if isinstance(dest, str): dest = PurePath(dest) return SyncedRoot(path, dest)
[docs] def walk(top: SyncedItem) -> Iterator[SyncedItem]: """ Generates a simple depth-first traversal of the items rooted at `top`. :param top: The starting point. :type top: ~fabsync.files.SyncedItem :rtype: ~typing.Iterator[~fabsync.files.SyncedItem] """ yield top for child in top.children.values(): yield from walk(child)
[docs] def select(root: SyncedRoot, selector: ItemSelector) -> Iterator[SyncedItem]: """ Traverses the file tree, returning only items that match `selector`. :param root: The root from :func:`fabsync.load`. :type root: ~fabsync.files.SyncedRoot :param selector: Identifies items to include. :type selector: ~fabsync.ItemSelector :rtype: ~typing.Iterator[~fabsync.files.SyncedItem] """ top: SyncedItem deferred: deque[SyncedItem] = deque() subpath = selector.subpath tags = selector.tags with_parents = selector.with_parents if subpath is not None: if branch := root.branch(subpath): top = branch[-1] if with_parents: deferred.extend(branch[:-1]) else: return else: top = root for item in walk(top): if tags and not (tags & item.opts.tags): if with_parents and isinstance(item, SyncedDir): deferred.append(item) continue for d in deferred: if d.src in item.src.parents: yield d deferred.clear() yield item
[docs] class TableRow(NamedTuple): """ Items generated by :func:`table`. Note that columns may be added in the future to include new features. Try not to make too many assumptions about this field list. """ #: The local path src: str #: The remote path path: str #: User name or id user: str #: Group name or id group: str #: Mode mode: str #: Renderer name renderer: str #: Does this item support diffs? diff: str #: Space-separated tags tags: str
[docs] def table( items: Iterable[SyncedItem], header=True, relative_src=False ) -> Iterator[TableRow]: """ A convenience function to generate human-readble data about items. This can be used to inspect your configuration by printing a table to the terminal or any other output format you find convenient. :param items: A collection of :class:`SyncedItem`, presumably from :func:`walk` or :func:`select`. :type items: ~typing.Iterable[SyncedItem] :param bool header: If ``True`` (the default), the first item will be a header row. :param bool relative_src: If ``True``, the ``src`` property of each row will be relative to the working directory. :rtype: ~typing.Iterator[TableRow] """ if header: yield TableRow( "Source", "Path", "User", "Group", "Mode", "Renderer", "Diff?", "Tags" ) for item in items: if item.is_dest_root(): continue opts = item.opts mode = opts.perms if mode != -1: if item.is_dir: mode |= stat.S_IFDIR elif item.is_file: mode |= stat.S_IFREG src = item.src if relative_src: try: src = item.src.relative_to(os.getcwd()) except ValueError: # pragma: no cover pass yield TableRow( f"{src}{'/' if item.is_dir else ''}", f"{item.dest}{'/' if item.is_dir else ''}", str(opts.user) if opts.has_user() else "", str(opts.group) if opts.has_group() else "", stat.filemode(mode) if opts.has_perms() else "", opts.renderer, "no" if (item.is_file and not opts.diff) else "", " ".join(sorted(opts.tags)), )
[docs] def renderers(items: Iterable[SyncedItem]) -> frozenset[str]: """ Returns the set of renderer names referenced by a collection of items. :param items: A collection of :class:`SyncedItem`, presumably from :func:`walk` or :func:`select`. :type items: ~typing.Iterable[SyncedItem] :rtype: frozenset[str] """ return frozenset( filter( None, (item.opts.renderer for item in items), ) )
[docs] def tags(items: Iterable[SyncedItem]) -> frozenset[str]: """ Returns the set of tags referenced by a collection of items. :param items: A collection of :class:`SyncedItem`, presumably from :func:`walk` or :func:`select`. :type items: ~typing.Iterable[SyncedItem] :rtype: frozenset[str] """ return frozenset( chain.from_iterable( (item.opts.tags for item in items), ) )