Newer
Older
# Copyright 2021-2022 Dominik Sekotill <dom.sekotill@kodo.org.uk>
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Commands for managing Docker for fixtures
"""
from __future__ import annotations
from pathlib import Path
from secrets import token_hex
from subprocess import DEVNULL
from subprocess import PIPE
from subprocess import CompletedProcess
from subprocess import Popen
from subprocess import run
from types import TracebackType
from typing import IO
from typing import Any
from typing import Iterable
from typing import Iterator
from typing import MutableMapping
from typing import Tuple
from typing import TypeVar
from typing import Union
from urllib.parse import urlparse
from .binaries import DownloadableDocker
from .proc import Executor
from .utils import wait
LOCALHOST = ipaddress.IPv4Address(0x7f000001)
MountPath = Union[PathLike[bytes], PathLike[str]]
HostMount = tuple[MountPath, MountPath]
NamedMount = tuple[str, MountPath]
Mount = Union[HostMount, NamedMount, MountPath]
IPAddress = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
run([b"docker", b"version"], stdout=DEVNULL)
except FileNotFoundError:
DOCKER: Argument = DownloadableDocker().get_binary()
def utf8_decode(buffer: bytes) -> str:
"""
Return a decoded string from a bytes-like sequence of bytes
"""
return codecs.getdecoder("utf-8")(buffer)[0]
def docker(*args: Argument, **env: str) -> None:
"""
Run a Docker command, with output going to stdout
"""
run([DOCKER, *args], env={**environ, **env}, check=True)
def docker_output(*args: Argument, **env: str) -> str:
"""
Run a Docker command, capturing and returning its stdout
"""
proc = run([DOCKER, *args], env={**environ, **env}, check=True, stdout=PIPE, text=True)
def docker_quiet(*args: Argument, **env: str) -> None:
"""
Run a Docker command, directing its stdout to /dev/null
"""
run([DOCKER, *args], env={**environ, **env}, check=True, stdout=DEVNULL)
def _get_docker_host_ip() -> IPAddress:
"""
Return an IP address from the DOCKER_HOST environment variable, or a loopback address
This function is *far* from complete, and there needs to be a much better way of
accessing ports on the Docker host.
Currently, only IP addresses are supported, not names.
"""
url = environ.get("DOCKER_HOST")
if url is None:
return LOCALHOST
if not "://" in url:
url = f"tcp://{url}"
purl = urlparse(url)
if not purl.hostname or purl.scheme not in ("tcp", "ssh"):
return LOCALHOST
return ipaddress.ip_address(purl.hostname)
class IPv4Address(ipaddress.IPv4Address):
"""
Subclass of IPv4Address that handle's docker idiosyncratic tendency to add a mask suffix
"""
T = TypeVar("T", bound="IPv4Address")
@classmethod
def with_suffix(cls: type[T], address: str) -> T:
"""
Construct an instance with a suffixed bitmask size
"""
address, *_ = address.partition("/")
return cls(address)
class IPProtocol(Enum):
"""
IP protocols supported by Docker port forwarding
"""
TCP = 'tcp'
UDP = 'udp'
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class Item:
"""
A mix-in for Docker items that can be inspected
"""
def __init__(self, ident: str):
self._id = ident
def get_id(self) -> str:
"""
Return an identifier for the Docker item
"""
return self._id
def inspect(self) -> JSONObject:
"""
Get the result of inspecting the Docker item
"""
with Popen([DOCKER, 'inspect', self.get_id()], stdout=PIPE) as proc:
assert proc.stdout is not None
results = json.load(proc.stdout)
assert isinstance(results, list)
assert len(results) == 1 and isinstance(results[0], dict)
return JSONObject(results[0])
class Image(Item):
"""
Docker image items
"""
T = TypeVar('T', bound='Image')
def __init__(self, iid: str):
self.iid = iid
@classmethod
def build(cls: type[T], context: Path, target: str = "", **build_args: str|None) -> T:
"""
Build an image from the given context
Build arguments are ignored if they are None to make it easier to supply (or not)
arguments from external lookups without complex argument composing.
"""
cmd: Arguments = [
b"build", context, f"--target={target}",
*(f"--build-arg={arg}={val}" for arg, val in build_args.items() if val is not None),
]
docker(*cmd, DOCKER_BUILDKIT='1')
iid = docker_output(*cmd, '-q', DOCKER_BUILDKIT='1')
return cls(iid)
@classmethod
def pull(cls: type[T], repository: str) -> T:
"""
Pull an image from a registry
"""
try:
iid = cls._cache[repository]
except KeyError:
docker(b"pull", repository)
iid = cls._process_image(repository)
@classmethod
def _process_image(cls, reference: str) -> str:
report = Item(reference).inspect()
iid = report.path("$.Id", str)
cls._cache.update(
((tag, iid) for tag in report.path("$.RepoTags", list[str])),
reference=iid,
)
return iid
def get_id(self) -> str:
"""
Return an identifier for the Docker Image
"""
return self.iid
class Container(Item):
"""
Docker container items
Instances can be used as context managers that ensure the container is stopped on
exiting the context.
"""
if TYPE_CHECKING:
T = TypeVar('T', bound='Container')
DEFAULT_ALIASES = tuple[str]()
def __init__(
self,
image: Image,
cmd: Arguments = [],
volumes: Volumes = [],
env: Environ = {},
network: Network|None = None,
publish: bool|list[int] = False,
):
if isinstance(entrypoint, tuple):
volumes = [*volumes, entrypoint]
entrypoint = entrypoint[1]
self.image = image
self.cmd = cmd
self.volumes = volumes
self.env = env
self.entrypoint = entrypoint
self.networks = dict[Network, Tuple[str, ...]]()
self.cid: str|None = None
if network:
self.connect(network, *self.DEFAULT_ALIASES)
return self
def __exit__(self, etype: type[BaseException], exc: BaseException, tb: TracebackType) -> None:
if self.cid and exc:
self.show_logs()
self.stop(rm=True)
@contextmanager
"""
Return a context manager that ensures the container is started when the context is entered
"""
with self:
self.start()
wait(lambda: self.is_running(raise_on_exit=True))
def is_running(self, raise_on_exit: bool = False) -> bool:
"""
Return whether the container is running
"""
if self.cid is None:
return False
details = self.inspect()
if details.path('$.State.Status', str) == 'exited':
code = details.path('$.State.ExitCode', int)
if code != 0:
logging.getLogger(__name__).warning(
f"container {self.cid} exited ({code})",
)
if raise_on_exit:
cmd = details.path("$.Config.Entrypoint", list[str])
cmd.extend(details.path("$.Config.Cmd", list[str]))
raise CalledProcessError(code, cmd)
return (
self.cid is not None
and details.path('$.State.Running', bool)
)
def get_id(self) -> str:
"""
Return an identifier for the Docker Container
"""
if self.cid is not None:
return self.cid
opts: MutableArguments = [
f"--env={name}={val}" for name, val in self.env.items()
for vol in self.volumes:
if isinstance(vol, tuple):
src = fspath(vol[0])
dst = fspath(vol[1])
if isinstance(src, bytes):
src = src.decode()
if isinstance(dst, bytes):
dst = dst.decode()
arg: Argument = f"{src}:{dst}"
else:
arg = vol
opts.extend((b"--volume", arg))
opts.extend((b"--entrypoint", self.entrypoint))
if isinstance(self.publish, list):
opts.extend(f"--publish={p}" for p in self.publish)
elif self.publish:
opts.append(b"--publish-all")
else:
opts.append(b"--network=none")
self.cid = docker_output(b"container", b"create", *opts, self.image.iid, *self.cmd)
# Disconnect the "none" network specified as the starting network
if not self.publish:
docker_quiet(b"network", b"disconnect", b"none", self.cid)
return self.cid
def start(self) -> None:
"""
Start the container
"""
docker_quiet(b"container", b"start", self.get_id())
assert self.inspect().path('$.State.Status', str) != 'created', \
"please report this at https://code.kodo.org.uk/dom/behave-utils/-/issues/11"
def stop(self, rm: bool = False) -> None:
"""
Stop the container
"""
if self.cid is None:
return
docker_quiet(b"container", b"stop", self.cid)
docker_quiet(b"container", b"rm", self.cid)
def connect(
self,
network: Network,
*aliases: str,
address: ipaddress.IPv4Address|ipaddress.IPv6Address|None = None,
) -> None:
"""
Connect the container to a Docker network
Any aliases supplied will be resolvable to the container by other containers on the
network.
"""
opts = [f'--alias={a}' for a in aliases]
if address is None:
address = network.reserve_address()
opts.append(
f"--ip={address}" if isinstance(address, ipaddress.IPv4Address) else
f"--ip6={address}",
)
if network in self.networks:
if self.networks[network] == aliases:
return
docker(b"network", b"disconnect", str(network), cid)
docker(b"network", b"connect", *opts, str(network), cid)
self.networks[network] = aliases
def show_logs(self) -> None:
"""
Print the container logs to stdout
"""
if self.cid:
def get_exec_args(self, cmd: Arguments, interactive: bool = False) -> MutableArguments:
"""
Return a full argument list for running "cmd" inside the container
"""
return [DOCKER, b"exec", *((b"-i",) if interactive else []), self.get_id(), *cmd]
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def run(
self,
cmd: Arguments,
*,
stdin: IO[Any]|int|None = None,
stdout: IO[Any]|int|None = None,
stderr: IO[Any]|int|None = None,
capture_output: bool = False,
check: bool = False,
input: bytes|None = None,
timeout: float|None = None,
) -> CompletedProcess[bytes]:
"""
Run "cmd" to completion inside the container and return the result
"""
return run(
self.get_exec_args(cmd),
stdin=stdin, stdout=stdout, stderr=stderr,
capture_output=capture_output,
check=check, timeout=timeout, input=input,
)
def exec(
self,
cmd: Arguments,
*,
stdin: IO[Any]|int|None = None,
stdout: IO[Any]|int|None = None,
stderr: IO[Any]|int|None = None,
) -> Popen[bytes]:
"""
Execute "cmd" inside the container and return a process object once started
"""
return Popen(
self.get_exec_args(cmd),
stdin=stdin, stdout=stdout, stderr=stderr,
)
def get_external_ports(self, port: int, proto: IPProtocol = IPProtocol.TCP) -> Iterable[tuple[IPAddress, int]]:
"""
Yield (address, port) combinations exposed on the host that map to the given container port
"""
name = f"{port}/{proto.name.lower()}"
ports = self.inspect().path(
f"$.NetworkSettings.Ports.{name}",
list[dict[str, str]],
)
if not ports:
raise KeyError(f"port {name} has not been published")
for portd in ports:
addr = ipaddress.ip_address(portd["HostIp"])
port = int(portd["HostPort"])
yield (_get_docker_host_ip() if addr.is_unspecified else addr), port
class Network(Item):
"""
A Docker network
"""
DOCKER_SUBNET = ipaddress.IPv4Network("172.16.0.0/12")
def __init__(self, name: str|None = None) -> None:
self._name = name or f"br{token_hex(6)}"
self._assigned = set[ipaddress.IPv4Address]()
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def __str__(self) -> str:
return self._name
def __repr__(self) -> str:
cls = type(self)
return f"<{cls.__module__}.{cls.__name__} {self._name}>"
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Network):
return self._name == str(other)
return self._name == other._name
def __hash__(self) -> int:
return self._name.__hash__()
def __enter__(self) -> Network:
self.create()
return self
def __exit__(self, etype: type[BaseException], exc: BaseException, tb: TracebackType) -> None:
self.destroy()
@property
def name(self) -> str:
"""
Return the name of the Docker network
"""
return self._name
def get_id(self) -> str:
"""
Return an identifier for the Docker Network
"""
if self._nid is None:
self.create()
assert self._nid is not None
return self._nid
def create(self) -> None:
"""
Create the network
"""
subnet = self.get_free_subnet()
gateway = next(subnet.hosts())
try:
self._nid = docker_output(
b"network", b"create", self._name,
f"--subnet={subnet}", f"--gateway={gateway}",
)
except CalledProcessError:
data = exec_io(
[DOCKER, b"network", b"inspect", self._name],
deserialiser=JSONArray.from_string,
)
if len(data) == 0:
raise
self._nid = data.path("$[0].Id", str)
self._assigned.update(
data.path(
"$[0].IPAM.Config[*].Gateway",
list[str], lambda ls: (IPv4Address(s) for s in ls),
),
)
else:
self._assigned.add(gateway)
assert len(self._assigned) > 0, \
"Expected gateways address(es) to be added to assigned addresses set"
def destroy(self) -> None:
"""
Remove the network
"""
docker_quiet(b"network", b"rm", self._nid)
@classmethod
def get_free_subnet(cls) -> ipaddress.IPv4Network:
"""
Return a free private subnet
"""
networks = exec_io(
[DOCKER, b"network", b"ls", b"--format={{.ID}}"],
deserialiser=utf8_decode,
).splitlines()
subnets = exec_io(
[DOCKER, b"network", b"inspect"] + cast(list[Argument], networks),
deserialiser=JSONArray.from_string,
).path(
"$[*].IPAM.Config[*].Subnet", list[str],
lambda subnets: {ipaddress.ip_network(net) for net in subnets},
)
for subnet in cls.DOCKER_SUBNET.subnets(8):
if not any(net.overlaps(subnet) for net in subnets):
return subnet
raise LookupError(f"No free subnets found in subnet {cls.DOCKER_SUBNET}")
def reserve_address(self) -> ipaddress.IPv4Address:
"""
Return a free address in the network
Note that the address is not reserved; any changes made to the network such as
adding a container may invalidate the assurance that the address is free.
"""
# TODO: support IPv6
data = self.inspect()
# Considering only the first listed subnet
net = data.path("$.IPAM.Config[0].Subnet", str, ipaddress.IPv4Network)
# Recycle some old code for an assertion about assigned addresses
if __debug__:
reserved: set[ipaddress.IPv4Address] = data.path(
"$.Containers.*.IPv4Address", list[str],
lambda addrs: {IPv4Address.with_suffix(a) for a in addrs},
)
reserved.add(data.path("$.IPAM.Config[0].Gateway", str, IPv4Address))
missing = reserved - self._assigned
assert len(missing) == 0, f"Missing addresses from assigned set: {missing}"
# Optimise for CPython 3.x without early binding
assigned = self._assigned
if addr not in assigned:
assigned.add(addr)
return addr
raise LookupError(f"No free addresses found in subnet {net}")
class Cli(Executor):
"""
Manage calling executables in a container
Any arguments passed to the constructor will prefix the arguments passed when the object
is called.
"""
def __init__(self, container: Container, *cmd: Argument):
Executor.__init__(self, *cmd)
def get_arguments(
cmd: Arguments,
kwargs: MutableMapping[str, Any],
has_input: bool,
is_query: bool,
deserialiser: Deserialiser[Any]|None,
) -> Arguments:
Prefix the command arguments with a command necessary for executing in a container
return self.container.get_exec_args(cmd, interactive=has_input)