Implement a secure ICS protocol targeting LoRa Node151 microcontroller for controlling irrigation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

166 lines
6.2 KiB

  1. """
  2. An example implementation of STROBE. Doesn't include the key tree.
  3. Copyright (c) Mike Hamburg, Cryptography Research, 2016.
  4. I will need to contact legal to get a license for this; in the mean time it is
  5. for example purposes only.
  6. """
  7. from __future__ import absolute_import
  8. from Strobe.Keccak import KeccakF
  9. class AuthenticationFailed(Exception):
  10. """Thrown when a MAC fails."""
  11. pass
  12. I,A,C,T,M,K = 1<<0, 1<<1, 1<<2, 1<<3, 1<<4, 1<<5
  13. class Strobe(object):
  14. def __init__(self, proto, F = KeccakF(1600), security = 128, copy_of=None, doInit=True):
  15. if copy_of is None:
  16. self.pos = self.posbegin = 0
  17. self.I0 = None
  18. self.F = F
  19. self.R = F.nbytes - security//4
  20. # Domain separation doesn't use Strobe padding
  21. self.initialized = False
  22. self.st = bytearray(F.nbytes)
  23. domain = bytearray([1,self.R,1,0,1,12*8]) \
  24. + bytearray(b"STROBEv1.0.2")
  25. if doInit: self._duplex(domain, forceF=True)
  26. # cSHAKE separation is done.
  27. # Turn on Strobe padding and do per-proto separation
  28. self.R -= 2
  29. self.initialized = True
  30. if doInit: self.operate(A|M, proto)
  31. else:
  32. self.R,self.pos,self.posbegin,self.I0,self.F = \
  33. (copy_of.R,copy_of.pos,copy_of.posbegin,copy_of.I0,
  34. copy_of.F.copy())
  35. self.st = bytearray(copy_of.st)
  36. self.initialized = True
  37. def copy(self): return Strobe(None,copy_of=self)
  38. def deepcopy(self): return self.copy()
  39. def _runF(self):
  40. if self.initialized:
  41. self.st[self.pos] ^= self.posbegin
  42. self.st[self.pos+1] ^= 0x04
  43. self.st[self.R+1] ^= 0x80
  44. self.st = self.F(self.st)
  45. self.pos = self.posbegin = 0
  46. def _duplex(self, data, cbefore=False, cafter=False, forceF=False):
  47. assert not (cbefore and cafter)
  48. # Copy data, and convert string or int to bytearray
  49. # This converts an integer n to an array of n zeros
  50. data = bytearray(data)
  51. for i in range(len(data)):
  52. if cbefore: data[i] ^= self.st[self.pos]
  53. self.st[self.pos] ^= data[i]
  54. if cafter: data[i] = self.st[self.pos]
  55. self.pos += 1
  56. if self.pos == self.R: self._runF()
  57. if forceF and self.pos != 0: self._runF()
  58. return data
  59. def _beginOp(self, flags):
  60. # Adjust direction information so that sender and receiver agree
  61. if flags & T:
  62. if self.I0 is None: self.I0 = flags & I
  63. flags ^= self.I0
  64. # Update posbegin
  65. oldbegin, self.posbegin = self.posbegin, self.pos+1
  66. self._duplex([oldbegin,flags], forceF = flags&(C|K))
  67. def operate(self, flags, data, more=False, meta_flags=A|M, metadata=None):
  68. """
  69. STROBE main duplexing mode.
  70. Op is a byte which describes the operating mode, per the STROBE paper.
  71. Data is either a string or bytearray of data, or else a length. If it
  72. is given as a length, the data is that many bytes of zeros.
  73. If metadata is not None, first apply the given metadata in the given
  74. meta_op.
  75. STROBE operations are streamable. If more is true, this operation
  76. continues the previous operation. It therefore ignores metadata and
  77. doesn't use the beginOp code from the paper.
  78. Certain operations return data. If an operation returns no data
  79. (for example, AD and KEY don't return any data), it returns the empty
  80. byte array.
  81. The meta-operation might also return data. This is convenient for
  82. explicit framing (meta_op = 0b11010/0b11011) or encrypted explicit
  83. framing (meta_op = 0b11110/0b11111)
  84. If the operation is a MAC verification, this function returns the
  85. empty byte array (plus any metadata returned) on success, and throws
  86. AuthenticationFailed on failure.
  87. """
  88. assert not (flags & (K|1<<6|1<<7)) # Not implemented here
  89. meta_out = bytearray()
  90. if more:
  91. assert flags == self.cur_flags
  92. else:
  93. if metadata is not None:
  94. meta_out = self.operate(meta_flags, metadata)
  95. self._beginOp(flags)
  96. self.cur_flags = flags
  97. if (flags & (I|T) != (I|T)) and (flags & (I|A) != A):
  98. # Operation takes no input
  99. assert isinstance(data,int)
  100. # The actual processing code is just duplex
  101. cafter = (flags & (C|I|T)) == (C|T)
  102. cbefore = (flags & C) and not cafter
  103. processed = self._duplex(data, cbefore, cafter)
  104. # Determine what to do with the output.
  105. if (flags & (I|A)) == (I|A):
  106. # Return data to the application
  107. return meta_out + processed
  108. elif (flags & (I|T)) == T:
  109. # Return data to the transport.
  110. # A fancier implementation might send it directly.
  111. return meta_out + processed
  112. elif (flags & (I|A|T)) == (I|T):
  113. # Check MAC
  114. assert not more
  115. failures = 0
  116. for byte in processed: failures |= byte
  117. if failures != 0: raise AuthenticationFailed()
  118. return meta_out
  119. else:
  120. # Operation has no output data, but maybe output metadata
  121. return meta_out
  122. def ad (self,data, **kw): return self.operate(0b0010,data,**kw)
  123. def key (self,data, **kw): return self.operate(0b0110,data,**kw)
  124. def prf (self,data, **kw): return self.operate(0b0111,data,**kw)
  125. def send_clr(self,data, **kw): return self.operate(0b1010,data,**kw)
  126. def recv_clr(self,data, **kw): return self.operate(0b1011,data,**kw)
  127. def send_enc(self,data, **kw): return self.operate(0b1110,data,**kw)
  128. def recv_enc(self,data, **kw): return self.operate(0b1111,data,**kw)
  129. def send_mac(self,data=16,**kw): return self.operate(0b1100,data,**kw)
  130. def recv_mac(self,data ,**kw): return self.operate(0b1101,data,**kw)
  131. def ratchet (self,data=32,**kw): return self.operate(0b0100,data,**kw)