Browse Source

start ecdhe support, wrap x25519 as this will be needed..

main
John-Mark Gurney 2 years ago
parent
commit
27619d0050
2 changed files with 61 additions and 1 deletions
  1. +26
    -1
      lora.py
  2. +35
    -0
      lora_comms.py

+ 26
- 1
lora.py View File

@@ -54,14 +54,17 @@ class LORANode(object):
'''Implement a LORANode initiator.'''

SHARED_DOMAIN = b'com.funkthat.lora.irrigation.shared.v0.0.1'
ECDHE_DOMAIN = b'com.funkthat.lora.irrigation.ecdhe.v0.0.1'

MAC_LEN = 8

def __init__(self, syncdatagram, shared=None):
def __init__(self, syncdatagram, shared=None, ecdhe_key=None, resp_pub=None):
self.sd = syncdatagram
self.st = Strobe(self.SHARED_DOMAIN, F=KeccakF(800))
if shared is not None:
self.st.key(shared)
else:
raise RuntimeError

async def start(self):
resp = await self.sendrecvvalid(os.urandom(16) + b'reqreset')
@@ -600,9 +603,31 @@ class TestSequencing(unittest.IsolatedAsyncioTestCase):
# that the order they ran in was correct
self.assertEqual(col, list(range(1, 7)))

class TestX25519(unittest.TestCase):
def test_basic(self):
aprivkey = lora_comms.x25519_genkey()
apubkey = lora_comms.x25519_base(aprivkey, 1)

bprivkey = lora_comms.x25519_genkey()
bpubkey = lora_comms.x25519_base(bprivkey, 1)

self.assertNotEqual(aprivkey, bprivkey)
self.assertNotEqual(apubkey, bpubkey)

ra = lora_comms.x25519_wrap(apubkey, aprivkey, bpubkey, 1)

rb = lora_comms.x25519_wrap(bpubkey, bprivkey, apubkey, 1)

self.assertEquals(ra, rb)

class TestLORANode(unittest.IsolatedAsyncioTestCase):
shared_domain = b'com.funkthat.lora.irrigation.shared.v0.0.1'

def test_initparams(self):
# make sure no keys fails
with self.assertRaises(RuntimeError):
l = LORANode(None)

@timeout(2)
async def test_lora(self):
_self = self


+ 35
- 0
lora_comms.py View File

@@ -22,6 +22,8 @@
# SUCH DAMAGE.
#

import os

from ctypes import Structure, POINTER, CFUNCTYPE, pointer, sizeof
from ctypes import c_uint8, c_uint16, c_ssize_t, c_size_t, c_uint64, c_int
from ctypes import CDLL
@@ -95,6 +97,9 @@ class CommsState(Structure,StructureRepr):
('cs_prevmsgrespbuf', c_uint8 * 64),
]

EC_PUBLIC_BYTES = 32
EC_PRIVATE_BYTES = 32

if _lib is not None:
_lib._comms_state_size.restype = c_size_t
_lib._comms_state_size.argtypes = ()
@@ -102,17 +107,47 @@ if _lib is not None:
if _lib._comms_state_size() != sizeof(CommsState): # pragma: no cover
raise RuntimeError('CommsState structure size mismatch!')

X25519_BASE_POINT = (c_uint8 * (256//8)).in_dll(_lib, 'X25519_BASE_POINT')

for func, ret, args in [
('comms_init', None, (POINTER(CommsState), process_msgfunc_t,
POINTER(PktBuf))),
('comms_process', None, (POINTER(CommsState), PktBuf, POINTER(PktBuf))),
('strobe_seed_prng', None, (POINTER(c_uint8), c_ssize_t)),
('x25519', c_int, (c_uint8 * EC_PUBLIC_BYTES, c_uint8 * EC_PRIVATE_BYTES, c_uint8 * EC_PUBLIC_BYTES, c_int)),
]:
f = getattr(_lib, func)
f.restype = ret
f.argtypes = args
locals()[func] = f

def x25519_wrap(out, scalar, base, clamp):
outptr = (c_uint8 * EC_PUBLIC_BYTES).from_buffer_copy(out)
scalarptr = (c_uint8 * EC_PRIVATE_BYTES).from_buffer_copy(scalar)
baseptr = (c_uint8 * EC_PRIVATE_BYTES).from_buffer_copy(base)

r = x25519(outptr, scalarptr, baseptr, clamp)

if r != 0:
raise RuntimeError('x25519 failed')

return bytes(outptr)

def x25519_genkey():
return os.urandom(EC_PRIVATE_BYTES)

def x25519_base(scalar, clamp):
out = bytearray(EC_PUBLIC_BYTES)
outptr = (c_uint8 * EC_PUBLIC_BYTES).from_buffer(out)
scalarptr = (c_uint8 * EC_PRIVATE_BYTES).from_buffer_copy(scalar)

r = x25519(outptr, scalarptr, X25519_BASE_POINT, clamp)

if r != 0:
raise RuntimeError('x25519 failed')

return bytes(out)

def comms_process_wrap(state, input):
'''A wrapper around comms_process that converts the argument
into the buffer, and the returns the message as a bytes string.


Loading…
Cancel
Save