|
- # Copyright 2020 John-Mark Gurney.
- # All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions
- # are met:
- # 1. Redistributions of source code must retain the above copyright
- # notice, this list of conditions and the following disclaimer.
- # 2. Redistributions in binary form must reproduce the above copyright
- # notice, this list of conditions and the following disclaimer in the
- # documentation and/or other materials provided with the distribution.
- #
- # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
- # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- # SUCH DAMAGE.
-
- import contextlib
- import glob
- import hashlib
- import importlib
- import os.path
- import pathlib
- import shutil
- import sys
- import tempfile
-
- from importlib.abc import MetaPathFinder, Loader
- from importlib.machinery import ModuleSpec
-
- @contextlib.contextmanager
- def tempset(obj, key, value):
- '''A context (with) manager for changing the value of an item in a
- dictionary, and restoring it after the with block.
-
- Example usage:
- ```
- d = dict(a=5, b=10)
- with tempset(d, 'a', 15):
- print(repr(d['a'])
- print(repr(d['a'])
- ```
- '''
-
- try:
- oldvalue = obj[key]
- obj[key] = value
- yield
- finally:
- obj[key] = oldvalue
-
- class FileDirCAS(object):
- '''A file loader for CAS that operates on a directory. It looks
- at files, caches their hash, and loads them upon request.'''
-
- def __init__(self, path):
- self._path = pathlib.Path(path)
- self._hashes = {}
-
- def refresh_dir(self):
- '''Internal method to refresh the internal cache of
- hashes.'''
-
- for i in glob.glob(os.path.join(self._path, '*.py')):
- _, hash = self.read_hash_file(i)
- self._hashes[hash] = i
-
- @staticmethod
- def read_hash_file(fname):
- '''Helper function that will read the file at fname, and
- return the tuple of it's contents and it's hash.'''
-
- with open(fname, 'rb') as fp:
- data = fp.read()
- hash = hashlib.sha256(data).hexdigest()
-
- return data, hash
-
- def is_package(self, hash):
- '''Decode the provided hash, and decide if it's a package
- or not.'''
-
- return False
-
- def exec_module(self, hash, module):
- '''Give the hash and module, load the code associated
- with the hash, and exec it in the module's context.'''
-
- self.refresh_dir()
-
- parts = hash.split('_', 2)
- fname = self._hashes[parts[2]]
-
- data, fhash = self.read_hash_file(fname)
-
- if fhash != parts[2]:
- raise ValueError('file no longer matches hash on disk')
-
- exec(data, module.__dict__)
-
- class CASFinder(MetaPathFinder, Loader):
- '''Overall class for using Content Addressable Storage to load
- Python modules into your code. It contains code to dispatch to
- the various loaders to attempt to load the hash.'''
-
- def __init__(self):
- self._loaders = []
-
- if [ x for x in sys.meta_path if isinstance(x, self.__class__) ]:
- raise RuntimeError('cannot register more than on CASFinder')
-
- sys.meta_path.append(self)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.disconnect()
-
- def disconnect(self):
- '''Disconnect this Finder from being used to load modules.
-
- As this claims an entire namespace, only the first loaded
- one will work, and any others will be hidden until the
- first one is disconnected.
-
- This can be used w/ a with block to automatically
- disconnect when no longer needed. This is mostly useful
- for testing.'''
-
- try:
- sys.meta_path.remove(self)
- except ValueError:
- pass
-
- def register(self, loader):
- '''Register a loader w/ this finder. This will attempt
- to load the hash passed to it. It is also (currently)
- responsible for executing the code in the module.'''
-
- self._loaders.append(loader)
-
- # MetaPathFinder methods
- def find_spec(self, fullname, path, target=None):
- if path is None:
- ms = ModuleSpec(fullname, self, is_package=True)
- else:
- parts = fullname.split('.')
- for l in self._loaders:
- ispkg = l.is_package(parts[1])
- break
- else:
- return None
-
- ms = ModuleSpec(fullname, self, is_package=True, loader_state=(parts[1], l))
-
- return ms
-
- def invalidate_caches(self):
- return None
-
- # Loader methods
- def exec_module(self, module):
- if module.__name__ == 'cas':
- pass
- else:
- hash, load = module.__spec__.loader_state
- load.exec_module(hash, module)
-
- def defaultinit(casf):
- cachedir = pathlib.Path.home() / '.casimport_cache'
- cachedir.mkdir(exist_ok=True)
-
- casf.register(FileDirCAS(cachedir))
-
- # The global version
- _casfinder = CASFinder()
- defaultinit(_casfinder)
-
- import unittest
-
- class Test(unittest.TestCase):
- def setUp(self):
- # clear out the default casfinder if there is one
- self.old_meta_path = sys.meta_path
- sys.meta_path = [ x for x in sys.meta_path if not isinstance(x, CASFinder) ]
-
- # setup temporary directory
- d = pathlib.Path(os.path.realpath(tempfile.mkdtemp()))
- self.basetempdir = d
- self.tempdir = d / 'subdir'
- self.tempdir.mkdir()
-
- self.fixtures = pathlib.Path(__file__).parent.parent / 'fixtures'
-
- def tearDown(self):
- # restore environment
- sys.meta_path = self.old_meta_path
-
- importlib.invalidate_caches()
-
- # clean up sys.modules
- [ sys.modules.pop(x) for x in list(sys.modules.keys()) if
- x == 'cas' or x.startswith('cas.') ]
-
- shutil.rmtree(self.basetempdir)
- self.tempdir = None
-
- def test_filedircas_limit_refresh(self):
- # XXX - only refresh when the dir has changed, and each
- # file has changed
- pass
-
- def test_casimport(self):
- # That a CASFinder
- f = CASFinder()
-
- # make sure that we can't import anything at first
- with self.assertRaises(ImportError):
- import cas.v1_f_2398472398
-
- # when registering the fixtures directory
- f.register(FileDirCAS(self.fixtures))
-
- # can import the function
- from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello
-
- name = 'Olof'
- # and run the code
- self.assertEqual(hello(name), 'hello ' + name)
-
- # and when finished, can disconnect
- f.disconnect()
-
- # and is no longer in the meta_path
- self.assertNotIn(f, sys.meta_path)
-
- # and when disconnected as second time, nothing happens
- f.disconnect()
-
- def test_defaultinit(self):
- temphome = self.tempdir / 'home'
- temphome.mkdir()
- cachedir = temphome / '.casimport_cache'
-
- with tempset(os.environ, 'HOME', str(temphome)):
- with CASFinder() as f:
- # Setup the defaults
- defaultinit(f)
-
- # that the cache got created
- self.assertTrue(cachedir.is_dir())
-
- # and that when hello.py is copied to the cache
- shutil.copy(self.fixtures / 'hello.py', cachedir)
-
- # it can be imported
- from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello
-
- with CASFinder() as f:
- defaultinit(f)
-
- # and that a new CASFinder can still find it
- from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello
-
- def test_multiplecas(self):
- # that once we have one
- with CASFinder() as f:
- # if we try to create a second, it fails
- self.assertRaises(RuntimeError, CASFinder)
|