log.py 9.2 KB


  1. """Logging
  2. """
  3. import sys
  4. import os
  5. import logging
  6. from pip import backwardcompat
  7. from pip._vendor import colorama, pkg_resources
  8. def _color_wrap(*colors):
  9. def wrapped(inp):
  10. return "".join(list(colors) + [inp, colorama.Style.RESET_ALL])
  11. return wrapped
  12. def should_color(consumer, environ, std=(sys.stdout, sys.stderr)):
  13. real_consumer = (consumer if not isinstance(consumer, colorama.AnsiToWin32)
  14. else consumer.wrapped)
  15. # If consumer isn't stdout or stderr we shouldn't colorize it
  16. if real_consumer not in std:
  17. return False
  18. # If consumer is a tty we should color it
  19. if hasattr(real_consumer, "isatty") and real_consumer.isatty():
  20. return True
  21. # If we have an ASNI term we should color it
  22. if environ.get("TERM") == "ANSI":
  23. return True
  24. # If anything else we should not color it
  25. return False
  26. def should_warn(current_version, removal_version):
  27. # Our Significant digits on versions is 2, so remove everything but the
  28. # first two places.
  29. current_version = ".".join(current_version.split(".")[:2])
  30. removal_version = ".".join(removal_version.split(".")[:2])
  31. # Our warning threshold is one minor version before removal, so we
  32. # decrement the minor version by one
  33. major, minor = removal_version.split(".")
  34. minor = str(int(minor) - 1)
  35. warn_version = ".".join([major, minor])
  36. # Test if our current_version should be a warn
  37. return (pkg_resources.parse_version(current_version)
  38. < pkg_resources.parse_version(warn_version))
  39. class Logger(object):
  40. """
  41. Logging object for use in command-line script. Allows ranges of
  42. levels, to avoid some redundancy of displayed information.
  43. """
  44. VERBOSE_DEBUG = logging.DEBUG - 1
  45. DEBUG = logging.DEBUG
  46. INFO = logging.INFO
  47. NOTIFY = (logging.INFO + logging.WARN) / 2
  48. WARN = WARNING = logging.WARN
  49. ERROR = logging.ERROR
  50. FATAL = logging.FATAL
  51. LEVELS = [VERBOSE_DEBUG, DEBUG, INFO, NOTIFY, WARN, ERROR, FATAL]
  52. COLORS = {
  53. WARN: _color_wrap(colorama.Fore.YELLOW),
  54. ERROR: _color_wrap(colorama.Fore.RED),
  55. FATAL: _color_wrap(colorama.Fore.RED),
  56. }
  57. def __init__(self):
  58. self.consumers = []
  59. self.indent = 0
  60. self.explicit_levels = False
  61. self.in_progress = None
  62. self.in_progress_hanging = False
  63. def add_consumers(self, *consumers):
  64. if sys.platform.startswith("win"):
  65. for level, consumer in consumers:
  66. if hasattr(consumer, "write"):
  67. self.consumers.append(
  68. (level, colorama.AnsiToWin32(consumer)),
  69. )
  70. else:
  71. self.consumers.append((level, consumer))
  72. else:
  73. self.consumers.extend(consumers)
  74. def debug(self, msg, *args, **kw):
  75. self.log(self.DEBUG, msg, *args, **kw)
  76. def info(self, msg, *args, **kw):
  77. self.log(self.INFO, msg, *args, **kw)
  78. def notify(self, msg, *args, **kw):
  79. self.log(self.NOTIFY, msg, *args, **kw)
  80. def warn(self, msg, *args, **kw):
  81. self.log(self.WARN, msg, *args, **kw)
  82. def error(self, msg, *args, **kw):
  83. self.log(self.ERROR, msg, *args, **kw)
  84. def fatal(self, msg, *args, **kw):
  85. self.log(self.FATAL, msg, *args, **kw)
  86. def deprecated(self, removal_version, msg, *args, **kwargs):
  87. """
  88. Logs deprecation message which is log level WARN if the
  89. ``removal_version`` is > 1 minor release away and log level ERROR
  90. otherwise.
  91. removal_version should be the version that the deprecated feature is
  92. expected to be removed in, so something that will not exist in
  93. version 1.7, but will in 1.6 would have a removal_version of 1.7.
  94. """
  95. from pip import __version__
  96. if should_warn(__version__, removal_version):
  97. self.warn(msg, *args, **kwargs)
  98. else:
  99. self.error(msg, *args, **kwargs)
  100. def log(self, level, msg, *args, **kw):
  101. if args:
  102. if kw:
  103. raise TypeError(
  104. "You may give positional or keyword arguments, not both")
  105. args = args or kw
  106. # render
  107. if args:
  108. rendered = msg % args
  109. else:
  110. rendered = msg
  111. rendered = ' ' * self.indent + rendered
  112. if self.explicit_levels:
  113. ## FIXME: should this be a name, not a level number?
  114. rendered = '%02i %s' % (level, rendered)
  115. for consumer_level, consumer in self.consumers:
  116. if self.level_matches(level, consumer_level):
  117. if (self.in_progress_hanging
  118. and consumer in (sys.stdout, sys.stderr)):
  119. self.in_progress_hanging = False
  120. sys.stdout.write('\n')
  121. sys.stdout.flush()
  122. if hasattr(consumer, 'write'):
  123. write_content = rendered + '\n'
  124. if should_color(consumer, os.environ):
  125. # We are printing to stdout or stderr and it supports
  126. # colors so render our text colored
  127. colorizer = self.COLORS.get(level, lambda x: x)
  128. write_content = colorizer(write_content)
  129. consumer.write(write_content)
  130. if hasattr(consumer, 'flush'):
  131. consumer.flush()
  132. else:
  133. consumer(rendered)
  134. def _show_progress(self):
  135. """Should we display download progress?"""
  136. return (self.stdout_level_matches(self.NOTIFY) and sys.stdout.isatty())
  137. def start_progress(self, msg):
  138. assert not self.in_progress, (
  139. "Tried to start_progress(%r) while in_progress %r"
  140. % (msg, self.in_progress))
  141. if self._show_progress():
  142. sys.stdout.write(' ' * self.indent + msg)
  143. sys.stdout.flush()
  144. self.in_progress_hanging = True
  145. else:
  146. self.in_progress_hanging = False
  147. self.in_progress = msg
  148. self.last_message = None
  149. def end_progress(self, msg='done.'):
  150. assert self.in_progress, (
  151. "Tried to end_progress without start_progress")
  152. if self._show_progress():
  153. if not self.in_progress_hanging:
  154. # Some message has been printed out since start_progress
  155. sys.stdout.write('...' + self.in_progress + msg + '\n')
  156. sys.stdout.flush()
  157. else:
  158. # These erase any messages shown with show_progress (besides .'s)
  159. logger.show_progress('')
  160. logger.show_progress('')
  161. sys.stdout.write(msg + '\n')
  162. sys.stdout.flush()
  163. self.in_progress = None
  164. self.in_progress_hanging = False
  165. def show_progress(self, message=None):
  166. """If we are in a progress scope, and no log messages have been
  167. shown, write out another '.'"""
  168. if self.in_progress_hanging:
  169. if message is None:
  170. sys.stdout.write('.')
  171. sys.stdout.flush()
  172. else:
  173. if self.last_message:
  174. padding = ' ' * max(0, len(self.last_message) - len(message))
  175. else:
  176. padding = ''
  177. sys.stdout.write('\r%s%s%s%s' %
  178. (' ' * self.indent, self.in_progress, message, padding))
  179. sys.stdout.flush()
  180. self.last_message = message
  181. def stdout_level_matches(self, level):
  182. """Returns true if a message at this level will go to stdout"""
  183. return self.level_matches(level, self._stdout_level())
  184. def _stdout_level(self):
  185. """Returns the level that stdout runs at"""
  186. for level, consumer in self.consumers:
  187. if consumer is sys.stdout:
  188. return level
  189. return self.FATAL
  190. def level_matches(self, level, consumer_level):
  191. """
  192. >>> l = Logger()
  193. >>> l.level_matches(3, 4)
  194. False
  195. >>> l.level_matches(3, 2)
  196. True
  197. >>> l.level_matches(slice(None, 3), 3)
  198. False
  199. >>> l.level_matches(slice(None, 3), 2)
  200. True
  201. >>> l.level_matches(slice(1, 3), 1)
  202. True
  203. >>> l.level_matches(slice(2, 3), 1)
  204. False
  205. """
  206. if isinstance(level, slice):
  207. start, stop = level.start, level.stop
  208. if start is not None and start > consumer_level:
  209. return False
  210. if stop is not None or stop <= consumer_level:
  211. return False
  212. return True
  213. else:
  214. return level >= consumer_level
  215. @classmethod
  216. def level_for_integer(cls, level):
  217. levels = cls.LEVELS
  218. if level < 0:
  219. return levels[0]
  220. if level >= len(levels):
  221. return levels[-1]
  222. return levels[level]
  223. def move_stdout_to_stderr(self):
  224. to_remove = []
  225. to_add = []
  226. for consumer_level, consumer in self.consumers:
  227. if consumer == sys.stdout:
  228. to_remove.append((consumer_level, consumer))
  229. to_add.append((consumer_level, sys.stderr))
  230. for item in to_remove:
  231. self.consumers.remove(item)
  232. self.consumers.extend(to_add)
  233. logger = Logger()