From ed0d08b12673abbcc55076b42ad141bfd255d08f Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Wed, 7 Apr 2021 17:11:10 -0700 Subject: [PATCH] start of the lorenode initiator --- lora.py | 246 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 lora.py diff --git a/lora.py b/lora.py new file mode 100644 index 0000000..e9d741f --- /dev/null +++ b/lora.py @@ -0,0 +1,246 @@ +import asyncio +import functools +import os +import unittest + +from Strobe.Strobe import Strobe +from Strobe.Strobe import AuthenticationFailed + +domain = b'com.funkthat.lora.irrigation.shared.v0.0.1' + +# Response to command will be the CMD and any arguments if needed. +# The command is encoded as an unsigned byte +CMD_TERMINATE = 1 # no args: terminate the sesssion, reply confirms + +# The follow commands are queue up, but will be acknoledged when queued +CMD_WAITFOR = 2 # arg: (length): waits for length seconds +CMD_RUNFOR = 3 # arg: (chan, length): turns on chan for length seconds + +class LORANode(object): + '''Implement a LORANode initiator.''' + + def __init__(self, syncdatagram): + self.sd = syncdatagram + self.st = Strobe(domain) + + async def start(self): + msg = self.st.send_enc(b'reqreset' + os.urandom(16)) + \ + self.st.send_mac(8) + + resp = await self.sd.sendtillrecv(msg, 1) + + self.st.recv_enc(resp[:16]) + self.st.recv_mac(resp[16:]) + + resp = await self.sd.sendtillrecv( + self.st.send_enc(b'confirm') + self.st.send_mac(8), 1) + + pkt = self.st.recv_enc(resp[:9]) + self.st.recv_mac(resp[9:]) + + if pkt != b'confirmed': + raise RuntimeError + + @staticmethod + def _encodeargs(*args): + r = [] + for i in args: + r.append(i.to_bytes(4, byteorder='little')) + + return b''.join(r) + + async def _sendcmd(self, cmd, *args): + cmdbyte = cmd.to_bytes(1, byteorder='little') + pkt = await self.sd.sendtillrecv( + self.st.send_enc(cmdbyte + + self._encodeargs(*args)) + self.st.send_mac(8), 1) + + resp = self.st.recv_enc(pkt[:-8]) + self.st.recv_mac(pkt[-8:]) + + if resp[0:1] != cmdbyte: + raise RuntimeError('response does not match, got: %s, expected: %s' % (repr(resp[0:1]), repr(cmdbyte))) + + async def waitfor(self, length): + return await self._sendcmd(CMD_WAITFOR, length) + + async def runfor(self, chan, length): + return await self._sendcmd(CMD_RUNFOR, chan, length) + + async def terminate(self): + return await self._sendcmd(CMD_TERMINATE) + +class SyncDatagram(object): + '''Base interface for a more simple synchronous interface.''' + + def __init__(self): #pragma: no cover + pass + + async def recv(self, timeout=None): #pragma: no cover + '''Receive a datagram. If timeout is not None, wait that many + seconds, and if nothing is received in that time, raise an TimeoutError + exception.''' + + raise NotImplementedError + + async def send(self, data): #pragma: no cover + '''Send a datagram.''' + + raise NotImplementedError + + async def sendtillrecv(self, data, freq): + '''Send the datagram in data, every freq seconds until a datagram + is received. If timeout seconds happen w/o receiving a datagram, + then raise an TimeoutError exception.''' + + while True: + await self.send(data) + try: + return await self.recv(freq) + except TimeoutError: + pass + +class MockSyncDatagram(SyncDatagram): + '''A testing version of SyncDatagram. Define a method runner which + implements part of the sequence. In the function, await on either + self.get, to wait for the other side to send something, or await + self.put w/ data to send.''' + + def __init__(self): + self.sendq = asyncio.Queue() + self.recvq = asyncio.Queue() + self.task = None + self.task = asyncio.create_task(self.runner()) + + self.get = self.sendq.get + self.put = self.recvq.put + + async def drain(self): + '''Wait for the runner thread to finish up.''' + + return await self.task + + async def runner(self): #pragma: no cover + raise NotImplementedError + + async def recv(self, timeout=None): + return await self.recvq.get() + + async def send(self, data): + return await self.sendq.put(data) + + def __del__(self): #pragma: no cover + if self.task is not None and not self.task.done(): + self.task.cancel() + +class TestSyncData(unittest.IsolatedAsyncioTestCase): + async def test_syncsendtillrecv(self): + class MySync(SyncDatagram): + def __init__(self): + self.sendq = [] + self.resp = [ TimeoutError(), b'a' ] + + async def recv(self, timeout=None): + assert timeout == 1 + r = self.resp.pop(0) + if isinstance(r, Exception): + raise r + + return r + + async def send(self, data): + self.sendq.append(data) + + ms = MySync() + + r = await ms.sendtillrecv(b'foo', 1) + + self.assertEqual(r, b'a') + self.assertEqual(ms.sendq, [ b'foo', b'foo' ]) + +def timeout(timeout): + def timeout_wrapper(fun): + @functools.wraps(fun) + async def wrapper(*args, **kwargs): + return await asyncio.wait_for(fun(*args, **kwargs), + timeout) + + return wrapper + + return timeout_wrapper + +class TestLORANode(unittest.IsolatedAsyncioTestCase): + @timeout(2) + async def test_lora(self): + class TestSD(MockSyncDatagram): + async def runner(self): + l = Strobe(domain) + + # start handshake + r = await self.get() + + pkt = l.recv_enc(r[:-8]) + l.recv_mac(r[-8:]) + + assert pkt.startswith(b'reqreset') + + await self.put(l.send_enc(os.urandom(16)) + + l.send_mac(8)) + + r = await self.get() + c = l.recv_enc(r[:-8]) + l.recv_mac(r[-8:]) + + assert c == b'confirm' + + await self.put(l.send_enc(b'confirmed') + + l.send_mac(8)) + + r = await self.get() + cmd = l.recv_enc(r[:-8]) + l.recv_mac(r[-8:]) + + assert cmd[0] == CMD_WAITFOR + assert int.from_bytes(cmd[1:], byteorder='little') == 30 + + await self.put(l.send_enc(cmd[0:1]) + + l.send_mac(8)) + + r = await self.get() + cmd = l.recv_enc(r[:-8]) + l.recv_mac(r[-8:]) + + assert cmd[0] == CMD_RUNFOR + assert int.from_bytes(cmd[1:5], byteorder='little') == 1 + assert int.from_bytes(cmd[5:], byteorder='little') == 50 + + await self.put(l.send_enc(cmd[0:1]) + + l.send_mac(8)) + + r = await self.get() + cmd = l.recv_enc(r[:-8]) + l.recv_mac(r[-8:]) + + assert cmd[0] == CMD_TERMINATE + + await self.put(l.send_enc(cmd[0:1]) + + l.send_mac(8)) + + tsd = TestSD() + l = LORANode(tsd) + + await l.start() + + await l.waitfor(30) + + await l.runfor(1, 50) + + await l.terminate() + + await tsd.drain() + + # Make sure all messages have been processed + self.assertTrue(tsd.sendq.empty()) + self.assertTrue(tsd.recvq.empty()) + + print('done')