#!/usr/bin/env python #import pdb, sys; mypdb = pdb.Pdb(stdout=sys.stderr); mypdb.set_trace() from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, \ Ed448PublicKey from cryptography.hazmat.primitives.serialization import Encoding, \ PrivateFormat, PublicFormat, NoEncryption from unittest import mock from .hostid import hostuuid import base58 import copy import datetime import functools import hashlib import io import json import os.path import pathlib import pasn1 import re import shutil import string import sys import tempfile import unittest import uuid # The UUID for the namespace representing the path to a file _NAMESPACE_MEDASHARE_PATH = uuid.UUID('f6f36b62-3770-4a68-bc3d-dc3e31e429e6') # useful for debugging when stderr is redirected/captured _real_stderr = sys.stderr _defaulthash = 'sha512' _validhashes = set([ 'sha256', 'sha512' ]) _hashlengths = { len(getattr(hashlib, x)().hexdigest()): x for x in _validhashes } def _keyordering(x): k, v = x try: return (MDBase._common_names_list.index(k), k, v) except ValueError: return (2**32, k, v) def _iterdictlist(obj, **kwargs): l = list(sorted(obj.items(**kwargs), key=_keyordering)) for k, v in l: if isinstance(v, list): for i in sorted(v): yield k, i else: yield k, v def _makeuuid(s): if isinstance(s, uuid.UUID): return s return uuid.UUID(bytes=s) # XXX - known issue, store is not atomic/safe, overwrites in place instead of # renames # XXX - add validation # XXX - how to add singletons class MDBase(object): '''This is a simple wrapper that turns a JSON object into a pythonesc object where attribute accesses work.''' _type = 'invalid' _generated_properties = { 'uuid': uuid.uuid4, 'modified': lambda: datetime.datetime.now( tz=datetime.timezone.utc), } # When decoding, the decoded value should be passed to this function # to get the correct type _instance_properties = { 'uuid': _makeuuid, 'created_by_ref': _makeuuid, #'parent_refs': lambda x: [ _makeuuid(y) for y in x ], } _common_properties = [ 'type', 'created_by_ref' ] # XXX - add lang? _common_optional = set(('parent_refs', 'sig')) _common_names = set(_common_properties + list( _generated_properties.keys())) _common_names_list = _common_properties + list( _generated_properties.keys()) def __init__(self, obj={}, **kwargs): obj = copy.deepcopy(obj) obj.update(kwargs) if self._type == MDBase._type: raise ValueError('call MDBase.create_obj instead so correct class is used.') if 'type' in obj and obj['type'] != self._type: raise ValueError( 'trying to create the wrong type of object, got: %s, expected: %s' % (repr(obj['type']), repr(self._type))) if 'type' not in obj: obj['type'] = self._type for x in self._common_properties: if x not in obj: raise ValueError('common property %s not present' % repr(x)) for x, fun in self._instance_properties.items(): if x in obj: obj[x] = fun(obj[x]) for x, fun in self._generated_properties.items(): if x not in obj: obj[x] = fun() self._obj = obj @classmethod def create_obj(cls, obj): '''Using obj as a base, create an instance of MDBase of the correct type. If the correct type is not found, a ValueError is raised.''' if isinstance(obj, cls): obj = obj._obj ty = obj['type'] for i in MDBase.__subclasses__(): if i._type == ty: return i(obj) else: raise ValueError('Unable to find class for type %s' % repr(ty)) def new_version(self, *args): '''For each k, v pair, add the property k as an additional one (or new one if first), with the value v.''' obj = copy.deepcopy(self._obj) common = self._common_names | self._common_optional for k, v in args: if k in common: obj[k] = v else: obj.setdefault(k, []).append(v) del obj['modified'] return self.create_obj(obj) def __repr__(self): # pragma: no cover return '%s(%s)' % (self.__class__.__name__, repr(self._obj)) def __getattr__(self, k): try: return self._obj[k] except KeyError: raise AttributeError(k) def __setattr__(self, k, v): if k[0] == '_': # direct attribute self.__dict__[k] = v else: self._obj[k] = v def __getitem__(self, k): return self._obj[k] def __to_dict__(self): return self._obj def __eq__(self, o): return self._obj == o def __contains__(self, k): return k in self._obj def items(self, skipcommon=True): return [ (k, v) for k, v in self._obj.items() if not skipcommon or k not in self._common_names ] def encode(self): return _asn1coder.dumps(self) @classmethod def decode(cls, s): return cls.create_obj(_asn1coder.loads(s)) class MetaData(MDBase): _type = 'metadata' class Identity(MDBase): _type = 'identity' # Identites don't need a created by _common_properties = [ x for x in MDBase._common_properties if x != 'created_by_ref' ] _common_optional = set([ x for x in MDBase._common_optional if x != 'parent_refs' ] + [ 'name', 'pubkey' ]) _common_names = set(_common_properties + list( MDBase._generated_properties.keys())) def _trytodict(o): if isinstance(o, uuid.UUID): return 'bytes', o.bytes try: return 'dict', o.__to_dict__() except Exception: # pragma: no cover raise TypeError('unable to find __to_dict__ on %s: %s' % (type(o), repr(o))) class CanonicalCoder(pasn1.ASN1DictCoder): def enc_dict(self, obj, **kwargs): class FakeIter: def items(self): return iter(sorted(obj.items())) return pasn1.ASN1DictCoder.enc_dict(self, FakeIter(), **kwargs) _asn1coder = CanonicalCoder(coerce=_trytodict) class Persona(object): '''The object that represents a persona, or identity. It will create the proper identity object, serialize for saving keys, create objects for that persona and other management.''' def __init__(self, identity=None, key=None): if identity is None: self._identity = Identity() else: self._identity = identity self._key = key self._pubkey = None if 'pubkey' in self._identity: pubkeybytes = self._identity.pubkey self._pubkey = Ed448PublicKey.from_public_bytes( pubkeybytes) self._created_by_ref = self._identity.uuid def MetaData(self, *args, **kwargs): kwargs['created_by_ref'] = self.uuid return self.sign(MetaData(*args, **kwargs)) @property def uuid(self): '''Return the UUID of the identity represented.''' return self._identity.uuid def __repr__(self): # pragma: no cover r = '' % \ (self._key is not None, self._pubkey is not None, repr(self._identity)) return r @classmethod def from_pubkey(cls, pubkeystr): pubstr = base58.b58decode_check(pubkeystr) uuid, pubkey = _asn1coder.loads(pubstr) ident = Identity(uuid=uuid, pubkey=pubkey) return cls(ident) def get_identity(self): '''Return the Identity object for this Persona.''' return self._identity def get_pubkey(self): '''Get a printable version of the public key. This is used for importing into different programs, or for shared.''' idobj = self._identity pubstr = _asn1coder.dumps([ idobj.uuid, idobj.pubkey ]) return base58.b58encode_check(pubstr) def new_version(self, *args): '''Update the Persona's Identity object.''' self._identity = self.sign(self._identity.new_version(*args)) return self._identity def store(self, fname): '''Store the Persona to a file. If there is a private key associated w/ the Persona, it will be saved as well.''' with open(fname, 'wb') as fp: obj = { 'identity': self._identity, } if self._key is not None: obj['key'] = \ self._key.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) fp.write(_asn1coder.dumps(obj)) @classmethod def load(cls, fname): '''Load the Persona from the provided file.''' with open(fname, 'rb') as fp: objs = _asn1coder.loads(fp.read()) kwargs = {} if 'key' in objs: kwargs['key'] = Ed448PrivateKey.from_private_bytes( objs['key']) return cls(Identity(objs['identity']), **kwargs) def generate_key(self): '''Generate a key for this Identity. Raises a RuntimeError if a key is already present.''' if self._key: raise RuntimeError('a key already exists') self._key = Ed448PrivateKey.generate() self._pubkey = self._key.public_key() pubkey = self._pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) self._identity = self.sign(self._identity.new_version(('pubkey', pubkey))) def _makesigbytes(self, obj): obj = dict(obj.items(False)) try: del obj['sig'] except KeyError: pass return _asn1coder.dumps(obj) def sign(self, obj): '''Takes the object, adds a signature, and returns the new object.''' sigbytes = self._makesigbytes(obj) sig = self._key.sign(sigbytes) newobj = MDBase.create_obj(obj) newobj.sig = sig return newobj def verify(self, obj): sigbytes = self._makesigbytes(obj) pubkey = self._pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) self._pubkey.verify(obj['sig'], sigbytes) return True def by_file(self, fname): '''Return a file object for the file named fname.''' fobj = FileObject.from_file(fname, self._created_by_ref) return self.sign(fobj) class ObjectStore(object): '''A container to store for the various Metadata objects.''' # The _uuids property contains both the UUIDv4 for objects, and # looking up the UUIDv5 for FileObjects. def __init__(self, created_by_ref): self._created_by_ref = created_by_ref self._uuids = {} self._hashes = {} @staticmethod def makehash(hashstr, strict=True): '''Take a hash or hash string, and return a valid hash string from it. This makes sure that it is of the correct type and length. If strict is False, the function will detect the length and return a valid hash string if one can be found. By default, the string must be prepended by the type, followed by a colon, followed by the value in hex in all lower case characters.''' try: hash, value = hashstr.split(':') except ValueError: if strict: raise hash = _hashlengths[len(hashstr)] value = hashstr bvalue = value.encode('ascii') if strict and len(bvalue.translate(None, string.hexdigits.lower().encode('ascii'))) != 0: raise ValueError('value has invalid hex digits (must be lower case)', value) if hash in _validhashes: return ':'.join((hash, value)) raise ValueError def __len__(self): return len(self._uuids) def __iter__(self): return iter(self._uuids.values()) def store(self, fname): '''Write out the objects in the store to the file named fname.''' # eliminate objs stored by multiple uuids (FileObjects) objs = { id(x): x for x in self._uuids.values() } with open(fname, 'wb') as fp: obj = { 'created_by_ref': self._created_by_ref, 'objects': list(objs.values()), } fp.write(_asn1coder.dumps(obj)) def loadobj(self, obj): '''Load obj into the data store.''' obj = MDBase.create_obj(obj) self._uuids[obj.uuid] = obj if obj.type == 'file': self._uuids[_makeuuid(obj.id)] = obj for j in obj.hashes: h = self.makehash(j) self._hashes.setdefault(h, []).append(obj) @classmethod def load(cls, fname): '''Load objects from the provided file name. Basic validation will be done on the objects in the file. The objects will be accessible via other methods.''' with open(fname, 'rb') as fp: objs = _asn1coder.loads(fp.read()) obj = cls(objs['created_by_ref']) for i in objs['objects']: obj.loadobj(i) return obj def by_id(self, id): '''Look up an object by it's UUID.''' if not isinstance(id, uuid.UUID): uid = uuid.UUID(id) else: uid = id return self._uuids[uid] def by_hash(self, hash): '''Look up an object by it's hash value.''' h = self.makehash(hash, strict=False) return self._hashes[h] def by_file(self, fname, types=('metadata', )): '''Return a metadata object for the file named fname. Will raise a KeyError if this file does not exist in the database. Will raise a ValueError if fname currently does not match what is in the database. ''' fid = FileObject.make_id(fname) fobj = self.by_id(fid) fobj.verify() for i in fobj.hashes: j = self.by_hash(i) # Filter out non-metadata objects j = [ x for x in j if x.type in types ] if j: return j else: raise KeyError('unable to find metadata for file: %s' % repr(fname)) def _readfp(fp): while True: r = fp.read(64*1024) if r == b'': return yield r def _hashfile(fname): hash = getattr(hashlib, _defaulthash)() with open(fname, 'rb') as fp: for r in _readfp(fp): hash.update(r) return '%s:%s' % (_defaulthash, hash.hexdigest()) class FileObject(MDBase): _type = 'file' @staticmethod def make_id(fname): '''Take a local file name, and make the id for it. Note that converts from the local path separator to a forward slash so that it will be the same between Windows and Unix systems.''' fname = os.path.realpath(fname) return uuid.uuid5(_NAMESPACE_MEDASHARE_PATH, str(hostuuid()) + '/'.join(os.path.split(fname))) @classmethod def from_file(cls, filename, created_by_ref): s = os.stat(filename) # XXX - add host uuid? obj = { 'created_by_ref': created_by_ref, 'hostid': hostuuid(), 'dir': os.path.dirname(filename), 'filename': os.path.basename(filename), 'id': cls.make_id(filename), 'mtime': datetime.datetime.fromtimestamp(s.st_mtime, tz=datetime.timezone.utc), 'size': s.st_size, 'hashes': [ _hashfile(filename), ], } return cls(obj) def verify(self, complete=False): '''Verify that this FileObject is still valid. It will by default, only do a mtime verification. It will raise a ValueError if the file does not match.''' s = os.stat(os.path.join(self.dir, self.filename)) mtimets = datetime.datetime.fromtimestamp(s.st_mtime, tz=datetime.timezone.utc).timestamp() if self.mtime.timestamp() != mtimets or \ self.size != s.st_size: raise ValueError('file %s has changed' % repr(self.filename)) def enumeratedir(_dir, created_by_ref): '''Enumerate all the files and directories (not recursive) in _dir. Returned is a list of FileObjects.''' return [FileObject.from_file(os.path.join(_dir, x), created_by_ref) for x in os.listdir(_dir) if not os.path.isdir(os.path.join(_dir, x)) ] def get_objstore(options): persona = get_persona(options) storefname = os.path.expanduser('~/.medashare_store.pasn1') try: objstr = ObjectStore.load(storefname) except FileNotFoundError: objstr = ObjectStore(persona.get_identity().uuid) return persona, objstr def write_objstore(options, objstr): storefname = os.path.expanduser('~/.medashare_store.pasn1') objstr.store(storefname) def get_persona(options): identfname = os.path.expanduser('~/.medashare_identity.pasn1') try: persona = Persona.load(identfname) except FileNotFoundError: print('ERROR: Identity not created, create w/ -g.', file=sys.stderr) sys.exit(1) return persona def cmd_genident(options): identfname = os.path.expanduser('~/.medashare_identity.pasn1') if os.path.exists(identfname): print('Error: Identity already created.', file=sys.stderr) sys.exit(1) persona = Persona() persona.generate_key() persona.new_version(*(x.split('=', 1) for x in options.tagvalue)) persona.store(identfname) def cmd_ident(options): identfname = os.path.expanduser('~/.medashare_identity.pasn1') persona = Persona.load(identfname) if options.tagvalue: persona.new_version(*(x.split('=', 1) for x in options.tagvalue)) persona.store(identfname) else: ident = persona.get_identity() for k, v in _iterdictlist(ident, skipcommon=False): print('%s:\t%s' % (k, v)) def cmd_pubkey(options): identfname = os.path.expanduser('~/.medashare_identity.pasn1') persona = Persona.load(identfname) print(persona.get_pubkey().decode('ascii')) def cmd_modify(options): persona, objstr = get_objstore(options) props = [[ x[0] ] + x[1:].split('=', 1) for x in options.modtagvalues] if any(x[0] not in ('+', '-') for x in props): print('ERROR: tag needs to start with a "+" (add) or a "-" (remove).', file=sys.stderr) sys.exit(1) badtags = list(x[1] for x in props if x[1] in (MDBase._common_names | MDBase._common_optional)) if any(badtags): print('ERROR: invalid tag%s: %s.' % ( 's' if len(badtags) > 1 else '', repr(badtags)), file=sys.stderr) sys.exit(1) adds = [ x[1:] for x in props if x[0] == '+' ] if any((len(x) != 2 for x in adds)): print('ERROR: invalid tag, needs an "=".', file=sys.stderr) sys.exit(1) dels = [ x[1:] for x in props if x[0] == '-' ] for i in options.files: # Get MetaData try: objs = objstr.by_file(i) except KeyError: fobj = persona.by_file(i) objstr.loadobj(fobj) objs = [ persona.MetaData(hashes=fobj.hashes) ] for j in objs: # make into key/values obj = j.__to_dict__() # delete tags for k in dels: try: key, v = k except ValueError: del obj[k[0]] else: obj[key].remove(v) # add tags for k, v in adds: obj.setdefault(k, []).append(v) del obj['modified'] nobj = MDBase.create_obj(obj) objstr.loadobj(nobj) write_objstore(options, objstr) def cmd_dump(options): persona, objstr = get_objstore(options) for i in objstr: print(repr(i)) def cmd_list(options): persona, objstr = get_objstore(options) for i in options.files: try: objs = objstr.by_file(i) except (ValueError, KeyError): # create the file, it may have the same hash # as something else try: fobj = persona.by_file(i) objstr.loadobj(fobj) objs = objstr.by_file(i) except (FileNotFoundError, KeyError) as e: print('ERROR: file not found: %s' % repr(i), file=sys.stderr) sys.exit(1) except FileNotFoundError: # XXX - tell the difference? print('ERROR: file not found: %s' % repr(i), file=sys.stderr) sys.exit(1) for j in objstr.by_file(i): for k, v in _iterdictlist(j): print('%s:\t%s' % (k, v)) # This is needed so that if it creates a FileObj, which may be # expensive (hashing large file), that it gets saved. write_objstore(options, objstr) def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument('--db', '-d', type=str, help='base name for storage') subparsers = parser.add_subparsers(title='subcommands', description='valid subcommands', help='additional help') parser_gi = subparsers.add_parser('genident', help='generate identity') parser_gi.add_argument('tagvalue', nargs='+', help='add the arg as metadata for the identity, tag=[value]') parser_gi.set_defaults(func=cmd_genident) parser_i = subparsers.add_parser('ident', help='update identity') parser_i.add_argument('tagvalue', nargs='*', help='add the arg as metadata for the identity, tag=[value]') parser_i.set_defaults(func=cmd_ident) parser_pubkey = subparsers.add_parser('pubkey', help='print public key of identity') parser_pubkey.set_defaults(func=cmd_pubkey) # used so that - isn't treated as an option parser_mod = subparsers.add_parser('modify', help='modify tags on file(s)', prefix_chars='@') parser_mod.add_argument('modtagvalues', nargs='+', help='add (+) or delete (-) the tag=[value], for the specified files') parser_mod.add_argument('files', nargs='+', help='files to modify') parser_mod.set_defaults(func=cmd_modify) parser_list = subparsers.add_parser('list', help='list tags on file(s)') parser_list.add_argument('files', nargs='+', help='files to modify') parser_list.set_defaults(func=cmd_list) parser_dump = subparsers.add_parser('dump', help='dump all the objects') parser_dump.set_defaults(func=cmd_dump) options = parser.parse_args() fun = options.func fun(options) if __name__ == '__main__': # pragma: no cover main() class _TestCononicalCoder(unittest.TestCase): def test_con(self): # make a dict obja = { 'foo': 23984732, 'a': 5, 'b': 6, 'something': '2398472398723498273dfasdfjlaksdfj' } # reorder the items in it objaitems = list(obja.items()) objaitems.sort() objb = dict(objaitems) # and they are still the same self.assertEqual(obja, objb) # This is to make sure that item order changed self.assertNotEqual(list(obja.items()), list(objb.items())) astr = pasn1.dumps(obja) bstr = pasn1.dumps(objb) # that they normally will be serialized differently self.assertNotEqual(astr, bstr) # but w/ the special encoder astr = _asn1coder.dumps(obja) bstr = _asn1coder.dumps(objb) # they are now encoded the same self.assertEqual(astr, bstr) class _TestCases(unittest.TestCase): def setUp(self): self.fixtures = pathlib.Path('fixtures').resolve() d = pathlib.Path(tempfile.mkdtemp()).resolve() self.basetempdir = d self.tempdir = d / 'subdir' persona = Persona.load(os.path.join('fixtures', 'sample.persona.pasn1')) self.created_by_ref = persona.get_identity().uuid shutil.copytree(self.fixtures / 'testfiles', self.tempdir) self.oldcwd = os.getcwd() def tearDown(self): shutil.rmtree(self.basetempdir) self.tempdir = None os.chdir(self.oldcwd) def test_mdbase(self): self.assertRaises(ValueError, MDBase, created_by_ref='') self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'unknosldkfj' }) self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'metadata' }) baseobj = { 'type': 'metadata', 'created_by_ref': self.created_by_ref, } origbase = copy.deepcopy(baseobj) # that when an MDBase object is created md = MDBase.create_obj(baseobj) # it doesn't modify the passed in object (when adding # generated properties) self.assertEqual(baseobj, origbase) # and it has the generted properties # Note: cannot mock the functions as they are already # referenced at creation time self.assertIn('uuid', md) self.assertIn('modified', md) # That you can create a new version using new_version md2 = md.new_version(('dc:creator', 'Jim Bob',)) # that they are different self.assertNotEqual(md, md2) # and that the new modified time is different from the old self.assertNotEqual(md.modified, md2.modified) # and that the modification is present self.assertEqual(md2['dc:creator'], [ 'Jim Bob' ]) # that providing a value from common property fvalue = 'fakesig' md3 = md.new_version(('sig', fvalue)) # gets set directly, and is not a list self.assertEqual(md3.sig, fvalue) # that invalid attribute access raises correct exception self.assertRaises(AttributeError, getattr, md, 'somerandombogusattribute') def test_mdbase_encode_decode(self): # that an object baseobj = { 'type': 'metadata', 'created_by_ref': self.created_by_ref, } obj = MDBase.create_obj(baseobj) # can be encoded coded = obj.encode() # and that the rsults can be decoded decobj = MDBase.decode(coded) # and that they are equal self.assertEqual(obj, decobj) # and in the encoded object eobj = _asn1coder.loads(coded) # the uuid property is a str instance self.assertIsInstance(eobj['uuid'], bytes) # and has the length of 16 self.assertEqual(len(eobj['uuid']), 16) def test_mdbase_wrong_type(self): # that created_by_ref can be passed by kw obj = MetaData(created_by_ref=self.created_by_ref) self.assertRaises(ValueError, FileObject, dict(obj.items(False))) def test_makehash(self): self.assertRaises(ValueError, ObjectStore.makehash, 'slkj') self.assertRaises(ValueError, ObjectStore.makehash, 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA') self.assertRaises(ValueError, ObjectStore.makehash, 'bogushash:9e0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA', strict=False) self.assertEqual(ObjectStore.makehash('cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', strict=False), 'sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e') self.assertEqual(ObjectStore.makehash('e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', strict=False), 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855') def test_enumeratedir(self): files = enumeratedir(self.tempdir, self.created_by_ref) ftest = files[1] fname = 'test.txt' # make sure that they are of type MDBase self.assertIsInstance(ftest, MDBase) oldid = ftest.id self.assertEqual(ftest.filename, fname) self.assertEqual(ftest.dir, str(self.tempdir)) # XXX - do we add host information? self.assertEqual(ftest.id, uuid.uuid5(_NAMESPACE_MEDASHARE_PATH, str(hostuuid()) + '/'.join(os.path.split(self.tempdir) + ( fname, )))) self.assertEqual(ftest.mtime, datetime.datetime(2019, 5, 20, 21, 47, 36, tzinfo=datetime.timezone.utc)) self.assertEqual(ftest.size, 15) self.assertIn('sha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f', ftest.hashes) # XXX - make sure works w/ relative dirs files = enumeratedir(os.path.relpath(self.tempdir), self.created_by_ref) self.assertEqual(oldid, files[1].id) def test_mdbaseoverlay(self): objst = ObjectStore(self.created_by_ref) # that a base object bid = uuid.uuid4() objst.loadobj({ 'type': 'metadata', 'uuid': bid, 'modified': datetime.datetime(2019, 6, 10, 14, 3, 10), 'created_by_ref': self.created_by_ref, 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ], 'someprop': [ 'somevalue' ], 'lang': 'en', }) # can have an overlay object oid = uuid.uuid4() dhash = 'sha256:a7c96262c21db9a06fd49e307d694fd95f624569f9b35bb3ffacd880440f9787' objst.loadobj({ 'type': 'metadata', 'uuid': oid, 'modified': datetime.datetime(2019, 6, 10, 18, 3, 10), 'created_by_ref': self.created_by_ref, 'hashes': [ dhash ], 'parent_refs': [ bid ], 'lang': 'en', }) # and that when you get it's properties oobj = objst.by_id(oid) odict = dict(list(oobj.items())) # that is has the overlays property self.assertEqual(odict['parent_refs'], [ bid ]) # that it doesn't have a common property self.assertNotIn('type', odict) # that when skipcommon is False odict = dict(oobj.items(False)) # that it does have a common property self.assertIn('type', odict) def test_persona(self): # that a newly created persona persona = Persona() # has an identity object idobj = persona.get_identity() # and that it has a uuid attribute that matches self.assertEqual(persona.uuid, idobj['uuid']) # that a key can be generated persona.generate_key() # that the pubkey property is present idobj = persona.get_identity() self.assertIsInstance(idobj['pubkey'], bytes) # that get_pubkey returns the correct thing pubstr = _asn1coder.dumps([ idobj.uuid, idobj['pubkey'] ]) self.assertEqual(persona.get_pubkey(), base58.b58encode_check(pubstr)) # and that there is a signature self.assertIsInstance(idobj['sig'], bytes) # and that it can verify itself persona.verify(idobj) # and that a new persona can be created from the pubkey pkpersona = Persona.from_pubkey(persona.get_pubkey()) # and that it can verify the old identity self.assertTrue(pkpersona.verify(idobj)) # that a second time, it raises an exception self.assertRaises(RuntimeError, persona.generate_key) # that a file object created by it testfname = os.path.join(self.tempdir, 'test.txt') testobj = persona.by_file(testfname) # has the correct created_by_ref self.assertEqual(testobj.created_by_ref, idobj.uuid) self.assertEqual(testobj.type, 'file') # and has a signature self.assertIn('sig', testobj) # that a persona created from the identity object vpersona = Persona(idobj) # can verify the sig self.assertTrue(vpersona.verify(testobj)) # and that a bogus signature bogussig = 'somebogussig' bogusobj = MDBase.create_obj(testobj) bogusobj.sig = bogussig # fails to verify self.assertRaises(Exception, vpersona.verify, bogusobj) # and that a modified object otherobj = testobj.new_version(('customprop', 'value')) # fails to verify self.assertRaises(Exception, vpersona.verify, otherobj) # that a persona object can be written perpath = os.path.join(self.basetempdir, 'persona.pasn1') persona.store(perpath) # and that when loaded back loadpersona = Persona.load(perpath) # the new persona object can sign an object nvtestobj = loadpersona.sign(testobj.new_version()) # and the old persona can verify it. self.assertTrue(vpersona.verify(nvtestobj)) def test_persona_metadata(self): # that a persona persona = Persona() persona.generate_key() # can create a metadata object hashobj = ['asdlfkj'] mdobj = persona.MetaData(hashes=hashobj) # that the object has the correct created_by_ref self.assertEqual(mdobj.created_by_ref, persona.uuid) # and has the provided hashes self.assertEqual(mdobj.hashes, hashobj) # and that it can be verified persona.verify(mdobj) def test_objectstore(self): persona = Persona.load(os.path.join('fixtures', 'sample.persona.pasn1')) objst = ObjectStore.load(os.path.join('fixtures', 'sample.data.pasn1')) objst.loadobj({ 'type': 'metadata', 'uuid': uuid.UUID('c9a1d1e2-3109-4efd-8948-577dc15e44e7'), 'modified': datetime.datetime(2019, 5, 31, 14, 3, 10, tzinfo=datetime.timezone.utc), 'created_by_ref': self.created_by_ref, 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ], 'lang': 'en', }) lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada') self.assertEqual(len(lst), 2) byid = objst.by_id('3e466e06-45de-4ecc-84ba-2d2a3d970e96') self.assertIsInstance(byid, MetaData) self.assertIn(byid, lst) r = byid self.assertEqual(r.uuid, uuid.UUID('3e466e06-45de-4ecc-84ba-2d2a3d970e96')) self.assertEqual(r['dc:creator'], [ 'John-Mark Gurney' ]) # test storing the object store fname = 'testfile.pasn1' objst.store(fname) with open(fname, 'rb') as fp: objs = _asn1coder.loads(fp.read()) os.unlink(fname) self.assertEqual(len(objs), len(objst)) self.assertEqual(objs['created_by_ref'], self.created_by_ref.bytes) # make sure that the read back data matches for i in objs['objects']: i['created_by_ref'] = uuid.UUID(bytes=i['created_by_ref']) i['uuid'] = uuid.UUID(bytes=i['uuid']) self.assertEqual(objst.by_id(i['uuid']), i) # that a file testfname = os.path.join(self.tempdir, 'test.txt') # when registered objst.loadobj(persona.by_file(testfname)) # can be found self.assertEqual(objst.by_file(testfname), [ byid ]) self.assertEqual(objst.by_file(testfname), [ byid ]) self.assertRaises(KeyError, objst.by_file, '/dev/null') # XXX make sure that object store contains fileobject # Tests to add: # Non-duplicates when same metadata is located by multiple hashes. def run_command_file(self, f): with open(f) as fp: cmds = json.load(fp) # setup object store storefname = self.tempdir / 'storefname' identfname = self.tempdir / 'identfname' # setup path mapping def expandusermock(arg): if arg == '~/.medashare_store.pasn1': return storefname elif arg == '~/.medashare_identity.pasn1': return identfname # setup test fname testfname = os.path.join(self.tempdir, 'test.txt') newtestfname = os.path.join(self.tempdir, 'newfile.txt') patches = [] for cmd in cmds: try: special = cmd['special'] except KeyError: pass else: if special == 'copy newfile.txt to test.txt': shutil.copy(newtestfname, testfname) elif special == 'change newfile.txt': with open(newtestfname, 'w') as fp: fp.write('some new contents') elif special == 'verify store object cnt': with open(storefname, 'rb') as fp: pasn1obj = pasn1.loads(fp.read()) objcnt = len(pasn1obj['objects']) self.assertEqual(objcnt, cmd['count']) elif special == 'set hostid': hostidpatch = mock.patch(__name__ + '.hostuuid') hostidpatch.start().return_value = uuid.uuid4() patches.append(hostidpatch) else: # pragma: no cover raise ValueError('unhandled special: %s' % repr(special)) continue with self.subTest(file=f, title=cmd['title']), \ mock.patch('os.path.expanduser', side_effect=expandusermock) as eu, \ mock.patch('sys.stdout', io.StringIO()) as stdout, \ mock.patch('sys.stderr', io.StringIO()) as stderr, \ mock.patch('sys.argv', [ 'progname', ] + cmd['cmd']) as argv: with self.assertRaises(SystemExit) as cm: main() # XXX - Minor hack till other tests fixed sys.exit(0) # with the correct output self.maxDiff = None outre = cmd.get('stdout_re') if outre: self.assertRegex(stdout.getvalue(), outre) else: self.assertEqual(stdout.getvalue(), cmd.get('stdout', '')) self.assertEqual(stderr.getvalue(), cmd.get('stderr', '')) self.assertEqual(cm.exception.code, cmd.get('exit', 0)) patches.reverse() for i in patches: i.stop() def test_cmds(self): cmds = self.fixtures.glob('cmd.*.json') for i in cmds: os.chdir(self.tempdir) self.run_command_file(i) def test_main(self): # Test the main runner, this is only testing things that are # specific to running the program, like where the store is # created. # setup object store storefname = os.path.join(self.tempdir, 'storefname') identfname = os.path.join(self.tempdir, 'identfname') # XXX part of the problem shutil.copy(os.path.join('fixtures', 'sample.data.pasn1'), storefname) # setup path mapping def expandusermock(arg): if arg == '~/.medashare_store.pasn1': return storefname elif arg == '~/.medashare_identity.pasn1': return identfname # setup test fname testfname = os.path.join(self.tempdir, 'test.txt') newtestfname = os.path.join(self.tempdir, 'newfile.txt') import itertools with mock.patch('os.path.expanduser', side_effect=expandusermock) \ as eu, mock.patch('medashare.cli.open') as op: # that when opening the store and identity fails op.side_effect = FileNotFoundError # and there is no identity with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'list', 'afile' ]) as argv: with self.assertRaises(SystemExit) as cm: main() # that it fails self.assertEqual(cm.exception.code, 1) # with the correct error message self.assertEqual(stderr.getvalue(), 'ERROR: Identity not created, create w/ -g.\n') with mock.patch('os.path.expanduser', side_effect=expandusermock) \ as eu: # that generating a new identity with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'genident', 'name=A Test User' ]) as argv: main() # does not output anything self.assertEqual(stdout.getvalue(), '') # looks up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') # and that the identity persona = Persona.load(identfname) pident = persona.get_identity() # has the correct name self.assertEqual(pident.name, 'A Test User') # that when generating an identity when one already exists with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'genident', 'name=A Test User' ]) as argv: # that it exits with self.assertRaises(SystemExit) as cm: main() # with error code 1 self.assertEqual(cm.exception.code, 1) # and outputs an error message self.assertEqual(stderr.getvalue(), 'Error: Identity already created.\n') # and looked up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') # that when updating the identity with mock.patch('sys.stdout', io.StringIO()) as stdout, mock.patch('sys.argv', [ 'progname', 'ident', 'name=Changed Name' ]) as argv: main() # it doesn't output anything self.assertEqual(stdout.getvalue(), '') # and looked up the correct file eu.assert_called_with('~/.medashare_identity.pasn1') npersona = Persona.load(identfname) nident = npersona.get_identity() # and has the new name self.assertEqual(nident.name, 'Changed Name') # and has the same old uuid self.assertEqual(nident.uuid, pident.uuid) # and that the modified date has changed self.assertNotEqual(pident.modified, nident.modified) # and that the old Persona can verify the new one self.assertTrue(persona.verify(nident)) orig_open = open with mock.patch('os.path.expanduser', side_effect=expandusermock) \ as eu, mock.patch('medashare.cli.open') as op: # that when the store fails def open_repl(fname, mode): #print('or:', repr(fname), repr(mode), file=sys.stderr) self.assertIn(mode, ('rb', 'wb')) if fname == identfname or mode == 'wb': return orig_open(fname, mode) #print('foo:', repr(fname), repr(mode), file=sys.stderr) raise FileNotFoundError op.side_effect = open_repl # and there is no store with mock.patch('sys.stderr', io.StringIO()) as stderr, mock.patch('sys.argv', [ 'progname', 'list', 'foo', ]) as argv: # that it exits with self.assertRaises(SystemExit) as cm: main() # with error code 1 self.assertEqual(cm.exception.code, 1) # and outputs an error message self.assertEqual(stderr.getvalue(), 'ERROR: file not found: \'foo\'\n')