# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright 2021 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Execute a callable in a chroot environment."""
from __future__ import annotations
import logging
import multiprocessing
import os
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar
from craft_parts.utils import os_utils
from . import errors
if TYPE_CHECKING:
from collections.abc import Callable
from multiprocessing.connection import Connection
logger = logging.getLogger(__name__)
_T = TypeVar("_T")
[docs]
def chroot(path: Path, target: Callable[..., _T], *args: Any, **kwargs: Any) -> _T:
"""Execute a callable in a chroot environment.
:param path: The new filesystem root.
:param target: The callable to run in the chroot environment.
:param args: Arguments for target.
:param kwargs: Keyword arguments for target.
:returns: The target function return value.
"""
logger.debug("[pid=%d] parent process", os.getpid())
# This typehint technically should be "Connection[Any, tuple[_T, None] | tuple[None, str]]"
# However, types surrounding multiprocessing are finnicky at best and the way we handle the
# result here makes the typehint effectively true, since we don't attempt to access the first
# field of the tuple unless the second field is None.
parent_conn: Connection[Any, tuple[_T, str | None]]
parent_conn, child_conn = multiprocessing.Pipe()
child = multiprocessing.Process(
target=_runner, args=(Path(path), child_conn, target, args, kwargs)
)
logger.debug("[pid=%d] set up chroot", os.getpid())
_setup_chroot(path)
try:
child.start()
res, err = parent_conn.recv()
child.join()
finally:
logger.debug("[pid=%d] clean up chroot", os.getpid())
_cleanup_chroot(path)
if isinstance(err, str):
raise errors.OverlayChrootExecutionError(err)
return res
def _runner(
path: Path,
conn: Connection[tuple[_T, str | None], Any],
target: Callable[..., _T],
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> None:
"""Chroot to the execution directory and call the target function."""
logger.debug("[pid=%d] child process: target=%r", os.getpid(), target)
try:
logger.debug("[pid=%d] chroot to %r", os.getpid(), path)
os.chdir(path)
os.chroot(path)
res = target(*args, **kwargs)
except Exception as exc: # noqa: BLE001
# Just send None for data since it won't be accessed anyways
conn.send((None, str(exc))) # type: ignore[arg-type]
return
conn.send((res, None))
def _setup_chroot(path: Path) -> None:
"""Prepare the chroot environment before executing the target function."""
logger.debug("setup chroot: %r", path)
if sys.platform == "linux":
_setup_chroot_linux(path)
def _cleanup_chroot(path: Path) -> None:
"""Clean the chroot environment after executing the target function."""
logger.debug("cleanup chroot: %r", path)
if sys.platform == "linux":
_cleanup_chroot_linux(path)
class _Mount(NamedTuple):
"""Mount entry for chroot setup."""
fstype: str | None
src: str
mountpoint: str
options: list[str] | None
# Essential filesystems to mount in order to have basic utilities and
# name resolution working inside the chroot environment.
_linux_mounts: list[_Mount] = [
_Mount(None, "/etc/resolv.conf", "/etc/resolv.conf", ["--bind"]),
_Mount("proc", "proc", "/proc", None),
_Mount("sysfs", "sysfs", "/sys", None),
# Device nodes require MS_REC to be bind mounted inside a container.
_Mount(None, "/dev", "/dev", ["--rbind", "--make-rprivate"]),
]
def _setup_chroot_linux(path: Path) -> None:
"""Linux-specific chroot environment preparation."""
# Some images (such as cloudimgs) symlink ``/etc/resolv.conf`` to
# ``/run/systemd/resolve/stub-resolv.conf``. We want resolv.conf to be
# a regular file to bind-mount the host resolver configuration on.
#
# There's no need to restore the file to its original condition because
# this operation happens on a temporary filesystem layer.
resolv_conf = path / "etc/resolv.conf"
if resolv_conf.is_symlink():
resolv_conf.unlink()
resolv_conf.touch()
elif not resolv_conf.exists() and resolv_conf.parent.is_dir():
resolv_conf.touch()
pid = os.getpid()
for entry in _linux_mounts:
args = entry.options or []
if entry.fstype:
args.append(f"-t{entry.fstype}")
mountpoint = path / entry.mountpoint.lstrip("/")
# Only mount if mountpoint exists.
if mountpoint.exists():
logger.debug("[pid=%d] mount %r on chroot", pid, str(mountpoint))
os_utils.mount(entry.src, str(mountpoint), *args)
else:
logger.debug("[pid=%d] mountpoint %r does not exist", pid, str(mountpoint))
logger.debug("chroot setup complete")
def _cleanup_chroot_linux(path: Path) -> None:
"""Linux-specific chroot environment cleanup."""
pid = os.getpid()
for entry in reversed(_linux_mounts):
mountpoint = path / entry.mountpoint.lstrip("/")
if mountpoint.exists():
logger.debug("[pid=%d] umount: %r", pid, str(mountpoint))
# The activity executed in the chroot can lead to additional mounts
# under those mounted to prepare the chroot.
# Remount as private to ease unmounting.
os_utils.mount(str(mountpoint), "--make-rprivate")
args: list[str] = ["--recursive"]
if entry.options and "--rbind" in entry.options:
# Mount points under /dev may be in use and make the bind mount
# unmountable. This may happen in destructive mode depending on
# the host environment, so use MNT_DETACH to defer unmounting.
args.append("--lazy")
os_utils.umount(str(mountpoint), *args)