command_cursor.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # Copyright 2014-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """CommandCursor class to iterate over command results."""
  15. from collections import deque
  16. from bson.py3compat import integer_types
  17. from pymongo import helpers
  18. from pymongo.errors import AutoReconnect, CursorNotFound, NotMasterError
  19. from pymongo.message import _GetMore
  20. class CommandCursor(object):
  21. """A cursor / iterator over command cursors.
  22. """
  23. def __init__(self, collection, cursor_info, address, retrieved=0):
  24. """Create a new command cursor.
  25. """
  26. self.__collection = collection
  27. self.__id = cursor_info['id']
  28. self.__address = address
  29. self.__data = deque(cursor_info['firstBatch'])
  30. self.__retrieved = retrieved
  31. self.__batch_size = 0
  32. self.__killed = (self.__id == 0)
  33. if "ns" in cursor_info:
  34. self.__ns = cursor_info["ns"]
  35. else:
  36. self.__ns = collection.full_name
  37. def __del__(self):
  38. if self.__id and not self.__killed:
  39. self.__die()
  40. def __die(self):
  41. """Closes this cursor.
  42. """
  43. if self.__id and not self.__killed:
  44. self.__collection.database.client.close_cursor(self.__id,
  45. self.__address)
  46. self.__killed = True
  47. def close(self):
  48. """Explicitly close / kill this cursor. Required for PyPy, Jython and
  49. other Python implementations that don't use reference counting
  50. garbage collection.
  51. """
  52. self.__die()
  53. def batch_size(self, batch_size):
  54. """Limits the number of documents returned in one batch. Each batch
  55. requires a round trip to the server. It can be adjusted to optimize
  56. performance and limit data transfer.
  57. .. note:: batch_size can not override MongoDB's internal limits on the
  58. amount of data it will return to the client in a single batch (i.e
  59. if you set batch size to 1,000,000,000, MongoDB will currently only
  60. return 4-16MB of results per batch).
  61. Raises :exc:`TypeError` if `batch_size` is not an integer.
  62. Raises :exc:`ValueError` if `batch_size` is less than ``0``.
  63. :Parameters:
  64. - `batch_size`: The size of each batch of results requested.
  65. """
  66. if not isinstance(batch_size, integer_types):
  67. raise TypeError("batch_size must be an integer")
  68. if batch_size < 0:
  69. raise ValueError("batch_size must be >= 0")
  70. self.__batch_size = batch_size == 1 and 2 or batch_size
  71. return self
  72. def __send_message(self, operation):
  73. """Send a getmore message and handle the response.
  74. """
  75. client = self.__collection.database.client
  76. try:
  77. response = client._send_message_with_response(
  78. operation, address=self.__address)
  79. except AutoReconnect:
  80. # Don't try to send kill cursors on another socket
  81. # or to another server. It can cause a _pinValue
  82. # assertion on some server releases if we get here
  83. # due to a socket timeout.
  84. self.__killed = True
  85. raise
  86. try:
  87. doc = helpers._unpack_response(response.data,
  88. self.__id,
  89. self.__collection.codec_options)
  90. except CursorNotFound:
  91. self.__killed = True
  92. raise
  93. except NotMasterError:
  94. # Don't send kill cursors to another server after a "not master"
  95. # error. It's completely pointless.
  96. self.__killed = True
  97. client._reset_server_and_request_check(self.address)
  98. raise
  99. self.__id = doc["cursor_id"]
  100. if self.__id == 0:
  101. self.__killed = True
  102. assert doc["starting_from"] == self.__retrieved, (
  103. "Result batch started from %s, expected %s" % (
  104. doc['starting_from'], self.__retrieved))
  105. self.__retrieved += doc["number_returned"]
  106. self.__data = deque(doc["data"])
  107. def _refresh(self):
  108. """Refreshes the cursor with more data from the server.
  109. Returns the length of self.__data after refresh. Will exit early if
  110. self.__data is already non-empty. Raises OperationFailure when the
  111. cursor cannot be refreshed due to an error on the query.
  112. """
  113. if len(self.__data) or self.__killed:
  114. return len(self.__data)
  115. if self.__id: # Get More
  116. self.__send_message(
  117. _GetMore(self.__ns, self.__batch_size, self.__id))
  118. else: # Cursor id is zero nothing else to return
  119. self.__killed = True
  120. return len(self.__data)
  121. @property
  122. def alive(self):
  123. """Does this cursor have the potential to return more data?
  124. Even if :attr:`alive` is ``True``, :meth:`next` can raise
  125. :exc:`StopIteration`. Best to use a for loop::
  126. for doc in collection.aggregate(pipeline):
  127. print(doc)
  128. .. note:: :attr:`alive` can be True while iterating a cursor from
  129. a failed server. In this case :attr:`alive` will return False after
  130. :meth:`next` fails to retrieve the next batch of results from the
  131. server.
  132. """
  133. return bool(len(self.__data) or (not self.__killed))
  134. @property
  135. def cursor_id(self):
  136. """Returns the id of the cursor."""
  137. return self.__id
  138. @property
  139. def address(self):
  140. """The (host, port) of the server used, or None.
  141. .. versionadded:: 3.0
  142. """
  143. return self.__address
  144. def __iter__(self):
  145. return self
  146. def next(self):
  147. """Advance the cursor."""
  148. if len(self.__data) or self._refresh():
  149. coll = self.__collection
  150. return coll.database._fix_outgoing(self.__data.popleft(), coll)
  151. else:
  152. raise StopIteration
  153. __next__ = next
  154. def __enter__(self):
  155. return self
  156. def __exit__(self, exc_type, exc_val, exc_tb):
  157. self.__die()