network.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # Copyright 2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Internal network layer helper methods."""
  15. import select
  16. import struct
  17. from pymongo import helpers, message
  18. from pymongo.errors import AutoReconnect
  19. _UNPACK_INT = struct.Struct("<i").unpack
  20. def command(sock, dbname, spec, slave_ok, is_mongos, read_preference,
  21. codec_options, check=True, allowable_errors=None):
  22. """Execute a command over the socket, or raise socket.error.
  23. :Parameters:
  24. - `sock`: a raw socket instance
  25. - `dbname`: name of the database on which to run the command
  26. - `spec`: a command document as a dict, SON, or mapping object
  27. - `slave_ok`: whether to set the SlaveOkay wire protocol bit
  28. - `is_mongos`: are we connected to a mongos?
  29. - `read_preference`: a read preference
  30. - `codec_options`: a CodecOptions instance
  31. - `check`: raise OperationFailure if there are errors
  32. - `allowable_errors`: errors to ignore if `check` is True
  33. """
  34. ns = dbname + '.$cmd'
  35. flags = 4 if slave_ok else 0
  36. if is_mongos:
  37. spec = message._maybe_add_read_preference(spec, read_preference)
  38. request_id, msg, _ = message.query(flags, ns, 0, -1, spec,
  39. None, codec_options)
  40. sock.sendall(msg)
  41. response = receive_message(sock, 1, request_id)
  42. unpacked = helpers._unpack_response(response, codec_options=codec_options)
  43. response_doc = unpacked['data'][0]
  44. msg = "command %s on namespace %s failed: %%s" % (
  45. repr(spec).replace("%", "%%"), ns)
  46. if check:
  47. helpers._check_command_response(response_doc, msg, allowable_errors)
  48. return response_doc
  49. def receive_message(sock, operation, request_id):
  50. """Receive a raw BSON message or raise socket.error."""
  51. header = _receive_data_on_socket(sock, 16)
  52. length = _UNPACK_INT(header[:4])[0]
  53. # No request_id for exhaust cursor "getMore".
  54. if request_id is not None:
  55. response_id = _UNPACK_INT(header[8:12])[0]
  56. assert request_id == response_id, "ids don't match %r %r" % (
  57. request_id, response_id)
  58. assert operation == _UNPACK_INT(header[12:])[0]
  59. return _receive_data_on_socket(sock, length - 16)
  60. def _receive_data_on_socket(sock, length):
  61. msg = b""
  62. while length:
  63. chunk = sock.recv(length)
  64. if chunk == b"":
  65. raise AutoReconnect("connection closed")
  66. length -= len(chunk)
  67. msg += chunk
  68. return msg
  69. def socket_closed(sock):
  70. """Return True if we know socket has been closed, False otherwise.
  71. """
  72. try:
  73. rd, _, _ = select.select([sock], [], [], 0)
  74. # Any exception here is equally bad (select.error, ValueError, etc.).
  75. except:
  76. return True
  77. return len(rd) > 0