Newer
Older
#  Copyright 2021-2024  Dominik Sekotill <dom.sekotill@kodo.org.uk>
#
#  This Source Code Form is subject to the terms of the Mozilla Public
#  License, v. 2.0. If a copy of the MPL was not distributed with this
#  file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Start and manage a test Kubernetes cluster with Kubernetes-in-Docker (kind)
"""
from __future__ import annotations
import platform
import re
from abc import ABC
from abc import abstractmethod
from collections.abc import Iterator
from io import BytesIO
from pathlib import Path
from shutil import copyfileobj
from tarfile import TarFile
from typing import IO
import requests
from packaging.version import Version
from xdg_base_dirs import xdg_cache_home
from behave_utils.json import JSONObject
from behave_utils.url import URL
CACHE_DIR: Path = xdg_cache_home() / "behave-testing"
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class DownloadableExecutable(ABC):
	"""
	Base class for downloading static binaries to local paths
	Subclasses should implement `get_latest` and `get_stream` methods.  They may use the
	"kernel", "arch" and "goarch" attributes to select the correct source for the current
	platform.
	Subclasses must also provide the "name" attribute, either as a class or instance
	attribute.  It is used to generate a cache path.
	Users of the subclasses SHOULD ONLY call the `get_binary` method to get a path pointing
	to a locally cached copy of the downloaded binary.
	"""
	# Map of `uname -m` output to architecture values accepted by Go
	# Many Go binaries include the architecture value accepted by `go` in their names, so
	# the "goarch" class attribute is added for convenience, generated from this map.
	# This map may not be fully complete. Only non-equal values need to be added.
	GOARCH_MAP = {
		"i386": "386",
		"i686": "386",
		"x86": "386",
		"x86_64": "amd64",
		"armv6l": "arm",
		"armv7l": "arm",
		"aarch64": "arm64",
	}
	kernel = platform.system().lower()
	arch = platform.machine()
	goarch = GOARCH_MAP.get(arch, arch)
	name: str
	def __init_subclass__(cls, name: str|None = None):
		if name and hasattr(cls, "name"):
			raise TypeError(f"Got two 'name' attributes for {cls}: {name} and {cls.name}")
		if name:
			cls.name = name
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
	def __init__(self, version: str = "latest"):
		self.version = version
	@abstractmethod
	def get_latest(self, session: requests.Session) -> str:
		"""
		Return the latest release string for a supported binary
		Implementations must discover and return the latest release or tag string
		`session` is provided for performing HTTP requests.  Although its use is not
		required, it has and automatic code check hook so there is no need to manually check
		the return code and handle errors.
		"""
		raise NotImplementedError
	@abstractmethod
	def get_stream(self, session: requests.Session, version: str) -> IO[bytes]:
		"""
		Return a stream that emits the requested version of a supported binary
		Implementations must perform a request for the binary and return a file-like reader
		The return object must be a readable FileIO like instance, returning bytes.  If the
		source is uncompressed the "raw" attribute of a `requests.Response` object opened
		with `stream=True` will suffice.  See examples below.
		`version` specifies the wanted version of the binary, which MAY be different from
		the "version" instance attribute.  Other attributes such as "kernel" and "arch" (or
		"goarch" if appropriate) MUST be honoured when selecting a source.
		`session` is provided for performing HTTP requests.  Although its use is not
		required, it has and automatic code check hook so there is no need to manually check
		the return code and handle errors.
		Examples:
		1) Get an uncompressed binary:
		>>> def get_stream(session: requests.Session, version: str) -> IO[bytes]:
		...     url = "https://example.com/binary"
		...     return session.get(url, stream=True).raw
		2) Get a binary from a GZip compressed tar archive, storing the tar file in memory:
		Note: Avoid this for very large downloads. Unfortunately the Python tarfile
		implementation cannot handle non-seekable streams.
		>>> from tarfile import TarFile
		>>> def get_stream(session: requests.Session, version: str) -> IO[bytes]:
		...     url = "https://example.com/binary.tar.gz"
		...     buf = BytesIO(session.get(url).content)
		...     tar = TarFile.gzopen("buffer", fileobj=buf)
		...     return tar.extractfile(self.name)
		3) Get a binary from a GZip compressed tar archive, storing the tar file in the file
		system:
		>>> from tarfile import TarFile
		>>> from tempfile import TemporaryFile
		>>> from shutil import copyfileobj
		>>> def get_stream(session: requests.Session, version: str) -> IO[bytes]:
		...     url = "https://example.com/binary.tar.gz"
		...     resp = session.get(url, stream=True)
		...     temp = TemporaryFile()
		...     copyfileobj(resp.raw, temp)
		...     tar = TarFile.gzopen("buffer", fileobj=temp)
		...     return tar.extractfile(self.name)
		"""
		raise NotImplementedError
	def get_binary(self) -> Path:
		"""
		Return a Path to a locally cached executable, downloading it if necessary
		"""
		CACHE_DIR.mkdir(0o775, True, True)
		version = self.version
		with requests.Session() as session:
			assert isinstance(session.hooks["response"], list)
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
			session.hooks["response"].append(lambda r, *a, **k: r.raise_for_status())
			if version == "latest":
				version = self.get_latest(session)
			binary = CACHE_DIR / f"{self.name}-{version}-{self.kernel}-{self.arch}"
			if binary.exists():
				return binary
			stream = self.get_stream(session, version)
			try:
				with binary.open("wb") as f:
					copyfileobj(stream, f)
			except BaseException:
				binary.unlink()
				raise
			binary.chmod(0o755)
		return binary
class DownloadableDocker(DownloadableExecutable):
	"""
	Download class for the Docker client binary
	"""
	URL = "https://download.docker.com/{kernel}/static/stable/{arch}/docker-{version}.tgz"
	LATEST_URL = "https://download.docker.com/{kernel}/static/stable/{arch}/"
	VERSION_RE = re.compile(rb'href="docker-(?P<release>[0-9.]+).tgz"')
	name = "docker"
	def get_latest(self, session: requests.Session) -> str:
		"""
		Return latest Docker release
		"""
		url = self.LATEST_URL.format(kernel=self.kernel, arch=self.arch)
		doc = session.get(url).content
		latest = max(self._extract_versions(doc))
		return str(latest)
	def get_stream(self, session: requests.Session, version: str) -> IO[bytes]:
		"""
		Return a stream that emits theDocker CLI binary
		"""
		url = self.URL.format(version=version, kernel=self.kernel, arch=self.arch)
		buf = BytesIO(session.get(url).content)
		tar = TarFile.gzopen("buffer", fileobj=buf)
		stream = tar.extractfile("docker/docker")
		if stream is None:
			raise FileNotFoundError(f"'docker/docker' in {url}")
		return stream
	@classmethod
	def _extract_versions(cls, doc: bytes) -> Iterator[Version]:
		for match in cls.VERSION_RE.finditer(doc):
			yield Version(match.group("release").decode())
class DownloadableKubeTools(DownloadableExecutable):
	"""
	Download class for the kubernetes binaries "kubectl", "kubelet" and "kubeadm"
	"""
	URL = "https://dl.k8s.io/release/{version}/bin/{kernel}/{arch}/{name}"
	LATEST_URL = "https://dl.k8s.io/release/stable.txt"
	def __init__(self, name: str, version: str = "latest"):
		DownloadableExecutable.__init__(self, version)
		self.name = name
		self._latest = ""
	def get_latest(self, session: requests.Session) -> str:
		"""
		Return that latest release of Kubernetes
		"""
		if not self._latest:
			self._latest = session.get(self.LATEST_URL).content.decode().strip()
		return self._latest
	def get_stream(self, session: requests.Session, version: str) -> IO[bytes]:
		"""
		Return a stream that emits the requested Kubernetes binary
		"""
		url = self.URL.format(version=version, kernel=self.kernel, arch=self.goarch, name=self.name)
		stream: IO[bytes] = session.get(url, stream=True).raw
		return stream
class DownloadableCrictl(DownloadableExecutable):
	"""
	Download class for the "crictl" binary
	"""
	URL = "https://github.com/kubernetes-sigs/cri-tools/releases/download/{version}/crictl-{version}-{kernel}-{arch}.tar.gz"
	LATEST_URL = "https://api.github.com/repos/kubernetes-sigs/cri-tools/releases/latest"
	name = "cri"
	def get_latest(self, session: requests.Session) -> str:
		"""
		Return the latest "crictl" release
		"""
		json = JSONObject.from_string(session.get(self.LATEST_URL).content)
		return json.path("$.name", str).replace("cri-tools ", "")
	def get_stream(self, session: requests.Session, version: str) -> IO[bytes]:
		"""
		Return a stream that emits the requested "crictl" binary
		"""
		url = self.URL.format(version=version, kernel=self.kernel, arch=self.goarch)
		buf = BytesIO(session.get(url).content)
		tar = TarFile.gzopen("buffer", fileobj=buf)
		stream = tar.extractfile("crictl")
		if stream is None:
			raise FileNotFoundError(f"'crictl' in {url}")
		return stream
class DownloadableKind(DownloadableExecutable):
	"""
	Download class for the "kind" (Kubernetes-in-Docker) binary
	"""
	URL = "https://kind.sigs.k8s.io/dl/{version}/kind-{kernel}-{arch}"
	LATEST_URL = "https://api.github.com/repos/kubernetes-sigs/kind/releases/latest"
	name = "kind"
	def get_latest(self, session: requests.Session) -> str:
		"""
		Return the latest Kind binary
		"""
		json = JSONObject.from_string(session.get(self.LATEST_URL).content)
		return json.path("$.name", str)
	def get_stream(self, session: requests.Session, version: str) -> IO[bytes]:
		"""
		Return a stream that emits the requested Kind binary
		"""
		url = self.URL.format(version=version, kernel=self.kernel, arch=self.goarch)
		stream: IO[bytes] = session.get(url, stream=True).raw
		return stream