# Copyright 2021 Dominik Sekotill # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """ Commands for managing Docker for fixtures """ from __future__ import annotations import ipaddress import json from contextlib import contextmanager from pathlib import Path from secrets import token_hex from subprocess import DEVNULL from subprocess import PIPE from subprocess import CalledProcessError from subprocess import CompletedProcess from subprocess import Popen from subprocess import run from types import TracebackType from typing import IO from typing import Any from typing import Iterable from typing import Iterator from typing import Literal from typing import SupportsBytes from typing import Tuple from typing import TypeVar from typing import Union from typing import overload from .json import JSONObject from .proc import Argument from .proc import Arguments from .proc import Deserialiser from .proc import Environ from .proc import MutableArguments from .proc import PathLike from .proc import coerce_args from .proc import exec_io HostMount = tuple[PathLike, PathLike] NamedMount = tuple[str, PathLike] AnonMount = PathLike Mount = Union[HostMount, NamedMount, AnonMount] Volumes = Iterable[Mount] DOCKER = "docker" def docker(*args: Argument, **env: str) -> None: """ Run a Docker command, with output going to stdout """ run([DOCKER, *coerce_args(args)], env=env, check=True) def docker_output(*args: Argument, **env: str) -> str: """ Run a Docker command, capturing and returning its stdout """ proc = run([DOCKER, *coerce_args(args)], env=env, check=True, stdout=PIPE, text=True) return proc.stdout.strip() def docker_quiet(*args: Argument, **env: str) -> None: """ Run a Docker command, directing its stdout to /dev/null """ run([DOCKER, *coerce_args(args)], env=env, check=True, stdout=DEVNULL) class IPv4Address(ipaddress.IPv4Address): """ Subclass of IPv4Address that handle's docker idiosyncratic tendency to add a mask suffix """ T = TypeVar("T", bound="IPv4Address") @classmethod def with_suffix(cls: type[T], address: str) -> T: """ Construct an instance with a suffixed bitmask size """ address, *_ = address.partition("/") return cls(address) class Item: """ A mix-in for Docker items that can be inspected """ def __init__(self, ident: str): self._id = ident def get_id(self) -> str: """ Return an identifier for the Docker item """ return self._id def inspect(self) -> JSONObject: """ Get the result of inspecting the Docker item """ with Popen([DOCKER, 'inspect', self.get_id()], stdout=PIPE) as proc: assert proc.stdout is not None results = json.load(proc.stdout) assert isinstance(results, list) assert len(results) == 1 and isinstance(results[0], dict) return JSONObject(results[0]) class Image(Item): """ Docker image items """ T = TypeVar('T', bound='Image') def __init__(self, iid: str): self.iid = iid @classmethod def build(cls: type[T], context: Path, target: str = "", **build_args: str|None) -> T: """ Build an image from the given context Build arguments are ignored if they are None to make it easier to supply (or not) arguments from external lookups without complex argument composing. """ cmd: Arguments = [ 'build', context, f"--target={target}", *(f"--build-arg={arg}={val}" for arg, val in build_args.items() if val is not None), ] docker(*cmd, DOCKER_BUILDKIT='1') iid = docker_output(*cmd, '-q', DOCKER_BUILDKIT='1') return cls(iid) @classmethod def pull(cls: type[T], repository: str) -> T: """ Pull an image from a registry """ docker('pull', repository) iid = Item(repository).inspect().path('$.Id', str) return cls(iid) def get_id(self) -> str: """ Return an identifier for the Docker Image """ return self.iid class Container(Item): """ Docker container items Instances can be used as context managers that ensure the container is stopped on exiting the context. """ DEFAULT_ALIASES = tuple[str]() def __init__( self, image: Image, cmd: Arguments = [], volumes: Volumes = [], env: Environ = {}, network: Network|None = None, entrypoint: HostMount|Argument|None = None, ): if isinstance(entrypoint, tuple): volumes = [*volumes, entrypoint] entrypoint = entrypoint[1] self.image = image self.cmd = cmd self.volumes = volumes self.env = env self.entrypoint = entrypoint self.networks = dict[Network, Tuple[str, ...]]() self.cid: str|None = None if network: self.connect(network, *self.DEFAULT_ALIASES) def __enter__(self) -> Container: return self def __exit__(self, etype: type[BaseException], exc: BaseException, tb: TracebackType) -> None: if self.cid and exc: self.show_logs() self.stop(rm=True) @contextmanager def started(self) -> Iterator[Container]: """ Return a context manager that ensures the container is started when the context is entered """ with self: self.start() yield self def is_running(self) -> bool: """ Return whether the container is running """ if self.cid is None: return False details = self.inspect() if details.path('$.State.Status', str) == 'exited': code = details.path('$.State.ExitCode', int) raise ProcessLookupError(f"container {self.cid} exited ({code})") return ( self.cid is not None and details.path('$.State.Running', bool) ) def get_id(self) -> str: """ Return an identifier for the Docker Container """ if self.cid is not None: return self.cid opts: MutableArguments = [ "--network=none", *( (f"--volume={vol[0]}:{vol[1]}" if isinstance(vol, tuple) else f"--volume={vol}") for vol in self.volumes ), *(f"--env={name}={val}" for name, val in self.env.items()), ] if self.entrypoint: opts.append(f"--entrypoint={self.entrypoint}") self.cid = docker_output('container', 'create', *opts, self.image.iid, *self.cmd) assert self.cid # Disconnect the "none" network specified as the starting network docker_quiet("network", "disconnect", "none", self.cid) return self.cid def start(self) -> None: """ Start the container """ docker_quiet('container', 'start', self.get_id()) def stop(self, rm: bool = False) -> None: """ Stop the container """ if self.cid is None: return docker_quiet('container', 'stop', self.cid) if rm: docker_quiet('container', 'rm', self.cid) self.cid = None def connect(self, network: Network, *aliases: str) -> None: """ Connect the container to a Docker network Any aliases supplied will be resolvable to the container by other containers on the network. """ cid = self.get_id() if network in self.networks: if self.networks[network] == aliases: return docker('network', 'disconnect', str(network), cid) docker( 'network', 'connect', *(f'--alias={a}' for a in aliases), str(network), cid, ) self.networks[network] = aliases def show_logs(self) -> None: """ Print the container logs to stdout """ if self.cid: docker('logs', self.cid) def get_exec_args(self, cmd: Arguments, interactive: bool = False) -> MutableArguments: """ Return a full argument list for running "cmd" inside the container """ return [DOCKER, "exec", *(("-i",) if interactive else ""), self.get_id(), *coerce_args(cmd)] def run( self, cmd: Arguments, *, stdin: IO[Any]|int|None = None, stdout: IO[Any]|int|None = None, stderr: IO[Any]|int|None = None, capture_output: bool = False, check: bool = False, input: bytes|None = None, timeout: float|None = None, ) -> CompletedProcess[bytes]: """ Run "cmd" to completion inside the container and return the result """ return run( self.get_exec_args(cmd), stdin=stdin, stdout=stdout, stderr=stderr, capture_output=capture_output, check=check, timeout=timeout, input=input, ) def exec( self, cmd: Arguments, *, stdin: IO[Any]|int|None = None, stdout: IO[Any]|int|None = None, stderr: IO[Any]|int|None = None, ) -> Popen[bytes]: """ Execute "cmd" inside the container and return a process object once started """ return Popen( self.get_exec_args(cmd), stdin=stdin, stdout=stdout, stderr=stderr, ) class Network(Item): """ A Docker network """ def __init__(self, name: str|None = None) -> None: self._name = name or f"br{token_hex(6)}" def __str__(self) -> str: return self._name def __repr__(self) -> str: cls = type(self) return f"<{cls.__module__}.{cls.__name__} {self._name}>" def __eq__(self, other: Any) -> bool: if not isinstance(other, Network): return self._name == str(other) return self._name == other._name def __hash__(self) -> int: return self._name.__hash__() def __enter__(self) -> Network: self.create() return self def __exit__(self, etype: type[BaseException], exc: BaseException, tb: TracebackType) -> None: self.destroy() @property def name(self) -> str: """ Return the name of the Docker network """ return self._name def get_id(self) -> str: """ Return an identifier for the Docker Network """ return self._name def create(self) -> None: """ Create the network """ docker_quiet("network", "create", self._name) def destroy(self) -> None: """ Remove the network """ docker_quiet("network", "rm", self._name) class Cli: """ Manage calling executables in a container Any arguments passed to the constructor will prefix the arguments passed when the object is called. """ T = TypeVar("T") def __init__(self, container: Container, *cmd: Argument): self.container = container self.cmd = cmd @overload def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: Deserialiser[T], query: Literal[False] = False, **kwargs: Any, ) -> T: ... @overload def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: None = None, query: Literal[True], **kwargs: Any, ) -> int: ... @overload def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: None = None, query: Literal[False] = False, **kwargs: Any, ) -> None: ... def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = None, deserialiser: Deserialiser[Any]|None = None, query: bool = False, **kwargs: Any, ) -> Any: """ Run the container executable with the given arguments Input: Any bytes passed as "input" will be fed into the process' stdin pipe. Output: If "deserialiser" is provided it will be called with a memoryview of a buffer containing any bytes from the process' stdout; whatever is returned by "deserialiser" will be returned. If "query" is true the return code of the process will be returned. Otherwise nothing is returned. Note that "deserialiser" and "query" are mutually exclusive; if debugging is enabled an AssertionError will be raised if both are non-None/non-False, otherwise "query" is ignored. Errors: If "query" is not true any non-zero return code will cause CalledProcessError to be raised. """ # deserialiser = kwargs.pop('deserialiser', None) assert not deserialiser or not query data = ( b"" if input is None else input.encode() if isinstance(input, str) else bytes(input) ) cmd = self.container.get_exec_args([*self.cmd, *args], interactive=bool(data)) if deserialiser: return exec_io(cmd, data, deserialiser=deserialiser, **kwargs) rcode = exec_io(cmd, data, **kwargs) if query: return rcode if not isinstance(rcode, int): raise TypeError(f"got rcode {rcode!r}") if 0 != rcode: raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) return None