Implement a secure ICS protocol targeting LoRa Node151 microcontroller for controlling irrigation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

265 lines
7.4 KiB

  1. # Copyright 2021 John-Mark Gurney.
  2. #
  3. # Redistribution and use in source and binary forms, with or without
  4. # modification, are permitted provided that the following conditions
  5. # are met:
  6. # 1. Redistributions of source code must retain the above copyright
  7. # notice, this list of conditions and the following disclaimer.
  8. # 2. Redistributions in binary form must reproduce the above copyright
  9. # notice, this list of conditions and the following disclaimer in the
  10. # documentation and/or other materials provided with the distribution.
  11. #
  12. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  13. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  14. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  15. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  16. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  17. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  18. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  19. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  20. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  21. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  22. # SUCH DAMAGE.
  23. #
  24. import os
  25. import unittest
  26. from binascii import a2b_hex
  27. from ctypes import Structure, POINTER, CFUNCTYPE, pointer, sizeof
  28. from ctypes import c_uint8, c_uint16, c_ssize_t, c_size_t, c_uint64, c_int
  29. from ctypes import CDLL
  30. class StructureRepr(object):
  31. def __repr__(self): #pragma: no cover
  32. return '%s(%s)' % (self.__class__.__name__, ', '.join('%s=%s' %
  33. (k, getattr(self, k)) for k, v in self._fields_))
  34. class PktBuf(Structure):
  35. _fields_ = [
  36. ('pkt', POINTER(c_uint8)),
  37. ('pktlen', c_uint16),
  38. ]
  39. def _from(self):
  40. return bytes(self.pkt[:self.pktlen])
  41. def __repr__(self): #pragma: no cover
  42. return 'PktBuf(pkt=%s, pktlen=%s)' % (repr(self._from()),
  43. self.pktlen)
  44. def make_pktbuf(s):
  45. pb = PktBuf()
  46. if isinstance(s, bytearray):
  47. obj = s
  48. pb.pkt = pointer(c_uint8.from_buffer(s))
  49. else:
  50. obj = (c_uint8 * len(s))(*s)
  51. pb.pkt = obj
  52. pb.pktlen = len(s)
  53. pb._make_pktbuf_ref = (obj, s)
  54. return pb
  55. process_msgfunc_t = CFUNCTYPE(None, PktBuf, POINTER(PktBuf))
  56. try:
  57. _lib = CDLL('liblora_test.dylib')
  58. except OSError:
  59. _lib = None
  60. if _lib is not None:
  61. _lib._strobe_state_size.restype = c_size_t
  62. _lib._strobe_state_size.argtypes = ()
  63. _strobe_state_u64_cnt = (_lib._strobe_state_size() + 7) // 8
  64. else:
  65. _strobe_state_u64_cnt = 1
  66. class CommsSession(Structure,StructureRepr):
  67. _fields_ = [
  68. ('cs_crypto', c_uint64 * _strobe_state_u64_cnt),
  69. ('cs_state', c_int),
  70. ]
  71. class CommsState(Structure,StructureRepr):
  72. _fields_ = [
  73. # The alignment of these may be off
  74. ('cs_active', CommsSession),
  75. ('cs_pending', CommsSession),
  76. ('cs_start', c_uint64 * _strobe_state_u64_cnt),
  77. ('cs_procmsg', process_msgfunc_t),
  78. ('cs_prevmsg', PktBuf),
  79. ('cs_prevmsgresp', PktBuf),
  80. ('cs_prevmsgbuf', c_uint8 * 64),
  81. ('cs_prevmsgrespbuf', c_uint8 * 64),
  82. ]
  83. EC_PUBLIC_BYTES = 32
  84. EC_PRIVATE_BYTES = 32
  85. if _lib is not None:
  86. _lib._comms_state_size.restype = c_size_t
  87. _lib._comms_state_size.argtypes = ()
  88. if _lib._comms_state_size() != sizeof(CommsState): # pragma: no cover
  89. raise RuntimeError('CommsState structure size mismatch!')
  90. X25519_BASE_POINT = (c_uint8 * (256//8)).in_dll(_lib, 'X25519_BASE_POINT')
  91. for func, ret, args in [
  92. ('comms_init', None, (POINTER(CommsState), process_msgfunc_t,
  93. POINTER(PktBuf))),
  94. ('comms_process', None, (POINTER(CommsState), PktBuf, POINTER(PktBuf))),
  95. ('strobe_seed_prng', None, (POINTER(c_uint8), c_ssize_t)),
  96. ('x25519', c_int, (c_uint8 * EC_PUBLIC_BYTES, c_uint8 * EC_PRIVATE_BYTES, c_uint8 * EC_PUBLIC_BYTES, c_int)),
  97. ]:
  98. f = getattr(_lib, func)
  99. f.restype = ret
  100. f.argtypes = args
  101. locals()[func] = f
  102. def x25519_wrap(out, scalar, base, clamp):
  103. outptr = (c_uint8 * EC_PUBLIC_BYTES).from_buffer_copy(out)
  104. scalarptr = (c_uint8 * EC_PRIVATE_BYTES).from_buffer_copy(scalar)
  105. baseptr = (c_uint8 * EC_PRIVATE_BYTES).from_buffer_copy(base)
  106. r = x25519(outptr, scalarptr, baseptr, clamp)
  107. if r != 0:
  108. raise RuntimeError('x25519 failed')
  109. return bytes(outptr)
  110. def x25519_genkey():
  111. return os.urandom(EC_PRIVATE_BYTES)
  112. def x25519_base(scalar, clamp):
  113. out = bytearray(EC_PUBLIC_BYTES)
  114. outptr = (c_uint8 * EC_PUBLIC_BYTES).from_buffer(out)
  115. scalarptr = (c_uint8 * EC_PRIVATE_BYTES).from_buffer_copy(scalar)
  116. r = x25519(outptr, scalarptr, X25519_BASE_POINT, clamp)
  117. if r != 0:
  118. raise RuntimeError('x25519 failed')
  119. return bytes(out)
  120. class X25519:
  121. '''Class to wrap the x25519 functions into something a bit more
  122. usable. This provides better key ingestion and better support
  123. for other key formats.
  124. Use either the gen method to generate a random key, or the frombytes
  125. method.
  126. a = X25519.gen()
  127. b = X25519.gen()
  128. a.dh(b.getpub()) == b.dh(a.getpub())
  129. That is, each party generates a key, sends their public part to the
  130. other party, and then uses their received public part as an argument
  131. to the dh method. The resulting value will be shared between the
  132. two parties.
  133. '''
  134. def __init__(self, key):
  135. self.privkey = key
  136. self.pubkey = x25519_base(key, 1)
  137. def dh(self, pub):
  138. '''Perform a DH operation using the public part pub.'''
  139. return x25519_wrap(self.pubkey, self.privkey, pub, 1)
  140. def getpub(self):
  141. '''Get the public part of the key. This is to be sent
  142. to the other party for key exchange.'''
  143. return self.pubkey
  144. def getpriv(self):
  145. return self.privkey
  146. @classmethod
  147. def gen(cls):
  148. '''Generate a random X25519 key.'''
  149. return cls(x25519_genkey())
  150. @classmethod
  151. def frombytes(cls, key):
  152. '''Generate an X25519 key from 32 bytes.'''
  153. return cls(key)
  154. def comms_process_wrap(state, input):
  155. '''A wrapper around comms_process that converts the argument
  156. into the buffer, and the returns the message as a bytes string.
  157. '''
  158. inpkt = make_pktbuf(input)
  159. outbytes = bytearray(64)
  160. outbuf = make_pktbuf(outbytes)
  161. comms_process(state, inpkt, outbuf)
  162. return outbuf._from()
  163. class TestX25519(unittest.TestCase):
  164. PUBLIC_BYTES = EC_PUBLIC_BYTES
  165. PRIVATE_BYTES = EC_PRIVATE_BYTES
  166. def test_class(self):
  167. key = X25519.gen()
  168. pubkey = key.getpub()
  169. privkey = key.getpriv()
  170. apubkey = x25519_base(privkey, 1)
  171. self.assertEqual(apubkey, pubkey)
  172. self.assertEqual(X25519.frombytes(privkey).getpub(), pubkey)
  173. with self.assertRaises(ValueError):
  174. X25519(b'0'*31)
  175. def test_rfc7748_6_1(self):
  176. # KAT from https://datatracker.ietf.org/doc/html/rfc7748#section-6.1
  177. apriv = a2b_hex('77076d0a7318a57d3c16c17251b26645df4c2f87ebc0992ab177fba51db92c2a')
  178. akey = X25519(apriv)
  179. self.assertEqual(akey.getpub(), a2b_hex('8520f0098930a754748b7ddcb43ef75a0dbf3a0d26381af4eba4a98eaa9b4e6a'))
  180. bpriv = a2b_hex('5dab087e624a8a4b79e17f8b83800ee66f3bb1292618b6fd1c2f8b27ff88e0eb')
  181. bkey = X25519(bpriv)
  182. self.assertEqual(bkey.getpub(), a2b_hex('de9edb7d7b7dc1b4d35b61c2ece435373f8343c85b78674dadfc7e146f882b4f'))
  183. ss = a2b_hex('4a5d9d5ba4ce2de1728e3bf480350f25e07e21c947d19e3376f09b3c1e161742')
  184. self.assertEqual(akey.dh(bkey.getpub()), ss)
  185. self.assertEqual(bkey.dh(akey.getpub()), ss)
  186. def test_basic_ops(self):
  187. aprivkey = x25519_genkey()
  188. apubkey = x25519_base(aprivkey, 1)
  189. bprivkey = x25519_genkey()
  190. bpubkey = x25519_base(bprivkey, 1)
  191. self.assertNotEqual(aprivkey, bprivkey)
  192. self.assertNotEqual(apubkey, bpubkey)
  193. ra = x25519_wrap(apubkey, aprivkey, bpubkey, 1)
  194. rb = x25519_wrap(bpubkey, bprivkey, apubkey, 1)
  195. self.assertEqual(ra, rb)