# Copyright 2009-2015 MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tools for creating `messages `_ to be sent to MongoDB. .. note:: This module is for internal use and is generally not needed by application developers. """ import random import struct import bson from bson.codec_options import DEFAULT_CODEC_OPTIONS from bson.py3compat import b, StringIO from bson.son import SON try: from pymongo import _cmessage _use_c = True except ImportError: _use_c = False from pymongo.errors import DocumentTooLarge, InvalidOperation, OperationFailure from pymongo.read_preferences import ReadPreference MAX_INT32 = 2147483647 MIN_INT32 = -2147483648 _INSERT = 0 _UPDATE = 1 _DELETE = 2 _EMPTY = b'' _BSONOBJ = b'\x03' _ZERO_8 = b'\x00' _ZERO_16 = b'\x00\x00' _ZERO_32 = b'\x00\x00\x00\x00' _ZERO_64 = b'\x00\x00\x00\x00\x00\x00\x00\x00' _SKIPLIM = b'\x00\x00\x00\x00\xff\xff\xff\xff' _OP_MAP = { _INSERT: b'\x04documents\x00\x00\x00\x00\x00', _UPDATE: b'\x04updates\x00\x00\x00\x00\x00', _DELETE: b'\x04deletes\x00\x00\x00\x00\x00', } def _maybe_add_read_preference(spec, read_preference): """Add $readPreference to spec when appropriate.""" mode = read_preference.mode tag_sets = read_preference.tag_sets # Only add $readPreference if it's something other than primary to avoid # problems with mongos versions that don't support read preferences. Also, # for maximum backwards compatibility, don't add $readPreference for # secondaryPreferred unless tags are in use (setting the slaveOkay bit # has the same effect). if mode and ( mode != ReadPreference.SECONDARY_PREFERRED.mode or tag_sets != [{}]): if "$query" not in spec: spec = SON([("$query", spec)]) spec["$readPreference"] = read_preference.document return spec class _Query(object): """A query operation.""" __slots__ = ('flags', 'ns', 'ntoskip', 'ntoreturn', 'spec', 'fields', 'codec_options', 'read_preference') def __init__(self, flags, ns, ntoskip, ntoreturn, spec, fields, codec_options, read_preference): self.flags = flags self.ns = ns self.ntoskip = ntoskip self.ntoreturn = ntoreturn self.spec = spec self.fields = fields self.codec_options = codec_options self.read_preference = read_preference def get_message(self, set_slave_ok, is_mongos): """Get a query message, possibly setting the slaveOk bit.""" if is_mongos: spec = _maybe_add_read_preference(self.spec, self.read_preference) else: spec = self.spec if set_slave_ok: # Set the slaveOk bit. flags = self.flags | 4 else: flags = self.flags return query(flags, self.ns, self.ntoskip, self.ntoreturn, spec, self.fields, self.codec_options) class _GetMore(object): """A getmore operation.""" __slots__ = ('ns', 'ntoreturn', 'cursor_id') def __init__(self, ns, ntoreturn, cursor_id): self.ns = ns self.ntoreturn = ntoreturn self.cursor_id = cursor_id def get_message(self, dummy0, dummy1): """Get a getmore message.""" return get_more(self.ns, self.ntoreturn, self.cursor_id) def __last_error(namespace, args): """Data to send to do a lastError. """ cmd = SON([("getlasterror", 1)]) cmd.update(args) splitns = namespace.split('.', 1) return query(0, splitns[0] + '.$cmd', 0, -1, cmd, None, DEFAULT_CODEC_OPTIONS) def __pack_message(operation, data): """Takes message data and adds a message header based on the operation. Returns the resultant message string. """ request_id = random.randint(MIN_INT32, MAX_INT32) message = struct.pack(" sock_info.max_bson_size) message_length += encoded_length if message_length < sock_info.max_message_size and not too_large: data.write(encoded) has_docs = True continue if has_docs: # We have enough data, send this message. try: request_id, msg = _insert_message(data.getvalue(), send_safe) sock_info.legacy_write(request_id, msg, 0, send_safe) # Exception type could be OperationFailure or a subtype # (e.g. DuplicateKeyError) except OperationFailure as exc: # Like it says, continue on error... if continue_on_error: # Store exception details to re-raise after the final batch. last_error = exc # With unacknowledged writes just return at the first error. elif not safe: return # With acknowledged writes raise immediately. else: raise if too_large: raise DocumentTooLarge("BSON document too large (%d bytes)" " - the connected server supports" " BSON document sizes up to %d" " bytes." % (encoded_length, sock_info.max_bson_size)) message_length = begin_loc + encoded_length data.seek(begin_loc) data.truncate() data.write(encoded) if not has_docs: raise InvalidOperation("cannot do an empty bulk insert") request_id, msg = _insert_message(data.getvalue(), safe) sock_info.legacy_write(request_id, msg, 0, safe) # Re-raise any exception stored due to continue_on_error if last_error is not None: raise last_error if _use_c: _do_batched_insert = _cmessage._do_batched_insert def _do_batched_write_command(namespace, operation, command, docs, check_keys, opts, sock_info): """Execute a batch of insert, update, or delete commands. """ max_bson_size = sock_info.max_bson_size max_write_batch_size = sock_info.max_write_batch_size # Max BSON object size + 16k - 2 bytes for ending NUL bytes. # Server guarantees there is enough room: SERVER-10643. max_cmd_size = max_bson_size + 16382 ordered = command.get('ordered', True) buf = StringIO() # Save space for message length and request id buf.write(_ZERO_64) # responseTo, opCode buf.write(b"\x00\x00\x00\x00\xd4\x07\x00\x00") # No options buf.write(_ZERO_32) # Namespace as C string buf.write(b(namespace)) buf.write(_ZERO_8) # Skip: 0, Limit: -1 buf.write(_SKIPLIM) # Where to write command document length command_start = buf.tell() buf.write(bson.BSON.encode(command)) # Start of payload buf.seek(-1, 2) # Work around some Jython weirdness. buf.truncate() try: buf.write(_OP_MAP[operation]) except KeyError: raise InvalidOperation('Unknown command') if operation in (_UPDATE, _DELETE): check_keys = False # Where to write list document length list_start = buf.tell() - 4 def send_message(): """Finalize and send the current OP_QUERY message. """ # Close list and command documents buf.write(_ZERO_16) # Write document lengths and request id length = buf.tell() buf.seek(list_start) buf.write(struct.pack('= max_cmd_size enough_documents = (idx >= max_write_batch_size) if enough_data or enough_documents: if not idx: if operation == _INSERT: raise DocumentTooLarge("BSON document too large (%d bytes)" " - the connected server supports" " BSON document sizes up to %d" " bytes." % (len(value), max_bson_size)) # There's nothing intelligent we can say # about size for update and remove raise DocumentTooLarge("command document too large") result = send_message() results.append((idx_offset, result)) if ordered and "writeErrors" in result: return results # Truncate back to the start of list elements buf.seek(list_start + 4) buf.truncate() idx_offset += idx idx = 0 key = b'0' buf.write(_BSONOBJ) buf.write(key) buf.write(_ZERO_8) buf.write(value) idx += 1 if not has_docs: raise InvalidOperation("cannot do an empty bulk write") results.append((idx_offset, send_message())) return results if _use_c: _do_batched_write_command = _cmessage._do_batched_write_command