archive_util.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. """Utilities for extracting common archive formats"""
  2. __all__ = [
  3. "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
  4. "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
  5. ]
  6. import zipfile, tarfile, os, shutil, posixpath
  7. from pkg_resources import ensure_directory
  8. from distutils.errors import DistutilsError
  9. class UnrecognizedFormat(DistutilsError):
  10. """Couldn't recognize the archive type"""
  11. def default_filter(src,dst):
  12. """The default progress/filter callback; returns True for all files"""
  13. return dst
  14. def unpack_archive(filename, extract_dir, progress_filter=default_filter,
  15. drivers=None
  16. ):
  17. """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
  18. `progress_filter` is a function taking two arguments: a source path
  19. internal to the archive ('/'-separated), and a filesystem path where it
  20. will be extracted. The callback must return the desired extract path
  21. (which may be the same as the one passed in), or else ``None`` to skip
  22. that file or directory. The callback can thus be used to report on the
  23. progress of the extraction, as well as to filter the items extracted or
  24. alter their extraction paths.
  25. `drivers`, if supplied, must be a non-empty sequence of functions with the
  26. same signature as this function (minus the `drivers` argument), that raise
  27. ``UnrecognizedFormat`` if they do not support extracting the designated
  28. archive type. The `drivers` are tried in sequence until one is found that
  29. does not raise an error, or until all are exhausted (in which case
  30. ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
  31. drivers, the module's ``extraction_drivers`` constant will be used, which
  32. means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
  33. order.
  34. """
  35. for driver in drivers or extraction_drivers:
  36. try:
  37. driver(filename, extract_dir, progress_filter)
  38. except UnrecognizedFormat:
  39. continue
  40. else:
  41. return
  42. else:
  43. raise UnrecognizedFormat(
  44. "Not a recognized archive type: %s" % filename
  45. )
  46. def unpack_directory(filename, extract_dir, progress_filter=default_filter):
  47. """"Unpack" a directory, using the same interface as for archives
  48. Raises ``UnrecognizedFormat`` if `filename` is not a directory
  49. """
  50. if not os.path.isdir(filename):
  51. raise UnrecognizedFormat("%s is not a directory" % (filename,))
  52. paths = {filename:('',extract_dir)}
  53. for base, dirs, files in os.walk(filename):
  54. src,dst = paths[base]
  55. for d in dirs:
  56. paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d)
  57. for f in files:
  58. name = src+f
  59. target = os.path.join(dst,f)
  60. target = progress_filter(src+f, target)
  61. if not target:
  62. continue # skip non-files
  63. ensure_directory(target)
  64. f = os.path.join(base,f)
  65. shutil.copyfile(f, target)
  66. shutil.copystat(f, target)
  67. def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
  68. """Unpack zip `filename` to `extract_dir`
  69. Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
  70. by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
  71. of the `progress_filter` argument.
  72. """
  73. if not zipfile.is_zipfile(filename):
  74. raise UnrecognizedFormat("%s is not a zip file" % (filename,))
  75. z = zipfile.ZipFile(filename)
  76. try:
  77. for info in z.infolist():
  78. name = info.filename
  79. # don't extract absolute paths or ones with .. in them
  80. if name.startswith('/') or '..' in name.split('/'):
  81. continue
  82. target = os.path.join(extract_dir, *name.split('/'))
  83. target = progress_filter(name, target)
  84. if not target:
  85. continue
  86. if name.endswith('/'):
  87. # directory
  88. ensure_directory(target)
  89. else:
  90. # file
  91. ensure_directory(target)
  92. data = z.read(info.filename)
  93. f = open(target,'wb')
  94. try:
  95. f.write(data)
  96. finally:
  97. f.close()
  98. del data
  99. unix_attributes = info.external_attr >> 16
  100. if unix_attributes:
  101. os.chmod(target, unix_attributes)
  102. finally:
  103. z.close()
  104. def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
  105. """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
  106. Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
  107. by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
  108. of the `progress_filter` argument.
  109. """
  110. try:
  111. tarobj = tarfile.open(filename)
  112. except tarfile.TarError:
  113. raise UnrecognizedFormat(
  114. "%s is not a compressed or uncompressed tar file" % (filename,)
  115. )
  116. try:
  117. tarobj.chown = lambda *args: None # don't do any chowning!
  118. for member in tarobj:
  119. name = member.name
  120. # don't extract absolute paths or ones with .. in them
  121. if not name.startswith('/') and '..' not in name.split('/'):
  122. prelim_dst = os.path.join(extract_dir, *name.split('/'))
  123. # resolve any links and to extract the link targets as normal files
  124. while member is not None and (member.islnk() or member.issym()):
  125. linkpath = member.linkname
  126. if member.issym():
  127. linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
  128. linkpath = posixpath.normpath(linkpath)
  129. member = tarobj._getmember(linkpath)
  130. if member is not None and (member.isfile() or member.isdir()):
  131. final_dst = progress_filter(name, prelim_dst)
  132. if final_dst:
  133. if final_dst.endswith(os.sep):
  134. final_dst = final_dst[:-1]
  135. try:
  136. tarobj._extract_member(member, final_dst) # XXX Ugh
  137. except tarfile.ExtractError:
  138. pass # chown/chmod/mkfifo/mknode/makedev failed
  139. return True
  140. finally:
  141. tarobj.close()
  142. extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile