util.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. """Utility functions."""
  2. import sys
  3. import os
  4. import base64
  5. import json
  6. import hashlib
  7. __all__ = ['urlsafe_b64encode', 'urlsafe_b64decode', 'utf8',
  8. 'to_json', 'from_json', 'matches_requirement']
  9. def urlsafe_b64encode(data):
  10. """urlsafe_b64encode without padding"""
  11. return base64.urlsafe_b64encode(data).rstrip(binary('='))
  12. def urlsafe_b64decode(data):
  13. """urlsafe_b64decode without padding"""
  14. pad = b'=' * (4 - (len(data) & 3))
  15. return base64.urlsafe_b64decode(data + pad)
  16. def to_json(o):
  17. '''Convert given data to JSON.'''
  18. return json.dumps(o, sort_keys=True)
  19. def from_json(j):
  20. '''Decode a JSON payload.'''
  21. return json.loads(j)
  22. def open_for_csv(name, mode):
  23. if sys.version_info[0] < 3:
  24. nl = {}
  25. bin = 'b'
  26. else:
  27. nl = { 'newline': '' }
  28. bin = ''
  29. return open(name, mode + bin, **nl)
  30. try:
  31. unicode
  32. def utf8(data):
  33. '''Utf-8 encode data.'''
  34. if isinstance(data, unicode):
  35. return data.encode('utf-8')
  36. return data
  37. except NameError:
  38. def utf8(data):
  39. '''Utf-8 encode data.'''
  40. if isinstance(data, str):
  41. return data.encode('utf-8')
  42. return data
  43. try:
  44. # For encoding ascii back and forth between bytestrings, as is repeatedly
  45. # necessary in JSON-based crypto under Python 3
  46. unicode
  47. def native(s):
  48. return s
  49. def binary(s):
  50. if isinstance(s, unicode):
  51. return s.encode('ascii')
  52. return s
  53. except NameError:
  54. def native(s):
  55. if isinstance(s, bytes):
  56. return s.decode('ascii')
  57. return s
  58. def binary(s):
  59. if isinstance(s, str):
  60. return s.encode('ascii')
  61. class HashingFile(object):
  62. def __init__(self, fd, hashtype='sha256'):
  63. self.fd = fd
  64. self.hashtype = hashtype
  65. self.hash = hashlib.new(hashtype)
  66. self.length = 0
  67. def write(self, data):
  68. self.hash.update(data)
  69. self.length += len(data)
  70. self.fd.write(data)
  71. def close(self):
  72. self.fd.close()
  73. def digest(self):
  74. if self.hashtype == 'md5':
  75. return self.hash.hexdigest()
  76. digest = self.hash.digest()
  77. return self.hashtype + '=' + native(urlsafe_b64encode(digest))
  78. if sys.platform == 'win32':
  79. import ctypes.wintypes
  80. # CSIDL_APPDATA for reference - not used here for compatibility with
  81. # dirspec, which uses LOCAL_APPDATA and COMMON_APPDATA in that order
  82. csidl = dict(CSIDL_APPDATA=26, CSIDL_LOCAL_APPDATA=28,
  83. CSIDL_COMMON_APPDATA=35)
  84. def get_path(name):
  85. SHGFP_TYPE_CURRENT = 0
  86. buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
  87. ctypes.windll.shell32.SHGetFolderPathW(0, csidl[name], 0, SHGFP_TYPE_CURRENT, buf)
  88. return buf.value
  89. def save_config_path(*resource):
  90. appdata = get_path("CSIDL_LOCAL_APPDATA")
  91. path = os.path.join(appdata, *resource)
  92. if not os.path.isdir(path):
  93. os.makedirs(path)
  94. return path
  95. def load_config_paths(*resource):
  96. ids = ["CSIDL_LOCAL_APPDATA", "CSIDL_COMMON_APPDATA"]
  97. for id in ids:
  98. base = get_path(id)
  99. path = os.path.join(base, *resource)
  100. if os.path.exists(path):
  101. yield path
  102. else:
  103. def save_config_path(*resource):
  104. import xdg.BaseDirectory
  105. return xdg.BaseDirectory.save_config_path(*resource)
  106. def load_config_paths(*resource):
  107. import xdg.BaseDirectory
  108. return xdg.BaseDirectory.load_config_paths(*resource)
  109. def matches_requirement(req, wheels):
  110. """List of wheels matching a requirement.
  111. :param req: The requirement to satisfy
  112. :param wheels: List of wheels to search.
  113. """
  114. try:
  115. from pkg_resources import Distribution, Requirement
  116. except ImportError:
  117. raise RuntimeError("Cannot use requirements without pkg_resources")
  118. req = Requirement.parse(req)
  119. selected = []
  120. for wf in wheels:
  121. f = wf.parsed_filename
  122. dist = Distribution(project_name=f.group("name"), version=f.group("ver"))
  123. if dist in req:
  124. selected.append(wf)
  125. return selected