bulk.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. # Copyright 2014-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """The bulk write operations interface.
  15. .. versionadded:: 2.7
  16. """
  17. from bson.objectid import ObjectId
  18. from bson.py3compat import u
  19. from bson.son import SON
  20. from pymongo.common import (validate_is_mapping,
  21. validate_is_mutable_mapping,
  22. validate_ok_for_replace,
  23. validate_ok_for_update)
  24. from pymongo.errors import (BulkWriteError,
  25. DocumentTooLarge,
  26. InvalidOperation,
  27. OperationFailure)
  28. from pymongo.message import (_INSERT, _UPDATE, _DELETE,
  29. _do_batched_write_command)
  30. from pymongo.write_concern import WriteConcern
  31. _DELETE_ALL = 0
  32. _DELETE_ONE = 1
  33. # For backwards compatibility. See MongoDB src/mongo/base/error_codes.err
  34. _BAD_VALUE = 2
  35. _UNKNOWN_ERROR = 8
  36. _WRITE_CONCERN_ERROR = 64
  37. _COMMANDS = ('insert', 'update', 'delete')
  38. # These string literals are used when we create fake server return
  39. # documents client side. We use unicode literals in python 2.x to
  40. # match the actual return values from the server.
  41. _UID = u("_id")
  42. _UCODE = u("code")
  43. _UERRMSG = u("errmsg")
  44. _UINDEX = u("index")
  45. _UOP = u("op")
  46. class _Run(object):
  47. """Represents a batch of write operations.
  48. """
  49. def __init__(self, op_type):
  50. """Initialize a new Run object.
  51. """
  52. self.op_type = op_type
  53. self.index_map = []
  54. self.ops = []
  55. def index(self, idx):
  56. """Get the original index of an operation in this run.
  57. :Parameters:
  58. - `idx`: The Run index that maps to the original index.
  59. """
  60. return self.index_map[idx]
  61. def add(self, original_index, operation):
  62. """Add an operation to this Run instance.
  63. :Parameters:
  64. - `original_index`: The original index of this operation
  65. within a larger bulk operation.
  66. - `operation`: The operation document.
  67. """
  68. self.index_map.append(original_index)
  69. self.ops.append(operation)
  70. def _make_error(index, code, errmsg, operation):
  71. """Create and return an error document.
  72. """
  73. return {
  74. _UINDEX: index,
  75. _UCODE: code,
  76. _UERRMSG: errmsg,
  77. _UOP: operation
  78. }
  79. def _merge_legacy(run, full_result, result, index):
  80. """Merge a result from a legacy opcode into the full results.
  81. """
  82. affected = result.get('n', 0)
  83. errmsg = result.get("errmsg", result.get("err", ""))
  84. if errmsg:
  85. # wtimeout is not considered a hard failure in
  86. # MongoDB 2.6 so don't treat it like one here.
  87. if result.get("wtimeout"):
  88. error_doc = {'errmsg': errmsg, 'code': _WRITE_CONCERN_ERROR}
  89. full_result['writeConcernErrors'].append(error_doc)
  90. else:
  91. code = result.get("code", _UNKNOWN_ERROR)
  92. error = _make_error(run.index(index), code, errmsg, run.ops[index])
  93. if "errInfo" in result:
  94. error["errInfo"] = result["errInfo"]
  95. full_result["writeErrors"].append(error)
  96. return
  97. if run.op_type == _INSERT:
  98. full_result['nInserted'] += 1
  99. elif run.op_type == _UPDATE:
  100. if "upserted" in result:
  101. doc = {_UINDEX: run.index(index), _UID: result["upserted"]}
  102. full_result["upserted"].append(doc)
  103. full_result['nUpserted'] += affected
  104. # Versions of MongoDB before 2.6 don't return the _id for an
  105. # upsert if _id is not an ObjectId.
  106. elif result.get("updatedExisting") == False and affected == 1:
  107. op = run.ops[index]
  108. # If _id is in both the update document *and* the query spec
  109. # the update document _id takes precedence.
  110. _id = op['u'].get('_id', op['q'].get('_id'))
  111. doc = {_UINDEX: run.index(index), _UID: _id}
  112. full_result["upserted"].append(doc)
  113. full_result['nUpserted'] += affected
  114. else:
  115. full_result['nMatched'] += affected
  116. elif run.op_type == _DELETE:
  117. full_result['nRemoved'] += affected
  118. def _merge_command(run, full_result, results):
  119. """Merge a group of results from write commands into the full result.
  120. """
  121. for offset, result in results:
  122. affected = result.get("n", 0)
  123. if run.op_type == _INSERT:
  124. full_result["nInserted"] += affected
  125. elif run.op_type == _DELETE:
  126. full_result["nRemoved"] += affected
  127. elif run.op_type == _UPDATE:
  128. upserted = result.get("upserted")
  129. if upserted:
  130. if isinstance(upserted, list):
  131. n_upserted = len(upserted)
  132. for doc in upserted:
  133. doc["index"] = run.index(doc["index"] + offset)
  134. full_result["upserted"].extend(upserted)
  135. else:
  136. n_upserted = 1
  137. index = run.index(offset)
  138. doc = {_UINDEX: index, _UID: upserted}
  139. full_result["upserted"].append(doc)
  140. full_result["nUpserted"] += n_upserted
  141. full_result["nMatched"] += (affected - n_upserted)
  142. else:
  143. full_result["nMatched"] += affected
  144. n_modified = result.get("nModified")
  145. # SERVER-13001 - in a mixed sharded cluster a call to
  146. # update could return nModified (>= 2.6) or not (<= 2.4).
  147. # If any call does not return nModified we can't report
  148. # a valid final count so omit the field completely.
  149. if n_modified is not None and "nModified" in full_result:
  150. full_result["nModified"] += n_modified
  151. else:
  152. full_result.pop("nModified", None)
  153. write_errors = result.get("writeErrors")
  154. if write_errors:
  155. for doc in write_errors:
  156. idx = doc["index"] + offset
  157. doc["index"] = run.index(idx)
  158. # Add the failed operation to the error document.
  159. doc[_UOP] = run.ops[idx]
  160. full_result["writeErrors"].extend(write_errors)
  161. wc_error = result.get("writeConcernError")
  162. if wc_error:
  163. full_result["writeConcernErrors"].append(wc_error)
  164. class _Bulk(object):
  165. """The private guts of the bulk write API.
  166. """
  167. def __init__(self, collection, ordered):
  168. """Initialize a _Bulk instance.
  169. """
  170. self.collection = collection
  171. self.ordered = ordered
  172. self.ops = []
  173. self.name = "%s.%s" % (collection.database.name, collection.name)
  174. self.namespace = collection.database.name + '.$cmd'
  175. self.executed = False
  176. def add_insert(self, document):
  177. """Add an insert document to the list of ops.
  178. """
  179. validate_is_mutable_mapping("document", document)
  180. # Generate ObjectId client side.
  181. if '_id' not in document:
  182. document['_id'] = ObjectId()
  183. self.ops.append((_INSERT, document))
  184. def add_update(self, selector, update, multi=False, upsert=False):
  185. """Create an update document and add it to the list of ops.
  186. """
  187. validate_ok_for_update(update)
  188. cmd = SON([('q', selector), ('u', update),
  189. ('multi', multi), ('upsert', upsert)])
  190. self.ops.append((_UPDATE, cmd))
  191. def add_replace(self, selector, replacement, upsert=False):
  192. """Create a replace document and add it to the list of ops.
  193. """
  194. validate_ok_for_replace(replacement)
  195. cmd = SON([('q', selector), ('u', replacement),
  196. ('multi', False), ('upsert', upsert)])
  197. self.ops.append((_UPDATE, cmd))
  198. def add_delete(self, selector, limit):
  199. """Create a delete document and add it to the list of ops.
  200. """
  201. cmd = SON([('q', selector), ('limit', limit)])
  202. self.ops.append((_DELETE, cmd))
  203. def gen_ordered(self):
  204. """Generate batches of operations, batched by type of
  205. operation, in the order **provided**.
  206. """
  207. run = None
  208. for idx, (op_type, operation) in enumerate(self.ops):
  209. if run is None:
  210. run = _Run(op_type)
  211. elif run.op_type != op_type:
  212. yield run
  213. run = _Run(op_type)
  214. run.add(idx, operation)
  215. yield run
  216. def gen_unordered(self):
  217. """Generate batches of operations, batched by type of
  218. operation, in arbitrary order.
  219. """
  220. operations = [_Run(_INSERT), _Run(_UPDATE), _Run(_DELETE)]
  221. for idx, (op_type, operation) in enumerate(self.ops):
  222. operations[op_type].add(idx, operation)
  223. for run in operations:
  224. if run.ops:
  225. yield run
  226. def execute_command(self, sock_info, generator, write_concern):
  227. """Execute using write commands.
  228. """
  229. # nModified is only reported for write commands, not legacy ops.
  230. full_result = {
  231. "writeErrors": [],
  232. "writeConcernErrors": [],
  233. "nInserted": 0,
  234. "nUpserted": 0,
  235. "nMatched": 0,
  236. "nModified": 0,
  237. "nRemoved": 0,
  238. "upserted": [],
  239. }
  240. for run in generator:
  241. cmd = SON([(_COMMANDS[run.op_type], self.collection.name),
  242. ('ordered', self.ordered)])
  243. if write_concern.document:
  244. cmd['writeConcern'] = write_concern.document
  245. results = _do_batched_write_command(
  246. self.namespace, run.op_type, cmd,
  247. run.ops, True, self.collection.codec_options, sock_info)
  248. _merge_command(run, full_result, results)
  249. # We're supposed to continue if errors are
  250. # at the write concern level (e.g. wtimeout)
  251. if self.ordered and full_result['writeErrors']:
  252. break
  253. if full_result["writeErrors"] or full_result["writeConcernErrors"]:
  254. if full_result['writeErrors']:
  255. full_result['writeErrors'].sort(
  256. key=lambda error: error['index'])
  257. raise BulkWriteError(full_result)
  258. return full_result
  259. def execute_no_results(self, sock_info, generator):
  260. """Execute all operations, returning no results (w=0).
  261. """
  262. coll = self.collection
  263. # If ordered is True we have to send GLE or use write
  264. # commands so we can abort on the first error.
  265. write_concern = WriteConcern(w=int(self.ordered))
  266. for run in generator:
  267. try:
  268. if run.op_type == _INSERT:
  269. coll._insert(sock_info,
  270. run.ops,
  271. self.ordered,
  272. write_concern=write_concern)
  273. elif run.op_type == _UPDATE:
  274. for operation in run.ops:
  275. doc = operation['u']
  276. check_keys = True
  277. if doc and next(iter(doc)).startswith('$'):
  278. check_keys = False
  279. coll._update(sock_info,
  280. operation['q'],
  281. doc,
  282. operation['upsert'],
  283. check_keys,
  284. operation['multi'],
  285. write_concern=write_concern)
  286. else:
  287. for operation in run.ops:
  288. coll._delete(sock_info,
  289. operation['q'],
  290. not operation['limit'],
  291. write_concern)
  292. except OperationFailure:
  293. if self.ordered:
  294. break
  295. def execute_legacy(self, sock_info, generator, write_concern):
  296. """Execute using legacy wire protocol ops.
  297. """
  298. coll = self.collection
  299. full_result = {
  300. "writeErrors": [],
  301. "writeConcernErrors": [],
  302. "nInserted": 0,
  303. "nUpserted": 0,
  304. "nMatched": 0,
  305. "nRemoved": 0,
  306. "upserted": [],
  307. }
  308. stop = False
  309. for run in generator:
  310. for idx, operation in enumerate(run.ops):
  311. try:
  312. # To do per-operation reporting we have to do ops one
  313. # at a time. That means the performance of bulk insert
  314. # will be slower here than calling Collection.insert()
  315. if run.op_type == _INSERT:
  316. coll._insert(sock_info,
  317. operation,
  318. write_concern=write_concern)
  319. result = {}
  320. elif run.op_type == _UPDATE:
  321. doc = operation['u']
  322. check_keys = True
  323. if doc and next(iter(doc)).startswith('$'):
  324. check_keys = False
  325. result = coll._update(sock_info,
  326. operation['q'],
  327. doc,
  328. operation['upsert'],
  329. check_keys,
  330. operation['multi'],
  331. write_concern=write_concern)
  332. else:
  333. result = coll._delete(sock_info,
  334. operation['q'],
  335. not operation['limit'],
  336. write_concern)
  337. _merge_legacy(run, full_result, result, idx)
  338. except DocumentTooLarge as exc:
  339. # MongoDB 2.6 uses error code 2 for "too large".
  340. error = _make_error(
  341. run.index(idx), _BAD_VALUE, str(exc), operation)
  342. full_result['writeErrors'].append(error)
  343. if self.ordered:
  344. stop = True
  345. break
  346. except OperationFailure as exc:
  347. if not exc.details:
  348. # Some error not related to the write operation
  349. # (e.g. kerberos failure). Re-raise immediately.
  350. raise
  351. _merge_legacy(run, full_result, exc.details, idx)
  352. # We're supposed to continue if errors are
  353. # at the write concern level (e.g. wtimeout)
  354. if self.ordered and full_result["writeErrors"]:
  355. stop = True
  356. break
  357. if stop:
  358. break
  359. if full_result["writeErrors"] or full_result['writeConcernErrors']:
  360. if full_result['writeErrors']:
  361. full_result['writeErrors'].sort(
  362. key=lambda error: error['index'])
  363. raise BulkWriteError(full_result)
  364. return full_result
  365. def execute(self, write_concern):
  366. """Execute operations.
  367. """
  368. if not self.ops:
  369. raise InvalidOperation('No operations to execute')
  370. if self.executed:
  371. raise InvalidOperation('Bulk operations can '
  372. 'only be executed once.')
  373. self.executed = True
  374. write_concern = (WriteConcern(**write_concern) if
  375. write_concern else self.collection.write_concern)
  376. if self.ordered:
  377. generator = self.gen_ordered()
  378. else:
  379. generator = self.gen_unordered()
  380. client = self.collection.database.client
  381. with client._socket_for_writes() as sock_info:
  382. if not write_concern.acknowledged:
  383. self.execute_no_results(sock_info, generator)
  384. elif sock_info.max_wire_version > 1:
  385. return self.execute_command(sock_info, generator, write_concern)
  386. else:
  387. return self.execute_legacy(sock_info, generator, write_concern)
  388. class BulkUpsertOperation(object):
  389. """An interface for adding upsert operations.
  390. """
  391. __slots__ = ('__selector', '__bulk')
  392. def __init__(self, selector, bulk):
  393. self.__selector = selector
  394. self.__bulk = bulk
  395. def update_one(self, update):
  396. """Update one document matching the selector.
  397. :Parameters:
  398. - `update` (dict): the update operations to apply
  399. """
  400. self.__bulk.add_update(self.__selector,
  401. update, multi=False, upsert=True)
  402. def update(self, update):
  403. """Update all documents matching the selector.
  404. :Parameters:
  405. - `update` (dict): the update operations to apply
  406. """
  407. self.__bulk.add_update(self.__selector,
  408. update, multi=True, upsert=True)
  409. def replace_one(self, replacement):
  410. """Replace one entire document matching the selector criteria.
  411. :Parameters:
  412. - `replacement` (dict): the replacement document
  413. """
  414. self.__bulk.add_replace(self.__selector, replacement, upsert=True)
  415. class BulkWriteOperation(object):
  416. """An interface for adding update or remove operations.
  417. """
  418. __slots__ = ('__selector', '__bulk')
  419. def __init__(self, selector, bulk):
  420. self.__selector = selector
  421. self.__bulk = bulk
  422. def update_one(self, update):
  423. """Update one document matching the selector criteria.
  424. :Parameters:
  425. - `update` (dict): the update operations to apply
  426. """
  427. self.__bulk.add_update(self.__selector, update, multi=False)
  428. def update(self, update):
  429. """Update all documents matching the selector criteria.
  430. :Parameters:
  431. - `update` (dict): the update operations to apply
  432. """
  433. self.__bulk.add_update(self.__selector, update, multi=True)
  434. def replace_one(self, replacement):
  435. """Replace one entire document matching the selector criteria.
  436. :Parameters:
  437. - `replacement` (dict): the replacement document
  438. """
  439. self.__bulk.add_replace(self.__selector, replacement)
  440. def remove_one(self):
  441. """Remove a single document matching the selector criteria.
  442. """
  443. self.__bulk.add_delete(self.__selector, _DELETE_ONE)
  444. def remove(self):
  445. """Remove all documents matching the selector criteria.
  446. """
  447. self.__bulk.add_delete(self.__selector, _DELETE_ALL)
  448. def upsert(self):
  449. """Specify that all chained update operations should be
  450. upserts.
  451. :Returns:
  452. - A :class:`BulkUpsertOperation` instance, used to add
  453. update operations to this bulk operation.
  454. """
  455. return BulkUpsertOperation(self.__selector, self.__bulk)
  456. class BulkOperationBuilder(object):
  457. """An interface for executing a batch of write operations.
  458. """
  459. __slots__ = '__bulk'
  460. def __init__(self, collection, ordered=True):
  461. """Initialize a new BulkOperationBuilder instance.
  462. :Parameters:
  463. - `collection`: A :class:`~pymongo.collection.Collection` instance.
  464. - `ordered` (optional): If ``True`` all operations will be executed
  465. serially, in the order provided, and the entire execution will
  466. abort on the first error. If ``False`` operations will be executed
  467. in arbitrary order (possibly in parallel on the server), reporting
  468. any errors that occurred after attempting all operations. Defaults
  469. to ``True``.
  470. """
  471. self.__bulk = _Bulk(collection, ordered)
  472. def find(self, selector):
  473. """Specify selection criteria for bulk operations.
  474. :Parameters:
  475. - `selector` (dict): the selection criteria for update
  476. and remove operations.
  477. :Returns:
  478. - A :class:`BulkWriteOperation` instance, used to add
  479. update and remove operations to this bulk operation.
  480. """
  481. validate_is_mapping("selector", selector)
  482. return BulkWriteOperation(selector, self.__bulk)
  483. def insert(self, document):
  484. """Insert a single document.
  485. :Parameters:
  486. - `document` (dict): the document to insert
  487. """
  488. self.__bulk.add_insert(document)
  489. def execute(self, write_concern=None):
  490. """Execute all provided operations.
  491. :Parameters:
  492. - write_concern (optional): the write concern for this bulk
  493. execution.
  494. """
  495. if write_concern is not None:
  496. validate_is_mapping("write_concern", write_concern)
  497. return self.__bulk.execute(write_concern)