Source code for craft_parts.utils.file_utils

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2016-2021 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""File-related utilities."""

import contextlib
import errno
import hashlib
import logging
import os
import shutil
import sys
from pathlib import Path
from typing import Callable, Generator, List, Optional, Set

from craft_parts import errors
from craft_parts.permissions import Permissions, apply_permissions

logger = logging.getLogger(__name__)


[docs]class NonBlockingRWFifo: """A non-blocking FIFO for reading and writing.""" def __init__(self, path: str) -> None: os.mkfifo(path) self._path = path # Using RDWR for every FIFO just so we can open them reliably whenever # (i.e. write-only FIFOs can't be opened successfully until the reader # is in place) self._fd = os.open(self._path, os.O_RDWR | os.O_NONBLOCK) @property def path(self) -> str: """Return the path to the FIFO file.""" return self._path
[docs] def read(self) -> str: """Read from the FIFO.""" total_read = "" with contextlib.suppress(BlockingIOError): value = os.read(self._fd, 1024) while value: total_read += value.decode(sys.getfilesystemencoding()) value = os.read(self._fd, 1024) return total_read
[docs] def write(self, data: str) -> int: """Write to the FIFO. :param data: The data to write. """ return os.write(self._fd, data.encode(sys.getfilesystemencoding()))
[docs] def close(self) -> None: """Close the FIFO.""" if self._fd is not None: os.close(self._fd)
[docs]def copy( source: str, destination: str, *, follow_symlinks: bool = False, permissions: Optional[List[Permissions]] = None, ) -> None: """Copy source and destination files. This function overwrites the destination if it already exists, and also tries to copy ownership information. :param source: The source to be copied to destination. :param destination: Where to put the copy. :param follow_symlinks: Whether or not symlinks should be followed. :param permissions: The permissions definitions that should be applied to the new file. :raises CopyFileNotFound: If source doesn't exist. """ # If os.link raised an I/O error, it may have left a file behind. Skip on # OSError in case it doesn't exist or is a directory. with contextlib.suppress(OSError): os.unlink(destination) try: shutil.copy2(source, destination, follow_symlinks=follow_symlinks) except FileNotFoundError as err: raise errors.CopyFileNotFound(source) from err uid = os.stat(source, follow_symlinks=follow_symlinks).st_uid gid = os.stat(source, follow_symlinks=follow_symlinks).st_gid try: os.chown(destination, uid, gid, follow_symlinks=follow_symlinks) except PermissionError as err: logger.debug("Unable to chown %s: %s", destination, err) if permissions: apply_permissions(destination, permissions)
[docs]def create_similar_directory( source: str, destination: str, permissions: Optional[List[Permissions]] = None ) -> None: """Create a directory with the same permission bits and owner information. :param source: Directory from which to copy name, permission bits, and owner information. :param destination: Directory to create and to which the ``source`` information will be copied. :param permissions: The permission definitions to apply to the new directory. If omitted, the new directory will have the same permissions and ownership of ``source``. """ stat = os.stat(source, follow_symlinks=False) uid = stat.st_uid gid = stat.st_gid os.makedirs(destination, exist_ok=True) # Windows does not have "os.chown" implementation and copystat # is unlikely to be useful, so just bail after creating directory. if sys.platform == "win32": return try: os.chown(destination, uid, gid, follow_symlinks=False) except PermissionError as exception: logger.debug("Unable to chown %s: %s", destination, exception) shutil.copystat(source, destination, follow_symlinks=False) if permissions: apply_permissions(destination, permissions)
[docs]def calculate_hash(filename: Path, *, algorithm: str) -> str: """Calculate the hash of the given file. :param filename: The path to the file to digest. :param algorithm: The algorithm to use, as defined by ``hashlib``. :return: The file hash. :raise ValueError: If the algorithm is unsupported. """ if algorithm not in hashlib.algorithms_available: raise ValueError(f"unsupported algorithm {algorithm!r}") hasher = hashlib.new(algorithm) for block in _file_reader_iter(filename): hasher.update(block) return hasher.hexdigest()
def _file_reader_iter( path: Path, block_size: int = 2**20 ) -> Generator[bytes, None, None]: """Read a file in blocks. :param path: The path to the file to read. :param block_size: The size of the block to read, default is 1MiB. """ with path.open("rb") as file: block = file.read(block_size) while len(block) > 0: yield block block = file.read(block_size)