From f968246195a0513bd165119dd92d426aa2dae54c Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Sun, 19 Nov 2023 20:43:12 -0800 Subject: [PATCH] first cut of the cryptography->pycryptodome compatibility/translation layer --- .gitignore | 7 + Makefile | 16 +++ README.md | 33 +++++ cryptography/__init__.py | 0 cryptography/exceptions.py | 5 + cryptography/hazmat/__init__.py | 0 cryptography/hazmat/backends.py | 2 + cryptography/hazmat/primitives/__init__.py | 0 .../hazmat/primitives/asymmetric/ec.py | 87 ++++++++++++ .../hazmat/primitives/asymmetric/utils.py | 6 + cryptography/hazmat/primitives/ciphers.py | 70 ++++++++++ cryptography/hazmat/primitives/hashes.py | 4 + .../hazmat/primitives/kdf/__init__.py | 0 cryptography/hazmat/primitives/kdf/hkdf.py | 15 ++ .../hazmat/primitives/serialization.py | 15 ++ cryptography/tests/__init__.py | 2 + cryptography/tests/aes.py | 61 +++++++++ cryptography/tests/ecc.py | 128 ++++++++++++++++++ requirements.txt | 3 + setup.py | 15 ++ 20 files changed, 469 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 README.md create mode 100644 cryptography/__init__.py create mode 100644 cryptography/exceptions.py create mode 100644 cryptography/hazmat/__init__.py create mode 100644 cryptography/hazmat/backends.py create mode 100644 cryptography/hazmat/primitives/__init__.py create mode 100644 cryptography/hazmat/primitives/asymmetric/ec.py create mode 100644 cryptography/hazmat/primitives/asymmetric/utils.py create mode 100644 cryptography/hazmat/primitives/ciphers.py create mode 100644 cryptography/hazmat/primitives/hashes.py create mode 100644 cryptography/hazmat/primitives/kdf/__init__.py create mode 100644 cryptography/hazmat/primitives/kdf/hkdf.py create mode 100644 cryptography/hazmat/primitives/serialization.py create mode 100644 cryptography/tests/__init__.py create mode 100644 cryptography/tests/aes.py create mode 100644 cryptography/tests/ecc.py create mode 100644 requirements.txt create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ded11c9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.coverage +__pycache__ + +cryptography.egg-info + +pycaenv +venv diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..af254ca --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ +VIRTUALENV ?= python3 -m venv +VRITUALENVARGS = + +MODULES=cryptography.tests + +test: venv pycaenv + find cryptography -name '*.py' | entr make test-noentr + +test-noentr: + ( . ./pycaenv/bin/activate && cd cryptography && python -m unittest tests) && ( . ./venv/bin/activate && pip install -e . && python -m coverage run -m unittest $(MODULES) && coverage report --omit=p/\* -m -i) + +venv: + ($(VIRTUALENV) $(VIRTUALENVARGS) venv && . ./venv/bin/activate && pip install -r requirements.txt) + +pycaenv: + ($(VIRTUALENV) $(VIRTUALENVARGS) pycaenv && . ./pycaenv/bin/activate && pip install cryptography) diff --git a/README.md b/README.md new file mode 100644 index 0000000..be7a0b1 --- /dev/null +++ b/README.md @@ -0,0 +1,33 @@ +pycryptowrap +============ + +This is a translation layer to let software written for +[cryptography](https://cryptography.io/en/latest/) to used with +[pycryptodome](https://www.pycryptodome.org/). + +It currently only implements a minimal interface to get +[pywebpush](https://github.com/web-push-libs/pywebpush) working. + +Currently implemented, tested, and working: +* AES-GCM +* ECC SECP256R1 - DSS and ECDH + +It shouldn't be too hard to add some additional ciphers or curves. + +Testing +======= + +The `Makefile` does the neccessary work to build an environment. To +run the tests: +``` +make test-noentr +``` + +This will first run the tests against the cryptography module, and then +run the tests against this module. This makes sure that the tests are +valid, and also makes it easier to add known answers to make sure +you don't end up only self compatible. + +To make developing easier, the `test` target has one dependency, +[entr](https://github.com/eradman/entr). This will watch for changes +in the `py` files, and automatically rerun the test. diff --git a/cryptography/__init__.py b/cryptography/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cryptography/exceptions.py b/cryptography/exceptions.py new file mode 100644 index 0000000..87268e1 --- /dev/null +++ b/cryptography/exceptions.py @@ -0,0 +1,5 @@ +class InvalidTag(Exception): + pass + +class InvalidSignature(Exception): + pass diff --git a/cryptography/hazmat/__init__.py b/cryptography/hazmat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cryptography/hazmat/backends.py b/cryptography/hazmat/backends.py new file mode 100644 index 0000000..e3faff2 --- /dev/null +++ b/cryptography/hazmat/backends.py @@ -0,0 +1,2 @@ +def default_backend(): + pass diff --git a/cryptography/hazmat/primitives/__init__.py b/cryptography/hazmat/primitives/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cryptography/hazmat/primitives/asymmetric/ec.py b/cryptography/hazmat/primitives/asymmetric/ec.py new file mode 100644 index 0000000..a1e4b8b --- /dev/null +++ b/cryptography/hazmat/primitives/asymmetric/ec.py @@ -0,0 +1,87 @@ +from cryptography.exceptions import InvalidSignature + +from Crypto.PublicKey import ECC +from Crypto.Protocol.DH import _compute_ecdh +from Crypto.Signature import DSS + +# https://www.pycryptodome.org/src/public_key/ecc# + +SECP256R1 = lambda: 'secp256r1' + +class ECDSA: + def __init__(self, algorithm): + self._algo = algorithm + +class ECDH: + pass + +class EllipticCurvePrivateNumbers: + def __init__(self, key): + self._key = key + + @property + def public_numbers(self): + return self._key._ecc.pointQ + + @property + def private_value(self): + return self._key._ecc.d + +class ECCWrapper: + def __init__(self, ecckey): + self._ecc = ecckey + + def private_numbers(self): + return EllipticCurvePrivateNumbers(self) + + def public_key(self): + return EllipticCurvePublicKey(self._ecc.public_key()) + + _format_to_compress = dict(nocompress=False, compress=True) + def public_bytes(self, encoding, format): + return self.public_key()._ecc.export_key(format=encoding, compress=self._format_to_compress[format]) + + def private_bytes(self, encoding, format, encryption_algorithm): + return self._ecc.export_key(format=encoding, compress=format) + + # https://www.pycryptodome.org/src/protocol/dh + def exchange(self, typ, pubkey): + assert isinstance(typ, ECDH) + + return _compute_ecdh(self._ecc, pubkey._ecc) + + @classmethod + def generate_private_key(cls, keytype, backend=None): + if callable(keytype): + keytype = keytype() + + return cls(ECC.generate(curve=keytype)) + + def sign(self, data, signature_algorithm): + h = signature_algorithm._algo.new(data) + + signer = DSS.new(self._ecc, 'fips-186-3', 'der') + + return signer.sign(h) + + def verify(self, signature, data, signature_algorithm): + h = signature_algorithm._algo.new(data) + + verifier = DSS.new(self._ecc, 'fips-186-3', 'der') + + try: + verifier.verify(h, signature) + except ValueError: + raise InvalidSignature + +class EllipticCurvePublicKey(ECCWrapper): + @classmethod + def from_encoded_point(cls, curve, key): + return cls(ECC.import_key(key, curve_name=curve)) + +generate_private_key = ECCWrapper.generate_private_key + +def load_der_private_key(key, password, backend=None): + if password is not None: + raise ValueError('unsupported') + return ECCWrapper(ECC.import_key(key)) diff --git a/cryptography/hazmat/primitives/asymmetric/utils.py b/cryptography/hazmat/primitives/asymmetric/utils.py new file mode 100644 index 0000000..7054c1f --- /dev/null +++ b/cryptography/hazmat/primitives/asymmetric/utils.py @@ -0,0 +1,6 @@ +from Crypto.Util.asn1 import DerSequence + +def decode_dss_signature(signature): + obj = DerSequence().decode(der_encoded=signature) + + return tuple(obj) diff --git a/cryptography/hazmat/primitives/ciphers.py b/cryptography/hazmat/primitives/ciphers.py new file mode 100644 index 0000000..63341fc --- /dev/null +++ b/cryptography/hazmat/primitives/ciphers.py @@ -0,0 +1,70 @@ +from Crypto.Cipher import AES as ccAES +from cryptography.exceptions import InvalidTag + +# https://www.pycryptodome.org/src/cipher/modern#gcm-mode + +class CipherEncryptor: + def __init__(self, encor): + self._encor = encor + + self.authenticate_additional_data = encor.update + self.update = encor.encrypt + + @property + def tag(self): + return self._encor.digest() + + def finalize(self): + return b'' + +class CipherDecryptor: + def __init__(self, decor, tag=None): + self._decor = decor + self._tag = tag + + self.authenticate_additional_data = decor.update + self.update = decor.decrypt + + def finalize(self): + try: + #print(repr(self._decor)) + self._decor.verify(self._tag) + except ValueError: + raise InvalidTag('tag mismatch') + + return b'' + +class Cipher: + def __init__(self, algo, mode, backend=None): + self._algo = algo + self._mode = mode + + def _getmode(self): + if isinstance(self._mode, GCM): + return ccAES.MODE_GCM + + def _nonce(self): + return self._mode._iv + + def encryptor(self): + return CipherEncryptor(ccAES.new(self._algo._key, + self._getmode(), nonce=self._nonce())) + + def decryptor(self): + return CipherDecryptor(ccAES.new(self._algo._key, + self._getmode(), nonce=self._nonce()), tag=self._mode._tag) + +class AES: + def __init__(self, key): + self._key = key + +class algorithms: + AES = AES + +class GCM: + def __init__(self, iv, tag=None): + self._iv = iv + self._tag = tag + +class modes: + GCM = GCM diff --git a/cryptography/hazmat/primitives/hashes.py b/cryptography/hazmat/primitives/hashes.py new file mode 100644 index 0000000..4582f5b --- /dev/null +++ b/cryptography/hazmat/primitives/hashes.py @@ -0,0 +1,4 @@ +def SHA256(): + from Crypto.Hash import SHA256 + + return SHA256 diff --git a/cryptography/hazmat/primitives/kdf/__init__.py b/cryptography/hazmat/primitives/kdf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cryptography/hazmat/primitives/kdf/hkdf.py b/cryptography/hazmat/primitives/kdf/hkdf.py new file mode 100644 index 0000000..a9dcd8b --- /dev/null +++ b/cryptography/hazmat/primitives/kdf/hkdf.py @@ -0,0 +1,15 @@ +from Crypto.Protocol.KDF import HKDF as baseHKDF + +# https://www.pycryptodome.org/src/protocol/kdf#hkdf + +# https://cryptography.io/en/latest/hazmat/primitives/key-derivation-functions/#hkdf + +class HKDF: + def __init__(self, algorithm, length, salt, info, backend=None): + self._algo = algorithm + self._len = length + self._salt = salt + self._info = info + + def derive(self, key): + return baseHKDF(key, self._len, self._salt, self._algo, context=self._info) diff --git a/cryptography/hazmat/primitives/serialization.py b/cryptography/hazmat/primitives/serialization.py new file mode 100644 index 0000000..da983e3 --- /dev/null +++ b/cryptography/hazmat/primitives/serialization.py @@ -0,0 +1,15 @@ +from .asymmetric.ec import load_der_private_key + +class Encoding: + X962 = 'SEC1' + Raw = 'raw' + DER = 'DER' + +class PublicFormat: + UncompressedPoint = 'nocompress' + +class PrivateFormat: + PKCS8 = 'pkcs8' + +def NoEncryption(): + return None diff --git a/cryptography/tests/__init__.py b/cryptography/tests/__init__.py new file mode 100644 index 0000000..c10cdb8 --- /dev/null +++ b/cryptography/tests/__init__.py @@ -0,0 +1,2 @@ +from .ecc import TestECC +from .aes import TestAES diff --git a/cryptography/tests/aes.py b/cryptography/tests/aes.py new file mode 100644 index 0000000..451a512 --- /dev/null +++ b/cryptography/tests/aes.py @@ -0,0 +1,61 @@ +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import ( + Cipher, algorithms, modes +) +from cryptography.exceptions import InvalidTag + +import random +import unittest + +TAG_LENGTH = 16 + +class TestAES(unittest.TestCase): + def test_aesgcm_cav(self): + pass + + def test_aesgcm(self): + buf = bytes.fromhex('00000000000000000000000000000000') + + key = bytes.fromhex('00000000000000000000000000000000') + + iv = bytes.fromhex('000000000000000000000000') + + origdata = buf + + # encryption + encryptor = Cipher(algorithms.AES(key), + modes.GCM(iv), backend=default_backend() + ).encryptor() + + last = True + #if version == 'aes128gcm': + data = encryptor.update(buf) + + self.assertEqual(data, bytes.fromhex('0388dace60b6a392f328c2b971b2fe78')) + data += encryptor.finalize() + + self.assertEqual(encryptor.tag, bytes.fromhex('ab6e47d42cec13bdf53a67b21257bddf')) + + data += encryptor.tag + + encdata = data + + # decryption + content = encdata + decryptor = Cipher(algorithms.AES(key), + modes.GCM(iv, tag=content[-TAG_LENGTH:]), + backend=default_backend() + ).decryptor() + decdata = decryptor.update(content[:-TAG_LENGTH]) + decryptor.finalize() + + self.assertEqual(origdata, decdata) + + # decryption + content = encdata + decryptor = Cipher(algorithms.AES(key), + modes.GCM(iv, tag=b'\x00' * TAG_LENGTH), + backend=default_backend() + ).decryptor() + decdata = decryptor.update(content[:-TAG_LENGTH]) + + self.assertRaises(InvalidTag, decryptor.finalize) diff --git a/cryptography/tests/ecc.py b/cryptography/tests/ecc.py new file mode 100644 index 0000000..aa7ffc8 --- /dev/null +++ b/cryptography/tests/ecc.py @@ -0,0 +1,128 @@ +import unittest + +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature +from cryptography.hazmat.primitives.kdf.hkdf import HKDF + + +class TestECC(unittest.TestCase): + def test_dh(self): + # slightly modified from: + # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/ec/#elliptic-curve-key-exchange-algorithm + + # private keys for use in the exchange. + keya = b'0\x81\x87\x02\x01\x000\x13\x06\x07*\x86H\xce=\x02\x01\x06\x08*\x86H\xce=\x03\x01\x07\x04m0k\x02\x01\x01\x04 \x04\xa3X\xbd\x0e\xff*\x8cw\xf8\x9f\x05BD<\nY\xb3\xf1\xd2\xc1\xb0\r\x1e\xedu\x92]4M?\x01\xa1D\x03B\x00\x04P\xd9y\x92f\t\xa7x\xf3\xcf\x17O\xad\x93\xf9\x18"\t\xd3\x13*]3\xa7#\x8bH$j\xea\xfb\x8a\xd3\xb5\xee\xd9\x0f\x9c\xdb\xcc\xf1\xd7\x10\x88\x10e\x82-\x15CR\x08\xbe\x0c\x1e\x82p\x00C\xb2.O\x17\xd4' + keyb = b'0\x81\x87\x02\x01\x000\x13\x06\x07*\x86H\xce=\x02\x01\x06\x08*\x86H\xce=\x03\x01\x07\x04m0k\x02\x01\x01\x04 \xfb\xf8\xf7\x9f\xa3\xb7\xed\x8cT@`\xf6\x9c\xbbv\x0e?\x87\xb1(\xf6\xa8\xb3`\x91\xb4\x92W\xc6\xaa\xf5~\xa1D\x03B\x00\x04\xb8\xfe\xe4\x8dkukc\xa4^\x87\x98\x9c\xb9\xa8\xec\x86\xf8\xc2\x89\xaeF\xe8q\xb9q\x92I\x98n\xfe\xe3<{\x1c&R\x82\xb1\x94=\xa5h*)m/\x13\xfb\x05\x1d\x98u\xec\x1ew\xdfW\x84\xfe\x9eSl\x83' + + #server_private_key = ec.generate_private_key(ec.SECP256R1()) + #print('spk:', repr(server_private_key.private_bytes( + # encoding=serialization.Encoding.DER, + # format=serialization.PublicFormat.UncompressedPoint, + # algorithm=None))) + + server_private_key = serialization.load_der_private_key(keya, + None, backend=default_backend()) + + pubpoint = server_private_key.private_numbers().public_numbers + + self.assertEqual(server_private_key.private_numbers(). \ + private_value, + 2097859916579721232322403601989230314767884081400167668022347085958538411777) + + self.assertEqual(pubpoint.x, + 36569272757924220784927781299997671003354453138026426189464987345196826688394) + self.assertEqual(pubpoint.y, + 95759458837377270694950453035845273914475242769728982836658929275588228093908) + + self.assertEqual(keya, server_private_key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption())) + + # In a real handshake the peer is a remote client. For this + # example we'll generate another local private key though. + peer_private_key = serialization.load_der_private_key(keyb, + None) + + shared_key = server_private_key.exchange( + ec.ECDH(), peer_private_key.public_key()) + + self.assertEqual(shared_key, + b'\x1a\x9f\x93c\xb0s\xa2\x15]{\xa3\xcc\xcf&Q\xd6g\x83\x86%\x7f\t\xfem@\xcb\xe9:U\x16\x07\x02') + + # Perform key derivation. + derived_key = HKDF(algorithm=hashes.SHA256(), + length=32, salt=None, info=b'handshake data', backend=None + ).derive(shared_key) + + # And now we can demonstrate that the handshake performed in + # the opposite direction gives the same final value + + same_shared_key = peer_private_key.exchange( + ec.ECDH(), server_private_key.public_key()) + + self.assertEqual(shared_key, same_shared_key) + + # Perform key derivation. + same_derived_key = HKDF(algorithm=hashes.SHA256(), + length=32, salt=None, info=b'handshake data', + ).derive(same_shared_key) + + self.assertEqual(derived_key, + b'\x89\r\xf7\xf0\xa6\xb9Z\xb9\xd7\xd0\x9b\x95y\xe0M\x11,\xb4\xe1Z\xe5\xa2j\xee)\xa0I\xb5Q\x18\x94\xd1') + + self.assertEqual(derived_key, same_derived_key) + + def test_decode_sig(self): + sig = b"0D\x02 P\x92\xaf\xffoN\xadq\r=\x92\xb5\r\xe0l3\xf2\x80*\xdd|\xfe\xd8'\xb8\\\xe8\x94\xd6\xa1\xdb\xea\x02 \x18\x89j\xa8P\x83jk*\xb8\xa2\x15r&d\xa1\x9e\xf6\xec\xd2\xf4 \xd6\x08\x91bs\x18\xb5\x11/\x04" + + res = (36444202250238074078057463719437572015031876874679459625290239663827367091178, 11098302536735876471048588108325227764001515075886106999029234198831298588420) + + self.assertEqual(decode_dss_signature(sig), res) + + def test_sign(self): + private_key = ec.generate_private_key(ec.SECP256R1()) + + data = b"this is some data I'd like to sign" + + signature = private_key.sign(data, ec.ECDSA(hashes.SHA256())) + + # make sure we can decode our own signatures + decode_dss_signature(signature) + + public_key = private_key.public_key() + + public_key.verify(signature, data, ec.ECDSA(hashes.SHA256())) + + wrongsig = bytearray(signature) + wrongsig[0] ^= 1 + wrongsig[1] ^= 4 + wrongsig = bytes(wrongsig) + + self.assertRaises(InvalidSignature, public_key.verify, wrongsig, data, ec.ECDSA(hashes.SHA256())) + + def test_misc(self): + keya = b'0\x81\x87\x02\x01\x000\x13\x06\x07*\x86H\xce=\x02\x01\x06\x08*\x86H\xce=\x03\x01\x07\x04m0k\x02\x01\x01\x04 \x04\xa3X\xbd\x0e\xff*\x8cw\xf8\x9f\x05BD<\nY\xb3\xf1\xd2\xc1\xb0\r\x1e\xedu\x92]4M?\x01\xa1D\x03B\x00\x04P\xd9y\x92f\t\xa7x\xf3\xcf\x17O\xad\x93\xf9\x18"\t\xd3\x13*]3\xa7#\x8bH$j\xea\xfb\x8a\xd3\xb5\xee\xd9\x0f\x9c\xdb\xcc\xf1\xd7\x10\x88\x10e\x82-\x15CR\x08\xbe\x0c\x1e\x82p\x00C\xb2.O\x17\xd4' + skey = serialization.load_der_private_key(keya, + None, backend=default_backend()) + + ckey = skey.public_key().public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.UncompressedPoint + ) + + self.assertEqual(ckey, b'\x04P\xd9y\x92f\t\xa7x\xf3\xcf\x17O\xad\x93\xf9\x18"\t\xd3\x13*]3\xa7#\x8bH$j\xea\xfb\x8a\xd3\xb5\xee\xd9\x0f\x9c\xdb\xcc\xf1\xd7\x10\x88\x10e\x82-\x15CR\x08\xbe\x0c\x1e\x82p\x00C\xb2.O\x17\xd4') + + pubkey = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256R1(), ckey) + + self.assertTrue(isinstance(pubkey, ec.EllipticCurvePublicKey)) + + self.assertEqual(ckey, pubkey.public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.UncompressedPoint + )) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4a87fa5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +-e . + +-e .[dev] diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..ba85213 --- /dev/null +++ b/setup.py @@ -0,0 +1,15 @@ +from setuptools import setup, Extension + +setup(name = "cryptography", version = "41.0.5", + description = "Emulate cryptography enough for some needs", + author = "John-Mark Gurney", + author_email = "jmg@funkthat.com", + url = 'about:blank', + packages = [ 'cryptography' ], + extras_require = { + 'dev': [ 'coverage' ], + }, + install_requires=[ + 'pycryptodome', + ], + )