Skip to content
test_base_client.py 9.43 KiB
Newer Older
#  Copyright 2019-2021  Dom Sekotill <dom.sekotill@kodo.org.uk>
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

"""
Test cases for wpa_supplicant.client.base.BaseClient
"""

import unittest
from unittest import mock

import anyio
from tests import _anyio as anyio_mock
from wpa_supplicant import errors
from wpa_supplicant.client import base


Dom Sekotill's avatar
Dom Sekotill committed
@mock.patch(
	"wpa_supplicant.client.base.connect_unix_datagram",
	new_callable=anyio_mock.AsyncMock,
)
@mock.patch(
	"wpa_supplicant.client.base.BaseClient.send_command",
	new_callable=anyio_mock.AsyncMock,
Dom Sekotill's avatar
Dom Sekotill committed
)
class ConnectTests(unittest.TestCase):
	"""
	Tests for the connect() method
	"""

	@anyio_mock.with_anyio()
	async def test_connect(self, _, connect_mock):
		"""
		Check connect() calls socket.connect()
		"""
		async with base.BaseClient() as client:
			await client.connect("foo")

		connect_mock.assert_called_once_with("foo")

	@anyio_mock.with_anyio()
	async def test_connect_timeout_1(self, _, connect_mock):
		"""
		Check a socket.connect() delay causes TimeoutError to be raised
		"""
		connect_mock.delay = 2

		async with base.BaseClient() as client:
			with self.assertRaises(TimeoutError):
				await client.connect("foo")

	@anyio_mock.with_anyio()
	async def test_connect_timeout_2(self, send_mock, _):
		"""
		Check a send/recv delay causes a TimeoutError to be raised
		"""
		send_mock.delay = 2

		async with base.BaseClient() as client:
			with self.assertRaises(TimeoutError):
				await client.connect("foo")


class SendMessageTests(unittest.TestCase):
	"""
	Tests for the send_command() method
	"""

	def setUp(self):
		self.client = client = base.BaseClient()
		client.sock = anyio_mock.AsyncMock(spec=anyio.abc.SocketStream)
		client.sock.send.return_value = None
		assert isinstance(client.sock, anyio.abc.SocketStream)

	@anyio_mock.with_anyio()
	async def test_simple(self):
		"""
		Check that a response is processed after a command
		"""
		async with self.client as client:
			client.sock.receive.return_value = b"OK"
			assert await client.send_command("SOME_COMMAND") is None

	@anyio_mock.with_anyio()
	async def test_simple_expect(self):
		"""
		Check that an alternate expected response is processed
		"""
		async with self.client as client:
			client.sock.receive.return_value = b"PONG"
			assert await client.send_command("PING", expect="PONG") is None

	@anyio_mock.with_anyio()
	async def test_simple_no_expect(self):
		"""
		Check that an unexpected response raises an UnexpectedResponseError
		"""
		async with self.client as client:
			client.sock.receive.return_value = b"DING"
			with self.assertRaises(errors.UnexpectedResponseError):
				await client.send_command("PING")
			with self.assertRaises(errors.UnexpectedResponseError):
				await client.send_command("PING", expect="PONG")

	@anyio_mock.with_anyio()
	async def test_simple_convert(self):
		"""
		Check that a response is passed through a converter if given
		"""
		async with self.client as client:
			client.sock.receive.return_value = b"FOO\nBAR\nBAZ\n"
			self.assertListEqual(
				await client.send_command(
Dom Sekotill's avatar
Dom Sekotill committed
					"SOME_COMMAND", convert=lambda x: x.splitlines(),
Dom Sekotill's avatar
Dom Sekotill committed
				["FOO", "BAR", "BAZ"],
			)

	@anyio_mock.with_anyio()
	async def test_simple_convert_over_expect(self):
		"""
		Check that 'convert' overrides 'expect'
		"""
		async with self.client as client:
			client.sock.receive.return_value = b"FOO\nBAR\nBAZ\n"
			self.assertListEqual(
				await client.send_command(
Dom Sekotill's avatar
Dom Sekotill committed
					"SOME_COMMAND", convert=lambda x: x.splitlines(), expect="PONG",
Dom Sekotill's avatar
Dom Sekotill committed
				["FOO", "BAR", "BAZ"],
			)

	@anyio_mock.with_anyio()
	async def test_simple_fail(self):
		"""
		Check that a response of 'FAIL' causes CommandFailed to be raised
		"""
		async with self.client as client:
			client.sock.receive.return_value = b"FAIL"
			with self.assertRaises(errors.CommandFailed):
				await client.send_command("SOME_COMMAND")

	@anyio_mock.with_anyio()
	async def test_simple_bad_command(self):
		"""
		Check that a response of 'UNKNOWN COMMAND' causes ValueError to be raised
		"""
		async with self.client as client:
			client.sock.receive.return_value = b"UNKNOWN COMMAND"
			with self.assertRaises(ValueError):
				await client.send_command("SOME_COMMAND")

	@anyio_mock.with_anyio()
	async def test_interleaved(self):
		"""
		Check that messages are processed alongside replies
		"""
		async with self.client as client:
			client.sock.receive.side_effect = [
				b"<2>SOME-MESSAGE",
				b"<1>SOME-OTHER-MESSAGE with|args",
				b"OK",
Dom Sekotill's avatar
Dom Sekotill committed
				b"<2>SOME-MESSAGE",
			assert await client.send_command("SOME_COMMAND") is None
	@anyio_mock.with_anyio()
	async def test_unexpected(self):
		"""
		Check that unexpected replies are logged cleanly
		"""
		async with self.client as client:
			client.sock.receive.side_effect = [
				b"OK",  # Response to "ATTACH"
				b"UNEXPECTED1",
				b"UNEXPECTED2",
				b"<2>CTRL-EVENT-EXAMPLE",
				b"OK",  # Response to "DETACH"
			]
			assert await client.event("CTRL-EVENT-EXAMPLE")
	@anyio_mock.with_anyio()
	async def test_unconnected(self):
		"""
		Check that calling send_command() on an unconnected client raises RuntimeError
		"""
		client = base.BaseClient()

		with self.assertRaises(RuntimeError):
			await client.send_command("SOME_COMMAND")

	@anyio_mock.with_anyio()
	async def test_multi_task(self):
		"""
		Check that calling send_command() from multiple tasks works as expected
		"""
		recv_responses = iter([
			b"OK",           # Response to ATTACH
			b"OK",           # Response to SOME_COMMAND1
			b"<2>CTRL-FOO",  # Event
			b"REPLY2",       # Response to SOME_COMMAND2
			b"OK",           # Response to DETACH
		])

		async def recv():
			await anyio.sleep(0.1)
			return next(recv_responses)

		async with self.client as client, anyio.create_task_group() as task_group:
			client.sock.receive.side_effect = recv

			@task_group.start_soon
			async def wait_for_event():
				self.assertTupleEqual(
					await client.event("CTRL-FOO"),
					(base.EventPriority.INFO, "CTRL-FOO", None),
				)

			await anyio.sleep(0.0)
			task_group.start_soon(client.send_command, "SOME_COMMAND1")

			await anyio.sleep(0.0)
			await client.send_command("SOME_COMMAND2", expect="REPLY2")


class EventTests(unittest.TestCase):
	"""
	Tests for the event() method
	"""

	def setUp(self):
		self.client = client = base.BaseClient()
		client.sock = anyio_mock.AsyncMock()
		client.sock.send.return_value = None

	@anyio_mock.with_anyio()
	async def test_simple(self):
		"""
		Check that an awaited message is returned when is arrives
		"""
		with anyio.fail_after(2):
			async with self.client as client:
				client.sock.receive.side_effect = [
					b"OK",  # Respond to ATTACH
					b"<2>CTRL-EVENT-EXAMPLE",
					b"OK",  # Respond to DETACH
				]
				prio, evt, args = await client.event("CTRL-EVENT-EXAMPLE")
				assert prio == 2
				assert evt == "CTRL-EVENT-EXAMPLE"
				assert args is None

	@anyio_mock.with_anyio()
	async def test_multiple(self):
		"""
		Check that an awaited messages is returned when it arrives between others
		"""
		with anyio.fail_after(2):
			async with self.client as client:
				client.sock.receive.side_effect = [
					b"OK",  # Respond to ATTACH
					b"<1>OTHER-MESSAGE",
					b"<2>CTRL-EVENT-OTHER",
					b"<4>CTRL-EVENT-EXAMPLE",
					b"OK",  # Respond to DETACH
					b"<3>OTHER-MESSAGE",
				]
				prio, evt, args = await client.event("CTRL-EVENT-EXAMPLE")
				assert prio == 4
				assert evt == "CTRL-EVENT-EXAMPLE"
				assert args is None

	@anyio_mock.with_anyio()
	async def test_wait_multiple(self):
		"""
		Check that the first of several awaited events is returned
		"""
		with anyio.fail_after(2):
			async with self.client as client:
				client.sock.receive.side_effect = [
					b"OK",  # Respond to ATTACH
					b"<1>OTHER-MESSAGE",
					b"<2>CTRL-EVENT-OTHER",
					b"<4>CTRL-EVENT-EXAMPLE3",
					b"<4>CTRL-EVENT-EXAMPLE1",
					b"OK",  # Respond to DETACH
					b"<3>CTRL-EVENT-OTHER",
				]
				prio, evt, args = await client.event(
					"CTRL-EVENT-EXAMPLE1", "CTRL-EVENT-EXAMPLE2", "CTRL-EVENT-EXAMPLE3",
				)
				assert prio == 4
				assert evt == "CTRL-EVENT-EXAMPLE3"
				assert args is None

	@anyio_mock.with_anyio()
	async def test_interleaved(self):
		"""
		Check that messages are processed as well as replies
		"""
		with anyio.fail_after(2):
			async with self.client as client:
				client.sock.receive.side_effect = [
					b"<1>OTHER-MESSAGE",
					b"OK",  # Respond to SOME_COMMAND
					b"OK",  # Respond to ATTACH
					b"<2>CTRL-EVENT-OTHER",
					b"<4>CTRL-EVENT-EXAMPLE",
					b"<3>CTRL-EVENT-OTHER",
					b"OK",  # Respond to DETACH
					b"FOO",
				]

				assert await client.send_command("SOME_COMMAND") is None

				prio, evt, args = await client.event("CTRL-EVENT-EXAMPLE")
				assert prio == 4
				assert evt == "CTRL-EVENT-EXAMPLE"
				assert args is None

				assert await client.send_command("SOME_COMMAND", expect="FOO") is None

	@anyio_mock.with_anyio()
	async def test_unconnected(self):
		"""
		Check that calling event() on an unconnected client raises RuntimeError
		"""
		client = base.BaseClient()

		with self.assertRaises(RuntimeError):
			await client.event("some", "events")