import argparse import asyncio import multicast import sys import unittest from unittest.mock import patch, AsyncMock, Mock, call from lora import timeout, _debprint DEFAULT_MADDR = ('239.192.76.111', 21089) async def to_loragw(rp, wtr, ignpkts): '''Take a multicast.ReceiverProtocol, and pass the packets to the lora gateway on the StreamWriter. Any packets in the set ignpkts will NOT be sent out to the gateway. This is to prevent looping packets back out. ''' while True: pkt, addr = await rp.recv() #_debprint('pkt to send:', repr(pkt), repr(ignpkts)) if pkt in ignpkts: ignpkts.remove(pkt) continue wtr.write(b'pkt:%s\n' % pkt.hex().encode('ascii')) await wtr.drain() async def from_loragw(rdr, tp, txpkts): '''Take a StreamReader, and pass the received packets from the lora gatway to the multicast.TransmitterProtocol. Each packet that will be transmitted will be added the the txpkts set that is passed in. This is to allow a receiver to ignore the loop back. ''' while True: rcv = await rdr.readuntil() #_debprint('from gw:', repr(rcv), repr(txpkts)) rcv = rcv.strip() if rcv.startswith(b'data:'): # we've received a packet data = bytes.fromhex(rcv[5:].decode('ascii')) txpkts.add(data) await tp.send(data) async def open_dev(fname, *args, **kwargs): '''coroutine that returns (reader, writer), in the same way as [open_connection](https://docs.python.org/3/library/asyncio-stream.html#asyncio.open_connection) does. The args are passed to open.''' import functools, os, socket f = open(fname, *args, **kwargs) f.type = socket.SOCK_STREAM f.setblocking = functools.partial(os.set_blocking, f.fileno()) f.getsockname = lambda: f.name f.getpeername = lambda: f.name f.family = socket.AF_UNIX f.recv = f.read f.send = f.write return await asyncio.open_connection(sock=f) async def main(): parser = argparse.ArgumentParser() parser.add_argument('-a', metavar='maddr', type=str, help='multicastip:port to use to send/receive pkts') parser.add_argument('serdev', type=str, help='device for gateway comms') args = parser.parse_args() # open up the gateway device reader, writer = await open_dev(args.serdev, 'w+b', buffering=0) # open up the listener mr = await multicast.create_multicast_receiver(DEFAULT_MADDR) mt = await multicast.create_multicast_transmitter(DEFAULT_MADDR) try: pkts = set() tlgtask = asyncio.create_task(to_loragw(mr, writer, pkts)) flgtask = asyncio.create_task(from_loragw(reader, mt, pkts)) await asyncio.gather(tlgtask, flgtask) finally: mt.close() mr.close() writer.close() if __name__ == '__main__': asyncio.run(main()) class TestLoraServ(unittest.IsolatedAsyncioTestCase): @timeout(2) async def test_from_loragw(self): readermock = AsyncMock() pkts = [ b'astringofdata', b'anotherpkt', b'makeupdata', b'asigo', ] pktset = set() readermock.readuntil.side_effect = [ b'bogus data\r\n', ] + [ b'data: %s\r\n' % x.hex().encode('ascii') for x in pkts ] + [ b'moreignored\r\n', b'rssi: 123\r\n', b'txdone\r\n', asyncio.IncompleteReadError(partial=b'aa', expected=b'\n'), ] writermock = AsyncMock() with self.assertRaises(asyncio.IncompleteReadError): await from_loragw(readermock, writermock, pktset) writermock.send.assert_has_calls([ call(x) for x in pkts ]) self.assertEqual(pktset, set(pkts)) @timeout(2) async def test_to_loragw(self): readermock = AsyncMock() writermock = AsyncMock() pkts = [ (x, None) for x in (b'astringofdata', b'anotherpkt', b'makeupdata', b'asigo', ) ] + [ asyncio.CancelledError(), ] readermock.recv.side_effect = pkts writermock.write = Mock() txpkts = { pkts[-2][0] } with self.assertRaises(asyncio.CancelledError): await to_loragw(readermock, writermock, txpkts) # make sure that the ignored packet was dropped self.assertFalse(txpkts) # and that it wasn't transmitted self.assertNotIn(call(b'pkt:%s\n' % pkts[-2][0].hex().encode('ascii')), writermock.write.mock_calls) writermock.write.assert_has_calls([ call(b'pkt:%s\n' % x.hex().encode('ascii')) for x, addr in pkts[:-2] ]) writermock.drain.assert_has_calls([ call() for x in pkts[:-2] ]) @timeout(2) async def test_argerrors(self): # it'd be nice to silence the usage output here with self.assertRaises(SystemExit) as cm, \ patch.dict(sys.__dict__, dict(argv=[ 'name', ])): await main() self.assertEqual(cm.exception.code, 2) @timeout(2) @patch(__name__ + '.from_loragw') @patch(__name__ + '.to_loragw') @patch('multicast.create_multicast_receiver') @patch('multicast.create_multicast_transmitter') @patch(__name__ + '.open_dev') async def test_main(self, od, cmt, cmr, tlg, flg): # setup various mocks cmtret = Mock() cmrret = Mock() cmt.return_value = cmtret cmr.return_value = cmrret readermock = Mock() writermock = Mock() od.return_value = (readermock, writermock) # make sure that when called w/ an arg serdev = 'abc123' with patch.dict(sys.__dict__, dict(argv=[ 'name', serdev ])): await main() # that open_dev is called with it od.assert_called_with(serdev, 'w+b', buffering=0) # and that the multicast functions were called cmt.assert_called_with(DEFAULT_MADDR) cmr.assert_called_with(DEFAULT_MADDR) # that there was a setobj created setobj = tlg.mock_calls[0][1][-1] self.assertIsInstance(setobj, set) # and the same object was passed self.assertIs(setobj, flg.mock_calls[0][1][-1]) # and both tasks were passed the correct objects tlg.assert_called_with(cmrret, writermock, setobj) flg.assert_called_with(readermock, cmtret, setobj) # and that they were closed in the end cmtret.close.assert_called() cmrret.close.assert_called() # and that the writer was closed as well writermock.close.assert_called()