helpers.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. # Copyright 2009-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Bits and pieces used by the driver that don't really fit elsewhere."""
  15. import collections
  16. import struct
  17. from pymongo.message import _Query
  18. import bson
  19. import pymongo
  20. from bson.codec_options import CodecOptions
  21. from bson.py3compat import itervalues, string_type, iteritems, u
  22. from bson.son import SON
  23. from pymongo.errors import (CursorNotFound,
  24. DuplicateKeyError,
  25. ExecutionTimeout,
  26. NotMasterError,
  27. OperationFailure,
  28. WriteError,
  29. WriteConcernError,
  30. WTimeoutError)
  31. _UUNDER = u("_")
  32. def _gen_index_name(keys):
  33. """Generate an index name from the set of fields it is over."""
  34. return _UUNDER.join(["%s_%s" % item for item in keys])
  35. def _index_list(key_or_list, direction=None):
  36. """Helper to generate a list of (key, direction) pairs.
  37. Takes such a list, or a single key, or a single key and direction.
  38. """
  39. if direction is not None:
  40. return [(key_or_list, direction)]
  41. else:
  42. if isinstance(key_or_list, string_type):
  43. return [(key_or_list, pymongo.ASCENDING)]
  44. elif not isinstance(key_or_list, (list, tuple)):
  45. raise TypeError("if no direction is specified, "
  46. "key_or_list must be an instance of list")
  47. return key_or_list
  48. def _index_document(index_list):
  49. """Helper to generate an index specifying document.
  50. Takes a list of (key, direction) pairs.
  51. """
  52. if isinstance(index_list, collections.Mapping):
  53. raise TypeError("passing a dict to sort/create_index/hint is not "
  54. "allowed - use a list of tuples instead. did you "
  55. "mean %r?" % list(iteritems(index_list)))
  56. elif not isinstance(index_list, (list, tuple)):
  57. raise TypeError("must use a list of (key, direction) pairs, "
  58. "not: " + repr(index_list))
  59. if not len(index_list):
  60. raise ValueError("key_or_list must not be the empty list")
  61. index = SON()
  62. for (key, value) in index_list:
  63. if not isinstance(key, string_type):
  64. raise TypeError("first item in each key pair must be a string")
  65. if not isinstance(value, (string_type, int, collections.Mapping)):
  66. raise TypeError("second item in each key pair must be 1, -1, "
  67. "'2d', 'geoHaystack', or another valid MongoDB "
  68. "index specifier.")
  69. index[key] = value
  70. return index
  71. def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()):
  72. """Unpack a response from the database.
  73. Check the response for errors and unpack, returning a dictionary
  74. containing the response data.
  75. Can raise CursorNotFound, NotMasterError, ExecutionTimeout, or
  76. OperationFailure.
  77. :Parameters:
  78. - `response`: byte string as returned from the database
  79. - `cursor_id` (optional): cursor_id we sent to get this response -
  80. used for raising an informative exception when we get cursor id not
  81. valid at server response
  82. - `codec_options` (optional): an instance of
  83. :class:`~bson.codec_options.CodecOptions`
  84. """
  85. response_flag = struct.unpack("<i", response[:4])[0]
  86. if response_flag & 1:
  87. # Shouldn't get this response if we aren't doing a getMore
  88. assert cursor_id is not None
  89. raise CursorNotFound("cursor id '%s' not valid at server" %
  90. cursor_id)
  91. elif response_flag & 2:
  92. error_object = bson.BSON(response[20:]).decode()
  93. if error_object["$err"].startswith("not master"):
  94. raise NotMasterError(error_object["$err"])
  95. elif error_object.get("code") == 50:
  96. raise ExecutionTimeout(error_object.get("$err"),
  97. error_object.get("code"),
  98. error_object)
  99. raise OperationFailure("database error: %s" %
  100. error_object.get("$err"),
  101. error_object.get("code"),
  102. error_object)
  103. result = {}
  104. result["cursor_id"] = struct.unpack("<q", response[4:12])[0]
  105. result["starting_from"] = struct.unpack("<i", response[12:16])[0]
  106. result["number_returned"] = struct.unpack("<i", response[16:20])[0]
  107. result["data"] = bson.decode_all(response[20:], codec_options)
  108. assert len(result["data"]) == result["number_returned"]
  109. return result
  110. def _check_command_response(response, msg=None, allowable_errors=None):
  111. """Check the response to a command for errors.
  112. """
  113. if "ok" not in response:
  114. # Server didn't recognize our message as a command.
  115. raise OperationFailure(response.get("$err"),
  116. response.get("code"),
  117. response)
  118. # TODO: remove, this is moving to _check_gle_response
  119. if response.get("wtimeout", False):
  120. # MongoDB versions before 1.8.0 return the error message in an "errmsg"
  121. # field. If "errmsg" exists "err" will also exist set to None, so we
  122. # have to check for "errmsg" first.
  123. raise WTimeoutError(response.get("errmsg", response.get("err")),
  124. response.get("code"),
  125. response)
  126. if not response["ok"]:
  127. details = response
  128. # Mongos returns the error details in a 'raw' object
  129. # for some errors.
  130. if "raw" in response:
  131. for shard in itervalues(response["raw"]):
  132. # Grab the first non-empty raw error from a shard.
  133. if shard.get("errmsg") and not shard.get("ok"):
  134. details = shard
  135. break
  136. errmsg = details["errmsg"]
  137. if allowable_errors is None or errmsg not in allowable_errors:
  138. # Server is "not master" or "recovering"
  139. if (errmsg.startswith("not master")
  140. or errmsg.startswith("node is recovering")):
  141. raise NotMasterError(errmsg)
  142. # Server assertion failures
  143. if errmsg == "db assertion failure":
  144. errmsg = ("db assertion failure, assertion: '%s'" %
  145. details.get("assertion", ""))
  146. raise OperationFailure(errmsg,
  147. details.get("assertionCode"),
  148. response)
  149. # Other errors
  150. code = details.get("code")
  151. # findAndModify with upsert can raise duplicate key error
  152. if code in (11000, 11001, 12582):
  153. raise DuplicateKeyError(errmsg, code, response)
  154. elif code == 50:
  155. raise ExecutionTimeout(errmsg, code, response)
  156. msg = msg or "%s"
  157. raise OperationFailure(msg % errmsg, code, response)
  158. def _check_gle_response(response):
  159. """Return getlasterror response as a dict, or raise OperationFailure."""
  160. response = _unpack_response(response)
  161. assert response["number_returned"] == 1
  162. result = response["data"][0]
  163. # Did getlasterror itself fail?
  164. _check_command_response(result)
  165. if result.get("wtimeout", False):
  166. # MongoDB versions before 1.8.0 return the error message in an "errmsg"
  167. # field. If "errmsg" exists "err" will also exist set to None, so we
  168. # have to check for "errmsg" first.
  169. raise WTimeoutError(result.get("errmsg", result.get("err")),
  170. result.get("code"),
  171. result)
  172. error_msg = result.get("err", "")
  173. if error_msg is None:
  174. return result
  175. if error_msg.startswith("not master"):
  176. raise NotMasterError(error_msg)
  177. details = result
  178. # mongos returns the error code in an error object for some errors.
  179. if "errObjects" in result:
  180. for errobj in result["errObjects"]:
  181. if errobj.get("err") == error_msg:
  182. details = errobj
  183. break
  184. code = details.get("code")
  185. if code in (11000, 11001, 12582):
  186. raise DuplicateKeyError(details["err"], code, result)
  187. raise OperationFailure(details["err"], code, result)
  188. def _first_batch(sock_info, namespace, query,
  189. limit, slave_ok, codec_options, read_preference):
  190. """Simple query helper for retrieving a first (and possibly only) batch."""
  191. query = _Query(
  192. 0, namespace, 0, limit, query, None, codec_options, read_preference)
  193. request_id, msg, max_doc_size = query.get_message(slave_ok,
  194. sock_info.is_mongos)
  195. sock_info.send_message(msg, max_doc_size)
  196. response = sock_info.receive_message(1, request_id)
  197. return _unpack_response(response, None, codec_options)
  198. def _check_write_command_response(results):
  199. """Backward compatibility helper for write command error handling.
  200. """
  201. errors = [res for res in results
  202. if "writeErrors" in res[1] or "writeConcernError" in res[1]]
  203. if errors:
  204. # If multiple batches had errors
  205. # raise from the last batch.
  206. offset, result = errors[-1]
  207. # Prefer write errors over write concern errors
  208. write_errors = result.get("writeErrors")
  209. if write_errors:
  210. # If the last batch had multiple errors only report
  211. # the last error to emulate continue_on_error.
  212. error = write_errors[-1]
  213. error["index"] += offset
  214. if error.get("code") == 11000:
  215. raise DuplicateKeyError(error.get("errmsg"), 11000, error)
  216. raise WriteError(error.get("errmsg"), error.get("code"), error)
  217. else:
  218. error = result["writeConcernError"]
  219. if "errInfo" in error and error["errInfo"].get('wtimeout'):
  220. # Make sure we raise WTimeoutError
  221. raise WTimeoutError(
  222. error.get("errmsg"), error.get("code"), error)
  223. raise WriteConcernError(
  224. error.get("errmsg"), error.get("code"), error)
  225. def _fields_list_to_dict(fields, option_name):
  226. """Takes a list of field names and returns a matching dictionary.
  227. ["a", "b"] becomes {"a": 1, "b": 1}
  228. and
  229. ["a.b.c", "d", "a.c"] becomes {"a.b.c": 1, "d": 1, "a.c": 1}
  230. """
  231. if isinstance(fields, collections.Mapping):
  232. return fields
  233. elif isinstance(fields, list):
  234. as_dict = {}
  235. for field in fields:
  236. if not isinstance(field, string_type):
  237. raise TypeError("%s must be a list of key names, each an "
  238. "instance of %s" % (option_name,
  239. string_type.__name__))
  240. as_dict[field] = 1
  241. return as_dict
  242. else:
  243. raise TypeError("%s must be a mapping or "
  244. "list of key names" % (option_name,))