# Copyright 2021 Dominik Sekotill # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """ Commands for managing Docker for fixtures """ from __future__ import annotations import ipaddress import json from contextlib import contextmanager from pathlib import Path from secrets import token_hex from subprocess import DEVNULL from subprocess import PIPE from subprocess import CompletedProcess from subprocess import Popen from subprocess import run from types import TracebackType from typing import IO from typing import Any from typing import Iterable from typing import Iterator from typing import Tuple from typing import TypeVar from typing import Union from .json import JSONObject from .proc import Arguments from .proc import Environ from .proc import MutableArguments from .proc import PathArg from .proc import PathLike from .proc import coerce_args HostMount = tuple[PathLike, PathLike] NamedMount = tuple[str, PathLike] AnonMount = PathLike Mount = Union[HostMount, NamedMount, AnonMount] Volumes = Iterable[Mount] DOCKER = "docker" def docker(*args: PathArg, **env: str) -> None: """ Run a Docker command, with output going to stdout """ run([DOCKER, *coerce_args(args)], env=env, check=True) def docker_output(*args: PathArg, **env: str) -> str: """ Run a Docker command, capturing and returning its stdout """ proc = run([DOCKER, *coerce_args(args)], env=env, check=True, stdout=PIPE, text=True) return proc.stdout.strip() def docker_quiet(*args: PathArg, **env: str) -> None: """ Run a Docker command, directing its stdout to /dev/null """ run([DOCKER, *coerce_args(args)], env=env, check=True, stdout=DEVNULL) class IPv4Address(ipaddress.IPv4Address): """ Subclass of IPv4Address that handle's docker idiosyncratic tendency to add a mask suffix """ T = TypeVar("T", bound="IPv4Address") @classmethod def with_suffix(cls: type[T], address: str) -> T: """ Construct an instance with a suffixed bitmask size """ address, *_ = address.partition("/") return cls(address) class Item: """ A mix-in for Docker items that can be inspected """ def __init__(self, ident: str): self._id = ident def get_id(self) -> str: """ Return an identifier for the Docker item """ return self._id def inspect(self) -> JSONObject: """ Get the result of inspecting the Docker item """ with Popen([DOCKER, 'inspect', self.get_id()], stdout=PIPE) as proc: assert proc.stdout is not None results = json.load(proc.stdout) assert isinstance(results, list) assert len(results) == 1 and isinstance(results[0], dict) return JSONObject(results[0]) class Image(Item): """ Docker image items """ T = TypeVar('T', bound='Image') def __init__(self, iid: str): self.iid = iid @classmethod def build(cls: type[T], context: Path, target: str = "", **build_args: str|None) -> T: """ Build an image from the given context Build arguments are ignored if they are None to make it easier to supply (or not) arguments from external lookups without complex argument composing. """ cmd: Arguments = [ 'build', context, f"--target={target}", *(f"--build-arg={arg}={val}" for arg, val in build_args.items() if val is not None), ] docker(*cmd, DOCKER_BUILDKIT='1') iid = docker_output(*cmd, '-q', DOCKER_BUILDKIT='1') return cls(iid) @classmethod def pull(cls: type[T], repository: str) -> T: """ Pull an image from a registry """ docker('pull', repository) iid = Item(repository).inspect().path('$.Id', str) return cls(iid) def get_id(self) -> str: """ Return an identifier for the Docker Image """ return self.iid class Container(Item): """ Docker container items Instances can be used as context managers that ensure the container is stopped on exiting the context. """ DEFAULT_ALIASES = tuple[str]() def __init__( self, image: Image, cmd: Arguments = [], volumes: Volumes = [], env: Environ = {}, network: Network|None = None, entrypoint: HostMount|PathArg|None = None, ): if isinstance(entrypoint, tuple): volumes = [*volumes, entrypoint] entrypoint = entrypoint[1] self.image = image self.cmd = cmd self.volumes = volumes self.env = env self.entrypoint = entrypoint self.networks = dict[Network, Tuple[str, ...]]() self.cid: str|None = None if network: self.connect(network, *self.DEFAULT_ALIASES) def __enter__(self) -> Container: return self def __exit__(self, etype: type[BaseException], exc: BaseException, tb: TracebackType) -> None: if self.cid and exc: self.show_logs() self.stop(rm=True) @contextmanager def started(self) -> Iterator[Container]: """ Return a context manager that ensures the container is started when the context is entered """ with self: self.start() yield self def is_running(self) -> bool: """ Return whether the container is running """ if self.cid is None: return False details = self.inspect() if details.path('$.State.Status', str) == 'exited': code = details.path('$.State.ExitCode', int) raise ProcessLookupError(f"container {self.cid} exited ({code})") return ( self.cid is not None and details.path('$.State.Running', bool) ) def get_id(self) -> str: """ Return an identifier for the Docker Container """ if self.cid is not None: return self.cid opts: MutableArguments = [ "--network=none", *( (f"--volume={vol[0]}:{vol[1]}" if isinstance(vol, tuple) else f"--volume={vol}") for vol in self.volumes ), *(f"--env={name}={val}" for name, val in self.env.items()), ] if self.entrypoint: opts.append(f"--entrypoint={self.entrypoint}") self.cid = docker_output('container', 'create', *opts, self.image.iid, *self.cmd) assert self.cid # Disconnect the "none" network specified as the starting network docker_quiet("network", "disconnect", "none", self.cid) return self.cid def start(self) -> None: """ Start the container """ docker_quiet('container', 'start', self.get_id()) def stop(self, rm: bool = False) -> None: """ Stop the container """ if self.cid is None: return docker_quiet('container', 'stop', self.cid) if rm: docker_quiet('container', 'rm', self.cid) self.cid = None def connect(self, network: Network, *aliases: str) -> None: """ Connect the container to a Docker network Any aliases supplied will be resolvable to the container by other containers on the network. """ cid = self.get_id() if network in self.networks: if self.networks[network] == aliases: return docker('network', 'disconnect', str(network), cid) docker( 'network', 'connect', *(f'--alias={a}' for a in aliases), str(network), cid, ) self.networks[network] = aliases def show_logs(self) -> None: """ Print the container logs to stdout """ if self.cid: docker('logs', self.cid) def get_exec_args(self, cmd: Arguments, interactive: bool = False) -> MutableArguments: """ Return a full argument list for running "cmd" inside the container """ return [DOCKER, "exec", *(("-i",) if interactive else ""), self.get_id(), *coerce_args(cmd)] def run( self, cmd: Arguments, *, stdin: IO[Any]|int|None = None, stdout: IO[Any]|int|None = None, stderr: IO[Any]|int|None = None, capture_output: bool = False, check: bool = False, input: bytes|None = None, timeout: float|None = None, ) -> CompletedProcess[bytes]: """ Run "cmd" to completion inside the container and return the result """ return run( self.get_exec_args(cmd), stdin=stdin, stdout=stdout, stderr=stderr, capture_output=capture_output, check=check, timeout=timeout, input=input, ) def exec( self, cmd: Arguments, *, stdin: IO[Any]|int|None = None, stdout: IO[Any]|int|None = None, stderr: IO[Any]|int|None = None, ) -> Popen[bytes]: """ Execute "cmd" inside the container and return a process object once started """ return Popen( self.get_exec_args(cmd), stdin=stdin, stdout=stdout, stderr=stderr, ) class Network(Item): """ A Docker network """ def __init__(self, name: str|None = None) -> None: self._name = name or f"br{token_hex(6)}" def __str__(self) -> str: return self._name def __repr__(self) -> str: cls = type(self) return f"<{cls.__module__}.{cls.__name__} {self._name}>" def __eq__(self, other: Any) -> bool: if not isinstance(other, Network): return self._name == str(other) return self._name == other._name def __hash__(self) -> int: return self._name.__hash__() def __enter__(self) -> Network: self.create() return self def __exit__(self, etype: type[BaseException], exc: BaseException, tb: TracebackType) -> None: self.destroy() @property def name(self) -> str: """ Return the name of the Docker network """ return self._name def get_id(self) -> str: """ Return an identifier for the Docker Network """ return self._name def create(self) -> None: """ Create the network """ docker_quiet("network", "create", self._name) def destroy(self) -> None: """ Remove the network """ docker_quiet("network", "rm", self._name)