Implement a secure ICS protocol targeting LoRa Node151 microcontroller for controlling irrigation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

223 lines
5.8 KiB

  1. import argparse
  2. import asyncio
  3. import multicast
  4. import sys
  5. import unittest
  6. from unittest.mock import patch, AsyncMock, Mock, call
  7. from lora import timeout, _debprint
  8. DEFAULT_MADDR = ('239.192.76.111', 21089)
  9. async def to_loragw(rp, wtr, ignpkts):
  10. '''Take a multicast.ReceiverProtocol, and pass the packets
  11. to the lora gateway on the StreamWriter.
  12. Any packets in the set ignpkts will NOT be sent out to the
  13. gateway. This is to prevent looping packets back out.
  14. '''
  15. while True:
  16. pkt, addr = await rp.recv()
  17. #_debprint('pkt to send:', repr(pkt), repr(ignpkts))
  18. if pkt in ignpkts:
  19. ignpkts.remove(pkt)
  20. continue
  21. wtr.write(b'pkt:%s\n' % pkt.hex().encode('ascii'))
  22. await wtr.drain()
  23. async def from_loragw(rdr, tp, txpkts):
  24. '''Take a StreamReader, and pass the received packets from
  25. the lora gatway to the multicast.TransmitterProtocol.
  26. Each packet that will be transmitted will be added the the txpkts
  27. set that is passed in. This is to allow a receiver to ignore
  28. the loop back.
  29. '''
  30. while True:
  31. rcv = await rdr.readuntil()
  32. #_debprint('from gw:', repr(rcv), repr(txpkts))
  33. rcv = rcv.strip()
  34. if rcv.startswith(b'data:'):
  35. # we've received a packet
  36. data = bytes.fromhex(rcv[5:].decode('ascii'))
  37. txpkts.add(data)
  38. await tp.send(data)
  39. async def open_dev(fname, *args, **kwargs):
  40. '''coroutine that returns (reader, writer), in the same way as
  41. [open_connection](https://docs.python.org/3/library/asyncio-stream.html#asyncio.open_connection)
  42. does. The args are passed to open.'''
  43. import functools, os, socket
  44. f = open(fname, *args, **kwargs)
  45. f.type = socket.SOCK_STREAM
  46. f.setblocking = functools.partial(os.set_blocking, f.fileno())
  47. f.getsockname = lambda: f.name
  48. f.getpeername = lambda: f.name
  49. f.family = socket.AF_UNIX
  50. f.recv = f.read
  51. f.send = f.write
  52. return await asyncio.open_connection(sock=f)
  53. async def main():
  54. parser = argparse.ArgumentParser()
  55. parser.add_argument('-a', metavar='maddr', type=str,
  56. help='multicastip:port to use to send/receive pkts')
  57. parser.add_argument('serdev', type=str,
  58. help='device for gateway comms')
  59. args = parser.parse_args()
  60. # open up the gateway device
  61. reader, writer = await open_dev(args.serdev, 'w+b', buffering=0)
  62. # open up the listener
  63. mr = await multicast.create_multicast_receiver(DEFAULT_MADDR)
  64. mt = await multicast.create_multicast_transmitter(DEFAULT_MADDR)
  65. try:
  66. pkts = set()
  67. tlgtask = asyncio.create_task(to_loragw(mr, writer, pkts))
  68. flgtask = asyncio.create_task(from_loragw(reader, mt, pkts))
  69. await asyncio.gather(tlgtask, flgtask)
  70. finally:
  71. mt.close()
  72. mr.close()
  73. writer.close()
  74. if __name__ == '__main__':
  75. asyncio.run(main())
  76. class TestLoraServ(unittest.IsolatedAsyncioTestCase):
  77. @timeout(2)
  78. async def test_from_loragw(self):
  79. readermock = AsyncMock()
  80. pkts = [ b'astringofdata',
  81. b'anotherpkt',
  82. b'makeupdata',
  83. b'asigo',
  84. ]
  85. pktset = set()
  86. readermock.readuntil.side_effect = [
  87. b'bogus data\r\n',
  88. ] + [ b'data: %s\r\n' % x.hex().encode('ascii') for x in pkts
  89. ] + [
  90. b'moreignored\r\n',
  91. b'rssi: 123\r\n',
  92. b'txdone\r\n',
  93. asyncio.IncompleteReadError(partial=b'aa',
  94. expected=b'\n'),
  95. ]
  96. writermock = AsyncMock()
  97. with self.assertRaises(asyncio.IncompleteReadError):
  98. await from_loragw(readermock, writermock, pktset)
  99. writermock.send.assert_has_calls([ call(x) for x in pkts ])
  100. self.assertEqual(pktset, set(pkts))
  101. @timeout(2)
  102. async def test_to_loragw(self):
  103. readermock = AsyncMock()
  104. writermock = AsyncMock()
  105. pkts = [ (x, None) for x in (b'astringofdata',
  106. b'anotherpkt',
  107. b'makeupdata',
  108. b'asigo', )
  109. ] + [
  110. asyncio.CancelledError(),
  111. ]
  112. readermock.recv.side_effect = pkts
  113. writermock.write = Mock()
  114. txpkts = { pkts[-2][0] }
  115. with self.assertRaises(asyncio.CancelledError):
  116. await to_loragw(readermock, writermock, txpkts)
  117. # make sure that the ignored packet was dropped
  118. self.assertFalse(txpkts)
  119. # and that it wasn't transmitted
  120. self.assertNotIn(call(b'pkt:%s\n' %
  121. pkts[-2][0].hex().encode('ascii')),
  122. writermock.write.mock_calls)
  123. writermock.write.assert_has_calls([ call(b'pkt:%s\n' %
  124. x.hex().encode('ascii')) for x, addr in pkts[:-2] ])
  125. writermock.drain.assert_has_calls([ call() for x in pkts[:-2]
  126. ])
  127. @timeout(2)
  128. async def test_argerrors(self):
  129. # it'd be nice to silence the usage output here
  130. with self.assertRaises(SystemExit) as cm, \
  131. patch.dict(sys.__dict__, dict(argv=[ 'name', ])):
  132. await main()
  133. self.assertEqual(cm.exception.code, 2)
  134. @timeout(2)
  135. @patch(__name__ + '.from_loragw')
  136. @patch(__name__ + '.to_loragw')
  137. @patch('multicast.create_multicast_receiver')
  138. @patch('multicast.create_multicast_transmitter')
  139. @patch(__name__ + '.open_dev')
  140. async def test_main(self, od, cmt, cmr, tlg, flg):
  141. # setup various mocks
  142. cmtret = Mock()
  143. cmrret = Mock()
  144. cmt.return_value = cmtret
  145. cmr.return_value = cmrret
  146. readermock = Mock()
  147. writermock = Mock()
  148. od.return_value = (readermock, writermock)
  149. # make sure that when called w/ an arg
  150. serdev = 'abc123'
  151. with patch.dict(sys.__dict__, dict(argv=[ 'name', serdev ])):
  152. await main()
  153. # that open_dev is called with it
  154. od.assert_called_with(serdev, 'w+b', buffering=0)
  155. # and that the multicast functions were called
  156. cmt.assert_called_with(DEFAULT_MADDR)
  157. cmr.assert_called_with(DEFAULT_MADDR)
  158. # that there was a setobj created
  159. setobj = tlg.mock_calls[0][1][-1]
  160. self.assertIsInstance(setobj, set)
  161. # and the same object was passed
  162. self.assertIs(setobj, flg.mock_calls[0][1][-1])
  163. # and both tasks were passed the correct objects
  164. tlg.assert_called_with(cmrret, writermock, setobj)
  165. flg.assert_called_with(readermock, cmtret, setobj)
  166. # and that they were closed in the end
  167. cmtret.close.assert_called()
  168. cmrret.close.assert_called()
  169. # and that the writer was closed as well
  170. writermock.close.assert_called()