# Copyright 2020 John-Mark Gurney. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. import contextlib import functools import glob import hashlib import importlib.resources import mock import os.path import pathlib import shutil import sys import tempfile import urllib.request from importlib.abc import MetaPathFinder, Loader from importlib.machinery import ModuleSpec def _printanyexc(f): # pragma: no cover '''Prints any exception that gets raised by the wrapped function.''' @functools.wraps(f) def wrapper(*args, **kwargs): try: return f(*args, **kwargs) except Exception: import traceback traceback.print_exc() raise return wrapper @contextlib.contextmanager def tempset(obj, key, value): '''A context (with) manager for changing the value of an item in a dictionary, and restoring it after the with block. Example usage: ``` d = dict(a=5, b=10) with tempset(d, 'a', 15): print(repr(d['a']) print(repr(d['a']) ``` ''' try: oldvalue = obj[key] obj[key] = value yield finally: obj[key] = oldvalue @contextlib.contextmanager def tempattrset(obj, key, value): '''A context (with) manager for changing the value of an attribute of an object, and restoring it after the with block. If the attribute does not exist, it will be deleted afterward. Example usage: ``` with tempattrset(someobj, 'a', 15): print(repr(someobj.a) print(repr(someobj.a) ``` ''' try: dodelattr = False if hasattr(obj, key): oldvalue = getattr(obj, key) else: dodelattr = True setattr(obj, key, value) yield finally: if not dodelattr: setattr(obj, key, oldvalue) else: delattr(obj, key) class HTTPSCAS(object): def fetch_data(self, url): if url.scheme != 'https': raise ValueError('cannot handle scheme %s' % repr(url.scheme)) url = urllib.parse.urlunparse(url) with urllib.request.urlopen(url) as req: if req.status // 100 != 2: raise RuntimeError('bad fetch') return req.read() class IPFSCAS(object): gwhost = 'gateway.ipfs.io' gwhost = 'cloudflare-ipfs.com' def make_url(self, url): return urllib.parse.urlunparse(('https', self.gwhost, '/ipfs/' + url.netloc) + ('', ) * 3) def fetch_data(self, url): if url.scheme != 'ipfs': raise ValueError('cannot handle scheme %s' % repr(url.scheme)) gwurl = self.make_url(url) with urllib.request.urlopen(gwurl) as req: if req.status // 100 != 2: raise RuntimeError('bad fetch') return req.read() class FileDirCAS(object): '''A file loader for CAS that operates on a directory. It looks at files, caches their hash, and loads them upon request.''' def __init__(self, path): self._path = pathlib.Path(path) self._hashes = {} def refresh_dir(self): '''Internal method to refresh the internal cache of hashes.''' for i in glob.glob(os.path.join(self._path, '*.py')): _, hash = self.read_hash_file(i) self._hashes[hash] = i @staticmethod def read_hash_file(fname): '''Helper function that will read the file at fname, and return the tuple of it's contents and it's hash.''' with open(fname, 'rb') as fp: data = fp.read() hash = hashlib.sha256(data).hexdigest() return data, hash def is_package(self, hash): '''Decode the provided hash, and decide if it's a package or not.''' return False def fetch_data(self, url): '''Given the URL (must be a hash URL), return the code for it.''' self.refresh_dir() hashurl = url if hashurl.scheme != 'hash' or hashurl.netloc != 'sha256': raise ValueError('invalid hash url') hash = hashurl.path[1:] fname = self._hashes[hash] data, fhash = self.read_hash_file(fname) if fhash != hash: raise ValueError('file no longer matches hash on disk') return data class CASFinder(MetaPathFinder, Loader): '''Overall class for using Content Addressable Storage to load Python modules into your code. It contains code to dispatch to the various loaders to attempt to load the hash.''' def __init__(self): self._loaders = [] self._aliases = {} if [ x for x in sys.meta_path if isinstance(x, self.__class__) ]: raise RuntimeError( 'cannot register more than on CASFinder') sys.meta_path.append(self) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.disconnect() def load_mod_aliases(self, name): '''Load the aliases from the module with the passed in name.''' aliases = importlib.resources.read_text(sys.modules[name], 'cas_aliases.txt') self._aliases.update(self._parsealiases(aliases)) @staticmethod def _makebasichashurl(url): try: hashurl = urllib.parse.urlparse(url) except AttributeError: hashurl = url return urllib.parse.urlunparse(hashurl[:3] + ('', '', '')) @classmethod def _parsealiases(cls, data): ret = {} lines = data.split('\n') for i in lines: if not i: continue name, hash = i.split() ret.setdefault(name, []).append(hash) # split out the hashes for items in list(ret.values()): lst = [ x for x in items if not x.startswith('hash://') ] for h in [ x for x in items if x.startswith('hash://') ]: h = cls._makebasichashurl(h) ret[h] = lst return ret def disconnect(self): '''Disconnect this Finder from being used to load modules. As this claims an entire namespace, only the first loaded one will work, and any others will be hidden until the first one is disconnected. This can be used w/ a with block to automatically disconnect when no longer needed. This is mostly useful for testing.''' try: sys.meta_path.remove(self) except ValueError: pass def register(self, loader): '''Register a loader w/ this finder. This will attempt to load the hash passed to it. It is also (currently) responsible for executing the code in the module.''' self._loaders.append(loader) # MetaPathFinder methods def find_spec(self, fullname, path, target=None): if path is None: ms = ModuleSpec(fullname, self, is_package=True) else: parts = fullname.split('.') ver, typ, arg = parts[1].split('_') if typ == 'f': # make hash url: hashurl = ('hash://sha256/%s' % bytes.fromhex(arg).hex()) hashurl = urllib.parse.urlparse(hashurl) for l in self._loaders: ispkg = l.is_package(hashurl) break else: return None else: # an alias for i in self._aliases[arg]: hashurl = urllib.parse.urlparse(i) if hashurl.scheme == 'hash': break else: raise ValueError('unable to find bash hash url for alias %s' % repr(arg)) ms = ModuleSpec(fullname, self, is_package=False, loader_state=(hashurl,)) return ms def invalidate_caches(self): return None # Loader methods def exec_module(self, module): if module.__name__ == 'cas': pass else: (url,) = module.__spec__.loader_state for load in self._loaders: try: data = load.fetch_data(url) break except Exception: pass else: for url in self._aliases[ self._makebasichashurl(url)]: url = urllib.parse.urlparse(url) for load in self._loaders: try: data = load.fetch_data(url) break except Exception: pass else: continue break else: raise ValueError('unable to find loader for url %s' % repr(urllib.parse.urlunparse(url))) exec(data, module.__dict__) def defaultinit(casf): cachedir = pathlib.Path.home() / '.casimport_cache' cachedir.mkdir(exist_ok=True) casf.register(FileDirCAS(cachedir)) casf.register(IPFSCAS()) casf.register(HTTPSCAS()) # The global version _casfinder = CASFinder() load_mod_aliases = _casfinder.load_mod_aliases defaultinit(_casfinder) import unittest class TestHelpers(unittest.TestCase): def test_testset(self): origobj = object() d = dict(a=origobj, b=10) # that when we temporarily set it with tempset(d, 'a', 15): # the new value is there self.assertEqual(d['a'], 15) # and that the original object is restored self.assertIs(d['a'], origobj) def test_testattrset(self): class TestObj(object): pass testobj = TestObj() # that when we temporarily set it with tempattrset(testobj, 'a', 15): # the new value is there self.assertEqual(testobj.a, 15) # and that there is no object self.assertFalse(hasattr(testobj, 'a')) origobj = object() newobj = object() testobj.b = origobj # that when we temporarily set it with tempattrset(testobj, 'b', newobj): # the new value is there self.assertIs(testobj.b, newobj) # and the original value is restored self.assertIs(testobj.b, origobj) class Test(unittest.TestCase): def setUp(self): # clear out the default casfinder if there is one self.old_meta_path = sys.meta_path sys.meta_path = [ x for x in sys.meta_path if not isinstance(x, CASFinder) ] # setup temporary directory d = pathlib.Path(os.path.realpath(tempfile.mkdtemp())) self.basetempdir = d self.tempdir = d / 'subdir' self.tempdir.mkdir() self.fixtures = \ pathlib.Path(__file__).parent.parent / 'fixtures' def tearDown(self): # restore environment sys.meta_path = self.old_meta_path importlib.invalidate_caches() # clean up sys.modules [ sys.modules.pop(x) for x in list(sys.modules.keys()) if x == 'cas' or x.startswith('cas.') ] shutil.rmtree(self.basetempdir) self.tempdir = None def test_filedircas_limit_refresh(self): # XXX - only refresh when the dir has changed, and each # file has changed pass def test_casimport(self): # That a CASFinder f = CASFinder() # make sure that we can't import anything at first with self.assertRaises(ImportError): import cas.v1_f_2398472398 # when registering the fixtures directory f.register(FileDirCAS(self.fixtures)) # can import the function from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello name = 'Olof' # and run the code self.assertEqual(hello(name), 'hello ' + name) # and when finished, can disconnect f.disconnect() # and is no longer in the meta_path self.assertNotIn(f, sys.meta_path) # and when disconnected as second time, nothing happens f.disconnect() def test_defaultinit(self): temphome = self.tempdir / 'home' temphome.mkdir() cachedir = temphome / '.casimport_cache' with tempset(os.environ, 'HOME', str(temphome)): with CASFinder() as f: # Setup the defaults defaultinit(f) # that the cache got created self.assertTrue(cachedir.is_dir()) # and that when hello.py is copied to the cache shutil.copy(self.fixtures / 'hello.py', cachedir) # it can be imported from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello # and that the second loader is the IPFSCAS self.assertIsInstance(f._loaders[1], IPFSCAS) # and that the third loader is the HTTPSCAS self.assertIsInstance(f._loaders[2], HTTPSCAS) with CASFinder() as f: defaultinit(f) # and that a new CASFinder can still find it from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello def test_multiplecas(self): # that once we have one with CASFinder() as f: # if we try to create a second, it fails self.assertRaises(RuntimeError, CASFinder) def test_parsealiases(self): with open(self.fixtures / 'randpkg' / 'cas_aliases.txt') as fp: aliasdata = fp.read() res = CASFinder._parsealiases(aliasdata) self.assertEqual(res, { 'hello': [ 'hash://sha256/330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3?type=text/x-python', 'ipfs://bafkreibtbcckul7lwxqz7nyzj3dknhwrdxj5o4jc6gsroxxjhzz46albym', 'https://www.funkthat.com/gitea/jmg/casimport/raw/commit/753e64f53c73d9d1afc4d8a617edb9d3542dcea2/fixtures/hello.py', ], 'hash://sha256/330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3': [ 'ipfs://bafkreibtbcckul7lwxqz7nyzj3dknhwrdxj5o4jc6gsroxxjhzz46albym', 'https://www.funkthat.com/gitea/jmg/casimport/raw/commit/753e64f53c73d9d1afc4d8a617edb9d3542dcea2/fixtures/hello.py', ], }) def test_aliasimports(self): # setup the cache temphome = self.tempdir / 'home' temphome.mkdir() cachedir = temphome / '.casimport_cache' # add the test module's path fixdir = str(self.fixtures) sys.path.append(fixdir) with tempset(os.environ, 'HOME', str(temphome)): try: with CASFinder() as f, \ tempattrset(sys.modules[__name__], 'load_mod_aliases', f.load_mod_aliases): defaultinit(f) # and that hello.py is in the cache shutil.copy(self.fixtures / 'hello.py', cachedir) # that the import is successful import randpkg # and pulled in the method self.assertTrue(hasattr(randpkg, 'hello')) del sys.modules['randpkg'] finally: sys.path.remove(fixdir) def test_aliasipfsimports(self): # add the test module's path fixdir = str(self.fixtures) sys.path.append(fixdir) # that a fake ipfsloader with open(self.fixtures / 'hello.py') as fp: # that returns the correct data fakedata = fp.read() def fakeload(url, fd=fakedata): if url.scheme != 'ipfs' or url.netloc != 'bafkreibtbcckul7lwxqz7nyzj3dknhwrdxj5o4jc6gsroxxjhzz46albym': raise ValueError return fd fakeipfsloader = mock.MagicMock() fakeipfsloader.fetch_data = fakeload try: with CASFinder() as f, \ tempattrset(sys.modules[__name__], 'load_mod_aliases', f.load_mod_aliases): f.register(fakeipfsloader) # that the import is successful import randpkg # and pulled in the method self.assertTrue(hasattr(randpkg, 'hello')) finally: sys.path.remove(fixdir) @mock.patch('urllib.request.urlopen') def test_ipfscasloader(self, uomock): # prep return test data with open(self.fixtures / 'hello.py') as fp: # that returns the correct data ipfsdata = fp.read() # that the ipfs CAS loader ipfs = IPFSCAS() # that the request is successfull uomock.return_value.__enter__.return_value.status = 200 # and returns the correct data uomock.return_value.__enter__.return_value.read.return_value = ipfsdata # that when called hashurl = urllib.parse.urlparse('ipfs://bafkreibtbcckul7lwxqz7nyzj3dknhwrdxj5o4jc6gsroxxjhzz46albym') data = ipfs.fetch_data(hashurl) # it opens the correct url uomock.assert_called_with('https://cloudflare-ipfs.com/ipfs/bafkreibtbcckul7lwxqz7nyzj3dknhwrdxj5o4jc6gsroxxjhzz46albym') # and returns the correct data self.assertEqual(data, ipfsdata) with self.assertRaises(ValueError): # that a hash url fails ipfs.fetch_data(urllib.parse.urlparse('hash://sha256/asldfkj')) # that when the request fails uomock.return_value.__enter__.return_value.status = 400 # it raises a RuntimeError with self.assertRaises(RuntimeError): ipfs.fetch_data(hashurl) # Note: mostly copied from above, test_ipfscasloader @mock.patch('urllib.request.urlopen') def test_httpscasloader(self, uomock): # prep return test data with open(self.fixtures / 'hello.py') as fp: # that returns the correct data httpsdata = fp.read() # that the https CAS loader httpsldr = HTTPSCAS() # that the request is successfull uomock.return_value.__enter__.return_value.status = 200 # and returns the correct data uomock.return_value.__enter__.return_value.read.return_value = httpsdata # that when called hashurl = urllib.parse.urlparse('https://www.funkthat.com/gitea/jmg/casimport/raw/commit/753e64f53c73d9d1afc4d8a617edb9d3542dcea2/fixtures/hello.py') data = httpsldr.fetch_data(hashurl) # it opens the correct url uomock.assert_called_with('https://www.funkthat.com/gitea/jmg/casimport/raw/commit/753e64f53c73d9d1afc4d8a617edb9d3542dcea2/fixtures/hello.py') # and returns the correct data self.assertEqual(data, httpsdata) with self.assertRaises(ValueError): # that a hash url fails httpsldr.fetch_data(urllib.parse.urlparse('hash://sha256/asldfkj')) # that when the request fails uomock.return_value.__enter__.return_value.status = 400 # it raises a RuntimeError with self.assertRaises(RuntimeError): httpsldr.fetch_data(hashurl) def test_overlappingaliases(self): # make sure that an aliases file is consistent and does not # override other urls. That is that any hashes are # consistent, and that they have at least one root hash that # is the same, and will be used for fetching. # # Likely will also have to deal w/ an issue where two # aliases share sha256, and a third shares sha512, which in # this case, BOTH hashse have to be checked. pass def test_loaderpriority(self): # XXX - write test to allow you to specify the priority of # a loader, to ensure that cache stays at top. # Maybe also think of a way to say local/remote, because # some loaders may be "more local" than others, like using # a local ipfs gateway makes more sense than hitting a # public gateway pass def test_filecorruption(self): cachedir = self.tempdir / 'cachedir' cachedir.mkdir() # that an existing file shutil.copy(self.fixtures / 'hello.py', cachedir) # is in the cache fdcas = FileDirCAS(cachedir) # that when refresh is surpressed fdcas.refresh_dir = lambda: None # and has a bogus hash fdcas._hashes['0000'] = cachedir / 'hello.py' # that when read raises an exception with self.assertRaises(ValueError): fdcas.fetch_data(urllib.parse.urlparse('hash://sha256/0000')) # that when passed an invalid url with self.assertRaises(ValueError): fdcas.fetch_data(urllib.parse.urlparse('https://sha256/0000'))