Browse Source

implement the python initiator for ecdhe key exchange...

main
John-Mark Gurney 2 years ago
parent
commit
550ee42cfb
1 changed files with 181 additions and 7 deletions
  1. +181
    -7
      lora.py

+ 181
- 7
lora.py View File

@@ -34,7 +34,7 @@ from Strobe.Strobe import Strobe, KeccakF
from Strobe.Strobe import AuthenticationFailed

import lora_comms
from lora_comms import make_pktbuf
from lora_comms import make_pktbuf, X25519
import multicast
from util import *

@@ -51,22 +51,37 @@ CMD_ADV = 6 # arg: ([cnt]): advances to the next cnt (default 1) command
CMD_CLEAR = 7 # arg: (): clears all future commands, but keeps current running

class LORANode(object):
'''Implement a LORANode initiator.'''
'''Implement a LORANode initiator.

There are currently two implemented modes, one is shared, and then
a shared key must be provided to the shared keyword argument.

The other is ecdhe mode, which requires an X25519 key to be passed
in to init_key, and the respondent's public key to be passed in to
resp_pub.
'''

SHARED_DOMAIN = b'com.funkthat.lora.irrigation.shared.v0.0.1'
ECDHE_DOMAIN = b'com.funkthat.lora.irrigation.ecdhe.v0.0.1'

MAC_LEN = 8

def __init__(self, syncdatagram, shared=None, ecdhe_key=None, resp_pub=None):
def __init__(self, syncdatagram, shared=None, init_key=None, resp_pub=None):
self.sd = syncdatagram
self.st = Strobe(self.SHARED_DOMAIN, F=KeccakF(800))
if shared is not None:
self.st = Strobe(self.SHARED_DOMAIN, F=KeccakF(800))
self.st.key(shared)
self.start = self.shared_start
elif init_key is not None and resp_pub is not None:
self.st = Strobe(self.ECDHE_DOMAIN, F=KeccakF(800))
self.key = init_key
self.resp_pub = resp_pub
self.st.key(init_key.getpub() + resp_pub)
self.start = self.ecdhe_start
else:
raise RuntimeError
raise RuntimeError('invalid combination of keys provided')

async def start(self):
async def shared_start(self):
resp = await self.sendrecvvalid(os.urandom(16) + b'reqreset')

self.st.ratchet()
@@ -77,9 +92,26 @@ class LORANode(object):
raise RuntimeError('got invalid response: %s' %
repr(pkt))

async def sendrecvvalid(self, msg):
async def ecdhe_start(self):
ephkey = X25519.gen()

resp = await self.sendrecvvalid(ephkey.getpub() + b'reqreset',
fun=lambda: self.st.key(ephkey.dh(self.resp_pub) + self.key.dh(self.resp_pub)))

self.st.key(ephkey.dh(resp) + self.key.dh(resp))

pkt = await self.sendrecvvalid(b'confirm')

if pkt != b'confirmed':
raise RuntimeError('got invalid response: %s' %
repr(pkt))

async def sendrecvvalid(self, msg, fun=None):
msg = self.st.send_enc(msg) + self.st.send_mac(self.MAC_LEN)

if fun is not None:
fun()

origstate = self.st.copy()

while True:
@@ -605,12 +637,154 @@ class TestSequencing(unittest.IsolatedAsyncioTestCase):

class TestLORANode(unittest.IsolatedAsyncioTestCase):
shared_domain = b'com.funkthat.lora.irrigation.shared.v0.0.1'
ecdhe_domain = b'com.funkthat.lora.irrigation.ecdhe.v0.0.1'

def test_initparams(self):
# make sure no keys fails
with self.assertRaises(RuntimeError):
l = LORANode(None)

@timeout(2)
async def test_lora_ecdhe(self):
_self = self
initkey = X25519.gen()
respkey = X25519.gen()

class TestSD(MockSyncDatagram):
async def sendgettest(self, msg):
'''Send the message, but make sure that if a
bad message is sent afterward, that it replies
w/ the same previous message.
'''

await self.put(msg)
resp = await self.get()

await self.put(b'bogusmsg' * 5)

resp2 = await self.get()

_self.assertEqual(resp, resp2)

return resp

async def runner(self):
# as respondant

l = Strobe(_self.ecdhe_domain, F=KeccakF(800))

l.key(initkey.getpub() + respkey.getpub())

# start handshake
r = await self.get()

# get eph key w/ reqreset
pkt = l.recv_enc(r[:-8])
l.recv_mac(r[-8:])

assert pkt.endswith(b'reqreset')

ephpub = pkt[:-len(b'reqreset')]

# make sure junk gets ignored
await self.put(b'sdlfkj')

# and that the packet remains the same
_self.assertEqual(r, await self.get())

# and a couple more times
await self.put(b'0' * 24)
_self.assertEqual(r, await self.get())
await self.put(b'0' * 32)
_self.assertEqual(r, await self.get())

# update the keys
l.key(respkey.dh(ephpub) + respkey.dh(initkey.getpub()))

# generate our eph key
ephkey = X25519.gen()

# send the response
await self.put(l.send_enc(ephkey.getpub()) +
l.send_mac(8))

l.key(ephkey.dh(ephpub) + ephkey.dh(initkey.getpub()))

# get the confirmation message
r = await self.get()

# test the resend capabilities
await self.put(b'0' * 24)
_self.assertEqual(r, await self.get())

# decode confirmation message
c = l.recv_enc(r[:-8])
l.recv_mac(r[-8:])

# assert that we got it
_self.assertEqual(c, b'confirm')

# send confirmed reply
r = await self.sendgettest(l.send_enc(
b'confirmed') + l.send_mac(8))

# test and decode remaining command messages
cmd = l.recv_enc(r[:-8])
l.recv_mac(r[-8:])

assert cmd[0] == CMD_WAITFOR
assert int.from_bytes(cmd[1:],
byteorder='little') == 30

r = await self.sendgettest(l.send_enc(
cmd[0:1]) + l.send_mac(8))

cmd = l.recv_enc(r[:-8])
l.recv_mac(r[-8:])

assert cmd[0] == CMD_RUNFOR
assert int.from_bytes(cmd[1:5],
byteorder='little') == 1
assert int.from_bytes(cmd[5:],
byteorder='little') == 50

r = await self.sendgettest(l.send_enc(
cmd[0:1]) + l.send_mac(8))

cmd = l.recv_enc(r[:-8])
l.recv_mac(r[-8:])

assert cmd[0] == CMD_TERMINATE

await self.put(l.send_enc(cmd[0:1]) +
l.send_mac(8))

tsd = TestSD()

# make sure it fails w/o both specified
with self.assertRaises(RuntimeError):
l = LORANode(tsd, init_key=initkey)

with self.assertRaises(RuntimeError):
l = LORANode(tsd, resp_pub=respkey.getpub())

l = LORANode(tsd, init_key=initkey, resp_pub=respkey.getpub())

await l.start()

await l.waitfor(30)

await l.runfor(1, 50)

await l.terminate()

await tsd.drain()

# Make sure all messages have been processed
self.assertTrue(tsd.sendq.empty())
self.assertTrue(tsd.recvq.empty())
#_debprint('done')

@timeout(2)
async def test_lora_shared(self):
_self = self


Loading…
Cancel
Save