Implement a secure ICS protocol targeting LoRa Node151 microcontroller for controlling irrigation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

223 lines
5.8 KiB

  1. import argparse
  2. import asyncio
  3. import multicast
  4. import sys
  5. import unittest
  6. from unittest.mock import patch, AsyncMock, Mock, call
  7. from lora import timeout, _debprint
  8. DEFAULT_MADDR = ('239.192.76.111', 21089)
  9. async def to_loragw(rp, wtr, ignpkts):
  10. '''Take a multicast.ReceiverProtocol, and pass the packets
  11. to the lora gateway on the StreamWriter.
  12. Any packets in the set ignpkts will NOT be sent out to the
  13. gateway. This is to prevent looping packets back out.
  14. '''
  15. while True:
  16. pkt, addr = await rp.recv()
  17. #_debprint('pkt to send:', repr(pkt), repr(ignpkts))
  18. if pkt in ignpkts:
  19. ignpkts.remove(pkt)
  20. continue
  21. wtr.write(b'pkt:%s\n' % pkt.hex().encode('ascii'))
  22. await wtr.drain()
  23. async def from_loragw(rdr, tp, txpkts):
  24. '''Take a StreamReader, and pass the received packets from
  25. the lora gatway to the multicast.TransmitterProtocol.
  26. Each packet that will be transmitted will be added the the txpkts
  27. set that is passed in. This is to allow a receiver to ignore
  28. the loop back.
  29. '''
  30. while True:
  31. rcv = await rdr.readuntil()
  32. #_debprint('from gw:', repr(rcv), repr(txpkts))
  33. rcv = rcv.strip()
  34. if rcv.startswith(b'data:'):
  35. # we've received a packet
  36. data = bytes.fromhex(rcv[5:].decode('ascii'))
  37. txpkts.add(data)
  38. await tp.send(data)
  39. async def open_dev(fname, *args, **kwargs):
  40. '''coroutine that returns (reader, writer), in the same way as
  41. [open_connection](https://docs.python.org/3/library/asyncio-stream.html#asyncio.open_connection)
  42. does. The args are passed to open.'''
  43. import functools, os, socket
  44. f = open(fname, *args, **kwargs)
  45. f.type = socket.SOCK_STREAM
  46. f.setblocking = functools.partial(os.set_blocking, f.fileno())
  47. f.getsockname = lambda: f.name
  48. f.getpeername = lambda: f.name
  49. f.family = socket.AF_UNIX
  50. f.recv = f.read
  51. f.send = f.write
  52. return await asyncio.open_connection(sock=f)
  53. async def main():
  54. parser = argparse.ArgumentParser()
  55. parser.add_argument('-a', metavar='maddr', type=str,
  56. help='multicastip:port to use to send/receive pkts')
  57. parser.add_argument('serdev', type=str,
  58. help='device for gateway comms')
  59. args = parser.parse_args()
  60. # open up the gateway device
  61. reader, writer = await open_dev(args.serdev, 'w+b', buffering=0)
  62. # open up the listener
  63. mr = await multicast.create_multicast_receiver(DEFAULT_MADDR)
  64. mt = await multicast.create_multicast_transmitter(DEFAULT_MADDR)
  65. try:
  66. pkts = set()
  67. tlgtask = asyncio.create_task(to_loragw(mr, writer, pkts))
  68. flgtask = asyncio.create_task(from_loragw(reader, mt, pkts))
  69. await asyncio.gather(tlgtask, flgtask)
  70. finally:
  71. mt.close()
  72. mr.close()
  73. writer.close()
  74. if __name__ == '__main__':
  75. asyncio.run(main())
  76. class TestLoraServ(unittest.IsolatedAsyncioTestCase):
  77. @timeout(2)
  78. async def test_from_loragw(self):
  79. readermock = AsyncMock()
  80. pkts = [ b'astringofdata',
  81. b'anotherpkt',
  82. b'makeupdata',
  83. b'asigo',
  84. ]
  85. pktset = set()
  86. readermock.readuntil.side_effect = [
  87. b'bogus data\r\n',
  88. ] + [ b'data: %s\r\n' % x.hex().encode('ascii') for x in pkts
  89. ] + [
  90. b'moreignored\r\n',
  91. b'rssi: 123\r\n',
  92. b'txdone\r\n',
  93. asyncio.IncompleteReadError(partial=b'aa',
  94. expected=b'\n'),
  95. ]
  96. writermock = AsyncMock()
  97. with self.assertRaises(asyncio.IncompleteReadError):
  98. await from_loragw(readermock, writermock, pktset)
  99. writermock.send.assert_has_calls([ call(x) for x in pkts ])
  100. self.assertEqual(pktset, set(pkts))
  101. @timeout(2)
  102. async def test_to_loragw(self):
  103. readermock = AsyncMock()
  104. writermock = AsyncMock()
  105. pkts = [ (x, None) for x in (b'astringofdata',
  106. b'anotherpkt',
  107. b'makeupdata',
  108. b'asigo', )
  109. ] + [
  110. asyncio.CancelledError(),
  111. ]
  112. readermock.recv.side_effect = pkts
  113. writermock.write = Mock()
  114. txpkts = { pkts[-2][0] }