From 5a27da467b8ca786ee78eda69b4d0d72689d87ed Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Sat, 1 Feb 2020 14:05:37 -0800 Subject: [PATCH] create the default cache direcotry, and initalize it.. also, currently always reread the cache directory, but this needs to be fixed.. --- casimport/__init__.py | 81 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 78 insertions(+), 3 deletions(-) diff --git a/casimport/__init__.py b/casimport/__init__.py index ce3c3b0..b58be4c 100644 --- a/casimport/__init__.py +++ b/casimport/__init__.py @@ -22,20 +22,35 @@ # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. +import contextlib import glob import hashlib +import importlib import os.path +import pathlib +import shutil import sys +import tempfile from importlib.abc import MetaPathFinder, Loader from importlib.machinery import ModuleSpec +@contextlib.contextmanager +def tempset(obj, key, value): + try: + oldvalue = obj[key] + obj[key] = value + yield + finally: + obj[key] = oldvalue + class FileDirCAS(object): def __init__(self, path): - self._path = path + self._path = pathlib.Path(path) self._hashes = {} - for i in glob.glob(os.path.join(path, '*.py')): + def refresh_dir(self): + for i in glob.glob(os.path.join(self._path, '*.py')): _, hash = self.read_hash_file(i) self._hashes[hash] = i @@ -50,6 +65,8 @@ class FileDirCAS(object): return False def exec_module(self, hash, module): + self.refresh_dir() + parts = hash.split('_', 2) fname = self._hashes[parts[2]] @@ -93,6 +110,8 @@ class CASFinder(MetaPathFinder, Loader): for l in self._loaders: ispkg = l.is_package(parts[1]) break + else: + return None ms = ModuleSpec(fullname, self, is_package=True, loader_state=(parts[1], l)) @@ -109,25 +128,56 @@ class CASFinder(MetaPathFinder, Loader): hash, load = module.__spec__.loader_state load.exec_module(hash, module) +def defaultinit(casf): + cachedir = pathlib.Path.home() / '.casimport_cache' + cachedir.mkdir(exist_ok=True) + + casf.register(FileDirCAS(cachedir)) + # The global version _casfinder = CASFinder() +defaultinit(_casfinder) import unittest class Test(unittest.TestCase): def setUp(self): + # clear out the default casfinder if there is one self.old_meta_path = sys.meta_path sys.meta_path = [ x for x in sys.meta_path if not isinstance(x, CASFinder) ] + # setup temporary directory + d = pathlib.Path(os.path.realpath(tempfile.mkdtemp())) + self.basetempdir = d + self.tempdir = d / 'subdir' + self.tempdir.mkdir() + + self.fixtures = pathlib.Path(__file__).parent.parent / 'fixtures' + def tearDown(self): + # restore environment sys.meta_path = self.old_meta_path + importlib.invalidate_caches() + + # clean up sys.modules + [ sys.modules.pop(x) for x in list(sys.modules.keys()) if + x == 'cas' or x.startswith('cas.') ] + + shutil.rmtree(self.basetempdir) + self.tempdir = None + + def test_filedircas_limit_refresh(self): + # XXX - only refresh when the dir has changed, and each + # file has changed + pass + def test_casimport(self): # That a CASFinder f = CASFinder() # when registering the fixtures directory - f.register(FileDirCAS(os.path.join(os.path.dirname(__file__), '..', 'fixtures'))) + f.register(FileDirCAS(self.fixtures)) # can import the function from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello @@ -145,6 +195,31 @@ class Test(unittest.TestCase): # and when disconnected as second time, nothing happens f.disconnect() + def test_defaultinit(self): + temphome = self.tempdir / 'home' + temphome.mkdir() + cachedir = temphome / '.casimport_cache' + + with tempset(os.environ, 'HOME', str(temphome)): + with CASFinder() as f: + # Setup the defaults + defaultinit(f) + + # that the cache got created + self.assertTrue(cachedir.is_dir()) + + # and that when hello.py is copied to the cache + shutil.copy(self.fixtures / 'hello.py', cachedir) + + # it can be imported + from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello + + with CASFinder() as f: + defaultinit(f) + + # and that a new CASFinder can still find it + from cas.v1_f_330884aa2febb5e19fb7194ec6a69ed11dd3d77122f1a5175ee93e73cf0161c3 import hello + def test_multiplecas(self): # that once we have one with CASFinder() as f: