MetaData Sharing
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

168 lines
3.8 KiB

  1. from . import bencode
  2. from hashlib import sha1
  3. import importlib.resources
  4. import itertools
  5. import os
  6. import pathlib
  7. import shutil
  8. import sys
  9. import tempfile
  10. import unittest
  11. class Storage:
  12. def __init__(self, rootpath, files, piecelen, encoding='us-ascii'):
  13. self._rootpath = pathlib.Path(rootpath)
  14. self._files = files
  15. self._piecelen = piecelen
  16. self._encoding = encoding
  17. self._buildindex()
  18. def _buildindex(self):
  19. self._index = []
  20. files = iter(self._files)
  21. left = 0
  22. curfile = None
  23. while True:
  24. if curfile is None or curfileoff == curfile['length']:
  25. # next file
  26. try:
  27. curfile = next(files)
  28. fname = pathlib.Path(
  29. *(x.decode(self._encoding) for x in
  30. curfile['path']))
  31. curfilepath = self._rootpath / fname
  32. except StopIteration:
  33. break
  34. curfileoff = 0
  35. if left == 0:
  36. current = []
  37. self._index.append(current)
  38. left = self._piecelen
  39. sz = min(curfile['length'] - curfileoff, left)
  40. current.append(dict(file=curfilepath, fname=fname,
  41. offset=curfileoff, size=sz))
  42. curfileoff += sz
  43. left -= sz
  44. def apply_piece(self, idx, fun):
  45. for i in self._index[idx]:
  46. with open(i['file'], 'rb') as fp:
  47. fp.seek(i['offset'])
  48. fun(fp.read(i['size']))
  49. def validate(torrent, basedir):
  50. info = torrent['info']
  51. basedir = pathlib.Path(basedir)
  52. try:
  53. encoding = torrent['encoding'].decode('us-ascii')
  54. except KeyError:
  55. encoding = 'us-ascii'
  56. torrentdir = basedir / info['name'].decode(encoding)
  57. stor = Storage(torrentdir, info['files'], info['piece length'], encoding)
  58. pieces = info['pieces']
  59. for num, i in enumerate(pieces[x:x+20] for x in range(0, len(pieces),
  60. 20)):
  61. hash = sha1()
  62. stor.apply_piece(num, hash.update)
  63. if hash.digest() != i:
  64. raise ValueError
  65. class _TestCases(unittest.TestCase):
  66. dirname = 'somedir'
  67. origfiledata = {
  68. 'filea.txt': b'foo\n',
  69. 'fileb.txt': b'bar\n',
  70. 'filec.txt': b'bleha\n',
  71. 'filed.txt': b'somehow\n',
  72. 'filee.txt': b'nowab\n',
  73. 'filef/filef.txt': b'\n',
  74. }
  75. def setUp(self):
  76. d = pathlib.Path(tempfile.mkdtemp()).resolve()
  77. tor = importlib.resources.files(__name__)
  78. tor = tor / 'fixtures' / 'somedir.torrent'
  79. with tor.open('rb') as fp:
  80. self.torrent = bencode.bdecode(fp.read())
  81. self.basetempdir = d
  82. self.oldcwd = os.getcwd()
  83. os.chdir(d)
  84. def tearDown(self):
  85. shutil.rmtree(self.basetempdir)
  86. os.chdir(self.oldcwd)
  87. @staticmethod
  88. def make_files(dname, fdict):
  89. dname = pathlib.Path(dname)
  90. for k, v in fdict.items():
  91. k = dname / pathlib.PurePosixPath(k)
  92. k.parent.mkdir(parents=True, exist_ok=True)
  93. with open(k, 'wb') as fp:
  94. fp.write(v)
  95. def test_completeverif(self):
  96. sd = self.basetempdir / self.dirname
  97. sd.mkdir()
  98. self.make_files(sd, self.origfiledata)
  99. validate(self.torrent, self.basetempdir)
  100. # encoded names
  101. sd = self.basetempdir / 'thai'
  102. sd.mkdir()
  103. self.make_files(sd, { 'thai - สวัสดี.txt': b'hello\n'
  104. })
  105. tor = importlib.resources.files(__name__)
  106. tor = tor / 'fixtures' / 'thai.torrent'
  107. with tor.open('rb') as fp:
  108. torrent = bencode.bdecode(fp.read())
  109. validate(torrent, self.basetempdir)
  110. def test_verification(self):
  111. # Testing for "missing" files
  112. # piece size 2 (aka 4 bytes)
  113. # empty file of 4 bytes 'foo\n'
  114. # complete file of 4 bytes 'bar\n'
  115. # partial missing file, 6 bytes, last two correct 'bleha\n'
  116. # complete file of 8 bytes (multiple pieces) 'somehow\n'
  117. # partial missing file, starting w/ 2 bytes, length 6 'nowab\n'
  118. # complete file (length 1) '\n'
  119. missingfiles = self.origfiledata.copy()
  120. missingfiles['filea.txt'] = b''
  121. missingfiles['filec.txt'] = b'\x00\x00\x00\x00a\n'
  122. missingfiles['filee.txt'] = b'no'
  123. sd = self.basetempdir / self.dirname
  124. sd.mkdir()
  125. self.make_files(sd, missingfiles)
  126. self.assertRaises(ValueError, validate, self.torrent, self.basetempdir)