Implement a secure ICS protocol targeting LoRa Node151 microcontroller for controlling irrigation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1637 lines
40 KiB

  1. # Copyright 2021 John-Mark Gurney.
  2. #
  3. # Redistribution and use in source and binary forms, with or without
  4. # modification, are permitted provided that the following conditions
  5. # are met:
  6. # 1. Redistributions of source code must retain the above copyright
  7. # notice, this list of conditions and the following disclaimer.
  8. # 2. Redistributions in binary form must reproduce the above copyright
  9. # notice, this list of conditions and the following disclaimer in the
  10. # documentation and/or other materials provided with the distribution.
  11. #
  12. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  13. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  14. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  15. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  16. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  17. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  18. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  19. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  20. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  21. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  22. # SUCH DAMAGE.
  23. #
  24. import asyncio
  25. import codecs
  26. import contextlib
  27. import functools
  28. import itertools
  29. import os
  30. import pathlib
  31. import shutil
  32. import subprocess
  33. import sys
  34. import tempfile
  35. import unittest
  36. from unittest.mock import patch
  37. from subprocess import DEVNULL, PIPE
  38. from Strobe.Strobe import Strobe, KeccakF
  39. from Strobe.Strobe import AuthenticationFailed
  40. import syote_comms
  41. from syote_comms import make_pktbuf, X25519
  42. import multicast
  43. from util import *
  44. # Response to command will be the CMD and any arguments if needed.
  45. # The command is encoded as an unsigned byte
  46. CMD_TERMINATE = 1 # no args: terminate the sesssion, reply confirms
  47. # The follow commands are queue up, but will be acknoledged when queued
  48. CMD_WAITFOR = 2 # arg: (length): waits for length seconds
  49. CMD_RUNFOR = 3 # arg: (chan, length): turns on chan for length seconds
  50. CMD_PING = 4 # arg: (): a no op command
  51. CMD_SETUNSET = 5 # arg: (chan, val): sets chan to val
  52. CMD_ADV = 6 # arg: ([cnt]): advances to the next cnt (default 1) command
  53. CMD_CLEAR = 7 # arg: (): clears all future commands, but keeps current running
  54. CMD_KEY = 8 # arg: ([key reports]): standard USB HID key reports, return is number added
  55. _directmap = [
  56. (40, '\n\x1b\b\t -=[]\\#;\'`,./'),
  57. (40, '\r'),
  58. ]
  59. _keymap = { chr(x): x - ord('a') + 4 for x in range(ord('a'), ord('z') + 1) } | \
  60. { '0': 39 } | \
  61. { chr(x): x - ord('1') + 30 for x in range(ord('1'), ord('9') + 1) } | \
  62. { x: i + base for base, keys in _directmap for i, x in enumerate(keys) }
  63. _shiftmaplist = '!1@2#3$4%5^6&7*8(9)0_-+=~`{[}]|\\:;"\'<,>.?/'
  64. _completemap = { x: (False, y) for x, y in _keymap.items() } | \
  65. { x.upper(): (True, y) for x, y in _keymap.items() if x != x.upper() } | \
  66. { x: (True, _keymap[y]) for x, y in zip(_shiftmaplist[::2], _shiftmaplist[1::2]) }
  67. def makekeybuf(s):
  68. blank = b'\x00' * 8
  69. # start clean
  70. ret = [ blank ]
  71. templ = bytearray([ 0 ] * 8)
  72. for i in s:
  73. shift, key = _completemap[i]
  74. if ret[-1][2] == key:
  75. ret.append(blank)
  76. templ[2] = key
  77. templ[0] = 2 if shift else 0
  78. ret.append(templ[:])
  79. # end clean
  80. ret.append(blank)
  81. return b''.join(ret)
  82. class LORANode(object):
  83. '''Implement a LORANode initiator.
  84. There are currently two implemented modes, one is shared, and then
  85. a shared key must be provided to the shared keyword argument.
  86. The other is ecdhe mode, which requires an X25519 key to be passed
  87. in to init_key, and the respondent's public key to be passed in to
  88. resp_pub.
  89. '''
  90. SHARED_DOMAIN = b'com.funkthat.lora.irrigation.shared.v0.0.1'
  91. ECDHE_DOMAIN = b'com.funkthat.lora.irrigation.ecdhe.v0.0.1'
  92. MAC_LEN = 8
  93. def __init__(self, syncdatagram, shared=None, init_key=None, resp_pub=None):
  94. self.sd = syncdatagram
  95. if shared is not None:
  96. self.st = Strobe(self.SHARED_DOMAIN, F=KeccakF(800))
  97. self.st.key(shared)
  98. self.start = self.shared_start
  99. elif init_key is not None and resp_pub is not None:
  100. self.st = Strobe(self.ECDHE_DOMAIN, F=KeccakF(800))
  101. self.key = init_key
  102. self.resp_pub = resp_pub
  103. self.st.key(init_key.getpub() + resp_pub)
  104. self.start = self.ecdhe_start
  105. else:
  106. raise RuntimeError('invalid combination of keys provided')
  107. async def shared_start(self):
  108. resp = await self.sendrecvvalid(os.urandom(16) + b'reqreset')
  109. self.st.ratchet()
  110. pkt = await self.sendrecvvalid(b'confirm')
  111. if pkt != b'confirmed':
  112. raise RuntimeError('got invalid response: %s' %
  113. repr(pkt))
  114. async def ecdhe_start(self):
  115. ephkey = X25519.gen()
  116. resp = await self.sendrecvvalid(ephkey.getpub() + b'reqreset',
  117. fun=lambda: self.st.key(ephkey.dh(self.resp_pub) + self.key.dh(self.resp_pub)))
  118. self.st.key(ephkey.dh(resp) + self.key.dh(resp))
  119. pkt = await self.sendrecvvalid(b'confirm')
  120. if pkt != b'confirmed':
  121. raise RuntimeError('got invalid response: %s' %
  122. repr(pkt))
  123. async def sendrecvvalid(self, msg, fun=None):
  124. msg = self.st.send_enc(msg) + self.st.send_mac(self.MAC_LEN)
  125. if fun is not None:
  126. fun()
  127. origstate = self.st.copy()
  128. while True:
  129. resp = await self.sd.sendtillrecv(msg, .50)
  130. #_debprint('got:', resp)
  131. # skip empty messages
  132. if len(resp) == 0:
  133. continue
  134. try:
  135. decmsg = self.st.recv_enc(resp[:-self.MAC_LEN])
  136. self.st.recv_mac(resp[-self.MAC_LEN:])
  137. break
  138. except AuthenticationFailed:
  139. # didn't get a valid packet, restore
  140. # state and retry
  141. #_debprint('failed')
  142. self.st.set_state_from(origstate)
  143. #_debprint('got rep:', repr(resp), repr(decmsg))
  144. return decmsg
  145. @staticmethod
  146. def _encodeargs(*args):
  147. r = []
  148. for i in args:
  149. if isinstance(i, bytes):
  150. r.append(i)
  151. else:
  152. r.append(i.to_bytes(4, byteorder='little'))
  153. return b''.join(r)
  154. async def _sendcmd(self, cmd, *args):
  155. cmdbyte = cmd.to_bytes(1, byteorder='little')
  156. resp = await self.sendrecvvalid(cmdbyte + self._encodeargs(*args))
  157. if resp[0:1] != cmdbyte:
  158. raise RuntimeError(
  159. 'response does not match, got: %s, expected: %s' %
  160. (repr(resp[0:1]), repr(cmdbyte)))
  161. r = resp[1:]
  162. if r:
  163. return r
  164. async def waitfor(self, length):
  165. return await self._sendcmd(CMD_WAITFOR, length)
  166. async def runfor(self, chan, length):
  167. return await self._sendcmd(CMD_RUNFOR, chan, length)
  168. async def setunset(self, chan, val):
  169. return await self._sendcmd(CMD_SETUNSET, chan, val)
  170. async def ping(self):
  171. return await self._sendcmd(CMD_PING)
  172. async def type(self, s):
  173. keys = makekeybuf(s)
  174. while keys:
  175. r = await self._sendcmd(CMD_KEY, keys[:8 * 6])
  176. r = int.from_bytes(r, byteorder='little')
  177. keys = keys[r * 8:]
  178. async def adv(self, cnt=None):
  179. args = ()
  180. if cnt is not None:
  181. args = (cnt, )
  182. return await self._sendcmd(CMD_ADV, *args)
  183. async def clear(self):
  184. return await self._sendcmd(CMD_CLEAR)
  185. async def terminate(self):
  186. return await self._sendcmd(CMD_TERMINATE)
  187. class SyncDatagram(object):
  188. '''Base interface for a more simple synchronous interface.'''
  189. async def recv(self, timeout=None): #pragma: no cover
  190. '''Receive a datagram. If timeout is not None, wait that many
  191. seconds, and if nothing is received in that time, raise an
  192. asyncio.TimeoutError exception.'''
  193. raise NotImplementedError
  194. async def send(self, data): #pragma: no cover
  195. raise NotImplementedError
  196. async def sendtillrecv(self, data, freq):
  197. '''Send the datagram in data, every freq seconds until a datagram
  198. is received. If timeout seconds happen w/o receiving a datagram,
  199. then raise an TimeoutError exception.'''
  200. while True:
  201. #_debprint('sending:', repr(data))
  202. await self.send(data)
  203. try:
  204. return await self.recv(freq)
  205. except asyncio.TimeoutError:
  206. pass
  207. class MulticastSyncDatagram(SyncDatagram):
  208. '''
  209. An implementation of SyncDatagram that uses the provided
  210. multicast address maddr as the source/sink of the packets.
  211. Note that once created, the start coroutine needs to be
  212. await'd before being passed to a LORANode so that everything
  213. is running.
  214. '''
  215. # Note: sent packets will be received. A similar method to
  216. # what was done in multicast.{to,from}_loragw could be done
  217. # here as well, that is passing in a set of packets to not
  218. # pass back up.
  219. def __init__(self, maddr):
  220. self.maddr = maddr
  221. self._ignpkts = set()
  222. async def start(self):
  223. self.mr = await multicast.create_multicast_receiver(self.maddr)
  224. self.mt = await multicast.create_multicast_transmitter(
  225. self.maddr)
  226. async def _recv(self):
  227. while True:
  228. pkt = await self.mr.recv()
  229. pkt = pkt[0]
  230. if pkt not in self._ignpkts:
  231. return pkt
  232. self._ignpkts.remove(pkt)
  233. async def recv(self, timeout=None): #pragma: no cover
  234. r = await asyncio.wait_for(self._recv(), timeout=timeout)
  235. return r
  236. async def send(self, data): #pragma: no cover
  237. self._ignpkts.add(bytes(data))
  238. await self.mt.send(data)
  239. def close(self):
  240. '''Shutdown communications.'''
  241. self.mr.close()
  242. self.mr = None
  243. self.mt.close()
  244. self.mt = None
  245. def listsplit(lst, item):
  246. try:
  247. idx = lst.index(item)
  248. except ValueError:
  249. return lst, []
  250. return lst[:idx], lst[idx + 1:]
  251. async def main():
  252. import argparse
  253. from loraserv import DEFAULT_MADDR as maddr
  254. parser = argparse.ArgumentParser(description='This is an implementation of both the server and client implementing syote secure IoT protocol.',
  255. epilog='Both -k and -p MUST be used together. One of either -s or the combone -k & -p MUST be specified.')
  256. parser.add_argument('-f', dest='schedfile', metavar='filename', type=str,
  257. help='Use commands from the file. One command per line.')
  258. parser.add_argument('-k', dest='privkey', metavar='privfile', type=str,
  259. help='File containing a hex encoded private key.')
  260. parser.add_argument('-p', dest='pubkey', metavar='pubfile', type=str,
  261. help='File containing a hex encoded public key.')
  262. parser.add_argument('-r', dest='client', metavar='module:function', type=str,
  263. help='Create a respondant instead of sending commands. Commands will be passed to the function.')
  264. parser.add_argument('-s', dest='shared_key', metavar='shared_key', type=str,
  265. help='File containing the shared key to use (note, any white space is included).')
  266. parser.add_argument('args', metavar='CMD_ARG', type=str, nargs='*',
  267. help='Various commands to send to the device.')
  268. args = parser.parse_args()
  269. # make sure a key is specified.
  270. if args.privkey is None and args.pubkey is None and \
  271. args.shared_key is None:
  272. parser.error('a key must be specified (either -k and -p, or -s)')
  273. # make sure if one is specified, both are.
  274. if (args.privkey is not None or args.pubkey is not None) and \
  275. (args.privkey is None or args.pubkey is None):
  276. parser.error('both -k and -p MUST be specified if one is.')
  277. if args.shared_key is not None:
  278. skdata = pathlib.Path(args.shared_key).read_bytes()
  279. lorakwargs = dict(shared=skdata)
  280. commsinitargs = (make_pktbuf(lorakwargs['shared']), None, None)
  281. else:
  282. privkeydata = pathlib.Path(args.privkey).read_text().strip()
  283. privkey = X25519.frombytes(codecs.decode(privkeydata, 'hex'))
  284. pubkeydata = pathlib.Path(args.pubkey).read_text().strip()
  285. pubkey = codecs.decode(pubkeydata, 'hex')
  286. lorakwargs = dict(init_key=privkey, resp_pub=pubkey)
  287. commsinitargs = (None, make_pktbuf(privkey.getpriv()),
  288. make_pktbuf(pubkey))
  289. if args.client:
  290. # Run a client
  291. mr = await multicast.create_multicast_receiver(maddr)
  292. mt = await multicast.create_multicast_transmitter(maddr)
  293. from ctypes import c_uint8
  294. # seed the RNG
  295. prngseed = os.urandom(64)
  296. syote_comms._lib.strobe_seed_prng((c_uint8 *
  297. len(prngseed))(*prngseed), len(prngseed))
  298. # Create the state for testing
  299. commstate = syote_comms.CommsState()
  300. import util_load
  301. client_func = util_load.load_application(args.client)
  302. def client_call(msg, outbuf):
  303. ret = client_func(msg._from())
  304. if len(ret) > outbuf[0].pktlen:
  305. ret = b'error, too long buffer: %d' % len(ret)
  306. outbuf[0].pktlen = min(len(ret), outbuf[0].pktlen)
  307. for i in range(outbuf[0].pktlen):
  308. outbuf[0].pkt[i] = ret[i]
  309. cb = syote_comms.process_msgfunc_t(client_call)
  310. # Initialize everything
  311. syote_comms.comms_init(commstate, cb, *commsinitargs)
  312. try:
  313. while True:
  314. pkt = await mr.recv()
  315. msg = pkt[0]
  316. #_debprint('procmsg:', repr(msg))
  317. out = syote_comms.comms_process_wrap(
  318. commstate, msg)
  319. #_debprint('resp:', repr(out))
  320. if out:
  321. await mt.send(out)
  322. finally:
  323. mr.close()
  324. mt.close()
  325. sys.exit(0)
  326. msd = MulticastSyncDatagram(maddr)
  327. await msd.start()
  328. l = LORANode(msd, **lorakwargs)
  329. await l.start()
  330. valid_cmds = {
  331. 'waitfor', 'setunset', 'runfor', 'ping', 'adv', 'clear',
  332. 'terminate', 'type',
  333. }
  334. if args.args and args.schedfile:
  335. parser.error('only one of -f or arguments can be specified.')
  336. if args.args:
  337. cmds = list(args.args)
  338. cmdargs = []
  339. while cmds:
  340. a, cmds = listsplit(cmds, '--')
  341. cmdargs.append(a)
  342. else:
  343. with open(args.schedfile) as fp:
  344. cmdargs = [ x.split() for x in fp.readlines() ]
  345. while cmdargs:
  346. cmd, *args = cmdargs.pop(0)
  347. if cmd not in valid_cmds:
  348. print('invalid command:', repr(cmd))
  349. sys.exit(1)
  350. fun = getattr(l, cmd)
  351. try:
  352. await fun(*(int(x) for x in args))
  353. except:
  354. await fun(*args)
  355. if __name__ == '__main__':
  356. asyncio.run(main())
  357. class MockSyncDatagram(SyncDatagram):
  358. '''A testing version of SyncDatagram. Define a method runner which
  359. implements part of the sequence. In the function, await on either
  360. self.get, to wait for the other side to send something, or await
  361. self.put w/ data to send.'''
  362. def __init__(self):
  363. self.sendq = asyncio.Queue()
  364. self.recvq = asyncio.Queue()
  365. self.task = asyncio.create_task(self.runner())
  366. self.get = self.sendq.get
  367. self.put = self.recvq.put
  368. async def drain(self):
  369. '''Wait for the runner thread to finish up.'''
  370. return await self.task
  371. async def runner(self): #pragma: no cover
  372. raise NotImplementedError
  373. async def recv(self, timeout=None):
  374. return await self.recvq.get()
  375. async def send(self, data):
  376. return await self.sendq.put(data)
  377. def __del__(self): #pragma: no cover
  378. if self.task is not None and not self.task.done():
  379. self.task.cancel()
  380. class TestSyncData(unittest.IsolatedAsyncioTestCase):
  381. async def test_syncsendtillrecv(self):
  382. class MySync(SyncDatagram):
  383. def __init__(self):
  384. self.sendq = []
  385. self.resp = [ asyncio.TimeoutError(), b'a' ]
  386. async def recv(self, timeout=None):
  387. assert timeout == 1
  388. r = self.resp.pop(0)
  389. if isinstance(r, Exception):
  390. raise r
  391. return r
  392. async def send(self, data):
  393. self.sendq.append(data)
  394. ms = MySync()
  395. r = await ms.sendtillrecv(b'foo', 1)
  396. self.assertEqual(r, b'a')
  397. self.assertEqual(ms.sendq, [ b'foo', b'foo' ])
  398. class AsyncSequence(object):
  399. '''
  400. Object used for sequencing async functions. To use, use the
  401. asynchronous context manager created by the sync method. For
  402. example:
  403. seq = AsyncSequence()
  404. async func1():
  405. async with seq.sync(1):
  406. second_fun()
  407. async func2():
  408. async with seq.sync(0):
  409. first_fun()
  410. This will make sure that function first_fun is run before running
  411. the function second_fun. If a previous block raises an Exception,
  412. it will be passed up, and all remaining blocks (and future ones)
  413. will raise a CancelledError to help ensure that any tasks are
  414. properly cleaned up.
  415. '''
  416. def __init__(self, positerfactory=lambda: itertools.count()):
  417. '''The argument positerfactory, is a factory that will
  418. create an iterator that will be used for the values that
  419. are passed to the sync method.'''
  420. self.positer = positerfactory()
  421. self.token = object()
  422. self.die = False
  423. self.waiting = {
  424. next(self.positer): self.token
  425. }
  426. async def simpsync(self, pos):
  427. async with self.sync(pos):
  428. pass
  429. @contextlib.asynccontextmanager
  430. async def sync(self, pos):
  431. '''An async context manager that will be run when it's
  432. turn arrives. It will only run when all the previous
  433. items in the iterator has been successfully run.'''
  434. if self.die:
  435. raise asyncio.CancelledError('seq cancelled')
  436. if pos in self.waiting:
  437. if self.waiting[pos] is not self.token:
  438. raise RuntimeError('pos already waiting!')
  439. else:
  440. fut = asyncio.Future()
  441. self.waiting[pos] = fut
  442. await fut
  443. # our time to shine!
  444. del self.waiting[pos]
  445. try:
  446. yield None
  447. except Exception as e:
  448. # if we got an exception, things went pear shaped,
  449. # shut everything down, and any future calls.
  450. #_debprint('dieing...', repr(e))
  451. self.die = True
  452. # cancel existing blocks
  453. while self.waiting:
  454. k, v = self.waiting.popitem()
  455. #_debprint('canceling: %s' % repr(k))
  456. if v is self.token:
  457. continue
  458. # for Python 3.9:
  459. # msg='pos %s raised exception: %s' %
  460. # (repr(pos), repr(e))
  461. v.cancel()
  462. # populate real exception up
  463. raise
  464. else:
  465. # handle next
  466. nextpos = next(self.positer)
  467. if nextpos in self.waiting:
  468. #_debprint('np:', repr(self), nextpos,
  469. # repr(self.waiting[nextpos]))
  470. self.waiting[nextpos].set_result(None)
  471. else:
  472. self.waiting[nextpos] = self.token
  473. class TestSequencing(unittest.IsolatedAsyncioTestCase):
  474. @timeout(2)
  475. async def test_seq_alreadywaiting(self):
  476. waitseq = AsyncSequence()
  477. seq = AsyncSequence()
  478. async def fun1():
  479. async with waitseq.sync(1):
  480. pass
  481. async def fun2():
  482. async with seq.sync(1):
  483. async with waitseq.sync(1): # pragma: no cover
  484. pass
  485. task1 = asyncio.create_task(fun1())
  486. task2 = asyncio.create_task(fun2())
  487. # spin things to make sure things advance
  488. await asyncio.sleep(0)
  489. async with seq.sync(0):
  490. pass
  491. with self.assertRaises(RuntimeError):
  492. await task2
  493. async with waitseq.sync(0):
  494. pass
  495. await task1
  496. @timeout(2)
  497. async def test_seqexc(self):
  498. seq = AsyncSequence()
  499. excseq = AsyncSequence()
  500. async def excfun1():
  501. async with seq.sync(1):
  502. pass
  503. async with excseq.sync(0):
  504. raise ValueError('foo')
  505. # that a block that enters first, but runs after
  506. # raises an exception
  507. async def excfun2():
  508. async with seq.sync(0):
  509. pass
  510. async with excseq.sync(1): # pragma: no cover
  511. pass
  512. # that a block that enters after, raises an
  513. # exception
  514. async def excfun3():
  515. async with seq.sync(2):
  516. pass
  517. async with excseq.sync(2): # pragma: no cover
  518. pass
  519. task1 = asyncio.create_task(excfun1())
  520. task2 = asyncio.create_task(excfun2())
  521. task3 = asyncio.create_task(excfun3())
  522. with self.assertRaises(ValueError):
  523. await task1
  524. with self.assertRaises(asyncio.CancelledError):
  525. await task2
  526. with self.assertRaises(asyncio.CancelledError):
  527. await task3
  528. @timeout(2)
  529. async def test_seq(self):
  530. # test that a seq object when created
  531. seq = AsyncSequence(lambda: itertools.count(1))
  532. col = []
  533. async def fun1():
  534. async with seq.sync(1):
  535. col.append(1)
  536. async with seq.sync(2):
  537. col.append(2)
  538. async with seq.sync(4):
  539. col.append(4)
  540. async def fun2():
  541. async with seq.sync(3):
  542. col.append(3)
  543. async with seq.sync(6):
  544. col.append(6)
  545. async def fun3():
  546. async with seq.sync(5):
  547. col.append(5)
  548. # and various functions are run
  549. task1 = asyncio.create_task(fun1())
  550. task2 = asyncio.create_task(fun2())
  551. task3 = asyncio.create_task(fun3())
  552. # and the functions complete
  553. await task3
  554. await task2
  555. await task1
  556. # that the order they ran in was correct
  557. self.assertEqual(col, list(range(1, 7)))
  558. class TestLORANode(unittest.IsolatedAsyncioTestCase):
  559. shared_domain = b'com.funkthat.lora.irrigation.shared.v0.0.1'
  560. ecdhe_domain = b'com.funkthat.lora.irrigation.ecdhe.v0.0.1'
  561. def test_initparams(self):
  562. # make sure no keys fails
  563. with self.assertRaises(RuntimeError):
  564. l = LORANode(None)
  565. @timeout(2)
  566. async def test_lora_ecdhe(self):
  567. _self = self
  568. initkey = X25519.gen()
  569. respkey = X25519.gen()
  570. class TestSD(MockSyncDatagram):
  571. async def sendgettest(self, msg):
  572. '''Send the message, but make sure that if a
  573. bad message is sent afterward, that it replies
  574. w/ the same previous message.
  575. '''
  576. await self.put(msg)
  577. resp = await self.get()
  578. await self.put(b'bogusmsg' * 5)
  579. resp2 = await self.get()
  580. _self.assertEqual(resp, resp2)
  581. return resp
  582. async def runner(self):
  583. # as respondant
  584. l = Strobe(_self.ecdhe_domain, F=KeccakF(800))
  585. l.key(initkey.getpub() + respkey.getpub())
  586. # start handshake
  587. r = await self.get()
  588. # get eph key w/ reqreset
  589. pkt = l.recv_enc(r[:-8])
  590. l.recv_mac(r[-8:])
  591. assert pkt.endswith(b'reqreset')
  592. ephpub = pkt[:-len(b'reqreset')]
  593. # make sure junk gets ignored
  594. await self.put(b'sdlfkj')
  595. # and that the packet remains the same
  596. _self.assertEqual(r, await self.get())
  597. # and a couple more times
  598. await self.put(b'0' * 24)
  599. _self.assertEqual(r, await self.get())
  600. await self.put(b'0' * 32)
  601. _self.assertEqual(r, await self.get())
  602. # update the keys
  603. l.key(respkey.dh(ephpub) + respkey.dh(initkey.getpub()))
  604. # generate our eph key
  605. ephkey = X25519.gen()
  606. # send the response
  607. await self.put(l.send_enc(ephkey.getpub()) +
  608. l.send_mac(8))
  609. l.key(ephkey.dh(ephpub) + ephkey.dh(initkey.getpub()))
  610. # get the confirmation message
  611. r = await self.get()
  612. # test the resend capabilities
  613. await self.put(b'0' * 24)
  614. _self.assertEqual(r, await self.get())
  615. # decode confirmation message
  616. c = l.recv_enc(r[:-8])
  617. l.recv_mac(r[-8:])
  618. # assert that we got it
  619. _self.assertEqual(c, b'confirm')
  620. # send confirmed reply
  621. r = await self.sendgettest(l.send_enc(
  622. b'confirmed') + l.send_mac(8))
  623. # test and decode remaining command messages
  624. cmd = l.recv_enc(r[:-8])
  625. l.recv_mac(r[-8:])
  626. assert cmd[0] == CMD_WAITFOR
  627. assert int.from_bytes(cmd[1:],
  628. byteorder='little') == 30
  629. r = await self.sendgettest(l.send_enc(
  630. cmd[0:1]) + l.send_mac(8))
  631. cmd = l.recv_enc(r[:-8])
  632. l.recv_mac(r[-8:])
  633. assert cmd[0] == CMD_RUNFOR
  634. assert int.from_bytes(cmd[1:5],
  635. byteorder='little') == 1
  636. assert int.from_bytes(cmd[5:],
  637. byteorder='little') == 50
  638. r = await self.sendgettest(l.send_enc(
  639. cmd[0:1]) + l.send_mac(8))
  640. cmd = l.recv_enc(r[:-8])
  641. l.recv_mac(r[-8:])
  642. assert cmd[0] == CMD_TERMINATE
  643. await self.put(l.send_enc(cmd[0:1]) +
  644. l.send_mac(8))
  645. tsd = TestSD()
  646. # make sure it fails w/o both specified
  647. with self.assertRaises(RuntimeError):
  648. l = LORANode(tsd, init_key=initkey)
  649. with self.assertRaises(RuntimeError):
  650. l = LORANode(tsd, resp_pub=respkey.getpub())
  651. l = LORANode(tsd, init_key=initkey, resp_pub=respkey.getpub())
  652. await l.start()
  653. await l.waitfor(30)
  654. await l.runfor(1, 50)
  655. await l.terminate()
  656. await tsd.drain()
  657. # Make sure all messages have been processed
  658. self.assertTrue(tsd.sendq.empty())
  659. self.assertTrue(tsd.recvq.empty())
  660. #_debprint('done')
  661. @timeout(2)
  662. async def test_lora_shared(self):
  663. _self = self
  664. shared_key = os.urandom(32)
  665. class TestSD(MockSyncDatagram):
  666. async def sendgettest(self, msg):
  667. '''Send the message, but make sure that if a
  668. bad message is sent afterward, that it replies
  669. w/ the same previous message.
  670. '''
  671. await self.put(msg)
  672. resp = await self.get()
  673. await self.put(b'bogusmsg' * 5)
  674. resp2 = await self.get()
  675. _self.assertEqual(resp, resp2)
  676. return resp
  677. async def runner(self):
  678. l = Strobe(TestLORANode.shared_domain, F=KeccakF(800))
  679. l.key(shared_key)
  680. # start handshake
  681. r = await self.get()
  682. pkt = l.recv_enc(r[:-8])
  683. l.recv_mac(r[-8:])
  684. assert pkt.endswith(b'reqreset')
  685. # make sure junk gets ignored
  686. await self.put(b'sdlfkj')
  687. # and that the packet remains the same
  688. _self.assertEqual(r, await self.get())
  689. # and a couple more times
  690. await self.put(b'0' * 24)
  691. _self.assertEqual(r, await self.get())
  692. await self.put(b'0' * 32)
  693. _self.assertEqual(r, await self.get())
  694. # send the response
  695. await self.put(l.send_enc(os.urandom(16)) +
  696. l.send_mac(8))
  697. # require no more back tracking at this point
  698. l.ratchet()
  699. # get the confirmation message
  700. r = await self.get()
  701. # test the resend capabilities
  702. await self.put(b'0' * 24)
  703. _self.assertEqual(r, await self.get())
  704. # decode confirmation message
  705. c = l.recv_enc(r[:-8])
  706. l.recv_mac(r[-8:])
  707. # assert that we got it
  708. _self.assertEqual(c, b'confirm')
  709. # send confirmed reply
  710. r = await self.sendgettest(l.send_enc(
  711. b'confirmed') + l.send_mac(8))
  712. # test and decode remaining command messages
  713. cmd = l.recv_enc(r[:-8])
  714. l.recv_mac(r[-8:])
  715. assert cmd[0] == CMD_WAITFOR
  716. assert int.from_bytes(cmd[1:],
  717. byteorder='little') == 30
  718. r = await self.sendgettest(l.send_enc(
  719. cmd[0:1]) + l.send_mac(8))
  720. cmd = l.recv_enc(r[:-8])
  721. l.recv_mac(r[-8:])
  722. assert cmd[0] == CMD_RUNFOR
  723. assert int.from_bytes(cmd[1:5],
  724. byteorder='little') == 1
  725. assert int.from_bytes(cmd[5:],
  726. byteorder='little') == 50
  727. r = await self.sendgettest(l.send_enc(
  728. cmd[0:1]) + l.send_mac(8))
  729. cmd = l.recv_enc(r[:-8])
  730. l.recv_mac(r[-8:])
  731. assert cmd[0] == CMD_TERMINATE
  732. await self.put(l.send_enc(cmd[0:1]) +
  733. l.send_mac(8))
  734. tsd = TestSD()
  735. l = LORANode(tsd, shared=shared_key)
  736. await l.start()
  737. await l.waitfor(30)
  738. await l.runfor(1, 50)
  739. await l.terminate()
  740. await tsd.drain()
  741. # Make sure all messages have been processed
  742. self.assertTrue(tsd.sendq.empty())
  743. self.assertTrue(tsd.recvq.empty())
  744. #_debprint('done')
  745. @timeout(2)
  746. async def test_ccode_badmsgs(self):
  747. # Test to make sure that various bad messages in the
  748. # handshake process are rejected even if the attacker
  749. # has the correct key. This just keeps the protocol
  750. # tight allowing for variations in the future.
  751. # seed the RNG
  752. prngseed = b'abc123'
  753. from ctypes import c_uint8
  754. syote_comms.strobe_seed_prng((c_uint8 *
  755. len(prngseed))(*prngseed), len(prngseed))
  756. # Create the state for testing
  757. commstate = syote_comms.CommsState()
  758. cb = syote_comms.process_msgfunc_t(lambda msg, outbuf: None)
  759. # Generate shared key
  760. shared_key = os.urandom(32)
  761. # Initialize everything
  762. syote_comms.comms_init(commstate, cb, make_pktbuf(shared_key), None, None)
  763. # Create test fixture, only use it to init crypto state
  764. tsd = SyncDatagram()
  765. l = LORANode(tsd, shared=shared_key)
  766. # copy the crypto state
  767. cstate = l.st.copy()
  768. # compose an incorrect init message
  769. msg = os.urandom(16) + b'othre'
  770. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  771. out = syote_comms.comms_process_wrap(commstate, msg)
  772. self.assertFalse(out)
  773. # that varous short messages don't cause problems
  774. for i in range(10):
  775. out = syote_comms.comms_process_wrap(commstate, b'0' * i)
  776. self.assertFalse(out)
  777. # copy the crypto state
  778. cstate = l.st.copy()
  779. # compose an incorrect init message
  780. msg = os.urandom(16) + b' eqreset'
  781. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  782. out = syote_comms.comms_process_wrap(commstate, msg)
  783. self.assertFalse(out)
  784. # compose the correct init message
  785. msg = os.urandom(16) + b'reqreset'
  786. msg = l.st.send_enc(msg) + l.st.send_mac(l.MAC_LEN)
  787. out = syote_comms.comms_process_wrap(commstate, msg)
  788. l.st.recv_enc(out[:-l.MAC_LEN])
  789. l.st.recv_mac(out[-l.MAC_LEN:])
  790. l.st.ratchet()
  791. # copy the crypto state
  792. cstate = l.st.copy()
  793. # compose an incorrect confirmed message
  794. msg = b'onfirm'
  795. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  796. out = syote_comms.comms_process_wrap(commstate, msg)
  797. self.assertFalse(out)
  798. # copy the crypto state
  799. cstate = l.st.copy()
  800. # compose an incorrect confirmed message
  801. msg = b' onfirm'
  802. msg = cstate.send_enc(msg) + cstate.send_mac(l.MAC_LEN)
  803. out = syote_comms.comms_process_wrap(commstate, msg)
  804. self.assertFalse(out)
  805. @timeout(2)
  806. async def test_ccode_ecdhe(self):
  807. _self = self
  808. from ctypes import c_uint8
  809. # seed the RNG
  810. prngseed = b'abc123'
  811. syote_comms.strobe_seed_prng((c_uint8 *
  812. len(prngseed))(*prngseed), len(prngseed))
  813. # Create the state for testing
  814. commstate = syote_comms.CommsState()
  815. # These are the expected messages and their arguments
  816. exptmsgs = [
  817. (CMD_WAITFOR, [ 30 ]),
  818. (CMD_RUNFOR, [ 1, 50 ]),
  819. (CMD_PING, [ ]),
  820. (CMD_TERMINATE, [ ]),
  821. ]
  822. def procmsg(msg, outbuf):
  823. msgbuf = msg._from()
  824. cmd = msgbuf[0]
  825. args = [ int.from_bytes(msgbuf[x:x + 4],
  826. byteorder='little') for x in range(1, len(msgbuf),
  827. 4) ]
  828. if exptmsgs[0] == (cmd, args):
  829. exptmsgs.pop(0)
  830. outbuf[0].pkt[0] = cmd
  831. outbuf[0].pktlen = 1
  832. else: #pragma: no cover
  833. raise RuntimeError('cmd not found')
  834. # wrap the callback function
  835. cb = syote_comms.process_msgfunc_t(procmsg)
  836. class CCodeSD(MockSyncDatagram):
  837. async def runner(self):
  838. for expectlen in [ 40, 17, 9, 9, 9, 9 ]:
  839. # get message
  840. inmsg = await self.get()
  841. # process the test message
  842. out = syote_comms.comms_process_wrap(
  843. commstate, inmsg)
  844. # make sure the reply matches length
  845. _self.assertEqual(expectlen, len(out))
  846. # save what was originally replied
  847. origmsg = out
  848. # pretend that the reply didn't make it
  849. out = syote_comms.comms_process_wrap(
  850. commstate, inmsg)
  851. # make sure that the reply matches
  852. # the previous
  853. _self.assertEqual(origmsg, out)
  854. # pass the reply back
  855. await self.put(out)
  856. # Generate keys
  857. initkey = X25519.gen()
  858. respkey = X25519.gen()
  859. # Initialize everything
  860. syote_comms.comms_init(commstate, cb, None, make_pktbuf(respkey.getpriv()), make_pktbuf(initkey.getpub()))
  861. # Create test fixture
  862. tsd = CCodeSD()
  863. l = LORANode(tsd, init_key=initkey, resp_pub=respkey.getpub())
  864. # Send various messages
  865. await l.start()
  866. await l.waitfor(30)
  867. await l.runfor(1, 50)
  868. await l.ping()
  869. await l.terminate()
  870. await tsd.drain()
  871. # Make sure all messages have been processed
  872. self.assertTrue(tsd.sendq.empty())
  873. self.assertTrue(tsd.recvq.empty())
  874. # Make sure all the expected messages have been
  875. # processed.
  876. self.assertFalse(exptmsgs)
  877. #_debprint('done')
  878. @timeout(2)
  879. async def test_ccode(self):
  880. _self = self
  881. from ctypes import c_uint8, memmove
  882. # seed the RNG
  883. prngseed = b'abc123'
  884. syote_comms.strobe_seed_prng((c_uint8 *
  885. len(prngseed))(*prngseed), len(prngseed))
  886. # Create the state for testing
  887. commstate = syote_comms.CommsState()
  888. # These are the expected messages and their arguments
  889. exptmsgs = [
  890. (CMD_WAITFOR, [ 30 ]),
  891. (CMD_RUNFOR, [ 1, 50 ]),
  892. (CMD_PING, [ ]),
  893. # using big here, because Python is stupid.
  894. (CMD_KEY, b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x16\x00\x00\x00\x00\x00\x02\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x17\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', CMD_KEY.to_bytes(1, byteorder='big') + (3).to_bytes(4, byteorder='little')),
  895. (CMD_KEY, b'\x00\x00\x17\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', CMD_KEY.to_bytes(1, byteorder='big') + (2).to_bytes(4, byteorder='little')),
  896. (CMD_TERMINATE, [ ]),
  897. ]
  898. def procmsg(msg, outbuf):
  899. msgbuf = msg._from()
  900. cmd = msgbuf[0]
  901. if isinstance(exptmsgs[0][1], bytes):
  902. args = msgbuf[1:]
  903. else:
  904. args = [ int.from_bytes(msgbuf[x:x + 4],
  905. byteorder='little') for x in range(1, len(msgbuf),
  906. 4) ]
  907. if exptmsgs[0][:2] == (cmd, args):
  908. if len(exptmsgs[0]) > 2:
  909. rmsg = exptmsgs[0][2]
  910. memmove(outbuf[0].pkt, rmsg, len(rmsg))
  911. outbuf[0].pktlen = len(rmsg)
  912. else:
  913. outbuf[0].pkt[0] = cmd
  914. outbuf[0].pktlen = 1
  915. exptmsgs.pop(0)
  916. else: #pragma: no cover
  917. raise RuntimeError('cmd not found, got %d, expected %d' % (cmd, exptmsgs[0][0]))
  918. # wrap the callback function
  919. cb = syote_comms.process_msgfunc_t(procmsg)
  920. class CCodeSD(MockSyncDatagram):
  921. async def runner(self):
  922. for expectlen in [ 24, 17, 9, 9, 9, 13, 13, 9 ]:
  923. # get message
  924. inmsg = await self.get()
  925. # process the test message
  926. out = syote_comms.comms_process_wrap(
  927. commstate, inmsg)
  928. # make sure the reply matches length
  929. _self.assertEqual(expectlen, len(out))
  930. # save what was originally replied
  931. origmsg = out
  932. # pretend that the reply didn't make it
  933. out = syote_comms.comms_process_wrap(
  934. commstate, inmsg)
  935. # make sure that the reply matches
  936. # the previous
  937. _self.assertEqual(origmsg, out)
  938. # pass the reply back
  939. await self.put(out)
  940. # Generate shared key
  941. shared_key = os.urandom(32)
  942. # Initialize everything
  943. syote_comms.comms_init(commstate, cb, make_pktbuf(shared_key), None, None)
  944. # Create test fixture
  945. tsd = CCodeSD()
  946. l = LORANode(tsd, shared=shared_key)
  947. # Send various messages
  948. await l.start()
  949. await l.waitfor(30)
  950. await l.runfor(1, 50)
  951. await l.ping()
  952. await l.type('sHt')
  953. await l.terminate()
  954. await tsd.drain()
  955. # Make sure all messages have been processed
  956. self.assertTrue(tsd.sendq.empty())
  957. self.assertTrue(tsd.recvq.empty())
  958. # Make sure all the expected messages have been
  959. # processed.
  960. self.assertFalse(exptmsgs)
  961. #_debprint('done')
  962. @timeout(2)
  963. async def test_ccode_newsession(self):
  964. '''This test is to make sure that if an existing session
  965. is running, that a new session can be established, and that
  966. when it does, the old session becomes inactive.
  967. '''
  968. _self = self
  969. from ctypes import c_uint8
  970. seq = AsyncSequence()
  971. # seed the RNG
  972. prngseed = b'abc123'
  973. syote_comms.strobe_seed_prng((c_uint8 *
  974. len(prngseed))(*prngseed), len(prngseed))
  975. # Create the state for testing
  976. commstate = syote_comms.CommsState()
  977. # These are the expected messages and their arguments
  978. exptmsgs = [
  979. (CMD_WAITFOR, [ 30 ]),
  980. (CMD_WAITFOR, [ 70 ]),
  981. (CMD_WAITFOR, [ 40 ]),
  982. (CMD_TERMINATE, [ ]),
  983. ]
  984. def procmsg(msg, outbuf):
  985. msgbuf = msg._from()
  986. cmd = msgbuf[0]
  987. args = [ int.from_bytes(msgbuf[x:x + 4],
  988. byteorder='little') for x in range(1, len(msgbuf),
  989. 4) ]
  990. if exptmsgs[0] == (cmd, args):
  991. exptmsgs.pop(0)
  992. outbuf[0].pkt[0] = cmd
  993. outbuf[0].pktlen = 1
  994. else: #pragma: no cover
  995. raise RuntimeError('cmd not found: %d' % cmd)
  996. # wrap the callback function
  997. cb = syote_comms.process_msgfunc_t(procmsg)
  998. class FlipMsg(object):
  999. async def flipmsg(self):
  1000. # get message
  1001. inmsg = await self.get()
  1002. # process the test message
  1003. out = syote_comms.comms_process_wrap(
  1004. commstate, inmsg)
  1005. # pass the reply back
  1006. await self.put(out)
  1007. # this class always passes messages, this is
  1008. # used for the first session.
  1009. class CCodeSD1(MockSyncDatagram, FlipMsg):
  1010. async def runner(self):
  1011. for i in range(3):
  1012. await self.flipmsg()
  1013. async with seq.sync(0):
  1014. # create bogus message
  1015. inmsg = b'0'*24
  1016. # process the bogus message
  1017. out = syote_comms.comms_process_wrap(
  1018. commstate, inmsg)
  1019. # make sure there was not a response
  1020. _self.assertFalse(out)
  1021. await self.flipmsg()
  1022. # this one is special in that it will pause after the first
  1023. # message to ensure that the previous session will continue
  1024. # to work, AND that if a new "new" session comes along, it
  1025. # will override the previous new session that hasn't been
  1026. # confirmed yet.
  1027. class CCodeSD2(MockSyncDatagram, FlipMsg):
  1028. async def runner(self):
  1029. # pass one message from the new session
  1030. async with seq.sync(1):
  1031. # There might be a missing case
  1032. # handled for when the confirmed
  1033. # message is generated, but lost.
  1034. await self.flipmsg()
  1035. # and the old session is still active
  1036. await l.waitfor(70)
  1037. async with seq.sync(2):
  1038. for i in range(3):
  1039. await self.flipmsg()
  1040. # Generate shared key
  1041. shared_key = os.urandom(32)
  1042. # Initialize everything
  1043. syote_comms.comms_init(commstate, cb, make_pktbuf(shared_key), None, None)
  1044. # Create test fixture
  1045. tsd = CCodeSD1()
  1046. l = LORANode(tsd, shared=shared_key)
  1047. # Send various messages
  1048. await l.start()
  1049. await l.waitfor(30)
  1050. # Ensure that a new one can take over
  1051. tsd2 = CCodeSD2()
  1052. l2 = LORANode(tsd2, shared=shared_key)
  1053. # Send various messages
  1054. await l2.start()
  1055. await l2.waitfor(40)
  1056. await l2.terminate()
  1057. await tsd.drain()
  1058. await tsd2.drain()
  1059. # Make sure all messages have been processed
  1060. self.assertTrue(tsd.sendq.empty())
  1061. self.assertTrue(tsd.recvq.empty())
  1062. self.assertTrue(tsd2.sendq.empty())
  1063. self.assertTrue(tsd2.recvq.empty())
  1064. # Make sure all the expected messages have been
  1065. # processed.
  1066. self.assertFalse(exptmsgs)
  1067. class TestLoRaNodeMulticast(unittest.IsolatedAsyncioTestCase):
  1068. # see: https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xhtml#multicast-addresses-1
  1069. maddr = ('224.0.0.198', 48542)
  1070. @timeout(2)
  1071. async def test_multisyncdgram(self):
  1072. # Test the implementation of the multicast version of
  1073. # SyncDatagram
  1074. _self = self
  1075. from ctypes import c_uint8
  1076. # seed the RNG
  1077. prngseed = b'abc123'
  1078. syote_comms.strobe_seed_prng((c_uint8 *
  1079. len(prngseed))(*prngseed), len(prngseed))
  1080. # Create the state for testing
  1081. commstate = syote_comms.CommsState()
  1082. # These are the expected messages and their arguments
  1083. exptmsgs = [
  1084. (CMD_WAITFOR, [ 30 ]),
  1085. (CMD_PING, [ ]),
  1086. (CMD_TERMINATE, [ ]),
  1087. ]
  1088. def procmsg(msg, outbuf):
  1089. msgbuf = msg._from()
  1090. cmd = msgbuf[0]
  1091. args = [ int.from_bytes(msgbuf[x:x + 4],
  1092. byteorder='little') for x in range(1, len(msgbuf),
  1093. 4) ]
  1094. if exptmsgs[0] == (cmd, args):
  1095. exptmsgs.pop(0)
  1096. outbuf[0].pkt[0] = cmd
  1097. outbuf[0].pktlen = 1
  1098. else: #pragma: no cover
  1099. raise RuntimeError('cmd not found')
  1100. # wrap the callback function
  1101. cb = syote_comms.process_msgfunc_t(procmsg)
  1102. # Generate shared key
  1103. shared_key = os.urandom(32)
  1104. # Initialize everything
  1105. syote_comms.comms_init(commstate, cb, make_pktbuf(shared_key), None, None)
  1106. # create the object we are testing
  1107. msd = MulticastSyncDatagram(self.maddr)
  1108. seq = AsyncSequence()
  1109. async def clienttask():
  1110. mr = await multicast.create_multicast_receiver(
  1111. self.maddr)
  1112. mt = await multicast.create_multicast_transmitter(
  1113. self.maddr)
  1114. try:
  1115. # make sure the above threads are running
  1116. await seq.simpsync(0)
  1117. while True:
  1118. pkt = await mr.recv()
  1119. msg = pkt[0]
  1120. out = syote_comms.comms_process_wrap(
  1121. commstate, msg)
  1122. if out:
  1123. await mt.send(out)
  1124. finally:
  1125. mr.close()
  1126. mt.close()
  1127. task = asyncio.create_task(clienttask())
  1128. # start it
  1129. await msd.start()
  1130. # pass it to a node
  1131. l = LORANode(msd, shared=shared_key)
  1132. await seq.simpsync(1)
  1133. # Send various messages
  1134. await l.start()
  1135. await l.waitfor(30)
  1136. await l.ping()
  1137. await l.terminate()
  1138. # shut things down
  1139. ln = None
  1140. msd.close()
  1141. task.cancel()
  1142. with self.assertRaises(asyncio.CancelledError):
  1143. await task
  1144. class TestLORAmain(unittest.TestCase):
  1145. def setUp(self):
  1146. # setup temporary directory
  1147. d = pathlib.Path(tempfile.mkdtemp()).resolve()
  1148. self.basetempdir = d
  1149. self.tempdir = d / 'subdir'
  1150. self.tempdir.mkdir()
  1151. self.origcwd = os.getcwd()
  1152. os.chdir(pathlib.Path(__file__).parent)
  1153. def tearDown(self):
  1154. btd = self.basetempdir
  1155. self.tempdir = None
  1156. self.basetempdir = None
  1157. shutil.rmtree(btd)
  1158. os.chdir(self.origcwd)
  1159. self.origcwd = None
  1160. def test_sharedkey(self):
  1161. keys = []
  1162. # shared key test
  1163. keyfile = self.basetempdir / 'shared.key'
  1164. with keyfile.open('w') as fp:
  1165. fp.write(os.urandom(16).hex())
  1166. keys.append(('shared', [ [ '-s', keyfile ] ] * 2))
  1167. # pub key test
  1168. init_key = X25519.gen()
  1169. resp_key = X25519.gen()
  1170. f = {}
  1171. for i in [ 'init', 'resp' ]:
  1172. key = locals()['%s_key' % i]
  1173. for j in [ 'pub', 'priv' ]:
  1174. data = getattr(key, 'get%s' % j)().hex()
  1175. file = self.basetempdir / ('%s_%s.key' % (i, j))
  1176. f['%s_%s_key_file' % (i, j)] = file
  1177. with file.open('w') as fp:
  1178. fp.write(data)
  1179. keys.append(('pk', (
  1180. [ '-k', f['init_priv_key_file'],
  1181. '-p', f['resp_pub_key_file'] ],
  1182. [ '-k', f['resp_priv_key_file'],
  1183. '-p', f['init_pub_key_file'] ]
  1184. )))
  1185. for name, (initargs, respargs) in keys:
  1186. # XXX - macosx specific library path var
  1187. with self.subTest(name=name), patch.dict(os.environ,
  1188. dict(DYLD_LIBRARY_PATH=self.origcwd)):
  1189. self.run_program_keyargs(initargs, respargs)
  1190. def run_program_keyargs(self, initargs, respargs):
  1191. try:
  1192. resp = subprocess.Popen([ 'python', __file__,
  1193. '-r', 'dummy:TestAttr.dummy_func' ] +
  1194. respargs, stdin=DEVNULL, stderr=PIPE,
  1195. stdout=PIPE)
  1196. init = subprocess.Popen([ 'python', __file__ ] +
  1197. initargs + [ 'ping' ], stdin=DEVNULL, stderr=PIPE,
  1198. stdout=DEVNULL)
  1199. # make sure the command completes
  1200. initret = init.wait(2)
  1201. self.assertEqual(initret, 0)
  1202. # terminate responder
  1203. respret = resp.terminate()
  1204. stdout, errout = resp.communicate()
  1205. # make sure we got the message
  1206. self.assertEqual(stdout, b"got msg: b'\\x04'\n")
  1207. self.assertEqual(errout, b'')
  1208. finally:
  1209. init.kill()
  1210. _, errout = init.communicate()
  1211. #print(repr(errout))
  1212. #sys.stdout.flush()
  1213. self.assertEqual(errout, b'')
  1214. resp.kill()
  1215. _, errout = resp.communicate()
  1216. self.assertEqual(errout, b'')