Skip to content
docker.py 16.1 KiB
Newer Older
#  Copyright 2021-2022  Dominik Sekotill <dom.sekotill@kodo.org.uk>
Dom Sekotill's avatar
Dom Sekotill committed
#
#  This Source Code Form is subject to the terms of the Mozilla Public
#  License, v. 2.0. If a copy of the MPL was not distributed with this
#  file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
Commands for managing Docker for fixtures
"""

from __future__ import annotations

Dom Sekotill's avatar
Dom Sekotill committed
import ipaddress
import json
import logging
Dom Sekotill's avatar
Dom Sekotill committed
from contextlib import contextmanager
from enum import Enum
from os import PathLike
from os import environ
from os import fspath
Dom Sekotill's avatar
Dom Sekotill committed
from pathlib import Path
from secrets import token_hex
from subprocess import DEVNULL
from subprocess import PIPE
Dom Sekotill's avatar
Dom Sekotill committed
from subprocess import CalledProcessError
Dom Sekotill's avatar
Dom Sekotill committed
from subprocess import CompletedProcess
from subprocess import Popen
from subprocess import run
from types import TracebackType
from typing import IO
from typing import Any
from typing import Iterable
from typing import Iterator
from typing import MutableMapping
Dom Sekotill's avatar
Dom Sekotill committed
from typing import Tuple
from typing import TypeVar
from typing import Union
from typing import cast
from urllib.parse import urlparse
from .binaries import DownloadableDocker
from .json import JSONArray
Dom Sekotill's avatar
Dom Sekotill committed
from .json import JSONObject
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import Argument
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import Arguments
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import Deserialiser
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import Environ
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import MutableArguments
Dom Sekotill's avatar
Dom Sekotill committed
from .proc import exec_io
LOCALHOST = ipaddress.IPv4Address(0x7f000001)

MountPath = Union[PathLike[bytes], PathLike[str]]
HostMount = tuple[MountPath, MountPath]
NamedMount = tuple[str, MountPath]
Mount = Union[HostMount, NamedMount, MountPath]
Dom Sekotill's avatar
Dom Sekotill committed
Volumes = Iterable[Mount]

IPAddress = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]

	run([b"docker", b"version"], stdout=DEVNULL)
except FileNotFoundError:
	DOCKER: Argument = DownloadableDocker().get_binary()
	DOCKER = b"docker"
def utf8_decode(buffer: bytes) -> str:
	"""
	Return a decoded string from a bytes-like sequence of bytes
	"""
	return codecs.getdecoder("utf-8")(buffer)[0]


Dom Sekotill's avatar
Dom Sekotill committed
def docker(*args: Argument, **env: str) -> None:
Dom Sekotill's avatar
Dom Sekotill committed
	"""
	Run a Docker command, with output going to stdout
	"""
	run([DOCKER, *args], env={**environ, **env}, check=True)
Dom Sekotill's avatar
Dom Sekotill committed
def docker_output(*args: Argument, **env: str) -> str:
Dom Sekotill's avatar
Dom Sekotill committed
	"""
	Run a Docker command, capturing and returning its stdout
	"""
	proc = run([DOCKER, *args], env={**environ, **env}, check=True, stdout=PIPE, text=True)
Dom Sekotill's avatar
Dom Sekotill committed
	return proc.stdout.strip()


Dom Sekotill's avatar
Dom Sekotill committed
def docker_quiet(*args: Argument, **env: str) -> None:
Dom Sekotill's avatar
Dom Sekotill committed
	"""
	Run a Docker command, directing its stdout to /dev/null
	"""
	run([DOCKER, *args], env={**environ, **env}, check=True, stdout=DEVNULL)
def _get_docker_host_ip() -> IPAddress:
	"""
	Return an IP address from the DOCKER_HOST environment variable, or a loopback address

	This function is *far* from complete, and there needs to be a much better way of
	accessing ports on the Docker host.

	Currently, only IP addresses are supported, not names.
	"""
	url = environ.get("DOCKER_HOST")
	if url is None:
		return LOCALHOST
	if not "://" in url:
		url = f"tcp://{url}"
	purl = urlparse(url)
	if not purl.hostname or purl.scheme not in ("tcp", "ssh"):
		return LOCALHOST
	return ipaddress.ip_address(purl.hostname)


Dom Sekotill's avatar
Dom Sekotill committed
class IPv4Address(ipaddress.IPv4Address):
	"""
	Subclass of IPv4Address that handle's docker idiosyncratic tendency to add a mask suffix
	"""

	T = TypeVar("T", bound="IPv4Address")

	@classmethod
	def with_suffix(cls: type[T], address: str) -> T:
		"""
		Construct an instance with a suffixed bitmask size
		"""
		address, *_ = address.partition("/")
		return cls(address)


class IPProtocol(Enum):
	"""
	IP protocols supported by Docker port forwarding
	"""

	TCP = 'tcp'
	UDP = 'udp'


Dom Sekotill's avatar
Dom Sekotill committed
class Item:
	"""
	A mix-in for Docker items that can be inspected
	"""

	def __init__(self, ident: str):
		self._id = ident

	def get_id(self) -> str:
		"""
		Return an identifier for the Docker item
		"""
		return self._id

	def inspect(self) -> JSONObject:
		"""
		Get the result of inspecting the Docker item
		"""
		with Popen([DOCKER, 'inspect', self.get_id()], stdout=PIPE) as proc:
			assert proc.stdout is not None
			results = json.load(proc.stdout)
		assert isinstance(results, list)
		assert len(results) == 1 and isinstance(results[0], dict)
		return JSONObject(results[0])


class Image(Item):
	"""
	Docker image items
	"""

Dom Sekotill's avatar
Dom Sekotill committed
	_cache = dict[str, str]()

Dom Sekotill's avatar
Dom Sekotill committed
	T = TypeVar('T', bound='Image')

	def __init__(self, iid: str):
		self.iid = iid

	@classmethod
	def build(cls: type[T], context: Path, target: str = "", **build_args: str|None) -> T:
		"""
		Build an image from the given context

		Build arguments are ignored if they are None to make it easier to supply (or not)
		arguments from external lookups without complex argument composing.
		"""
		cmd: Arguments = [
			b"build", context, f"--target={target}",
Dom Sekotill's avatar
Dom Sekotill committed
			*(f"--build-arg={arg}={val}" for arg, val in build_args.items() if val is not None),
		]
		docker(*cmd, DOCKER_BUILDKIT='1')
		iid = docker_output(*cmd, '-q', DOCKER_BUILDKIT='1')
		return cls(iid)

	@classmethod
	def pull(cls: type[T], repository: str) -> T:
		"""
		Pull an image from a registry
		"""
Dom Sekotill's avatar
Dom Sekotill committed
		try:
			iid = cls._cache[repository]
		except KeyError:
			docker(b"pull", repository)
			iid = cls._process_image(repository)
Dom Sekotill's avatar
Dom Sekotill committed
		return cls(iid)

Dom Sekotill's avatar
Dom Sekotill committed
	@classmethod
	def _process_image(cls, reference: str) -> str:
		report = Item(reference).inspect()
		iid = report.path("$.Id", str)
		cls._cache.update(
			((tag, iid) for tag in report.path("$.RepoTags", list[str])),
			reference=iid,
		)
		return iid

Dom Sekotill's avatar
Dom Sekotill committed
	def get_id(self) -> str:
		"""
		Return an identifier for the Docker Image
		"""
		return self.iid


class Container(Item):
	"""
	Docker container items

	Instances can be used as context managers that ensure the container is stopped on
	exiting the context.
	"""

	T = TypeVar('T', bound='Container')

Dom Sekotill's avatar
Dom Sekotill committed
	DEFAULT_ALIASES = tuple[str]()

	def __init__(
		self,
		image: Image,
		cmd: Arguments = [],
		volumes: Volumes = [],
		env: Environ = {},
		network: Network|None = None,
Dom Sekotill's avatar
Dom Sekotill committed
		entrypoint: HostMount|Argument|None = None,
		privileged: bool = False,
		publish: bool|list[int] = False,
Dom Sekotill's avatar
Dom Sekotill committed
	):
		if isinstance(entrypoint, tuple):
			volumes = [*volumes, entrypoint]
			entrypoint = entrypoint[1]

		self.image = image
		self.cmd = cmd
		self.volumes = volumes
		self.env = env
		self.entrypoint = entrypoint
		self.privileged = privileged
		self.publish = publish
Dom Sekotill's avatar
Dom Sekotill committed
		self.networks = dict[Network, Tuple[str, ...]]()
		self.cid: str|None = None

		if network:
			self.connect(network, *self.DEFAULT_ALIASES)
	def __enter__(self: T) -> T:
Dom Sekotill's avatar
Dom Sekotill committed
		return self

	def __exit__(self, etype: type[BaseException], exc: BaseException, tb: TracebackType) -> None:
		if self.cid and exc:
			self.show_logs()
		self.stop(rm=True)

	@contextmanager
	def started(self) -> Iterator[Container]:
		"""
		Return a context manager that ensures the container is started when the context is entered
		"""
		with self:
			self.start()
			wait(lambda: self.is_running(raise_on_exit=True))
Dom Sekotill's avatar
Dom Sekotill committed
			yield self

Dom Sekotill's avatar
Dom Sekotill committed
	def is_running(self, raise_on_exit: bool = False) -> bool:
Dom Sekotill's avatar
Dom Sekotill committed
		"""
		Return whether the container is running
		"""
		if self.cid is None:
			return False
		details = self.inspect()
		if details.path('$.State.Status', str) == 'exited':
			code = details.path('$.State.ExitCode', int)
			if code != 0:
				logging.getLogger(__name__).warning(
					f"container {self.cid} exited ({code})",
				)
Dom Sekotill's avatar
Dom Sekotill committed
			if raise_on_exit:
				cmd = details.path("$.Config.Entrypoint", list[str])
				cmd.extend(details.path("$.Config.Cmd", list[str]))
				raise CalledProcessError(code, cmd)
Dom Sekotill's avatar
Dom Sekotill committed
		return (
			self.cid is not None
			and details.path('$.State.Running', bool)
		)

	def get_id(self) -> str:
		"""
		Return an identifier for the Docker Container
		"""
		if self.cid is not None:
			return self.cid

		opts: MutableArguments = [
			f"--env={name}={val}" for name, val in self.env.items()
		for vol in self.volumes:
			if isinstance(vol, tuple):
				src = fspath(vol[0])
				dst = fspath(vol[1])
				if isinstance(src, bytes):
					src = src.decode()
				if isinstance(dst, bytes):
					dst = dst.decode()
				arg: Argument = f"{src}:{dst}"
			else:
				arg = vol
			opts.extend((b"--volume", arg))

Dom Sekotill's avatar
Dom Sekotill committed
		if self.entrypoint:
			opts.extend((b"--entrypoint", self.entrypoint))
		if self.privileged:
			opts.append(b"--privileged")
		if isinstance(self.publish, list):
			opts.extend(f"--publish={p}" for p in self.publish)
		elif self.publish:
			opts.append(b"--publish-all")
		else:
			opts.append(b"--network=none")

		self.cid = docker_output(b"container", b"create", *opts, self.image.iid, *self.cmd)
Dom Sekotill's avatar
Dom Sekotill committed
		assert self.cid

		# Disconnect the "none" network specified as the starting network
		if not self.publish:
			docker_quiet(b"network", b"disconnect", b"none", self.cid)
Dom Sekotill's avatar
Dom Sekotill committed
		return self.cid

	def start(self) -> None:
		"""
		Start the container
		"""
		if self.is_running():
			return
		docker_quiet(b"container", b"start", self.get_id())
		assert self.inspect().path('$.State.Status', str) != 'created', \
			"please report this at https://code.kodo.org.uk/dom/behave-utils/-/issues/11"
Dom Sekotill's avatar
Dom Sekotill committed

	def stop(self, rm: bool = False) -> None:
		"""
		Stop the container
		"""
		if self.cid is None:
			return
		try:
			if self.is_running():
				docker_quiet(b"container", b"stop", self.cid)
		finally:
			if rm:
				docker_quiet(b"container", b"rm", self.cid)
				self.cid = None
	def connect(
		self,
		network: Network,
		*aliases: str,
		address: ipaddress.IPv4Address|ipaddress.IPv6Address|None = None,
	) -> None:
Dom Sekotill's avatar
Dom Sekotill committed
		"""
		Connect the container to a Docker network

		Any aliases supplied will be resolvable to the container by other containers on the
		network.
		"""
		cid = self.get_id()
		opts = [f'--alias={a}' for a in aliases]

		if address is None:
			address = network.reserve_address()
		opts.append(
			f"--ip={address}" if isinstance(address, ipaddress.IPv4Address) else
			f"--ip6={address}",
		)

Dom Sekotill's avatar
Dom Sekotill committed
		if network in self.networks:
			if self.networks[network] == aliases:
				return
			docker(b"network", b"disconnect", str(network), cid)
		docker(b"network", b"connect", *opts, str(network), cid)
Dom Sekotill's avatar
Dom Sekotill committed
		self.networks[network] = aliases

	def show_logs(self) -> None:
		"""
		Print the container logs to stdout
		"""
		if self.cid:
			docker(b"logs", self.cid)
Dom Sekotill's avatar
Dom Sekotill committed

	def get_exec_args(self, cmd: Arguments, interactive: bool = False) -> MutableArguments:
		"""
		Return a full argument list for running "cmd" inside the container
		"""
		return [DOCKER, b"exec", *((b"-i",) if interactive else []), self.get_id(), *cmd]
Dom Sekotill's avatar
Dom Sekotill committed

	def run(
		self,
		cmd: Arguments,
		*,
		stdin: IO[Any]|int|None = None,
		stdout: IO[Any]|int|None = None,
		stderr: IO[Any]|int|None = None,
		capture_output: bool = False,
		check: bool = False,
		input: bytes|None = None,
		timeout: float|None = None,
	) -> CompletedProcess[bytes]:
		"""
		Run "cmd" to completion inside the container and return the result
		"""
		return run(
			self.get_exec_args(cmd),
			stdin=stdin, stdout=stdout, stderr=stderr,
			capture_output=capture_output,
			check=check, timeout=timeout, input=input,
		)

	def exec(
		self,
		cmd: Arguments,
		*,
		stdin: IO[Any]|int|None = None,
		stdout: IO[Any]|int|None = None,
		stderr: IO[Any]|int|None = None,
	) -> Popen[bytes]:
		"""
		Execute "cmd" inside the container and return a process object once started
		"""
		return Popen(
			self.get_exec_args(cmd),
			stdin=stdin, stdout=stdout, stderr=stderr,
		)

	def get_external_ports(self, port: int, proto: IPProtocol = IPProtocol.TCP) -> Iterable[tuple[IPAddress, int]]:
		"""
		Yield (address, port) combinations exposed on the host that map to the given container port
		"""
		name = f"{port}/{proto.name.lower()}"
		ports = self.inspect().path(
			f"$.NetworkSettings.Ports.{name}",
			list[dict[str, str]],
		)
		if not ports:
			raise KeyError(f"port {name} has not been published")
		for portd in ports:
			addr = ipaddress.ip_address(portd["HostIp"])
			port = int(portd["HostPort"])
			yield (_get_docker_host_ip() if addr.is_unspecified else addr), port

Dom Sekotill's avatar
Dom Sekotill committed

class Network(Item):
	"""
	A Docker network
	"""

	DOCKER_SUBNET = ipaddress.IPv4Network("172.16.0.0/12")

Dom Sekotill's avatar
Dom Sekotill committed
	def __init__(self, name: str|None = None) -> None:
		self._name = name or f"br{token_hex(6)}"
		self._nid: str|None = None
		self._assigned = set[ipaddress.IPv4Address]()
Dom Sekotill's avatar
Dom Sekotill committed

	def __str__(self) -> str:
		return self._name

	def __repr__(self) -> str:
		cls = type(self)
		return f"<{cls.__module__}.{cls.__name__} {self._name}>"

	def __eq__(self, other: Any) -> bool:
		if not isinstance(other, Network):
			return self._name == str(other)
		return self._name == other._name

	def __hash__(self) -> int:
		return self._name.__hash__()

	def __enter__(self) -> Network:
		self.create()
		return self

	def __exit__(self, etype: type[BaseException], exc: BaseException, tb: TracebackType) -> None:
		self.destroy()

	@property
	def name(self) -> str:
		"""
		Return the name of the Docker network
		"""
		return self._name

	def get_id(self) -> str:
		"""
		Return an identifier for the Docker Network
		"""
		if self._nid is None:
			self.create()
		assert self._nid is not None
		return self._nid
Dom Sekotill's avatar
Dom Sekotill committed

	def create(self) -> None:
		"""
		Create the network
		"""
		subnet = self.get_free_subnet()
		gateway = next(subnet.hosts())
		try:
			self._nid = docker_output(
				b"network", b"create", self._name,
				f"--subnet={subnet}", f"--gateway={gateway}",
			)
		except CalledProcessError:
			data = exec_io(
				[DOCKER, b"network", b"inspect", self._name],
				deserialiser=JSONArray.from_string,
			)
			if len(data) == 0:
				raise
			self._nid = data.path("$[0].Id", str)
			self._assigned.update(
				data.path(
					"$[0].IPAM.Config[*].Gateway",
					list[str], lambda ls: (IPv4Address(s) for s in ls),
				),
			)
		else:
			self._assigned.add(gateway)
		assert self._nid is not None
		assert len(self._assigned) > 0, \
			"Expected gateways address(es) to be added to assigned addresses set"
Dom Sekotill's avatar
Dom Sekotill committed

	def destroy(self) -> None:
		"""
		Remove the network
		"""
		if self._nid:
			docker_quiet(b"network", b"rm", self._nid)
	@classmethod
	def get_free_subnet(cls) -> ipaddress.IPv4Network:
		"""
		Return a free private subnet
		"""
		networks = exec_io(
			[DOCKER, b"network", b"ls", b"--format={{.ID}}"],
			deserialiser=utf8_decode,
		).splitlines()
		subnets = exec_io(
			[DOCKER, b"network", b"inspect"] + cast(list[Argument], networks),
			deserialiser=JSONArray.from_string,
		).path(
			"$[*].IPAM.Config[*].Subnet", list[str],
			lambda subnets: {ipaddress.ip_network(net) for net in subnets},
		)
		for subnet in cls.DOCKER_SUBNET.subnets(8):
			if not any(net.overlaps(subnet) for net in subnets):
				return subnet
		raise LookupError(f"No free subnets found in subnet {cls.DOCKER_SUBNET}")

	def reserve_address(self) -> ipaddress.IPv4Address:
		"""
		Return a free address in the network

		Note that the address is not reserved; any changes made to the network such as
		adding a container may invalidate the assurance that the address is free.
		"""
		# TODO: support IPv6
		data = self.inspect()
		# Considering only the first listed subnet
		net = data.path("$.IPAM.Config[0].Subnet", str, ipaddress.IPv4Network)

		# Recycle some old code for an assertion about assigned addresses
		if __debug__:
			reserved: set[ipaddress.IPv4Address] = data.path(
				"$.Containers.*.IPv4Address", list[str],
				lambda addrs: {IPv4Address.with_suffix(a) for a in addrs},
			)
			reserved.add(data.path("$.IPAM.Config[0].Gateway", str, IPv4Address))
			missing = reserved - self._assigned
			assert len(missing) == 0, f"Missing addresses from assigned set: {missing}"

		# Optimise for CPython 3.x without early binding
		assigned = self._assigned

		for addr in net.hosts():
			if addr not in assigned:
				assigned.add(addr)
				return addr
		raise LookupError(f"No free addresses found in subnet {net}")

Dom Sekotill's avatar
Dom Sekotill committed
	"""
	Manage calling executables in a container

	Any arguments passed to the constructor will prefix the arguments passed when the object
	is called.
	"""

Dom Sekotill's avatar
Dom Sekotill committed
	def __init__(self, container: Container, *cmd: Argument):
Dom Sekotill's avatar
Dom Sekotill committed
		self.container = container

Dom Sekotill's avatar
Dom Sekotill committed
		self,
		cmd: Arguments,
		kwargs: MutableMapping[str, Any],
		has_input: bool,
		is_query: bool,
		deserialiser: Deserialiser[Any]|None,
	) -> Arguments:
Dom Sekotill's avatar
Dom Sekotill committed
		"""
		Prefix the command arguments with a command necessary for executing in a container
Dom Sekotill's avatar
Dom Sekotill committed
		"""
		return self.container.get_exec_args(cmd, interactive=has_input)