Newer
Older
# Copyright 2021, 2024 Dom Sekotill <dom.sekotill@kodo.org.uk>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Work-arounds for lack of AF_UNIX datagram socket support in Anyio
"""
from __future__ import annotations
import errno
import os
import socket
import tempfile
from contextlib import suppress
from typing import Callable
from typing import Dict
from typing import Protocol
ConnectorFn = Callable[[str, str], Awaitable['DatagramSocket']]
connectors: Dict[str, ConnectorFn] = {}
class DatagramSocket(Protocol):
@property
def _raw_socket(self) -> socket.socket: ...
async def aclose(self) -> None: ...
async def receive(self) -> bytes: ...
async def send(self, item: bytes) -> None: ...
class ConnectedUNIXMixin:
async def aclose(self: DatagramSocket) -> None:
await super().aclose() # type: ignore # Mypy doesn't handle super() well in mixins
async def connect_unix_datagram(path: Union[str, PathLike[str]]) -> DatagramSocket:
"""
Return an AnyIO socket connected to a Unix datagram socket
This behaviour is currently missing from AnyIO.
"""
for _ in range(10):
fname = tempfile.mktemp(suffix=".sock", prefix="wpa_ctrl.")
with suppress(FileExistsError):
async_lib = sniffio.current_async_library()
connector = connectors[async_lib]
return await connector(fname, os.fspath(path))
raise FileExistsError(
errno.EEXIST, "No usable temporary filename found",
)
try:
import trio
except ImportError: ...
else:
from anyio._backends import _trio
class TrioConnectedUNIXSocket(ConnectedUNIXMixin, _trio.ConnectedUDPSocket):
...
async def trio_connect_unix_datagram(
local_path: str,
remote_path: str,
) -> TrioConnectedUNIXSocket:
sock = trio.socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
await sock.connect(remote_path)
except BaseException: # pragma: no cover
sock.close()
raise
else:
return TrioConnectedUNIXSocket(sock)
connectors['trio'] = trio_connect_unix_datagram
# asyncio is in the stdlib, but lets make the layout match trio 😉
try:
import asyncio
except ImportError: ...
else:
from anyio._backends import _asyncio
class AsyncioConnectedUNIXSocket(ConnectedUNIXMixin, _asyncio.ConnectedUDPSocket):
...
async def asyncio_connect_unix_datagram(
local_path: str,
remote_path: str,
) -> AsyncioConnectedUNIXSocket:
await asyncio.sleep(0.0)
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.setblocking(False)
except BlockingIOError:
future: asyncio.Future[None] = asyncio.Future()
loop.add_writer(sock, future.set_result, None)
future.add_done_callback(lambda _: loop.remove_writer(sock))
await future
except BaseException:
sock.close()
raise
else:
break
transport, protocol = await asyncio.get_running_loop().create_datagram_endpoint(
_asyncio.DatagramProtocol,
sock=sock,
)
if protocol.exception:
transport.close()
raise protocol.exception
return AsyncioConnectedUNIXSocket(transport, protocol)
connectors['asyncio'] = asyncio_connect_unix_datagram