| @@ -27,16 +27,13 @@ | |||
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |||
| import os, unittest, tempfile, random, string, sys | |||
| import zipfile | |||
| import io | |||
| import shutil | |||
| import zipfile | |||
| from libarchive import Archive, is_archive_name, is_archive | |||
| from libarchive.zip import is_zipfile, ZipFile, ZipEntry | |||
| TMPDIR = tempfile.mkdtemp(suffix='.python-libarchive') | |||
| ZIPFILE = 'test.zip' | |||
| ZIPPATH = os.path.join(TMPDIR, ZIPFILE) | |||
| FILENAMES = [ | |||
| 'test1.txt', | |||
| 'foo', | |||
| @@ -44,19 +41,31 @@ FILENAMES = [ | |||
| #'álért.txt', | |||
| ] | |||
| class MakeTempMixIn: | |||
| def setUp(self): | |||
| self.TMPDIR = tempfile.mkdtemp(suffix='.python-libarchive') | |||
| self.ZIPFILE = 'test.zip' | |||
| self.ZIPPATH = os.path.join(self.TMPDIR, self.ZIPFILE) | |||
| def make_temp_files(): | |||
| if not os.path.exists(ZIPPATH): | |||
| for name in FILENAMES: | |||
| with open(os.path.join(TMPDIR, name), 'w') as f: | |||
| f.write(''.join(random.sample(string.ascii_letters, 10))) | |||
| def tearDown(self): | |||
| shutil.rmtree(self.TMPDIR) | |||
| self.TMPDIR = None | |||
| self.ZIPFILE = None | |||
| self.ZIPPATH = None | |||
| def make_temp_archive(): | |||
| make_temp_files() | |||
| with zipfile.ZipFile(ZIPPATH, mode="w") as z: | |||
| for name in FILENAMES: | |||
| z.write(os.path.join(TMPDIR, name), arcname=name) | |||
| def make_temp_files(self): | |||
| if not os.path.exists(self.ZIPPATH): | |||
| for name in FILENAMES: | |||
| with open(os.path.join(self.TMPDIR, name), 'w') as f: | |||
| f.write(''.join(random.sample(string.ascii_letters, 10))) | |||
| def make_temp_archive(self): | |||
| self.make_temp_files() | |||
| with zipfile.ZipFile(self.ZIPPATH, mode="w") as z: | |||
| for name in FILENAMES: | |||
| z.write(os.path.join(self.TMPDIR, name), arcname=name) | |||
| class TestIsArchiveName(unittest.TestCase): | |||
| @@ -72,14 +81,18 @@ class TestIsArchiveName(unittest.TestCase): | |||
| self.assertEqual(is_archive_name('foo.rpm'), 'cpio') | |||
| class TestIsArchiveZip(unittest.TestCase): | |||
| class TestIsArchiveZip(unittest.TestCase, MakeTempMixIn): | |||
| def setUp(self): | |||
| make_temp_archive() | |||
| MakeTempMixIn.setUp(self) | |||
| self.make_temp_archive() | |||
| def tearDown(self): | |||
| MakeTempMixIn.tearDown(self) | |||
| def test_zip(self): | |||
| self.assertEqual(is_archive(ZIPPATH), True) | |||
| self.assertEqual(is_archive(ZIPPATH, formats=('zip',)), True) | |||
| self.assertEqual(is_archive(ZIPPATH, formats=('tar',)), False) | |||
| self.assertEqual(is_archive(self.ZIPPATH), True) | |||
| self.assertEqual(is_archive(self.ZIPPATH, formats=('zip',)), True) | |||
| self.assertEqual(is_archive(self.ZIPPATH, formats=('tar',)), False) | |||
| class TestIsArchiveTar(unittest.TestCase): | |||
| @@ -89,17 +102,19 @@ class TestIsArchiveTar(unittest.TestCase): | |||
| # TODO: incorporate tests from: | |||
| # http://hg.python.org/cpython/file/a6e1d926cd98/Lib/test/test_zipfile.py | |||
| class TestZipRead(unittest.TestCase): | |||
| class TestZipRead(unittest.TestCase, MakeTempMixIn): | |||
| def setUp(self): | |||
| make_temp_archive() | |||
| self.f = open(ZIPPATH, mode='r') | |||
| MakeTempMixIn.setUp(self) | |||
| self.make_temp_archive() | |||
| self.f = open(self.ZIPPATH, mode='r') | |||
| def tearDown(self): | |||
| self.f.close() | |||
| MakeTempMixIn.tearDown(self) | |||
| def test_iszipfile(self): | |||
| self.assertEqual(is_zipfile('/dev/null'), False) | |||
| self.assertEqual(is_zipfile(ZIPPATH), True) | |||
| self.assertEqual(is_zipfile(self.ZIPPATH), True) | |||
| def test_iterate(self): | |||
| z = ZipFile(self.f, 'r') | |||
| @@ -158,18 +173,20 @@ class TestZipRead(unittest.TestCase): | |||
| pass | |||
| class TestZipWrite(unittest.TestCase): | |||
| class TestZipWrite(unittest.TestCase, MakeTempMixIn): | |||
| def setUp(self): | |||
| make_temp_files() | |||
| self.f = open(ZIPPATH, mode='w') | |||
| MakeTempMixIn.setUp(self) | |||
| self.make_temp_files() | |||
| self.f = open(self.ZIPPATH, mode='w') | |||
| def tearDown(self): | |||
| self.f.close() | |||
| MakeTempMixIn.tearDown(self) | |||
| def test_writepath(self): | |||
| z = ZipFile(self.f, 'w') | |||
| for fname in FILENAMES: | |||
| with open(os.path.join(TMPDIR, fname), 'r') as f: | |||
| with open(os.path.join(self.TMPDIR, fname), 'r') as f: | |||
| z.writepath(f) | |||
| z.close() | |||
| @@ -182,7 +199,7 @@ class TestZipWrite(unittest.TestCase): | |||
| z.close() | |||
| self.f.close() | |||
| f = open(ZIPPATH, mode='r') | |||
| f = open(self.ZIPPATH, mode='r') | |||
| z = ZipFile(f, 'r') | |||
| entries = z.infolist() | |||
| @@ -195,7 +212,7 @@ class TestZipWrite(unittest.TestCase): | |||
| def test_writestream(self): | |||
| z = ZipFile(self.f, 'w') | |||
| for fname in FILENAMES: | |||
| full_path = os.path.join(TMPDIR, fname) | |||
| full_path = os.path.join(self.TMPDIR, fname) | |||
| i = open(full_path) | |||
| o = z.writestream(fname) | |||
| while True: | |||
| @@ -210,7 +227,7 @@ class TestZipWrite(unittest.TestCase): | |||
| def test_writestream_unbuffered(self): | |||
| z = ZipFile(self.f, 'w') | |||
| for fname in FILENAMES: | |||
| full_path = os.path.join(TMPDIR, fname) | |||
| full_path = os.path.join(self.TMPDIR, fname) | |||
| i = open(full_path) | |||
| o = z.writestream(fname, os.path.getsize(full_path)) | |||
| while True: | |||
| @@ -257,72 +274,75 @@ ITEM_NAME='test.txt' | |||
| ZIP1_PWD='pwd' | |||
| ZIP2_PWD='12345' | |||
| def create_file_from_content(): | |||
| with open(ZIPPATH, mode='wb') as f: | |||
| f.write(base64.b64decode(ZIP_CONTENT)) | |||
| class TestProtectedReading(unittest.TestCase, MakeTempMixIn): | |||
| def create_file_from_content(self): | |||
| with open(self.ZIPPATH, mode='wb') as f: | |||
| f.write(base64.b64decode(ZIP_CONTENT)) | |||
| def create_protected_zip(): | |||
| z = ZipFile(ZIPPATH, mode='w', password=ZIP2_PWD) | |||
| z.writestr(ITEM_NAME, ITEM_CONTENT) | |||
| z.close() | |||
| class TestProtectedReading(unittest.TestCase): | |||
| def setUp(self): | |||
| create_file_from_content() | |||
| MakeTempMixIn.setUp(self) | |||
| self.create_file_from_content() | |||
| def tearDown(self): | |||
| os.remove(ZIPPATH) | |||
| MakeTempMixIn.tearDown(self) | |||
| def test_read_with_password(self): | |||
| z = ZipFile(ZIPPATH, 'r', password=ZIP1_PWD) | |||
| z = ZipFile(self.ZIPPATH, 'r', password=ZIP1_PWD) | |||
| self.assertEqual(z.read(ITEM_NAME), bytes(ITEM_CONTENT, 'utf-8')) | |||
| z.close() | |||
| def test_read_without_password(self): | |||
| z = ZipFile(ZIPPATH, 'r') | |||
| z = ZipFile(self.ZIPPATH, 'r') | |||
| self.assertRaises(RuntimeError, z.read, ITEM_NAME) | |||
| z.close() | |||
| def test_read_with_wrong_password(self): | |||
| z = ZipFile(ZIPPATH, 'r', password='wrong') | |||
| z = ZipFile(self.ZIPPATH, 'r', password='wrong') | |||
| self.assertRaises(RuntimeError, z.read, ITEM_NAME) | |||
| z.close() | |||
| class TestProtectedWriting(unittest.TestCase): | |||
| class TestProtectedWriting(unittest.TestCase, MakeTempMixIn): | |||
| def create_protected_zip(self): | |||
| z = ZipFile(self.ZIPPATH, mode='w', password=ZIP2_PWD) | |||
| z.writestr(ITEM_NAME, ITEM_CONTENT) | |||
| z.close() | |||
| def setUp(self): | |||
| create_protected_zip() | |||
| MakeTempMixIn.setUp(self) | |||
| self.create_protected_zip() | |||
| def tearDown(self): | |||
| os.remove(ZIPPATH) | |||
| MakeTempMixIn.tearDown(self) | |||
| def test_read_with_password(self): | |||
| z = ZipFile(ZIPPATH, 'r', password=ZIP2_PWD) | |||
| z = ZipFile(self.ZIPPATH, 'r', password=ZIP2_PWD) | |||
| self.assertEqual(z.read(ITEM_NAME), bytes(ITEM_CONTENT, 'utf-8')) | |||
| z.close() | |||
| def test_read_without_password(self): | |||
| z = ZipFile(ZIPPATH, 'r') | |||
| z = ZipFile(self.ZIPPATH, 'r') | |||
| self.assertRaises(RuntimeError, z.read, ITEM_NAME) | |||
| z.close() | |||
| def test_read_with_wrong_password(self): | |||
| z = ZipFile(ZIPPATH, 'r', password='wrong') | |||
| z = ZipFile(self.ZIPPATH, 'r', password='wrong') | |||
| self.assertRaises(RuntimeError, z.read, ITEM_NAME) | |||
| z.close() | |||
| def test_read_with_password_list(self): | |||
| z = ZipFile(ZIPPATH, 'r', password=[ZIP1_PWD, ZIP2_PWD]) | |||
| z = ZipFile(self.ZIPPATH, 'r', password=[ZIP1_PWD, ZIP2_PWD]) | |||
| self.assertEqual(z.read(ITEM_NAME), bytes(ITEM_CONTENT, 'utf-8')) | |||
| z.close() | |||
| class TestHighLevelAPI(unittest.TestCase): | |||
| class TestHighLevelAPI(unittest.TestCase, MakeTempMixIn): | |||
| def setUp(self): | |||
| make_temp_archive() | |||
| MakeTempMixIn.setUp(self) | |||
| self.make_temp_archive() | |||
| def tearDown(self): | |||
| MakeTempMixIn.tearDown(self) | |||
| def _test_listing_content(self, f): | |||
| """Test helper capturing file paths while iterating the archive.""" | |||
| @@ -335,16 +355,16 @@ class TestHighLevelAPI(unittest.TestCase): | |||
| def test_open_by_name(self): | |||
| """Test an archive opened directly by name.""" | |||
| self._test_listing_content(ZIPPATH) | |||
| self._test_listing_content(self.ZIPPATH) | |||
| def test_open_by_named_fobj(self): | |||
| """Test an archive using a file-like object opened by name.""" | |||
| with open(ZIPPATH, 'rb') as f: | |||
| with open(self.ZIPPATH, 'rb') as f: | |||
| self._test_listing_content(f) | |||
| def test_open_by_unnamed_fobj(self): | |||
| """Test an archive using file-like object opened by fileno().""" | |||
| with open(ZIPPATH, 'rb') as zf: | |||
| with open(self.ZIPPATH, 'rb') as zf: | |||
| with io.FileIO(zf.fileno(), mode='r', closefd=False) as f: | |||
| self._test_listing_content(f) | |||