diff --git a/lora.py b/lora.py index a356077..c4f1baf 100644 --- a/lora.py +++ b/lora.py @@ -27,6 +27,7 @@ import contextlib import functools import itertools import os +import sys import unittest from Strobe.Strobe import Strobe, KeccakF @@ -203,6 +204,105 @@ class MulticastSyncDatagram(SyncDatagram): self.mt.close() self.mt = None +def listsplit(lst, item): + try: + idx = lst.index(item) + except ValueError: + return lst, [] + + return lst[:idx], lst[idx + 1:] + +async def main(): + import argparse + + from loraserv import DEFAULT_MADDR as maddr + + parser = argparse.ArgumentParser() + + parser.add_argument('-r', dest='client', metavar='module:function', type=str, + help='Create a respondant instead of sending commands. Commands will be passed to the function.') + parser.add_argument('-s', dest='shared_key', metavar='shared_key', type=str, required=True, + help='The shared key (encoded as UTF-8) to use.') + parser.add_argument('args', metavar='CMD_ARG', type=str, nargs='*', + help='Various commands to send to the device.') + + args = parser.parse_args() + + shared_key = args.shared_key.encode('utf-8') + + if args.client: + # Run a client + mr = await multicast.create_multicast_receiver(maddr) + mt = await multicast.create_multicast_transmitter(maddr) + + from ctypes import c_uint8 + + # seed the RNG + prngseed = os.urandom(64) + lora_comms.strobe_seed_prng((c_uint8 * + len(prngseed))(*prngseed), len(prngseed)) + + # Create the state for testing + commstate = lora_comms.CommsState() + + import util_load + client_func = util_load.load_application(args.client) + + def client_call(msg, outbuf): + ret = client_func(msg._from()) + + if len(ret) > outbuf[0].pktlen: + ret = b'error, too long buffer: %d' % len(ret) + + outbuf[0].pktlen = min(len(ret), outbuf[0].pktlen) + for i in range(outbuf[0].pktlen): + outbuf[0].pkt[i] = ret[i] + + cb = lora_comms.process_msgfunc_t(client_call) + + # Initialize everything + lora_comms.comms_init(commstate, cb, make_pktbuf(shared_key)) + + try: + while True: + pkt = await mr.recv() + msg = pkt[0] + + out = lora_comms.comms_process_wrap( + commstate, msg) + + if out: + await mt.send(out) + finally: + mr.close() + mt.close() + sys.exit(0) + + msd = MulticastSyncDatagram(maddr) + await msd.start() + + l = LORANode(msd, shared=shared_key) + + await l.start() + + valid_cmds = { 'waitfor', 'runfor', 'ping', 'terminate', } + + cmdargs = list(args.args) + while cmdargs: + cmd = cmdargs.pop(0) + + if cmd not in valid_cmds: + print('invalid command:', repr(cmd)) + sys.exit(1) + + fun = getattr(l, cmd) + args, cmdargs = listsplit(cmdargs, '--') + + await fun(*(int(x) for x in args)) + +if __name__ == '__main__': + asyncio.run(main()) + class MockSyncDatagram(SyncDatagram): '''A testing version of SyncDatagram. Define a method runner which implements part of the sequence. In the function, await on either diff --git a/util_load.py b/util_load.py new file mode 100644 index 0000000..c96a78f --- /dev/null +++ b/util_load.py @@ -0,0 +1,72 @@ +# Copyright P G Jones 2018. +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# + +# Obtained from: +# https://github.com/pgjones/hypercorn/blob/master/src/hypercorn/utils.py +# +# Slightly modified to adapt to my uses. +# +# Remove import path from sys.path as part of clean up. +# +# Use getattr instead of eval. + +import sys + +from pathlib import Path +from importlib import import_module + +def load_application(path: str): + try: + module_name, app_name = path.split(":", 1) + except ValueError: + module_name, app_name = path, "app" + except AttributeError: + raise ValueError() + + module_path = Path(module_name).resolve() + added_module = str(module_path.parent) + sys.path.insert(0, added_module) + try: + if module_path.is_file(): + import_name = module_path.with_suffix("").name + else: + import_name = module_path.name + try: + module = import_module(import_name) + except ModuleNotFoundError as error: + if error.name == import_name: + raise ValueError('module %s not found' % + repr(import_name)) + else: + raise + + try: + for i in app_name.split('.'): + module = getattr(module, i) + return module + except AttributeError: + raise ValueError('attribute %s not found on %s' % + (repr(i), repr(module))) + finally: + sys.path.remove(added_module)