Implement a secure ICS protocol targeting LoRa Node151 microcontroller for controlling irrigation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

229 lines
5.9 KiB

  1. import argparse
  2. import asyncio
  3. import io
  4. import multicast
  5. import sys
  6. import unittest
  7. from unittest.mock import patch, AsyncMock, Mock, call
  8. from lora import timeout, _debprint
  9. DEFAULT_MADDR = ('239.192.76.111', 21089)
  10. async def to_loragw(rp, wtr, ignpkts):
  11. '''Take a multicast.ReceiverProtocol, and pass the packets
  12. to the lora gateway on the StreamWriter.
  13. Any packets in the set ignpkts will NOT be sent out to the
  14. gateway. This is to prevent looping packets back out.
  15. '''
  16. while True:
  17. pkt, addr = await rp.recv()
  18. #_debprint('pkt to send:', repr(pkt), repr(ignpkts))
  19. if pkt in ignpkts:
  20. ignpkts.remove(pkt)
  21. continue
  22. wtr.write(b'pkt:%s\n' % pkt.hex().encode('ascii'))
  23. await wtr.drain()
  24. async def from_loragw(rdr, tp, txpkts):
  25. '''Take a StreamReader, and pass the received packets from
  26. the lora gatway to the multicast.TransmitterProtocol.
  27. Each packet that will be transmitted will be added the the txpkts
  28. set that is passed in. This is to allow a receiver to ignore
  29. the loop back.
  30. '''
  31. while True:
  32. rcv = await rdr.readuntil()
  33. #_debprint('from gw:', repr(rcv), repr(txpkts))
  34. rcv = rcv.strip()
  35. if rcv.startswith(b'data:'):
  36. # we've received a packet
  37. data = bytes.fromhex(rcv[5:].decode('ascii'))
  38. txpkts.add(data)
  39. await tp.send(data)
  40. async def open_dev(fname, *args, **kwargs):
  41. '''coroutine that returns (reader, writer), in the same way as
  42. [open_connection](https://docs.python.org/3/library/asyncio-stream.html#asyncio.open_connection)
  43. does. The args are passed to open.'''
  44. import functools, os, socket
  45. f = open(fname, *args, **kwargs)
  46. f.type = socket.SOCK_STREAM
  47. f.setblocking = functools.partial(os.set_blocking, f.fileno())
  48. f.getsockname = lambda: f.name
  49. f.getpeername = lambda: f.name
  50. f.family = socket.AF_UNIX
  51. f.recv = f.read
  52. f.send = f.write
  53. return await asyncio.open_connection(sock=f)
  54. async def main():
  55. parser = argparse.ArgumentParser()
  56. parser.add_argument('-a', metavar='maddr', type=str,
  57. help='multicastip:port to use to send/receive pkts')
  58. parser.add_argument('serdev', type=str,
  59. help='device for gateway comms')
  60. args = parser.parse_args()
  61. # open up the gateway device
  62. reader, writer = await open_dev(args.serdev, 'w+b', buffering=0)
  63. # open up the listener
  64. mr = await multicast.create_multicast_receiver(DEFAULT_MADDR)
  65. mt = await multicast.create_multicast_transmitter(DEFAULT_MADDR)
  66. try:
  67. pkts = set()
  68. tlgtask = asyncio.create_task(to_loragw(mr, writer, pkts))
  69. flgtask = asyncio.create_task(from_loragw(reader, mt, pkts))
  70. await asyncio.gather(tlgtask, flgtask)
  71. finally:
  72. mt.close()
  73. mr.close()
  74. writer.close()
  75. if __name__ == '__main__':
  76. asyncio.run(main())
  77. class TestLoraServ(unittest.IsolatedAsyncioTestCase):
  78. @timeout(2)
  79. async def test_from_loragw(self):
  80. readermock = AsyncMock()
  81. pkts = [ b'astringofdata',
  82. b'anotherpkt',
  83. b'makeupdata',
  84. b'asigo',
  85. ]
  86. pktset = set()
  87. readermock.readuntil.side_effect = [
  88. b'bogus data\r\n',
  89. ] + [ b'data: %s\r\n' % x.hex().encode('ascii') for x in pkts
  90. ] + [
  91. b'moreignored\r\n',
  92. b'rssi: 123\r\n',
  93. b'txdone\r\n',
  94. asyncio.IncompleteReadError(partial=b'aa',
  95. expected=b'\n'),
  96. ]
  97. writermock = AsyncMock()
  98. with self.assertRaises(asyncio.IncompleteReadError):
  99. await from_loragw(readermock, writermock, pktset)
  100. writermock.send.assert_has_calls([ call(x) for x in pkts ])
  101. self.assertEqual(pktset, set(pkts))
  102. @timeout(2)
  103. async def test_to_loragw(self):
  104. readermock = AsyncMock()
  105. writermock = AsyncMock()
  106. pkts = [ (x, None) for x in (b'astringofdata',
  107. b'anotherpkt',
  108. b'makeupdata',
  109. b'asigo', )
  110. ] + [
  111. asyncio.CancelledError(),
  112. ]
  113. readermock.recv.side_effect = pkts
  114. writermock.write = Mock()
  115. txpkts = { pkts[-2][0] }
  116. with self.assertRaises(asyncio.CancelledError):
  117. await to_loragw(readermock, writermock, txpkts)
  118. # make sure that the ignored packet was dropped
  119. self.assertFalse(txpkts)
  120. # and that it wasn't transmitted
  121. self.assertNotIn(call(b'pkt:%s\n' %
  122. pkts[-2][0].hex().encode('ascii')),
  123. writermock.write.mock_calls)
  124. writermock.write.assert_has_calls([ call(b'pkt:%s\n' %
  125. x.hex().encode('ascii')) for x, addr in pkts[:-2] ])
  126. writermock.drain.assert_has_calls([ call() for x in pkts[:-2]
  127. ])
  128. @timeout(2)
  129. async def test_argerrors(self):
  130. with io.StringIO() as fp:
  131. with self.assertRaises(SystemExit) as cm, \
  132. patch.dict(sys.__dict__, dict(argv=[ 'name', ],
  133. stderr=fp)):
  134. await main()
  135. errout = fp.getvalue()
  136. self.assertEqual(errout, 'usage: name [-h] [-a maddr] serdev\nname: error: the following arguments are required: serdev\n')
  137. self.assertEqual(cm.exception.code, 2)
  138. @timeout(2)
  139. @patch(__name__ + '.from_loragw')
  140. @patch(__name__ + '.to_loragw')
  141. @patch('multicast.create_multicast_receiver')
  142. @patch('multicast.create_multicast_transmitter')
  143. @patch(__name__ + '.open_dev')
  144. async def test_main(self, od, cmt, cmr, tlg, flg):
  145. # setup various mocks
  146. cmtret = Mock()
  147. cmrret = Mock()
  148. cmt.return_value = cmtret
  149. cmr.return_value = cmrret
  150. readermock = Mock()
  151. writermock = Mock()
  152. od.return_value = (readermock, writermock)
  153. # make sure that when called w/ an arg
  154. serdev = 'abc123'
  155. with patch.dict(sys.__dict__, dict(argv=[ 'name', serdev ])):
  156. await main()
  157. # that open_dev is called with it
  158. od.assert_called_with(serdev, 'w+b', buffering=0)
  159. # and that the multicast functions were called
  160. cmt.assert_called_with(DEFAULT_MADDR)
  161. cmr.assert_called_with(DEFAULT_MADDR)
  162. # that there was a setobj created
  163. setobj = tlg.mock_calls[0][1][-1]
  164. self.assertIsInstance(setobj, set)
  165. # and the same object was passed
  166. self.assertIs(setobj, flg.mock_calls[0][1][-1])
  167. # and both tasks were passed the correct objects
  168. tlg.assert_called_with(cmrret, writermock, setobj)
  169. flg.assert_called_with(readermock, cmtret, setobj)
  170. # and that they were closed in the end
  171. cmtret.close.assert_called()
  172. cmrret.close.assert_called()
  173. # and that the writer was closed as well
  174. writermock.close.assert_called()