Newer
Older
# Copyright 2021,2022 Dominik Sekotill <dom.sekotill@kodo.org.uk>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Start and manage a test Kubernetes cluster with Kubernetes-in-Docker (kind)
"""
from __future__ import annotations
import platform
import re
from abc import ABC
from abc import abstractmethod
from io import BytesIO
from pathlib import Path
from shutil import copyfileobj
from tarfile import TarFile
from typing import IO
from typing import Iterator
import requests
import xdg
from packaging.version import Version
from behave_utils.json import JSONObject
from behave_utils.url import URL
CACHE_DIR: Path = xdg.xdg_cache_home() / "behave-testing"
class DownloadableExecutable(ABC):
"""
Base class for downloading static binaries to local paths
Subclasses should implement `get_latest` and `get_stream` methods. They may use the
"kernel", "arch" and "goarch" attributes to select the correct source for the current
platform.
Subclasses must also provide the "name" attribute, either as a class or instance
attribute. It is used to generate a cache path.
Users of the subclasses SHOULD ONLY call the `get_binary` method to get a path pointing
to a locally cached copy of the downloaded binary.
"""
# Map of `uname -m` output to architecture values accepted by Go
# Many Go binaries include the architecture value accepted by `go` in their names, so
# the "goarch" class attribute is added for convenience, generated from this map.
# This map may not be fully complete. Only non-equal values need to be added.
GOARCH_MAP = {
"i386": "386",
"i686": "386",
"x86": "386",
"x86_64": "amd64",
"armv6l": "arm",
"armv7l": "arm",
"aarch64": "arm64",
}
kernel = platform.system().lower()
arch = platform.machine()
goarch = GOARCH_MAP.get(arch, arch)
name: str
def __init__(self, version: str = "latest"):
self.version = version
@abstractmethod
def get_latest(self, session: requests.Session) -> str:
"""
Return the latest release string for a supported binary
Implementations must discover and return the latest release or tag string
`session` is provided for performing HTTP requests. Although its use is not
required, it has and automatic code check hook so there is no need to manually check
the return code and handle errors.
"""
raise NotImplementedError
@abstractmethod
def get_stream(self, session: requests.Session, version: str) -> IO[bytes]:
"""
Return a stream that emits the requested version of a supported binary
Implementations must perform a request for the binary and return a file-like reader
The return object must be a readable FileIO like instance, returning bytes. If the
source is uncompressed the "raw" attribute of a `requests.Response` object opened
with `stream=True` will suffice. See examples below.
`version` specifies the wanted version of the binary, which MAY be different from
the "version" instance attribute. Other attributes such as "kernel" and "arch" (or
"goarch" if appropriate) MUST be honoured when selecting a source.
`session` is provided for performing HTTP requests. Although its use is not
required, it has and automatic code check hook so there is no need to manually check
the return code and handle errors.
Examples:
1) Get an uncompressed binary:
>>> def get_stream(session: requests.Session, version: str) -> IO[bytes]:
... url = "https://example.com/binary"
... return session.get(url, stream=True).raw
2) Get a binary from a GZip compressed tar archive, storing the tar file in memory:
Note: Avoid this for very large downloads. Unfortunately the Python tarfile
implementation cannot handle non-seekable streams.
>>> from tarfile import TarFile
>>> def get_stream(session: requests.Session, version: str) -> IO[bytes]:
... url = "https://example.com/binary.tar.gz"
... buf = BytesIO(session.get(url).content)
... tar = TarFile.gzopen("buffer", fileobj=buf)
... return tar.extractfile(self.name)
3) Get a binary from a GZip compressed tar archive, storing the tar file in the file
system:
>>> from tarfile import TarFile
>>> from tempfile import TemporaryFile
>>> from shutil import copyfileobj
>>> def get_stream(session: requests.Session, version: str) -> IO[bytes]:
... url = "https://example.com/binary.tar.gz"
... resp = session.get(url, stream=True)
... temp = TemporaryFile()
... copyfileobj(resp.raw, temp)
... tar = TarFile.gzopen("buffer", fileobj=temp)
... return tar.extractfile(self.name)
"""
raise NotImplementedError
def get_binary(self) -> Path:
"""
Return a Path to a locally cached executable, downloading it if necessary
"""
CACHE_DIR.mkdir(0o775, True, True)
version = self.version
with requests.Session() as session:
assert isinstance(session.hooks["response"], list)
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
session.hooks["response"].append(lambda r, *a, **k: r.raise_for_status())
if version == "latest":
version = self.get_latest(session)
binary = CACHE_DIR / f"{self.name}-{version}-{self.kernel}-{self.arch}"
if binary.exists():
return binary
stream = self.get_stream(session, version)
try:
with binary.open("wb") as f:
copyfileobj(stream, f)
except BaseException:
binary.unlink()
raise
binary.chmod(0o755)
return binary
class DownloadableDocker(DownloadableExecutable):
"""
Download class for the Docker client binary
"""
URL = "https://download.docker.com/{kernel}/static/stable/{arch}/docker-{version}.tgz"
LATEST_URL = "https://download.docker.com/{kernel}/static/stable/{arch}/"
VERSION_RE = re.compile(rb'href="docker-(?P<release>[0-9.]+).tgz"')
name = "docker"
def get_latest(self, session: requests.Session) -> str:
"""
Return latest Docker release
"""
url = self.LATEST_URL.format(kernel=self.kernel, arch=self.arch)
doc = session.get(url).content
latest = max(self._extract_versions(doc))
return str(latest)
def get_stream(self, session: requests.Session, version: str) -> IO[bytes]:
"""
Return a stream that emits theDocker CLI binary
"""
url = self.URL.format(version=version, kernel=self.kernel, arch=self.arch)
buf = BytesIO(session.get(url).content)
tar = TarFile.gzopen("buffer", fileobj=buf)
stream = tar.extractfile("docker/docker")
if stream is None:
raise FileNotFoundError(f"'docker/docker' in {url}")
return stream
@classmethod
def _extract_versions(cls, doc: bytes) -> Iterator[Version]:
for match in cls.VERSION_RE.finditer(doc):
yield Version(match.group("release").decode())
class DownloadableKubeTools(DownloadableExecutable):
"""
Download class for the kubernetes binaries "kubectl", "kubelet" and "kubeadm"
"""
URL = "https://dl.k8s.io/release/{version}/bin/{kernel}/{arch}/{name}"
LATEST_URL = "https://dl.k8s.io/release/stable.txt"
def __init__(self, name: str, version: str = "latest"):
DownloadableExecutable.__init__(self, version)
self.name = name
self._latest = ""
def get_latest(self, session: requests.Session) -> str:
"""
Return that latest release of Kubernetes
"""
if not self._latest:
self._latest = session.get(self.LATEST_URL).content.decode().strip()
return self._latest
def get_stream(self, session: requests.Session, version: str) -> IO[bytes]:
"""
Return a stream that emits the requested Kubernetes binary
"""
url = self.URL.format(version=version, kernel=self.kernel, arch=self.goarch, name=self.name)
stream: IO[bytes] = session.get(url, stream=True).raw
return stream
class DownloadableCrictl(DownloadableExecutable):
"""
Download class for the "crictl" binary
"""
URL = "https://github.com/kubernetes-sigs/cri-tools/releases/download/{version}/crictl-{version}-{kernel}-{arch}.tar.gz"
LATEST_URL = "https://api.github.com/repos/kubernetes-sigs/cri-tools/releases/latest"
name = "cri"
def get_latest(self, session: requests.Session) -> str:
"""
Return the latest "crictl" release
"""
json = JSONObject.from_string(session.get(self.LATEST_URL).content)
return json.path("$.name", str).replace("cri-tools ", "")
def get_stream(self, session: requests.Session, version: str) -> IO[bytes]:
"""
Return a stream that emits the requested "crictl" binary
"""
url = self.URL.format(version=version, kernel=self.kernel, arch=self.goarch)
buf = BytesIO(session.get(url).content)
tar = TarFile.gzopen("buffer", fileobj=buf)
stream = tar.extractfile("crictl")
if stream is None:
raise FileNotFoundError(f"'crictl' in {url}")
return stream
class DownloadableKind(DownloadableExecutable):
"""
Download class for the "kind" (Kubernetes-in-Docker) binary
"""
URL = "https://kind.sigs.k8s.io/dl/{version}/kind-{kernel}-{arch}"
LATEST_URL = "https://api.github.com/repos/kubernetes-sigs/kind/releases/latest"
name = "kind"
def get_latest(self, session: requests.Session) -> str:
"""
Return the latest Kind binary
"""
json = JSONObject.from_string(session.get(self.LATEST_URL).content)
return json.path("$.name", str)
def get_stream(self, session: requests.Session, version: str) -> IO[bytes]:
"""
Return a stream that emits the requested Kind binary
"""
url = self.URL.format(version=version, kernel=self.kernel, arch=self.goarch)
stream: IO[bytes] = session.get(url, stream=True).raw
return stream