|
- # Copyright 2020 John-Mark Gurney.
- # All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions
- # are met:
- # 1. Redistributions of source code must retain the above copyright
- # notice, this list of conditions and the following disclaimer.
- # 2. Redistributions in binary form must reproduce the above copyright
- # notice, this list of conditions and the following disclaimer in the
- # documentation and/or other materials provided with the distribution.
- #
- # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
- # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- # SUCH DAMAGE.
-
- import contextlib
- import filecmp
- import functools
- import glob
- import hashlib
- import importlib.resources
- import mock
- import os.path
- import pathlib
- import shutil
- import sys
- import tempfile
- import urllib.request
-
- from importlib.abc import MetaPathFinder, Loader
- from importlib.machinery import ModuleSpec
-
- __author__ = 'John-Mark Gurney'
- __copyright__ = 'Copyright 2020 John-Mark Gurney. All rights reserved.'
- __license__ = '2-clause BSD license'
- __version__ = '0.1.0.dev'
-
- def _printanyexc(f): # pragma: no cover
- '''Prints any exception that gets raised by the wrapped function.'''
-
- @functools.wraps(f)
- def wrapper(*args, **kwargs):
- try:
- return f(*args, **kwargs)
- except Exception:
- import traceback
-
- traceback.print_exc()
-
- raise
-
- return wrapper
-
- @contextlib.contextmanager
- def tempset(obj, key, value):
- '''A context (with) manager for changing the value of an item in a
- dictionary, and restoring it after the with block.
-
- Example usage:
- ```
- d = dict(a=5, b=10)
- with tempset(d, 'a', 15):
- print(repr(d['a'])
- print(repr(d['a'])
- ```
- '''
-
- try:
- oldvalue = obj[key]
- obj[key] = value
- yield
- finally:
- obj[key] = oldvalue
-
- @contextlib.contextmanager
- def tempattrset(obj, key, value):
- '''A context (with) manager for changing the value of an attribute
- of an object, and restoring it after the with block.
-
- If the attribute does not exist, it will be deleted afterward.
-
- Example usage:
- ```
- with tempattrset(someobj, 'a', 15):
- print(repr(someobj.a)
- print(repr(someobj.a)
- ```
- '''
-
- try:
- dodelattr = False
- if hasattr(obj, key):
- oldvalue = getattr(obj, key)
- else:
- dodelattr = True
-
- setattr(obj, key, value)
- yield
- finally:
- if not dodelattr:
- setattr(obj, key, oldvalue)
- else:
- delattr(obj, key)
-
- def urlfetch(url):
- with urllib.request.urlopen(url) as req:
- if req.status // 100 != 2:
- raise RuntimeError('bad fetch')
-
- return req.read()
-
- class HTTPSCAS(object):
- def fetch_data(self, url):
- if url.scheme != 'https':
- raise ValueError('cannot handle scheme %s' %
- repr(url.scheme))
-
- url = urllib.parse.urlunparse(url)
- return urlfetch(url)
-
- class IPFSCAS(object):
- gwhost = 'gateway.ipfs.io'
- gwhost = 'cloudflare-ipfs.com'
-
- def make_url(self, url):
- return urllib.parse.urlunparse(('https', self.gwhost,
- '/ipfs/' + url.netloc) + ('', ) * 3)
-
- def fetch_data(self, url):
- if url.scheme != 'ipfs':
- raise ValueError('cannot handle scheme %s' %
- repr(url.scheme))
-
- gwurl = self.make_url(url)
- return urlfetch(gwurl)
-
- class FileDirCAS(object):
- '''A file loader for CAS that operates on a directory. It looks
- at files, caches their hash, and loads them upon request.'''
-
- def __init__(self, path):
- self._path = pathlib.Path(path)
- self._path.mkdir(exist_ok=True)
- self._hashes = {}
-
- def refresh_dir(self):
- '''Internal method to refresh the internal cache of
- hashes.'''
-
- for i in glob.glob(os.path.join(self._path, '*.py')):
- _, hash = self.read_hash_file(i)
- self._hashes[hash] = i
-
- @staticmethod
- def read_hash_file(fname):
- '''Helper function that will read the file at fname, and
- return the tuple of it's contents and it's hash.'''
-
- with open(fname, 'rb') as fp:
- data = fp.read()
- hash = hashlib.sha256(data).hexdigest()
-
- return data, hash
-
- def is_package(self, hash):
- '''Decode the provided hash, and decide if it's a package
- or not.'''
-
- return False
-
- def fetch_data(self, url):
- '''Given the URL (must be a hash URL), return the code for
- it.'''
-
- self.refresh_dir()
-
- hashurl = url
-
- if hashurl.scheme != 'hash' or hashurl.netloc != 'sha256':
- raise ValueError('invalid hash url')
-
- hash = hashurl.path[1:]
- fname = self._hashes[hash]
-
- data, fhash = self.read_hash_file(fname)
-
- if fhash != hash:
- raise ValueError('file no longer matches hash on disk')
-
- return data
-
- class CASFinder(MetaPathFinder, Loader):
- '''Overall class for using Content Addressable Storage to load
- Python modules into your code. It contains code to dispatch to
- the various loaders to attempt to load the hash.'''
-
- def __init__(self):
- self._loaders = []
- self._aliases = {}
-
- if [ x for x in sys.meta_path if
- isinstance(x, self.__class__) ]:
- raise RuntimeError(
- 'cannot register more than on CASFinder')
-
- sys.meta_path.append(self)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.disconnect()
-
- def load_aliases(self, data):
- self._aliases.update(self._parsealiases(data))
-
- def load_mod_aliases(self, name):
- '''Load the aliases from the module with the passed in name.'''
-
- aliases = importlib.resources.read_text(sys.modules[name],
- 'cas_aliases.txt')
- self.load_aliases(aliases)
-
- @staticmethod
- def _makebasichashurl(url):
- try:
- hashurl = urllib.parse.urlparse(url)
- except AttributeError:
- hashurl = url
- return urllib.parse.urlunparse(hashurl[:3] + ('', '', ''))
-
- @classmethod
- def _parsealiases(cls, data):
- ret = {}
-
- lines = data.split('\n')
- for i in lines:
- if not i:
- continue
-
- name, hash = i.split()
- ret.setdefault(name, []).append(hash)
-
- # split out the hashes
- for items in list(ret.values()):
- lst = [ x for x in items if
- not x.startswith('hash://') ]
- for h in [ x for x in items if
- x.startswith('hash://') ]:
- h = cls._makebasichashurl(h)
- ret[h] = lst
-
- return ret
-
- def disconnect(self):
- '''Disconnect this Finder from being used to load modules.
-
- As this claims an entire namespace, only the first loaded
- one will work, and any others will be hidden until the
- first one is disconnected.
-
- This can be used w/ a with block to automatically
- disconnect when no longer needed. This is mostly useful
- for testing.'''
-
- try:
- sys.meta_path.remove(self)
- except ValueError:
- pass
-
- def register(self, loader):
- '''Register a loader w/ this finder. This will attempt
- to load the hash passed to it. It is also (currently)
- responsible for executing the code in the module.'''
-
- self._loaders.append(loader)
-
- # MetaPathFinder methods
- def find_spec(self, fullname, path, target=None):
- if path is None:
- ms = ModuleSpec(fullname, self, is_package=True)
- else:
- parts = fullname.split('.')
- ver, typ, arg = parts[1].split('_')
- if typ == 'f':
- # make hash url:
- hashurl = ('hash://sha256/%s' %
- bytes.fromhex(arg).hex())
- hashurl = urllib.parse.urlparse(hashurl)
- for l in self._loaders:
- ispkg = l.is_package(hashurl)
- break
- else:
- return None
- else:
- # an alias
- for i in self._aliases[arg]:
- hashurl = urllib.parse.urlparse(i)
- if hashurl.scheme == 'hash':
- break
- else:
- raise ValueError('unable to find base hash url for alias %s' % repr(arg))
-
- # fix up the full name:
- fullname = 'cas.v1_f_%s' % hashurl.path[1:]
-
- ms = ModuleSpec(fullname, self, is_package=False,
- loader_state=(hashurl,))
-
- return ms
-
- def invalidate_caches(self):
- return None
-
- # Loader methods
- def exec_module(self, module):
- if module.__name__ == 'cas':
- pass
- else:
- (url,) = module.__spec__.loader_state
- for load in self._loaders:
- try:
- data = load.fetch_data(url)
- break
- except Exception:
- pass
-
- else:
- for url in self._aliases[
- self._makebasichashurl(url)]:
- url = urllib.parse.urlparse(url)
- for load in self._loaders:
- try:
- data = load.fetch_data(url)
- break
- except Exception:
- pass
- else:
- continue
-
- break
- else:
- raise ValueError('unable to find loader for url %s' % repr(urllib.parse.urlunparse(url)))
-
- exec(data, module.__dict__)
-
- def defaultinit(casf):
- basedir = pathlib.Path.home() / '.casimport'
- basedir.mkdir(exist_ok=True)
-
- conffile = basedir / 'casimport.conf'
- if not conffile.exists():
- import casimport
- with importlib.resources.path(casimport,
- 'default.conf') as defconf:
- shutil.copy(defconf, conffile)
-
- cachedir = basedir / 'cache'
- cachedir.mkdir(parents=True, exist_ok=True)
-
- casf.register(FileDirCAS(cachedir))
- casf.register(IPFSCAS())
- casf.register(HTTPSCAS())
-
- # The global version
- _casfinder = CASFinder()
- load_mod_aliases = _casfinder.load_mod_aliases
- defaultinit(_casfinder)
-
- import unittest
-
- class TestHelpers(unittest.TestCase):
- def test_testset(self):
- origobj = object()
- d = dict(a=origobj, b=10)
-
- # that when we temporarily set it
- with tempset(d, 'a', 15):
- # the new value is there
- self.assertEqual(d['a'], 15)
-
- # and that the original object is restored
- self.assertIs(d['a'], origobj)
-
- def test_testattrset(self):
- class TestObj(object):
- pass
-
- testobj = TestObj()
-
- # that when we temporarily set it
- with tempattrset(testobj, 'a', 15):
- # the new value is there
- self.assertEqual(testobj.a, 15)
-
- # and that there is no object
- self.assertFalse(hasattr(testobj, 'a'))
-
- origobj = object()
- newobj = object()
- testobj.b = origobj
-
- # that when we temporarily set it
- with tempattrset(testobj, 'b', newobj):
- # the new value is there
- self.assertIs(testobj.b, newobj)
-
- # and the original value is restored
- self.assertIs(testobj.b, origobj)
-
- class Test(unittest.TestCase):
- def setUp(self):
- # clear out the default casfinder if there is one
- self.old_meta_path = sys.meta_path
- sys.meta_path = [ x for x in sys.meta_path if
- not isinstance(x, CASFinder) ]
-
- # setup temporary directory
- d = pathlib.Path(os.path.realpath(tempfile.mkdtemp()))
- self.basetempdir = d
- self.tempdir = d / 'subdir'
- self.tempdir.mkdir()
-
- self.fixtures = \
- pathlib.Path(__file__).parent.parent / 'fixtures'
-
- def tearDown(self):
- # restore environment
- sys.meta_path = self.old_meta_path
-
- importlib.invalidate_caches()
-
- # clean up sys.modules
- [ sys.modules.pop(x) for x in list(sys.modules.keys()) if
- x == 'cas' or x.startswith('cas.') ]
-
- shutil.rmtree(self.basetempdir)
- self.tempdir = None
-
- def test_filedircas(self):
- cachedir = self.tempdir / 'cache'
- fd = FileDirCAS(cachedir)
-
- self.assertTrue(cachedir.exists())
-
- def test_filedircas_limit_refresh(self):
- # XXX - only refresh when the dir has changed, and each
- # file has changed
- pass
-
- def test_casimport(self):
- # That a CASFinder
- f = CASFinder()
-
- # make sure that we can't import anything at first
- with self.assertRaises(ImportError):
- import cas.v1_f_2398472398
-
- # when registering the fixtures directory
- f.register(FileDirCAS(self.fixtures))
-
- # can import the function
- from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello
-
- name = 'Olof'
- # and run the code
- self.assertEqual(hello(name), 'hello ' + name)
-
- # and when finished, can disconnect
- f.disconnect()
-
- # and is no longer in the meta_path
- self.assertNotIn(f, sys.meta_path)
-
- # and when disconnected as second time, nothing happens
- f.disconnect()
-
- def test_defaultinit(self):
- temphome = self.tempdir / 'home'
- temphome.mkdir()
- defcachedir = temphome / '.casimport' / 'cache'
-
- # testing w/ default config
- with tempset(os.environ, 'HOME', str(temphome)):
- with CASFinder() as f:
- # Setup the defaults
- defaultinit(f)
-
- # That the default.conf file got copied over.
- filecmp.cmp(defcachedir.parent /
- 'casimport.conf',
- pathlib.Path(__file__).parent / 'default.conf')
-
- # that the cache got created
- self.assertTrue(defcachedir.is_dir())
-
- # and that when hello.py is copied to the cache
- shutil.copy(self.fixtures / 'hello.py', defcachedir)
-
- # it can be imported
- from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello
-
- # and that the second loader is the IPFSCAS
- self.assertIsInstance(f._loaders[1], IPFSCAS)
-
- # and that the third loader is the HTTPSCAS
- self.assertIsInstance(f._loaders[2], HTTPSCAS)
-
- with CASFinder() as f:
- defaultinit(f)
-
- # and that a new CASFinder can still find it
- from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello
-
- def test_multiplecas(self):
- # that once we have one
- with CASFinder() as f:
- # if we try to create a second, it fails
- self.assertRaises(RuntimeError, CASFinder)
-
- def test_parsealiases(self):
- with open(self.fixtures / 'randpkg' / 'cas_aliases.txt') as fp:
- aliasdata = fp.read()
- res = CASFinder._parsealiases(aliasdata)
- self.assertEqual(res, {
- 'hello': [
- 'hash://sha256/330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3?type=text/x-python',
- 'ipfs://bafkreibtbcckul7lwxqz7nyzj3dknhwrdxj5o4jc6gsroxxjhzz46albym',
- 'https://www.funkthat.com/gitea/jmg/casimport/raw/commit/753e64f53c73d9d1afc4d8a617edb9d3542dcea2/fixtures/hello.py',
- ],
- 'hash://sha256/330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3': [
- 'ipfs://bafkreibtbcckul7lwxqz7nyzj3dknhwrdxj5o4jc6gsroxxjhzz46albym',
- 'https://www.funkthat.com/gitea/jmg/casimport/raw/commit/753e64f53c73d9d1afc4d8a617edb9d3542dcea2/fixtures/hello.py',
- ],
- })
-
- def test_aliasmulti(self):
- # setup the cache
- cachedir = self.tempdir / 'cache'
- cachedir.mkdir()
-
- with CASFinder() as f, \
- tempattrset(sys.modules[__name__],
- 'load_mod_aliases', f.load_mod_aliases):
- f.register(FileDirCAS(cachedir))
-
- # and that hello.py is in the cache
- shutil.copy(self.fixtures / 'hello.py',
- cachedir)
-
- # and that the aliases are loaded
- with open(self.fixtures / 'randpkg' / 'cas_aliases.txt') as fp:
- f.load_aliases(fp.read())
-
- # that when we load the alias first
- from cas.v1_a_hello import hello as hello_alias
-
- # and then load the same module via hash
- from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello as hello_hash
-
- # they are the same
- self.assertIs(hello_alias, hello_hash)
-
- def test_aliasimports(self):
- # setup the cache
- cachedir = self.tempdir / 'cache'
- cachedir.mkdir()
-
- # add the test module's path
- fixdir = str(self.fixtures)
- sys.path.append(fixdir)
-
- try:
- with CASFinder() as f, \
- tempattrset(sys.modules[__name__],
- 'load_mod_aliases', f.load_mod_aliases):
- f.register(FileDirCAS(cachedir))
-
- # and that hello.py is in the cache
- shutil.copy(self.fixtures / 'hello.py',
- cachedir)
-
- self.assertNotIn('randpkg', sys.modules)
-
- # that the import is successful
- import randpkg
-
- # and pulled in the method
- self.assertTrue(hasattr(randpkg, 'hello'))
-
- del sys.modules['randpkg']
- finally:
- sys.path.remove(fixdir)
-
- def test_aliasipfsimports(self):
- # add the test module's path
- fixdir = str(self.fixtures)
- sys.path.append(fixdir)
-
- # that a fake ipfsloader
- with open(self.fixtures / 'hello.py') as fp:
- # that returns the correct data
- fakedata = fp.read()
-
- def fakeload(url, fd=fakedata):
- if url.scheme != 'ipfs' or url.netloc != 'bafkreibtbcckul7lwxqz7nyzj3dknhwrdxj5o4jc6gsroxxjhzz46albym':
- raise ValueError
-
- return fd
-
- fakeipfsloader = mock.MagicMock()
- fakeipfsloader.fetch_data = fakeload
-
- try:
- with CASFinder() as f, \
- tempattrset(sys.modules[__name__], 'load_mod_aliases',
- f.load_mod_aliases):
-
- f.register(fakeipfsloader)
-
- self.assertNotIn('randpkg', sys.modules)
-
- # that the import is successful
- import randpkg
-
- # and pulled in the method
- self.assertTrue(hasattr(randpkg, 'hello'))
-
- del sys.modules['randpkg']
- finally:
- sys.path.remove(fixdir)
-
- @mock.patch('urllib.request.urlopen')
- def test_ipfscasloader(self, uomock):
- # prep return test data
- with open(self.fixtures / 'hello.py') as fp:
- # that returns the correct data
- ipfsdata = fp.read()
-
- # that the ipfs CAS loader
- ipfs = IPFSCAS()
-
- # that the request is successfull
- uomock.return_value.__enter__.return_value.status = 200
-
- # and returns the correct data
- uomock.return_value.__enter__.return_value.read.return_value = ipfsdata
-
- # that when called
- hashurl = urllib.parse.urlparse('ipfs://bafkreibtbcckul7lwxqz7nyzj3dknhwrdxj5o4jc6gsroxxjhzz46albym')
- data = ipfs.fetch_data(hashurl)
-
- # it opens the correct url
- uomock.assert_called_with('https://cloudflare-ipfs.com/ipfs/bafkreibtbcckul7lwxqz7nyzj3dknhwrdxj5o4jc6gsroxxjhzz46albym')
-
- # and returns the correct data
- self.assertEqual(data, ipfsdata)
-
- with self.assertRaises(ValueError):
- # that a hash url fails
- ipfs.fetch_data(urllib.parse.urlparse('hash://sha256/asldfkj'))
-
- # that when the request fails
- uomock.return_value.__enter__.return_value.status = 400
-
- # it raises a RuntimeError
- with self.assertRaises(RuntimeError):
- ipfs.fetch_data(hashurl)
-
- # Note: mostly copied from above, test_ipfscasloader
- @mock.patch('urllib.request.urlopen')
- def test_httpscasloader(self, uomock):
- # prep return test data
- with open(self.fixtures / 'hello.py') as fp:
- # that returns the correct data
- httpsdata = fp.read()
-
- # that the https CAS loader
- httpsldr = HTTPSCAS()
-
- # that the request is successfull
- uomock.return_value.__enter__.return_value.status = 200
-
- # and returns the correct data
- uomock.return_value.__enter__.return_value.read.return_value = httpsdata
-
- # that when called
- hashurl = urllib.parse.urlparse('https://www.funkthat.com/gitea/jmg/casimport/raw/commit/753e64f53c73d9d1afc4d8a617edb9d3542dcea2/fixtures/hello.py')
- data = httpsldr.fetch_data(hashurl)
-
- # it opens the correct url
- uomock.assert_called_with('https://www.funkthat.com/gitea/jmg/casimport/raw/commit/753e64f53c73d9d1afc4d8a617edb9d3542dcea2/fixtures/hello.py')
-
- # and returns the correct data
- self.assertEqual(data, httpsdata)
-
- with self.assertRaises(ValueError):
- # that a hash url fails
- httpsldr.fetch_data(urllib.parse.urlparse('hash://sha256/asldfkj'))
-
- # that when the request fails
- uomock.return_value.__enter__.return_value.status = 400
-
- # it raises a RuntimeError
- with self.assertRaises(RuntimeError):
- httpsldr.fetch_data(hashurl)
-
- @unittest.skip('todo')
- def test_overlappingaliases(self):
- # make sure that an aliases file is consistent and does not
- # override other urls. That is that any hashes are
- # consistent, and that they have at least one root hash that
- # is the same, and will be used for fetching.
- #
- # Likely will also have to deal w/ an issue where two
- # aliases share sha256, and a third shares sha512, which in
- # this case, BOTH hashse have to be checked.
- pass
-
- @unittest.skip('todo')
- def test_loaderpriority(self):
- # XXX - write test to allow you to specify the priority of
- # a loader, to ensure that cache stays at top.
- # Maybe also think of a way to say local/remote, because
- # some loaders may be "more local" than others, like using
- # a local ipfs gateway makes more sense than hitting a
- # public gateway
- pass
-
- def test_filecorruption(self):
- cachedir = self.tempdir / 'cachedir'
- cachedir.mkdir()
-
- # that an existing file
- shutil.copy(self.fixtures / 'hello.py', cachedir)
-
- # is in the cache
- fdcas = FileDirCAS(cachedir)
-
- # that when refresh is surpressed
- fdcas.refresh_dir = lambda: None
-
- # and has a bogus hash
- fdcas._hashes['0000'] = cachedir / 'hello.py'
-
- # that when read raises an exception
- with self.assertRaises(ValueError):
- fdcas.fetch_data(urllib.parse.urlparse('hash://sha256/0000'))
-
- # that when passed an invalid url
- with self.assertRaises(ValueError):
- fdcas.fetch_data(urllib.parse.urlparse('https://sha256/0000'))
|