Source code for craft_parts.overlays.chroot

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2021 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Execute a callable in a chroot environment."""

import logging
import multiprocessing
import os
import sys
from collections import namedtuple
from multiprocessing.connection import Connection
from pathlib import Path
from typing import Any, Callable, Dict, List, Tuple

from craft_parts.utils import os_utils

from . import errors

logger = logging.getLogger(__name__)


[docs]def chroot(path: Path, target: Callable, *args: Any, **kwargs: Any) -> Any: """Execute a callable in a chroot environment. :param path: The new filesystem root. :param target: The callable to run in the chroot environment. :param args: Arguments for target. :param kwargs: Keyword arguments for target. :returns: The target function return value. """ logger.debug("[pid=%d] parent process", os.getpid()) parent_conn, child_conn = multiprocessing.Pipe() child = multiprocessing.Process( target=_runner, args=(Path(path), child_conn, target, args, kwargs) ) logger.debug("[pid=%d] set up chroot", os.getpid()) _setup_chroot(path) try: child.start() res, err = parent_conn.recv() child.join() finally: logger.debug("[pid=%d] clean up chroot", os.getpid()) _cleanup_chroot(path) if isinstance(err, str): raise errors.OverlayChrootExecutionError(err) return res
def _runner( path: Path, conn: Connection, target: Callable, args: Tuple, kwargs: Dict, ) -> None: """Chroot to the execution directory and call the target function.""" logger.debug("[pid=%d] child process: target=%r", os.getpid(), target) try: logger.debug("[pid=%d] chroot to %r", os.getpid(), path) os.chdir(path) os.chroot(path) res = target(*args, **kwargs) except Exception as exc: # pylint: disable=broad-except conn.send((None, str(exc))) return conn.send((res, None)) def _setup_chroot(path: Path) -> None: """Prepare the chroot environment before executing the target function.""" logger.debug("setup chroot: %r", path) if sys.platform == "linux": _setup_chroot_linux(path) def _cleanup_chroot(path: Path) -> None: """Clean the chroot environment after executing the target function.""" logger.debug("cleanup chroot: %r", path) if sys.platform == "linux": _cleanup_chroot_linux(path) _Mount = namedtuple("_Mount", ["fstype", "src", "mountpoint", "options"]) # Essential filesystems to mount in order to have basic utilities and # name resolution working inside the chroot environment. _linux_mounts: List[_Mount] = [ _Mount(None, "/etc/resolv.conf", "/etc/resolv.conf", ["--bind"]), _Mount("proc", "proc", "/proc", None), _Mount("sysfs", "sysfs", "/sys", None), # Device nodes require MS_REC to be bind mounted inside a container. _Mount(None, "/dev", "/dev", ["--rbind", "--make-rprivate"]), ] def _setup_chroot_linux(path: Path) -> None: """Linux-specific chroot environment preparation.""" # Some images (such as cloudimgs) symlink ``/etc/resolv.conf`` to # ``/run/systemd/resolve/stub-resolv.conf``. We want resolv.conf to be # a regular file to bind-mount the host resolver configuration on. # # There's no need to restore the file to its original condition because # this operation happens on a temporary filesystem layer. resolv_conf = path / "etc/resolv.conf" if resolv_conf.is_symlink(): resolv_conf.unlink() resolv_conf.touch() elif not resolv_conf.exists() and resolv_conf.parent.is_dir(): resolv_conf.touch() pid = os.getpid() for entry in _linux_mounts: args = [] if entry.options: args.extend(entry.options) if entry.fstype: args.append(f"-t{entry.fstype}") mountpoint = path / entry.mountpoint.lstrip("/") # Only mount if mountpoint exists. if mountpoint.exists(): logger.debug("[pid=%d] mount %r on chroot", pid, str(mountpoint)) os_utils.mount(entry.src, str(mountpoint), *args) else: logger.debug("[pid=%d] mountpoint %r does not exist", pid, str(mountpoint)) logger.debug("chroot setup complete") def _cleanup_chroot_linux(path: Path) -> None: """Linux-specific chroot environment cleanup.""" pid = os.getpid() for entry in reversed(_linux_mounts): mountpoint = path / entry.mountpoint.lstrip("/") if mountpoint.exists(): logger.debug("[pid=%d] umount: %r", pid, str(mountpoint)) if entry.options and "--rbind" in entry.options: # Mount points under /dev may be in use and make the bind mount # unmountable. This may happen in destructive mode depending on # the host environment, so use MNT_DETACH to defer unmounting. os_utils.umount(str(mountpoint), "--recursive", "--lazy") else: os_utils.umount(str(mountpoint))