MetaData Sharing
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

960 lines
28 KiB

  1. #!/usr/bin/env python
  2. #import pdb, sys; mypdb = pdb.Pdb(stdout=sys.stderr); mypdb.set_trace()
  3. from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, \
  4. Ed448PublicKey
  5. from cryptography.hazmat.primitives.serialization import Encoding, \
  6. PrivateFormat, PublicFormat, NoEncryption
  7. import copy
  8. import datetime
  9. import hashlib
  10. import mock
  11. import os.path
  12. import pasn1
  13. import shutil
  14. import string
  15. import tempfile
  16. import unittest
  17. import uuid
  18. from contextlib import nested
  19. # The UUID for the namespace representing the path to a file
  20. _NAMESPACE_MEDASHARE_PATH = uuid.UUID('f6f36b62-3770-4a68-bc3d-dc3e31e429e6')
  21. _defaulthash = 'sha512'
  22. _validhashes = set([ 'sha256', 'sha512' ])
  23. _hashlengths = { len(getattr(hashlib, x)().hexdigest()): x for x in _validhashes }
  24. def _iterdictlist(obj):
  25. itms = obj.items()
  26. itms.sort()
  27. for k, v in itms:
  28. if isinstance(v, list):
  29. v = v[:]
  30. v.sort()
  31. for i in v:
  32. yield k, i
  33. else:
  34. yield k, v
  35. def _makeuuid(s):
  36. if isinstance(s, uuid.UUID):
  37. return s
  38. return uuid.UUID(s)
  39. # XXX - known issue, store is not atomic/safe, overwrites in place instead of renames
  40. # XXX - add validation
  41. # XXX - how to add singletons
  42. class MDBase(object):
  43. '''This is a simple wrapper that turns a JSON object into a pythonesc
  44. object where attribute accesses work.'''
  45. _type = 'invalid'
  46. _generated_properties = {
  47. 'uuid': uuid.uuid4,
  48. 'modified': datetime.datetime.utcnow
  49. }
  50. # When decoding, the decoded value should be passed to this function
  51. # to get the correct type
  52. _instance_properties = {
  53. 'uuid': _makeuuid,
  54. 'created_by_ref': _makeuuid,
  55. }
  56. _common_properties = [ 'type', 'created_by_ref' ] # XXX - add lang?
  57. _common_optional = set(('overlay_refs', 'sig'))
  58. _common_names = set(_common_properties + _generated_properties.keys())
  59. def __init__(self, obj={}, **kwargs):
  60. obj = copy.deepcopy(obj)
  61. obj.update(kwargs)
  62. if self._type == MDBase._type:
  63. raise ValueError('call MDBase.create_obj instead so correct class is used.')
  64. if 'type' in obj and obj['type'] != self._type:
  65. raise ValueError(
  66. 'trying to create the wrong type of object, got: %s, expected: %s' %
  67. (`obj['type']`, `self._type`))
  68. if 'type' not in obj:
  69. obj['type'] = self._type
  70. for x in self._common_properties:
  71. if x not in obj:
  72. raise ValueError('common property %s not present' % `x`)
  73. for x, fun in self._instance_properties.iteritems():
  74. if x in obj:
  75. obj[x] = fun(obj[x])
  76. for x, fun in self._generated_properties.iteritems():
  77. if x not in obj:
  78. obj[x] = fun()
  79. self._obj = obj
  80. @classmethod
  81. def create_obj(cls, obj):
  82. '''Using obj as a base, create an instance of MDBase of the
  83. correct type.
  84. If the correct type is not found, a ValueError is raised.'''
  85. if isinstance(obj, cls):
  86. obj = obj._obj
  87. ty = obj['type']
  88. for i in MDBase.__subclasses__():
  89. if i._type == ty:
  90. return i(obj)
  91. else:
  92. raise ValueError('Unable to find class for type %s' %
  93. `ty`)
  94. def new_version(self, *args):
  95. '''For each k, v pari, add the property k as an additional one
  96. (or new one if first), with the value v.'''
  97. obj = copy.deepcopy(self._obj)
  98. common = self._common_names | self._common_optional
  99. for k, v in args:
  100. if k in common:
  101. obj[k] = v
  102. else:
  103. obj.setdefault(k, []).append(v)
  104. del obj['modified']
  105. return self.create_obj(obj)
  106. def __repr__(self): # pragma: no cover
  107. return '%s(%s)' % (self.__class__.__name__, `self._obj`)
  108. def __getattr__(self, k):
  109. return self._obj[k]
  110. def __setattr__(self, k, v):
  111. if k[0] == '_': # direct attribute
  112. self.__dict__[k] = v
  113. else:
  114. self._obj[k] = v
  115. def __getitem__(self, k):
  116. return self._obj[k]
  117. def __to_dict__(self):
  118. return self._obj
  119. def __eq__(self, o):
  120. return cmp(self._obj, o) == 0
  121. def __contains__(self, k):
  122. return k in self._obj
  123. def items(self, skipcommon=True):
  124. return [ (k, v) for k, v in self._obj.iteritems() if
  125. not skipcommon or k not in self._common_names ]
  126. def encode(self):
  127. return _asn1coder.dumps(self)
  128. @classmethod
  129. def decode(cls, s):
  130. return cls.create_obj(_asn1coder.loads(s))
  131. class MetaData(MDBase):
  132. _type = 'metadata'
  133. class Identity(MDBase):
  134. _type = 'identity'
  135. # Identites don't need a created by
  136. _common_properties = [ x for x in MDBase._common_properties if x !=
  137. 'created_by_ref' ]
  138. _common_optional = set([ x for x in MDBase._common_optional if x !=
  139. 'overlay_refs' ] + [ 'name', 'pubkey' ])
  140. _common_names = set(_common_properties + MDBase._generated_properties.keys())
  141. def _trytodict(o):
  142. if isinstance(o, uuid.UUID):
  143. return 'unicode', str(o)
  144. try:
  145. return 'dict', o.__to_dict__()
  146. except Exception: # pragma: no cover
  147. raise TypeError('unable to find __to_dict__ on %s: %s' % (type(o), `o`))
  148. _asn1coder = pasn1.ASN1DictCoder(coerce=_trytodict)
  149. class Persona(object):
  150. '''The object that represents a persona, or identity. It will
  151. create the proper identity object, serialize for saving keys,
  152. create objects for that persona and other management.'''
  153. def __init__(self, identity=None, key=None):
  154. if identity is None:
  155. self._identity = Identity()
  156. else:
  157. self._identity = identity
  158. self._key = key
  159. self._pubkey = None
  160. if 'pubkey' in self._identity:
  161. pubkeybytes = self._identity.pubkey
  162. self._pubkey = Ed448PublicKey.from_public_bytes(pubkeybytes)
  163. self._created_by_ref = self._identity.uuid
  164. def get_identity(self):
  165. '''Return the Identity object for this Persona.'''
  166. return self._identity
  167. def new_version(self, *args):
  168. '''Update the Persona's Identity object.'''
  169. self._identity = self.sign(self._identity.new_version(*args))
  170. return self._identity
  171. def store(self, fname):
  172. '''Store the Persona to a file. If there is a private
  173. key associated w/ the Persona, it will be saved as well.'''
  174. with open(fname, 'w') as fp:
  175. obj = {
  176. 'identity': self._identity,
  177. }
  178. if self._key is not None:
  179. obj['key'] = \
  180. self._key.private_bytes(Encoding.Raw,
  181. PrivateFormat.Raw, NoEncryption())
  182. fp.write(_asn1coder.dumps(obj))
  183. @classmethod
  184. def load(cls, fname):
  185. '''Load the Persona from the provided file.'''
  186. with open(fname) as fp:
  187. objs = _asn1coder.loads(fp.read())
  188. kwargs = {}
  189. if 'key' in objs:
  190. kwargs['key'] = Ed448PrivateKey.from_private_bytes(objs['key'])
  191. return cls(Identity(objs['identity']), **kwargs)
  192. def generate_key(self):
  193. '''Generate a key for this Identity.
  194. Raises a RuntimeError if a key is already present.'''
  195. if self._key:
  196. raise RuntimeError('a key already exists')
  197. self._key = Ed448PrivateKey.generate()
  198. self._pubkey = self._key.public_key()
  199. pubkey = self._pubkey.public_bytes(Encoding.Raw,
  200. PublicFormat.Raw)
  201. self._identity = self.sign(self._identity.new_version(('pubkey',
  202. pubkey)))
  203. def _makesigbytes(self, obj):
  204. obj = dict(obj.items(False))
  205. try:
  206. del obj['sig']
  207. except KeyError:
  208. pass
  209. return _asn1coder.dumps(obj)
  210. def sign(self, obj):
  211. '''Takes the object, adds a signature, and returns the new
  212. object.'''
  213. sigbytes = self._makesigbytes(obj)
  214. sig = self._key.sign(sigbytes)
  215. newobj = MDBase.create_obj(obj)
  216. newobj.sig = sig
  217. return newobj
  218. def verify(self, obj):
  219. sigbytes = self._makesigbytes(obj)
  220. self._pubkey.verify(obj['sig'], sigbytes)
  221. return True
  222. def by_file(self, fname):
  223. '''Return a metadata object for the file named fname.'''
  224. fobj = FileObject.from_file(fname, self._created_by_ref)
  225. return self.sign(fobj)
  226. class ObjectStore(object):
  227. '''A container to store for the various Metadata objects.'''
  228. # The _uuids property contains both the UUIDv4 for objects, and
  229. # looking up the UUIDv5 for FileObjects.
  230. def __init__(self, created_by_ref):
  231. self._created_by_ref = created_by_ref
  232. self._uuids = {}
  233. self._hashes = {}
  234. @staticmethod
  235. def makehash(hashstr, strict=True):
  236. '''Take a hash string, and return a valid hash string from it.
  237. This makes sure that it is of the correct type and length.
  238. If strict is False, the function will detect the length and
  239. return a valid hash if one can be found.'''
  240. try:
  241. hash, value = hashstr.split(':')
  242. except ValueError:
  243. if strict:
  244. raise
  245. hash = _hashlengths[len(hashstr)]
  246. value = hashstr
  247. if strict and len(str(value).translate(None, string.hexdigits.lower())) != 0:
  248. raise ValueError('value has invalid hex digits (must be lower case)', value)
  249. if hash in _validhashes:
  250. return ':'.join((hash, value))
  251. raise ValueError
  252. def __len__(self):
  253. return len(self._uuids)
  254. def store(self, fname):
  255. '''Write out the objects in the store to the file named
  256. fname.'''
  257. with open(fname, 'w') as fp:
  258. obj = {
  259. 'created_by_ref': self._created_by_ref,
  260. 'objects': self._uuids.values(),
  261. }
  262. fp.write(_asn1coder.dumps(obj))
  263. def loadobj(self, obj):
  264. '''Load obj into the data store.'''
  265. obj = MDBase.create_obj(obj)
  266. self._uuids[obj.uuid] = obj
  267. for j in obj.hashes:
  268. h = self.makehash(j)
  269. self._hashes.setdefault(h, []).append(obj)
  270. @classmethod
  271. def load(cls, fname):
  272. '''Load objects from the provided file name.
  273. Basic validation will be done on the objects in the file.
  274. The objects will be accessible via other methods.'''
  275. with open(fname) as fp:
  276. objs = _asn1coder.loads(fp.read())
  277. obj = cls(objs['created_by_ref'])
  278. for i in objs['objects']:
  279. obj.loadobj(i)
  280. return obj
  281. def by_id(self, id):
  282. '''Look up an object by it's UUID.'''
  283. if not isinstance(id, uuid.UUID):
  284. uid = uuid.UUID(id)
  285. else:
  286. uid = id
  287. return self._uuids[uid]
  288. def by_hash(self, hash):
  289. '''Look up an object by it's hash value.'''
  290. h = self.makehash(hash, strict=False)
  291. return self._hashes[h]
  292. def by_file(self, fname):
  293. '''Return a metadata object for the file named fname.'''
  294. fid = FileObject.make_id(fname)
  295. try:
  296. fobj = self.by_id(fid)
  297. except KeyError:
  298. # unable to find it
  299. fobj = FileObject.from_file(fname, self._created_by_ref)
  300. self.loadobj(fobj)
  301. for i in fobj.hashes:
  302. j = self.by_hash(i)
  303. # Filter out non-metadata objects
  304. j = [ x for x in j if x.type == 'metadata' ]
  305. if j:
  306. return j
  307. else:
  308. raise KeyError('unable to find metadata for file')
  309. def _hashfile(fname):
  310. hash = getattr(hashlib, _defaulthash)()
  311. with open(fname) as fp:
  312. r = fp.read()
  313. hash.update(r)
  314. return '%s:%s' % (_defaulthash, hash.hexdigest())
  315. class FileObject(MDBase):
  316. _type = 'file'
  317. @staticmethod
  318. def make_id(fname):
  319. '''Take a local file name, and make the id for it. Note that
  320. converts from the local path separator to a forward slash so
  321. that it will be the same between Windows and Unix systems.'''
  322. fname = os.path.realpath(fname)
  323. return uuid.uuid5(_NAMESPACE_MEDASHARE_PATH,
  324. '/'.join(os.path.split(fname)))
  325. @classmethod
  326. def from_file(cls, filename, created_by_ref):
  327. s = os.stat(filename)
  328. obj = {
  329. 'dir': os.path.dirname(filename),
  330. 'created_by_ref': created_by_ref,
  331. 'filename': os.path.basename(filename),
  332. 'id': cls.make_id(filename),
  333. 'mtime': datetime.datetime.utcfromtimestamp(s.st_mtime),
  334. 'size': s.st_size,
  335. 'hashes': [ _hashfile(filename), ],
  336. }
  337. return cls(obj)
  338. def enumeratedir(_dir, created_by_ref):
  339. '''Enumerate all the files and directories (not recursive) in _dir.
  340. Returned is a list of FileObjects.'''
  341. return map(lambda x: FileObject.from_file(os.path.join(_dir, x), created_by_ref),
  342. os.listdir(_dir))
  343. def main():
  344. from optparse import OptionParser
  345. import sys
  346. parser = OptionParser()
  347. parser.add_option('-a', action='append', dest='add',
  348. default=[], help='add the arg as metadata for files, tag=value')
  349. parser.add_option('-d', action='append', dest='delete',
  350. default=[], help='delete the arg as metadata from files. Either specify tag, and all tags are removed, or specify tag=value and that specific tag will be removed.')
  351. parser.add_option('-g', action='store_true', dest='generateident',
  352. default=False, help='generate an identity')
  353. parser.add_option('-i', action='store_true', dest='updateident',
  354. default=False, help='update the identity')
  355. parser.add_option('-l', action='store_true', dest='list',
  356. default=False, help='list metadata')
  357. options, args = parser.parse_args()
  358. # this is shared between generateident and add
  359. addprops = map(lambda x: x.split('=', 1), options.add)
  360. if options.generateident or options.updateident:
  361. identfname = os.path.expanduser('~/.medashare_identity.pasn1')
  362. if options.generateident and os.path.exists(identfname):
  363. print >>sys.stderr, 'Error: Identity already created.'
  364. sys.exit(1)
  365. if options.generateident:
  366. persona = Persona()
  367. persona.generate_key()
  368. else:
  369. persona = Persona.load(identfname)
  370. persona.new_version(*addprops)
  371. persona.store(identfname)
  372. return
  373. storefname = os.path.expanduser('~/.medashare_store.pasn1')
  374. import sys
  375. #print >>sys.stderr, `storefname`
  376. objstr = ObjectStore.load(storefname)
  377. if options.list:
  378. for i in args:
  379. for j in objstr.by_file(i):
  380. #print >>sys.stderr, `j._obj`
  381. for k, v in _iterdictlist(j):
  382. print '%s:\t%s' % (k, v)
  383. elif options.add:
  384. for i in args:
  385. for j in objstr.by_file(i):
  386. nobj = j.new_version(*addprops)
  387. objstr.loadobj(nobj)
  388. elif options.delete:
  389. for i in args:
  390. for j in objstr.by_file(i):
  391. obj = j.__to_dict__()
  392. for k in options.delete:
  393. try:
  394. key, v = k.split('=', 1)
  395. obj[key].remove(v)
  396. except ValueError:
  397. del obj[k]
  398. nobj = MDBase.create_obj(obj)
  399. objstr.loadobj(nobj)
  400. else: # pragma: no cover
  401. raise NotImplementedError
  402. objstr.store(storefname)
  403. if __name__ == '__main__': # pragma: no cover
  404. main()
  405. class _TestCases(unittest.TestCase):
  406. def setUp(self):
  407. d = os.path.realpath(tempfile.mkdtemp())
  408. self.basetempdir = d
  409. self.tempdir = os.path.join(d, 'subdir')
  410. persona = Persona.load(os.path.join('fixtures', 'sample.persona.pasn1'))
  411. self.created_by_ref = persona.get_identity().uuid
  412. shutil.copytree(os.path.join('fixtures', 'testfiles'),
  413. self.tempdir)
  414. def tearDown(self):
  415. shutil.rmtree(self.basetempdir)
  416. self.tempdir = None
  417. def test_mdbase(self):
  418. self.assertRaises(ValueError, MDBase, created_by_ref='')
  419. self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'unknosldkfj' })
  420. self.assertRaises(ValueError, MDBase.create_obj, { 'type': 'metadata' })
  421. baseobj = {
  422. 'type': 'metadata',
  423. 'created_by_ref': self.created_by_ref,
  424. }
  425. origbase = copy.deepcopy(baseobj)
  426. # that when an MDBase object is created
  427. md = MDBase.create_obj(baseobj)
  428. # it doesn't modify the passed in object (when adding
  429. # generated properties)
  430. self.assertEqual(baseobj, origbase)
  431. # and it has the generted properties
  432. # Note: cannot mock the functions as they are already
  433. # referenced at creation time
  434. self.assertIn('uuid', md)
  435. self.assertIn('modified', md)
  436. # That you can create a new version using new_version
  437. md2 = md.new_version(('dc:creator', 'Jim Bob',))
  438. # that they are different
  439. self.assertNotEqual(md, md2)
  440. # and that the new modified time is different from the old
  441. self.assertNotEqual(md.modified, md2.modified)
  442. # and that the modification is present
  443. self.assertEqual(md2['dc:creator'], [ 'Jim Bob' ])
  444. # that providing a value from common property
  445. fvalue = 'fakesig'
  446. md3 = md.new_version(('sig', fvalue))
  447. # gets set directly, and is not a list
  448. self.assertEqual(md3.sig, fvalue)
  449. def test_mdbase_encode_decode(self):
  450. # that an object
  451. baseobj = {
  452. 'type': 'metadata',
  453. 'created_by_ref': self.created_by_ref,
  454. }
  455. obj = MDBase.create_obj(baseobj)
  456. # can be encoded
  457. coded = obj.encode()
  458. # and that the rsults can be decoded
  459. decobj = MDBase.decode(coded)
  460. # and that they are equal
  461. self.assertEqual(obj, decobj)
  462. def test_mdbase_wrong_type(self):
  463. # that created_by_ref can be passed by kw
  464. obj = MetaData(created_by_ref=self.created_by_ref)
  465. self.assertRaises(ValueError, FileObject, dict(obj.items(False)))
  466. def test_makehash(self):
  467. self.assertRaises(ValueError, ObjectStore.makehash, 'slkj')
  468. self.assertRaises(ValueError, ObjectStore.makehash, 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA')
  469. self.assertRaises(ValueError, ObjectStore.makehash, 'bogushash:9e0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ADA', strict=False)
  470. self.assertEqual(ObjectStore.makehash('cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', strict=False), 'sha512:cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e')
  471. self.assertEqual(ObjectStore.makehash('e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', strict=False), 'sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855')
  472. def test_enumeratedir(self):
  473. files = enumeratedir(self.tempdir, self.created_by_ref)
  474. ftest = files[0]
  475. fname = 'test.txt'
  476. # make sure that they are of type MDBase
  477. self.assertIsInstance(ftest, MDBase)
  478. oldid = ftest.id
  479. self.assertEqual(ftest.filename, fname)
  480. self.assertEqual(ftest.dir, self.tempdir)
  481. # XXX - do we add host information?
  482. self.assertEqual(ftest.id, uuid.uuid5(_NAMESPACE_MEDASHARE_PATH,
  483. '/'.join(os.path.split(self.tempdir) +
  484. ( fname, ))))
  485. self.assertEqual(ftest.mtime, datetime.datetime(2019, 5, 20, 21, 47, 36))
  486. self.assertEqual(ftest.size, 15)
  487. self.assertIn('sha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f', ftest.hashes)
  488. # XXX - make sure works w/ relative dirs
  489. files = enumeratedir(os.path.relpath(self.tempdir),
  490. self.created_by_ref)
  491. self.assertEqual(oldid, files[0].id)
  492. def test_mdbaseoverlay(self):
  493. objst = ObjectStore(self.created_by_ref)
  494. # that a base object
  495. bid = uuid.uuid4()
  496. objst.loadobj({
  497. 'type': 'metadata',
  498. 'uuid': bid,
  499. 'modified': datetime.datetime(2019, 6, 10, 14, 3, 10),
  500. 'created_by_ref': self.created_by_ref,
  501. 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ],
  502. 'someprop': [ 'somevalue' ],
  503. 'lang': 'en',
  504. })
  505. # can have an overlay object
  506. oid = uuid.uuid4()
  507. dhash = 'sha256:a7c96262c21db9a06fd49e307d694fd95f624569f9b35bb3ffacd880440f9787'
  508. objst.loadobj({
  509. 'type': 'metadata',
  510. 'uuid': oid,
  511. 'modified': datetime.datetime(2019, 6, 10, 18, 3, 10),
  512. 'created_by_ref': self.created_by_ref,
  513. 'hashes': [ dhash ],
  514. 'overlay_refs': [ bid ],
  515. 'lang': 'en',
  516. })
  517. # and that when you get it's properties
  518. oobj = objst.by_id(oid)
  519. odict = dict(oobj.items())
  520. # that is has the overlays property
  521. self.assertEqual(odict['overlay_refs'], [ bid ])
  522. # that it doesn't have a common property
  523. self.assertNotIn('type', odict)
  524. # that when skipcommon is False
  525. odict = dict(oobj.items(False))
  526. # that it does have a common property
  527. self.assertIn('type', odict)
  528. def test_persona(self):
  529. # that a newly created persona
  530. persona = Persona()
  531. # has an identity object
  532. idobj = persona.get_identity()
  533. # that a key can be generated
  534. persona.generate_key()
  535. # that the pubkey property is present
  536. idobj = persona.get_identity()
  537. self.assertIsInstance(idobj['pubkey'], str)
  538. # and that there is a signature
  539. self.assertIsInstance(idobj['sig'], str)
  540. # that a second time, it raises an exception
  541. self.assertRaises(RuntimeError, persona.generate_key)
  542. # that a file object created by it
  543. testfname = os.path.join(self.tempdir, 'test.txt')
  544. testobj = persona.by_file(testfname)
  545. # has the correct created_by_ref
  546. self.assertEqual(testobj.created_by_ref, idobj.uuid)
  547. # and has a signature
  548. self.assertIn('sig', testobj)
  549. # that a persona created from the identity object
  550. vpersona = Persona(idobj)
  551. # can verify the sig
  552. self.assertTrue(vpersona.verify(testobj))
  553. # and that a bogus signature
  554. bogussig = 'somebogussig'
  555. bogusobj = MDBase.create_obj(testobj)
  556. bogusobj.sig = bogussig
  557. # fails to verify
  558. self.assertRaises(Exception, vpersona.verify, bogusobj)
  559. # and that a modified object
  560. otherobj = testobj.new_version(('customprop', 'value'))
  561. # fails to verify
  562. self.assertRaises(Exception, vpersona.verify, otherobj)
  563. # that a persona object can be written
  564. perpath = os.path.join(self.basetempdir, 'persona.pasn1')
  565. persona.store(perpath)
  566. # and that when loaded back
  567. loadpersona = Persona.load(perpath)
  568. # the new persona object can sign an object
  569. nvtestobj = loadpersona.sign(testobj.new_version())
  570. # and the old persona can verify it.
  571. self.assertTrue(vpersona.verify(nvtestobj))
  572. def test_objectstore(self):
  573. objst = ObjectStore.load(os.path.join('fixtures', 'sample.data.pasn1'))
  574. objst.loadobj({
  575. 'type': 'metadata',
  576. 'uuid': 'c9a1d1e2-3109-4efd-8948-577dc15e44e7',
  577. 'modified': datetime.datetime(2019, 5, 31, 14, 3, 10),
  578. 'created_by_ref': self.created_by_ref,
  579. 'hashes': [ 'sha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada' ],
  580. 'lang': 'en',
  581. })
  582. lst = objst.by_hash('91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada')
  583. self.assertEqual(len(lst), 2)
  584. byid = objst.by_id('3e466e06-45de-4ecc-84ba-2d2a3d970e96')
  585. self.assertIsInstance(byid, MetaData)
  586. self.assertIn(byid, lst)
  587. r = byid
  588. self.assertEqual(r.uuid, uuid.UUID('3e466e06-45de-4ecc-84ba-2d2a3d970e96'))
  589. self.assertEqual(r['dc:creator'], [ u'John-Mark Gurney' ])
  590. fname = 'testfile.pasn1'
  591. objst.store(fname)
  592. with open(fname) as fp:
  593. objs = _asn1coder.loads(fp.read())
  594. os.unlink(fname)
  595. self.assertEqual(len(objs), len(objst))
  596. self.assertEqual(objs['created_by_ref'], str(self.created_by_ref))
  597. for i in objs['objects']:
  598. i['created_by_ref'] = uuid.UUID(i['created_by_ref'])
  599. i['uuid'] = uuid.UUID(i['uuid'])
  600. self.assertEqual(objst.by_id(i['uuid']), i)
  601. testfname = os.path.join(self.tempdir, 'test.txt')
  602. self.assertEqual(objst.by_file(testfname), [ byid ])
  603. self.assertEqual(objst.by_file(testfname), [ byid ])
  604. self.assertRaises(KeyError, objst.by_file, '/dev/null')
  605. # XXX make sure that object store contains fileobject
  606. # Tests to add:
  607. # Non-duplicates when same metadata is located by multiple hashes.
  608. def test_main(self):
  609. # Test the main runner, this is only testing things that are
  610. # specific to running the program, like where the store is
  611. # created.
  612. # setup object store
  613. storefname = os.path.join(self.tempdir, 'storefname')
  614. identfname = os.path.join(self.tempdir, 'identfname')
  615. shutil.copy(os.path.join('fixtures', 'sample.data.pasn1'), storefname)
  616. # setup path mapping
  617. def expandusermock(arg):
  618. if arg == '~/.medashare_store.pasn1':
  619. return storefname
  620. elif arg == '~/.medashare_identity.pasn1':
  621. return identfname
  622. # setup test fname
  623. testfname = os.path.join(self.tempdir, 'test.txt')
  624. import sys
  625. import StringIO
  626. import itertools
  627. with mock.patch('os.path.expanduser', side_effect=expandusermock) \
  628. as eu:
  629. # that generating a new identity
  630. with nested(mock.patch('sys.stdout',
  631. StringIO.StringIO()), mock.patch('sys.argv',
  632. [ 'progname', '-g', '-a', 'name=A Test User' ])) as (stdout, argv):
  633. main()
  634. # does not output anything
  635. self.assertEqual(stdout.getvalue(), '')
  636. # looks up the correct file
  637. eu.assert_called_with('~/.medashare_identity.pasn1')
  638. # and that the identity
  639. persona = Persona.load(identfname)
  640. pident = persona.get_identity()
  641. # has the correct name
  642. self.assertEqual(pident.name, 'A Test User')
  643. # that when generating an identity when one already exists
  644. with nested(mock.patch('sys.stderr',
  645. StringIO.StringIO()), mock.patch('sys.argv',
  646. [ 'progname', '-g', '-a', 'name=A Test User' ])) as (stderr, argv):
  647. # that it exits
  648. with self.assertRaises(SystemExit) as cm:
  649. main()
  650. # with error code 5
  651. self.assertEqual(cm.exception[0], 1)
  652. # and outputs an error message
  653. self.assertEqual(stderr.getvalue(),
  654. 'Error: Identity already created.\n')
  655. # and looked up the correct file
  656. eu.assert_called_with('~/.medashare_identity.pasn1')
  657. # that when updating the identity
  658. with nested(mock.patch('sys.stdout',
  659. StringIO.StringIO()), mock.patch('sys.argv',
  660. [ 'progname', '-i', '-a', 'name=Changed Name' ])) as (stdout, argv):
  661. main()
  662. # it doesn't output anything
  663. self.assertEqual(stdout.getvalue(), '')
  664. # and looked up the correct file
  665. eu.assert_called_with('~/.medashare_identity.pasn1')
  666. npersona = Persona.load(identfname)
  667. nident = npersona.get_identity()
  668. # and has the new name
  669. self.assertEqual(nident.name, 'Changed Name')
  670. # and has the same old uuid
  671. self.assertEqual(nident.uuid, pident.uuid)
  672. # and that the modified date has changed
  673. self.assertNotEqual(pident.modified, nident.modified)
  674. # and that the old Persona can verify the new one
  675. self.assertTrue(persona.verify(nident))
  676. with nested(mock.patch('sys.stdout',
  677. StringIO.StringIO()), mock.patch('sys.argv',
  678. [ 'progname', '-l', testfname ])) as (stdout, argv):
  679. main()
  680. self.assertEqual(stdout.getvalue(),
  681. 'dc:creator:\tJohn-Mark Gurney\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
  682. eu.assert_called_with('~/.medashare_store.pasn1')
  683. with nested(mock.patch('sys.stdout',
  684. StringIO.StringIO()), mock.patch('sys.argv',
  685. [ 'progname', '-a', 'dc:creator=Another user', '-a', 'foo=bar=baz', testfname ])) as (stdout, argv):
  686. main()
  687. with nested(mock.patch('sys.stdout',
  688. StringIO.StringIO()), mock.patch('sys.argv',
  689. [ 'progname', '-l', testfname ])) as (stdout, argv):
  690. main()
  691. self.assertEqual(stdout.getvalue(),
  692. 'dc:creator:\tAnother user\ndc:creator:\tJohn-Mark Gurney\nfoo:\tbar=baz\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
  693. with nested(mock.patch('sys.stdout',
  694. StringIO.StringIO()), mock.patch('sys.argv',
  695. [ 'progname', '-d', 'dc:creator', testfname ])) as (stdout, argv):
  696. main()
  697. with nested(mock.patch('sys.stdout',
  698. StringIO.StringIO()), mock.patch('sys.argv',
  699. [ 'progname', '-l', testfname ])) as (stdout, argv):
  700. main()
  701. self.assertEqual(stdout.getvalue(),
  702. 'foo:\tbar=baz\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
  703. with nested(mock.patch('sys.stdout',
  704. StringIO.StringIO()), mock.patch('sys.argv',
  705. [ 'progname', '-a', 'foo=bleh', testfname ])) as (stdout, argv):
  706. main()
  707. with nested(mock.patch('sys.stdout',
  708. StringIO.StringIO()), mock.patch('sys.argv',
  709. [ 'progname', '-l', testfname ])) as (stdout, argv):
  710. main()
  711. self.assertEqual(stdout.getvalue(),
  712. 'foo:\tbar=baz\nfoo:\tbleh\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')
  713. with nested(mock.patch('sys.stdout',
  714. StringIO.StringIO()), mock.patch('sys.argv',
  715. [ 'progname', '-d', 'foo=bar=baz', testfname ])) as (stdout, argv):
  716. main()
  717. with nested(mock.patch('sys.stdout',
  718. StringIO.StringIO()), mock.patch('sys.argv',
  719. [ 'progname', '-l', testfname ])) as (stdout, argv):
  720. main()
  721. self.assertEqual(stdout.getvalue(),
  722. 'foo:\tbleh\nhashes:\tsha256:91751cee0a1ab8414400238a761411daa29643ab4b8243e9a91649e25be53ada\nhashes:\tsha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f\nlang:\ten\n')