message.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. # Copyright 2009-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Tools for creating `messages
  15. <http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>`_ to be sent to
  16. MongoDB.
  17. .. note:: This module is for internal use and is generally not needed by
  18. application developers.
  19. """
  20. import random
  21. import struct
  22. import bson
  23. from bson.codec_options import DEFAULT_CODEC_OPTIONS
  24. from bson.py3compat import b, StringIO
  25. from bson.son import SON
  26. try:
  27. from pymongo import _cmessage
  28. _use_c = True
  29. except ImportError:
  30. _use_c = False
  31. from pymongo.errors import DocumentTooLarge, InvalidOperation, OperationFailure
  32. from pymongo.read_preferences import ReadPreference
  33. MAX_INT32 = 2147483647
  34. MIN_INT32 = -2147483648
  35. _INSERT = 0
  36. _UPDATE = 1
  37. _DELETE = 2
  38. _EMPTY = b''
  39. _BSONOBJ = b'\x03'
  40. _ZERO_8 = b'\x00'
  41. _ZERO_16 = b'\x00\x00'
  42. _ZERO_32 = b'\x00\x00\x00\x00'
  43. _ZERO_64 = b'\x00\x00\x00\x00\x00\x00\x00\x00'
  44. _SKIPLIM = b'\x00\x00\x00\x00\xff\xff\xff\xff'
  45. _OP_MAP = {
  46. _INSERT: b'\x04documents\x00\x00\x00\x00\x00',
  47. _UPDATE: b'\x04updates\x00\x00\x00\x00\x00',
  48. _DELETE: b'\x04deletes\x00\x00\x00\x00\x00',
  49. }
  50. def _maybe_add_read_preference(spec, read_preference):
  51. """Add $readPreference to spec when appropriate."""
  52. mode = read_preference.mode
  53. tag_sets = read_preference.tag_sets
  54. # Only add $readPreference if it's something other than primary to avoid
  55. # problems with mongos versions that don't support read preferences. Also,
  56. # for maximum backwards compatibility, don't add $readPreference for
  57. # secondaryPreferred unless tags are in use (setting the slaveOkay bit
  58. # has the same effect).
  59. if mode and (
  60. mode != ReadPreference.SECONDARY_PREFERRED.mode or tag_sets != [{}]):
  61. if "$query" not in spec:
  62. spec = SON([("$query", spec)])
  63. spec["$readPreference"] = read_preference.document
  64. return spec
  65. class _Query(object):
  66. """A query operation."""
  67. __slots__ = ('flags', 'ns', 'ntoskip', 'ntoreturn',
  68. 'spec', 'fields', 'codec_options', 'read_preference')
  69. def __init__(self, flags, ns, ntoskip, ntoreturn,
  70. spec, fields, codec_options, read_preference):
  71. self.flags = flags
  72. self.ns = ns
  73. self.ntoskip = ntoskip
  74. self.ntoreturn = ntoreturn
  75. self.spec = spec
  76. self.fields = fields
  77. self.codec_options = codec_options
  78. self.read_preference = read_preference
  79. def get_message(self, set_slave_ok, is_mongos):
  80. """Get a query message, possibly setting the slaveOk bit."""
  81. if is_mongos:
  82. spec = _maybe_add_read_preference(self.spec, self.read_preference)
  83. else:
  84. spec = self.spec
  85. if set_slave_ok:
  86. # Set the slaveOk bit.
  87. flags = self.flags | 4
  88. else:
  89. flags = self.flags
  90. return query(flags, self.ns, self.ntoskip,
  91. self.ntoreturn, spec, self.fields, self.codec_options)
  92. class _GetMore(object):
  93. """A getmore operation."""
  94. __slots__ = ('ns', 'ntoreturn', 'cursor_id')
  95. def __init__(self, ns, ntoreturn, cursor_id):
  96. self.ns = ns
  97. self.ntoreturn = ntoreturn
  98. self.cursor_id = cursor_id
  99. def get_message(self, dummy0, dummy1):
  100. """Get a getmore message."""
  101. return get_more(self.ns, self.ntoreturn, self.cursor_id)
  102. def __last_error(namespace, args):
  103. """Data to send to do a lastError.
  104. """
  105. cmd = SON([("getlasterror", 1)])
  106. cmd.update(args)
  107. splitns = namespace.split('.', 1)
  108. return query(0, splitns[0] + '.$cmd', 0, -1, cmd,
  109. None, DEFAULT_CODEC_OPTIONS)
  110. def __pack_message(operation, data):
  111. """Takes message data and adds a message header based on the operation.
  112. Returns the resultant message string.
  113. """
  114. request_id = random.randint(MIN_INT32, MAX_INT32)
  115. message = struct.pack("<i", 16 + len(data))
  116. message += struct.pack("<i", request_id)
  117. message += _ZERO_32 # responseTo
  118. message += struct.pack("<i", operation)
  119. return (request_id, message + data)
  120. def insert(collection_name, docs, check_keys,
  121. safe, last_error_args, continue_on_error, opts):
  122. """Get an **insert** message.
  123. Used by the Bulk API to insert into pre-2.6 servers. Collection.insert
  124. uses _do_batched_insert.
  125. """
  126. options = 0
  127. if continue_on_error:
  128. options += 1
  129. data = struct.pack("<i", options)
  130. data += bson._make_c_string(collection_name)
  131. encoded = [bson.BSON.encode(doc, check_keys, opts) for doc in docs]
  132. if not encoded:
  133. raise InvalidOperation("cannot do an empty bulk insert")
  134. max_bson_size = max(map(len, encoded))
  135. data += _EMPTY.join(encoded)
  136. if safe:
  137. (_, insert_message) = __pack_message(2002, data)
  138. (request_id, error_message, _) = __last_error(collection_name,
  139. last_error_args)
  140. return (request_id, insert_message + error_message, max_bson_size)
  141. else:
  142. (request_id, insert_message) = __pack_message(2002, data)
  143. return (request_id, insert_message, max_bson_size)
  144. if _use_c:
  145. insert = _cmessage._insert_message
  146. def update(collection_name, upsert, multi,
  147. spec, doc, safe, last_error_args, check_keys, opts):
  148. """Get an **update** message.
  149. """
  150. options = 0
  151. if upsert:
  152. options += 1
  153. if multi:
  154. options += 2
  155. data = _ZERO_32
  156. data += bson._make_c_string(collection_name)
  157. data += struct.pack("<i", options)
  158. data += bson.BSON.encode(spec, False, opts)
  159. encoded = bson.BSON.encode(doc, check_keys, opts)
  160. data += encoded
  161. if safe:
  162. (_, update_message) = __pack_message(2001, data)
  163. (request_id, error_message, _) = __last_error(collection_name,
  164. last_error_args)
  165. return (request_id, update_message + error_message, len(encoded))
  166. else:
  167. (request_id, update_message) = __pack_message(2001, data)
  168. return (request_id, update_message, len(encoded))
  169. if _use_c:
  170. update = _cmessage._update_message
  171. def query(options, collection_name, num_to_skip,
  172. num_to_return, query, field_selector, opts):
  173. """Get a **query** message.
  174. """
  175. data = struct.pack("<I", options)
  176. data += bson._make_c_string(collection_name)
  177. data += struct.pack("<i", num_to_skip)
  178. data += struct.pack("<i", num_to_return)
  179. encoded = bson.BSON.encode(query, False, opts)
  180. data += encoded
  181. max_bson_size = len(encoded)
  182. if field_selector is not None:
  183. encoded = bson.BSON.encode(field_selector, False, opts)
  184. data += encoded
  185. max_bson_size = max(len(encoded), max_bson_size)
  186. (request_id, query_message) = __pack_message(2004, data)
  187. return (request_id, query_message, max_bson_size)
  188. if _use_c:
  189. query = _cmessage._query_message
  190. def get_more(collection_name, num_to_return, cursor_id):
  191. """Get a **getMore** message.
  192. """
  193. data = _ZERO_32
  194. data += bson._make_c_string(collection_name)
  195. data += struct.pack("<i", num_to_return)
  196. data += struct.pack("<q", cursor_id)
  197. return __pack_message(2005, data)
  198. if _use_c:
  199. get_more = _cmessage._get_more_message
  200. def delete(collection_name, spec, safe,
  201. last_error_args, opts, flags=0):
  202. """Get a **delete** message.
  203. `opts` is a CodecOptions. `flags` is a bit vector that may contain
  204. the SingleRemove flag or not:
  205. http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#op-delete
  206. """
  207. data = _ZERO_32
  208. data += bson._make_c_string(collection_name)
  209. data += struct.pack("<I", flags)
  210. encoded = bson.BSON.encode(spec, False, opts)
  211. data += encoded
  212. if safe:
  213. (_, remove_message) = __pack_message(2006, data)
  214. (request_id, error_message, _) = __last_error(collection_name,
  215. last_error_args)
  216. return (request_id, remove_message + error_message, len(encoded))
  217. else:
  218. (request_id, remove_message) = __pack_message(2006, data)
  219. return (request_id, remove_message, len(encoded))
  220. def kill_cursors(cursor_ids):
  221. """Get a **killCursors** message.
  222. """
  223. data = _ZERO_32
  224. data += struct.pack("<i", len(cursor_ids))
  225. for cursor_id in cursor_ids:
  226. data += struct.pack("<q", cursor_id)
  227. return __pack_message(2007, data)
  228. def _do_batched_insert(collection_name, docs, check_keys,
  229. safe, last_error_args, continue_on_error, opts,
  230. sock_info):
  231. """Insert `docs` using multiple batches.
  232. """
  233. def _insert_message(insert_message, send_safe):
  234. """Build the insert message with header and GLE.
  235. """
  236. request_id, final_message = __pack_message(2002, insert_message)
  237. if send_safe:
  238. request_id, error_message, _ = __last_error(collection_name,
  239. last_error_args)
  240. final_message += error_message
  241. return request_id, final_message
  242. send_safe = safe or not continue_on_error
  243. last_error = None
  244. data = StringIO()
  245. data.write(struct.pack("<i", int(continue_on_error)))
  246. data.write(bson._make_c_string(collection_name))
  247. message_length = begin_loc = data.tell()
  248. has_docs = False
  249. for doc in docs:
  250. encoded = bson.BSON.encode(doc, check_keys, opts)
  251. encoded_length = len(encoded)
  252. too_large = (encoded_length > sock_info.max_bson_size)
  253. message_length += encoded_length
  254. if message_length < sock_info.max_message_size and not too_large:
  255. data.write(encoded)
  256. has_docs = True
  257. continue
  258. if has_docs:
  259. # We have enough data, send this message.
  260. try:
  261. request_id, msg = _insert_message(data.getvalue(), send_safe)
  262. sock_info.legacy_write(request_id, msg, 0, send_safe)
  263. # Exception type could be OperationFailure or a subtype
  264. # (e.g. DuplicateKeyError)
  265. except OperationFailure as exc:
  266. # Like it says, continue on error...
  267. if continue_on_error:
  268. # Store exception details to re-raise after the final batch.
  269. last_error = exc
  270. # With unacknowledged writes just return at the first error.
  271. elif not safe:
  272. return
  273. # With acknowledged writes raise immediately.
  274. else:
  275. raise
  276. if too_large:
  277. raise DocumentTooLarge("BSON document too large (%d bytes)"
  278. " - the connected server supports"
  279. " BSON document sizes up to %d"
  280. " bytes." %
  281. (encoded_length, sock_info.max_bson_size))
  282. message_length = begin_loc + encoded_length
  283. data.seek(begin_loc)
  284. data.truncate()
  285. data.write(encoded)
  286. if not has_docs:
  287. raise InvalidOperation("cannot do an empty bulk insert")
  288. request_id, msg = _insert_message(data.getvalue(), safe)
  289. sock_info.legacy_write(request_id, msg, 0, safe)
  290. # Re-raise any exception stored due to continue_on_error
  291. if last_error is not None:
  292. raise last_error
  293. if _use_c:
  294. _do_batched_insert = _cmessage._do_batched_insert
  295. def _do_batched_write_command(namespace, operation, command,
  296. docs, check_keys, opts, sock_info):
  297. """Execute a batch of insert, update, or delete commands.
  298. """
  299. max_bson_size = sock_info.max_bson_size
  300. max_write_batch_size = sock_info.max_write_batch_size
  301. # Max BSON object size + 16k - 2 bytes for ending NUL bytes.
  302. # Server guarantees there is enough room: SERVER-10643.
  303. max_cmd_size = max_bson_size + 16382
  304. ordered = command.get('ordered', True)
  305. buf = StringIO()
  306. # Save space for message length and request id
  307. buf.write(_ZERO_64)
  308. # responseTo, opCode
  309. buf.write(b"\x00\x00\x00\x00\xd4\x07\x00\x00")
  310. # No options
  311. buf.write(_ZERO_32)
  312. # Namespace as C string
  313. buf.write(b(namespace))
  314. buf.write(_ZERO_8)
  315. # Skip: 0, Limit: -1
  316. buf.write(_SKIPLIM)
  317. # Where to write command document length
  318. command_start = buf.tell()
  319. buf.write(bson.BSON.encode(command))
  320. # Start of payload
  321. buf.seek(-1, 2)
  322. # Work around some Jython weirdness.
  323. buf.truncate()
  324. try:
  325. buf.write(_OP_MAP[operation])
  326. except KeyError:
  327. raise InvalidOperation('Unknown command')
  328. if operation in (_UPDATE, _DELETE):
  329. check_keys = False
  330. # Where to write list document length
  331. list_start = buf.tell() - 4
  332. def send_message():
  333. """Finalize and send the current OP_QUERY message.
  334. """
  335. # Close list and command documents
  336. buf.write(_ZERO_16)
  337. # Write document lengths and request id
  338. length = buf.tell()
  339. buf.seek(list_start)
  340. buf.write(struct.pack('<i', length - list_start - 1))
  341. buf.seek(command_start)
  342. buf.write(struct.pack('<i', length - command_start))
  343. buf.seek(4)
  344. request_id = random.randint(MIN_INT32, MAX_INT32)
  345. buf.write(struct.pack('<i', request_id))
  346. buf.seek(0)
  347. buf.write(struct.pack('<i', length))
  348. return sock_info.write_command(request_id, buf.getvalue())
  349. # If there are multiple batches we'll
  350. # merge results in the caller.
  351. results = []
  352. idx = 0
  353. idx_offset = 0
  354. has_docs = False
  355. for doc in docs:
  356. has_docs = True
  357. # Encode the current operation
  358. key = b(str(idx))
  359. value = bson.BSON.encode(doc, check_keys, opts)
  360. # Send a batch?
  361. enough_data = (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size
  362. enough_documents = (idx >= max_write_batch_size)
  363. if enough_data or enough_documents:
  364. if not idx:
  365. if operation == _INSERT:
  366. raise DocumentTooLarge("BSON document too large (%d bytes)"
  367. " - the connected server supports"
  368. " BSON document sizes up to %d"
  369. " bytes." % (len(value),
  370. max_bson_size))
  371. # There's nothing intelligent we can say
  372. # about size for update and remove
  373. raise DocumentTooLarge("command document too large")
  374. result = send_message()
  375. results.append((idx_offset, result))
  376. if ordered and "writeErrors" in result:
  377. return results
  378. # Truncate back to the start of list elements
  379. buf.seek(list_start + 4)
  380. buf.truncate()
  381. idx_offset += idx
  382. idx = 0
  383. key = b'0'
  384. buf.write(_BSONOBJ)
  385. buf.write(key)
  386. buf.write(_ZERO_8)
  387. buf.write(value)
  388. idx += 1
  389. if not has_docs:
  390. raise InvalidOperation("cannot do an empty bulk write")
  391. results.append((idx_offset, send_message()))
  392. return results
  393. if _use_c:
  394. _do_batched_write_command = _cmessage._do_batched_write_command