| @@ -11,6 +11,7 @@ from unittest import mock | |||
| from .hostid import hostuuid | |||
| import base64 | |||
| import base58 | |||
| import copy | |||
| import datetime | |||
| @@ -61,7 +62,23 @@ def _makeuuid(s): | |||
| if isinstance(s, uuid.UUID): | |||
| return s | |||
| return uuid.UUID(bytes=s) | |||
| if isinstance(s, bytes): | |||
| return uuid.UUID(bytes=s) | |||
| else: | |||
| return uuid.UUID(s) | |||
| def _makedatetime(s): | |||
| if isinstance(s, datetime.datetime): | |||
| return s | |||
| return datetime.datetime.strptime(s, '%Y-%m-%dT%H:%M:%S.%fZ').replace( | |||
| tzinfo=datetime.timezone.utc) | |||
| def _makebytes(s): | |||
| if isinstance(s, bytes): | |||
| return s | |||
| return base64.urlsafe_b64decode(s) | |||
| # XXX - known issue, store is not atomic/safe, overwrites in place instead of | |||
| # renames | |||
| @@ -84,8 +101,10 @@ class MDBase(object): | |||
| # to get the correct type | |||
| _instance_properties = { | |||
| 'uuid': _makeuuid, | |||
| 'modified': _makedatetime, | |||
| 'created_by_ref': _makeuuid, | |||
| #'parent_refs': lambda x: [ _makeuuid(y) for y in x ], | |||
| 'sig': _makebytes, | |||
| } | |||
| # Override on a per subclass basis | |||
| @@ -197,12 +216,20 @@ class MDBase(object): | |||
| return [ (k, v) for k, v in self._obj.items() if | |||
| not skipcommon or k not in self._common_names ] | |||
| def encode(self): | |||
| return _asn1coder.dumps(self) | |||
| def encode(self, meth='asn1'): | |||
| if meth == 'asn1': | |||
| return _asn1coder.dumps(self) | |||
| return _jsonencoder.encode(self._obj) | |||
| @classmethod | |||
| def decode(cls, s): | |||
| return cls.create_obj(_asn1coder.loads(s)) | |||
| def decode(cls, s, meth='asn1'): | |||
| if meth == 'asn1': | |||
| obj = _asn1coder.loads(s) | |||
| else: | |||
| obj = json.loads(s) | |||
| return cls.create_obj(obj) | |||
| class MetaData(MDBase): | |||
| _type = 'metadata' | |||
| @@ -237,6 +264,20 @@ class CanonicalCoder(pasn1.ASN1DictCoder): | |||
| _asn1coder = CanonicalCoder(coerce=_trytodict) | |||
| class _JSONEncoder(json.JSONEncoder): | |||
| def default(self, o): | |||
| if isinstance(o, uuid.UUID): | |||
| return str(o) | |||
| elif isinstance(o, datetime.datetime): | |||
| o = o.astimezone(datetime.timezone.utc) | |||
| return o.strftime('%Y-%m-%dT%H:%M:%S.%fZ') | |||
| elif isinstance(o, bytes): | |||
| return base64.urlsafe_b64encode(o).decode('US-ASCII') | |||
| return json.JSONEncoder.default(self, o) | |||
| _jsonencoder = _JSONEncoder() | |||
| class Persona(object): | |||
| '''The object that represents a persona, or identity. It will | |||
| create the proper identity object, serialize for saving keys, | |||
| @@ -546,6 +587,7 @@ class FileObject(MDBase): | |||
| _class_instance_properties = { | |||
| 'hostid': _makeuuid, | |||
| 'id': _makeuuid, | |||
| 'mtime': _makedatetime, | |||
| } | |||
| @staticmethod | |||
| @@ -892,6 +934,15 @@ class _TestCases(unittest.TestCase): | |||
| # make sure the file's id is still a UUID | |||
| self.assertIsInstance(a.id, uuid.UUID) | |||
| # That it can be encoded to json | |||
| jsfo = a.encode('json') | |||
| # that it can be decoded from json | |||
| jsloadedfo = MDBase.decode(jsfo, 'json') | |||
| # and that it is equal | |||
| self.assertEqual(jsloadedfo, a) | |||
| def test_mdbase(self): | |||
| self.assertRaises(ValueError, MDBase, created_by_ref='') | |||
| self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'unknosldkfj' }) | |||
| @@ -929,7 +980,7 @@ class _TestCases(unittest.TestCase): | |||
| self.assertEqual(md2['dc:creator'], [ 'Jim Bob' ]) | |||
| # that providing a value from common property | |||
| fvalue = 'fakesig' | |||
| fvalue = b'fakesig' | |||
| md3 = md.new_version(('sig', fvalue)) | |||
| # gets set directly, and is not a list | |||
| @@ -964,6 +1015,29 @@ class _TestCases(unittest.TestCase): | |||
| # and has the length of 16 | |||
| self.assertEqual(len(eobj['uuid']), 16) | |||
| # and that json can be used to encode | |||
| js = obj.encode('json') | |||
| # and that it is valid json | |||
| jsobj = json.loads(js) | |||
| # and that it can be decoded | |||
| jsdecobj = MDBase.decode(js, 'json') | |||
| # and that it matches | |||
| self.assertEqual(jsdecobj, obj) | |||
| for key, inval in [ | |||
| ('modified', '2022-08-19T01:27:34.258676'), | |||
| ('modified', '2022-08-19T01:27:34Z'), | |||
| ('modified', '2022-08-19T01:27:34.258676+00:00'), | |||
| ('uuid', 'z5336176-8086-4c21-984f-fda60ddaa172'), | |||
| ('uuid', '05336176-8086-421-984f-fda60ddaa172'), | |||
| ]: | |||
| jsobj['modified'] = inval | |||
| jstest = json.dumps(jsobj) | |||
| self.assertRaises(ValueError, MDBase.decode, jstest, 'json') | |||
| def test_mdbase_wrong_type(self): | |||
| # that created_by_ref can be passed by kw | |||
| obj = MetaData(created_by_ref=self.created_by_ref) | |||