Skip to content
docker.py 14.9 KiB
Newer Older
Dom Sekotill's avatar
Dom Sekotill committed
#  Copyright 2021  Dominik Sekotill <dom.sekotill@kodo.org.uk>
#
#  This Source Code Form is subject to the terms of the Mozilla Public
#  License, v. 2.0. If a copy of the MPL was not distributed with this
#  file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
Commands for managing Docker for fixtures
"""

from __future__ import annotations

Dom Sekotill's avatar
Dom Sekotill committed
import ipaddress
import json
import logging
Dom Sekotill's avatar
Dom Sekotill committed
from contextlib import contextmanager
from os import PathLike
from os import fspath
Dom Sekotill's avatar
Dom Sekotill committed
from pathlib import Path
from secrets import token_hex
from subprocess import DEVNULL
from subprocess import PIPE
Dom Sekotill's avatar
Dom Sekotill committed
from subprocess import CalledProcessError
Dom Sekotill's avatar
Dom Sekotill committed
from subprocess import CompletedProcess
from subprocess import Popen
from subprocess import run
from types import TracebackType
from typing import IO
from typing import Any
from typing import Iterable
from typing import Iterator
Dom Sekotill's avatar
Dom Sekotill committed
from typing import Literal
from typing import SupportsBytes
Dom Sekotill's avatar
Dom Sekotill committed
from typing import Tuple
from typing import TypeVar
from typing import Union
from typing import cast
Dom Sekotill's avatar
Dom Sekotill committed
from typing import overload
from .binaries import DownloadableDocker
from .json import JSONArray
Dom Sekotill's avatar
Dom Sekotill committed
from .json import JSONObject
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import Argument
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import Arguments
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import Deserialiser
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import Environ
from .proc import MutableArguments
from .proc import coerce_args
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import exec_io
MountPath = Union[PathLike[bytes], PathLike[str]]
HostMount = tuple[MountPath, MountPath]
NamedMount = tuple[str, MountPath]
Mount = Union[HostMount, NamedMount, MountPath]
Dom Sekotill's avatar
Dom Sekotill committed
Volumes = Iterable[Mount]

	run([b"docker", b"version"], stdout=DEVNULL)
except FileNotFoundError:
	DOCKER: Argument = DownloadableDocker().get_binary()
	DOCKER = b"docker"
def utf8_decode(buffer: bytes) -> str:
	"""
	Return a decoded string from a bytes-like sequence of bytes
	"""
	return codecs.getdecoder("utf-8")(buffer)[0]


Dom Sekotill's avatar
Dom Sekotill committed
def docker(*args: Argument, **env: str) -> None:
Dom Sekotill's avatar
Dom Sekotill committed
	"""
	Run a Docker command, with output going to stdout
	"""
	run([DOCKER, *args], env=env, check=True)
Dom Sekotill's avatar
Dom Sekotill committed
def docker_output(*args: Argument, **env: str) -> str:
Dom Sekotill's avatar
Dom Sekotill committed
	"""
	Run a Docker command, capturing and returning its stdout
	"""
	proc = run([DOCKER, *args], env=env, check=True, stdout=PIPE, text=True)
Dom Sekotill's avatar
Dom Sekotill committed
	return proc.stdout.strip()


Dom Sekotill's avatar
Dom Sekotill committed
def docker_quiet(*args: Argument, **env: str) -> None:
Dom Sekotill's avatar
Dom Sekotill committed
	"""
	Run a Docker command, directing its stdout to /dev/null
	"""
	run([DOCKER, *args], env=env, check=True, stdout=DEVNULL)
Dom Sekotill's avatar
Dom Sekotill committed


class IPv4Address(ipaddress.IPv4Address):
	"""
	Subclass of IPv4Address that handle's docker idiosyncratic tendency to add a mask suffix
	"""

	T = TypeVar("T", bound="IPv4Address")

	@classmethod
	def with_suffix(cls: type[T], address: str) -> T:
		"""
		Construct an instance with a suffixed bitmask size
		"""
		address, *_ = address.partition("/")
		return cls(address)


class Item:
	"""
	A mix-in for Docker items that can be inspected
	"""

	def __init__(self, ident: str):
		self._id = ident

	def get_id(self) -> str:
		"""
		Return an identifier for the Docker item
		"""
		return self._id

	def inspect(self) -> JSONObject:
		"""
		Get the result of inspecting the Docker item
		"""
		with Popen([DOCKER, 'inspect', self.get_id()], stdout=PIPE) as proc:
			assert proc.stdout is not None
			results = json.load(proc.stdout)
		assert isinstance(results, list)
		assert len(results) == 1 and isinstance(results[0], dict)
		return JSONObject(results[0])


class Image(Item):
	"""
	Docker image items
	"""

	T = TypeVar('T', bound='Image')

	def __init__(self, iid: str):
		self.iid = iid

	@classmethod
	def build(cls: type[T], context: Path, target: str = "", **build_args: str|None) -> T:
		"""
		Build an image from the given context

		Build arguments are ignored if they are None to make it easier to supply (or not)
		arguments from external lookups without complex argument composing.
		"""
		cmd: Arguments = [
			b"build", context, f"--target={target}",
Dom Sekotill's avatar
Dom Sekotill committed
			*(f"--build-arg={arg}={val}" for arg, val in build_args.items() if val is not None),
		]
		docker(*cmd, DOCKER_BUILDKIT='1')
		iid = docker_output(*cmd, '-q', DOCKER_BUILDKIT='1')
		return cls(iid)

	@classmethod
	def pull(cls: type[T], repository: str) -> T:
		"""
		Pull an image from a registry
		"""
		docker(b"pull", repository)
Dom Sekotill's avatar
Dom Sekotill committed
		iid = Item(repository).inspect().path('$.Id', str)
		return cls(iid)

	def get_id(self) -> str:
		"""
		Return an identifier for the Docker Image
		"""
		return self.iid


class Container(Item):
	"""
	Docker container items

	Instances can be used as context managers that ensure the container is stopped on
	exiting the context.
	"""

	DEFAULT_ALIASES = tuple[str]()

	def __init__(
		self,
		image: Image,
		cmd: Arguments = [],
		volumes: Volumes = [],
		env: Environ = {},
		network: Network|None = None,
Dom Sekotill's avatar
Dom Sekotill committed
		entrypoint: HostMount|Argument|None = None,
		privileged: bool = False,
Dom Sekotill's avatar
Dom Sekotill committed
	):
		if isinstance(entrypoint, tuple):
			volumes = [*volumes, entrypoint]
			entrypoint = entrypoint[1]

		self.image = image
		self.cmd = cmd
		self.volumes = volumes
		self.env = env
		self.entrypoint = entrypoint
		self.privileged = privileged
Dom Sekotill's avatar
Dom Sekotill committed
		self.networks = dict[Network, Tuple[str, ...]]()
		self.cid: str|None = None

		if network:
			self.connect(network, *self.DEFAULT_ALIASES)
Dom Sekotill's avatar
Dom Sekotill committed

	def __enter__(self) -> Container:
		return self

	def __exit__(self, etype: type[BaseException], exc: BaseException, tb: TracebackType) -> None:
		if self.cid and exc:
			self.show_logs()
		self.stop(rm=True)

	@contextmanager
	def started(self) -> Iterator[Container]:
		"""
		Return a context manager that ensures the container is started when the context is entered
		"""
		with self:
			self.start()
			yield self

Dom Sekotill's avatar
Dom Sekotill committed
	def is_running(self, raise_on_exit: bool = False) -> bool:
Dom Sekotill's avatar
Dom Sekotill committed
		"""
		Return whether the container is running
		"""
		if self.cid is None:
			return False
		details = self.inspect()
		if details.path('$.State.Status', str) == 'exited':
			code = details.path('$.State.ExitCode', int)
			if code != 0:
				logging.getLogger(__name__).warning(
					f"container {self.cid} exited ({code})",
				)
Dom Sekotill's avatar
Dom Sekotill committed
			if raise_on_exit:
				cmd = details.path("$.Config.Entrypoint", list[str])
				cmd.extend(details.path("$.Config.Cmd", list[str]))
				raise CalledProcessError(code, cmd)
Dom Sekotill's avatar
Dom Sekotill committed
		return (
			self.cid is not None
			and details.path('$.State.Running', bool)
		)

	def get_id(self) -> str:
		"""
		Return an identifier for the Docker Container
		"""
		if self.cid is not None:
			return self.cid

		opts: MutableArguments = [
			b"--network=none",
Dom Sekotill's avatar
Dom Sekotill committed
			*(f"--env={name}={val}" for name, val in self.env.items()),
		]

		for vol in self.volumes:
			if isinstance(vol, tuple):
				src = fspath(vol[0])
				dst = fspath(vol[1])
				if isinstance(src, bytes):
					src = src.decode()
				if isinstance(dst, bytes):
					dst = dst.decode()
				arg: Argument = f"{src}:{dst}"
			else:
				arg = vol
			opts.extend((b"--volume", arg))

Dom Sekotill's avatar
Dom Sekotill committed
		if self.entrypoint:
			opts.extend((b"--entrypoint", self.entrypoint))
		if self.privileged:
			opts.append(b"--privileged")
		self.cid = docker_output(b"container", b"create", *opts, self.image.iid, *self.cmd)
Dom Sekotill's avatar
Dom Sekotill committed
		assert self.cid

		# Disconnect the "none" network specified as the starting network
		docker_quiet(b"network", b"disconnect", b"none", self.cid)
Dom Sekotill's avatar
Dom Sekotill committed
		return self.cid

	def start(self) -> None:
		"""
		Start the container
		"""
		if self.is_running():
			return
		docker_quiet(b"container", b"start", self.get_id())
Dom Sekotill's avatar
Dom Sekotill committed

	def stop(self, rm: bool = False) -> None:
		"""
		Stop the container
		"""
		if self.cid is None:
			return
		try:
			if self.is_running():
				docker_quiet(b"container", b"stop", self.cid)
		finally:
			if rm:
				docker_quiet(b"container", b"rm", self.cid)
				self.cid = None
	def connect(
		self,
		network: Network,
		*aliases: str,
		address: ipaddress.IPv4Address|ipaddress.IPv6Address|None = None,
	) -> None:
Dom Sekotill's avatar
Dom Sekotill committed
		"""
		Connect the container to a Docker network

		Any aliases supplied will be resolvable to the container by other containers on the
		network.
		"""
		cid = self.get_id()
		opts = [f'--alias={a}' for a in aliases]

		if address is None:
			address = network.get_free_address()
		opts.append(
			f"--ip={address}" if isinstance(address, ipaddress.IPv4Address) else
			f"--ip6={address}",
		)

Dom Sekotill's avatar
Dom Sekotill committed
		if network in self.networks:
			if self.networks[network] == aliases:
				return
			docker(b"network", b"disconnect", str(network), cid)
		docker(b"network", b"connect", *opts, str(network), cid)
Dom Sekotill's avatar
Dom Sekotill committed
		self.networks[network] = aliases

	def show_logs(self) -> None:
		"""
		Print the container logs to stdout
		"""
		if self.cid:
			docker(b"logs", self.cid)
Dom Sekotill's avatar
Dom Sekotill committed

	def get_exec_args(self, cmd: Arguments, interactive: bool = False) -> MutableArguments:
		"""
		Return a full argument list for running "cmd" inside the container
		"""
		return [DOCKER, b"exec", *((b"-i",) if interactive else []), self.get_id(), *cmd]
Dom Sekotill's avatar
Dom Sekotill committed

	def run(
		self,
		cmd: Arguments,
		*,
		stdin: IO[Any]|int|None = None,
		stdout: IO[Any]|int|None = None,
		stderr: IO[Any]|int|None = None,
		capture_output: bool = False,
		check: bool = False,
		input: bytes|None = None,
		timeout: float|None = None,
	) -> CompletedProcess[bytes]:
		"""
		Run "cmd" to completion inside the container and return the result
		"""
		return run(
			self.get_exec_args(cmd),
			stdin=stdin, stdout=stdout, stderr=stderr,
			capture_output=capture_output,
			check=check, timeout=timeout, input=input,
		)

	def exec(
		self,
		cmd: Arguments,
		*,
		stdin: IO[Any]|int|None = None,
		stdout: IO[Any]|int|None = None,
		stderr: IO[Any]|int|None = None,
	) -> Popen[bytes]:
		"""
		Execute "cmd" inside the container and return a process object once started
		"""
		return Popen(
			self.get_exec_args(cmd),
			stdin=stdin, stdout=stdout, stderr=stderr,
		)


class Network(Item):
	"""
	A Docker network
	"""

	DOCKER_SUBNET = ipaddress.IPv4Network("172.16.0.0/12")

Dom Sekotill's avatar
Dom Sekotill committed
	def __init__(self, name: str|None = None) -> None:
		self._name = name or f"br{token_hex(6)}"
		self._nid: str|None = None
Dom Sekotill's avatar
Dom Sekotill committed

	def __str__(self) -> str:
		return self._name

	def __repr__(self) -> str:
		cls = type(self)
		return f"<{cls.__module__}.{cls.__name__} {self._name}>"

	def __eq__(self, other: Any) -> bool:
		if not isinstance(other, Network):
			return self._name == str(other)
		return self._name == other._name

	def __hash__(self) -> int:
		return self._name.__hash__()

	def __enter__(self) -> Network:
		self.create()
		return self

	def __exit__(self, etype: type[BaseException], exc: BaseException, tb: TracebackType) -> None:
		self.destroy()

	@property
	def name(self) -> str:
		"""
		Return the name of the Docker network
		"""
		return self._name

	def get_id(self) -> str:
		"""
		Return an identifier for the Docker Network
		"""
		if self._nid is None:
			self.create()
		assert self._nid is not None
		return self._nid
Dom Sekotill's avatar
Dom Sekotill committed

	def create(self) -> None:
		"""
		Create the network
		"""
		subnet = self.get_free_subnet()
		gateway = next(subnet.hosts())
		try:
			self._nid = docker_output(
				b"network", b"create", self._name,
				f"--subnet={subnet}", f"--gateway={gateway}",
			)
		except CalledProcessError:
			data = exec_io(
				[DOCKER, b"network", b"inspect", self._name],
				deserialiser=JSONArray.from_string,
			)
			if len(data) == 0:
				raise
			self._nid = data.path("$[0].Id", str)
		assert self._nid is not None
Dom Sekotill's avatar
Dom Sekotill committed

	def destroy(self) -> None:
		"""
		Remove the network
		"""
		if self._nid:
			docker_quiet(b"network", b"rm", self._nid)
	@classmethod
	def get_free_subnet(cls) -> ipaddress.IPv4Network:
		"""
		Return a free private subnet
		"""
		networks = exec_io(
			[DOCKER, b"network", b"ls", b"--format={{.ID}}"],
			deserialiser=utf8_decode,
		).splitlines()
		subnets = exec_io(
			[DOCKER, b"network", b"inspect"] + cast(list[Argument], networks),
			deserialiser=JSONArray.from_string,
		).path(
			"$[*].IPAM.Config[*].Subnet", list[str],
			lambda subnets: {ipaddress.ip_network(net) for net in subnets},
		)
		for subnet in cls.DOCKER_SUBNET.subnets(8):
			if not any(net.overlaps(subnet) for net in subnets):
				return subnet
		raise LookupError(f"No free subnets found in subnet {cls.DOCKER_SUBNET}")

	def get_free_address(self) -> ipaddress.IPv4Address:
		"""
		Return a free address in the network

		Note that the address is not reserved; any changes made to the network such as
		adding a container may invalidate the assurance that the address is free.
		"""
		# TODO: support IPv6
		data = self.inspect()
		# Considering only the first listed subnet
		net = data.path("$.IPAM.Config[0].Subnet", str, ipaddress.IPv4Network)
		reserved = data.path(
			"$.Containers.*.IPv4Address", list[str],
			lambda addrs: {IPv4Address.with_suffix(a) for a in addrs},
		)
		reserved.add(data.path("$.IPAM.Config[0].Gateway", str, IPv4Address))
		for addr in net.hosts():
			if addr not in reserved:
				return addr
		raise LookupError(f"No free addresses found in subnet {net}")

Dom Sekotill's avatar
Dom Sekotill committed

class Cli:
	"""
	Manage calling executables in a container

	Any arguments passed to the constructor will prefix the arguments passed when the object
	is called.
	"""

	T = TypeVar("T")

Dom Sekotill's avatar
Dom Sekotill committed
	def __init__(self, container: Container, *cmd: Argument):
Dom Sekotill's avatar
Dom Sekotill committed
		self.container = container
		self.cmd = cmd

	@overload
	def __call__(
		self,
Dom Sekotill's avatar
Dom Sekotill committed
		*args: Argument,
Dom Sekotill's avatar
Dom Sekotill committed
		input: str|bytes|SupportsBytes|None = ...,
		deserialiser: Deserialiser[T],
		query: Literal[False] = False,
		**kwargs: Any,
	) -> T: ...

	@overload
	def __call__(
		self,
Dom Sekotill's avatar
Dom Sekotill committed
		*args: Argument,
Dom Sekotill's avatar
Dom Sekotill committed
		input: str|bytes|SupportsBytes|None = ...,
		deserialiser: None = None,
		query: Literal[True],
		**kwargs: Any,
	) -> int: ...

	@overload
	def __call__(
		self,
Dom Sekotill's avatar
Dom Sekotill committed
		*args: Argument,
Dom Sekotill's avatar
Dom Sekotill committed
		input: str|bytes|SupportsBytes|None = ...,
		deserialiser: None = None,
		query: Literal[False] = False,
		**kwargs: Any,
	) -> None: ...

	def __call__(
		self,
Dom Sekotill's avatar
Dom Sekotill committed
		*args: Argument,
Dom Sekotill's avatar
Dom Sekotill committed
		input: str|bytes|SupportsBytes|None = None,
		deserialiser: Deserialiser[Any]|None = None,
		query: bool = False,
		**kwargs: Any,
	) -> Any:
		"""
		Run the container executable with the given arguments

		Input:
			Any bytes passed as "input" will be fed into the process' stdin pipe.

		Output:
			If "deserialiser" is provided it will be called with a memoryview of a buffer
			containing any bytes from the process' stdout; whatever is returned by
			"deserialiser" will be returned.

			If "query" is true the return code of the process will be returned.

			Otherwise nothing is returned.

			Note that "deserialiser" and "query" are mutually exclusive; if debugging is
			enabled an AssertionError will be raised if both are non-None/non-False, otherwise
			"query" is ignored.

		Errors:
			If "query" is not true any non-zero return code will cause CalledProcessError to
			be raised.
		"""
		# deserialiser = kwargs.pop('deserialiser', None)
		assert not deserialiser or not query

		data = (
			b"" if input is None else
			input.encode() if isinstance(input, str) else
			bytes(input)
		)
		cmd = self.container.get_exec_args([*self.cmd, *args], interactive=bool(data))

		if deserialiser:
Dom Sekotill's avatar
Dom Sekotill committed
			return exec_io(cmd, data=data, deserialiser=deserialiser, **kwargs)
Dom Sekotill's avatar
Dom Sekotill committed
		rcode = exec_io(cmd, data=data, **kwargs)
Dom Sekotill's avatar
Dom Sekotill committed
		if query:
			return rcode
		if not isinstance(rcode, int):
			raise TypeError(f"got rcode {rcode!r}")
		if 0 != rcode:
			raise CalledProcessError(rcode, ' '.join(coerce_args(cmd)))
		return None