server.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # Copyright 2009-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Communicate with one MongoDB server in a topology."""
  15. import contextlib
  16. from pymongo.response import Response, ExhaustResponse
  17. from pymongo.server_type import SERVER_TYPE
  18. class Server(object):
  19. def __init__(self, server_description, pool, monitor):
  20. """Represent one MongoDB server."""
  21. self._description = server_description
  22. self._pool = pool
  23. self._monitor = monitor
  24. def open(self):
  25. """Start monitoring, or restart after a fork.
  26. Multiple calls have no effect.
  27. """
  28. self._monitor.open()
  29. def reset(self):
  30. """Clear the connection pool."""
  31. self.pool.reset()
  32. def close(self):
  33. """Clear the connection pool and stop the monitor.
  34. Reconnect with open().
  35. """
  36. self._monitor.close()
  37. self._pool.reset()
  38. def request_check(self):
  39. """Check the server's state soon."""
  40. self._monitor.request_check()
  41. def send_message(self, message, all_credentials):
  42. """Send an unacknowledged message to MongoDB.
  43. Can raise ConnectionFailure.
  44. :Parameters:
  45. - `message`: (request_id, data).
  46. - `all_credentials`: dict, maps auth source to MongoCredential.
  47. """
  48. request_id, data, max_doc_size = self._split_message(message)
  49. with self.get_socket(all_credentials) as sock_info:
  50. sock_info.send_message(data, max_doc_size)
  51. def send_message_with_response(
  52. self,
  53. operation,
  54. set_slave_okay,
  55. all_credentials,
  56. exhaust=False):
  57. """Send a message to MongoDB and return a Response object.
  58. Can raise ConnectionFailure.
  59. :Parameters:
  60. - `operation`: A _Query or _GetMore object.
  61. - `set_slave_okay`: Pass to operation.get_message.
  62. - `all_credentials`: dict, maps auth source to MongoCredential.
  63. - `exhaust` (optional): If True, the socket used stays checked out.
  64. It is returned along with its Pool in the Response.
  65. """
  66. with self.get_socket(all_credentials, exhaust) as sock_info:
  67. message = operation.get_message(
  68. set_slave_okay, sock_info.is_mongos)
  69. request_id, data, max_doc_size = self._split_message(message)
  70. sock_info.send_message(data, max_doc_size)
  71. response_data = sock_info.receive_message(1, request_id)
  72. if exhaust:
  73. return ExhaustResponse(
  74. data=response_data,
  75. address=self._description.address,
  76. socket_info=sock_info,
  77. pool=self._pool)
  78. else:
  79. return Response(
  80. data=response_data,
  81. address=self._description.address)
  82. @contextlib.contextmanager
  83. def get_socket(self, all_credentials, checkout=False):
  84. with self.pool.get_socket(all_credentials, checkout) as sock_info:
  85. yield sock_info
  86. @property
  87. def description(self):
  88. return self._description
  89. @description.setter
  90. def description(self, server_description):
  91. assert server_description.address == self._description.address
  92. self._description = server_description
  93. @property
  94. def pool(self):
  95. return self._pool
  96. def _split_message(self, message):
  97. """Return request_id, data, max_doc_size.
  98. :Parameters:
  99. - `message`: (request_id, data, max_doc_size) or (request_id, data)
  100. """
  101. if len(message) == 3:
  102. return message
  103. else:
  104. # get_more and kill_cursors messages don't include BSON documents.
  105. request_id, data = message
  106. return request_id, data, 0
  107. def __str__(self):
  108. d = self._description
  109. return '<Server "%s:%s" %s>' % (
  110. d.address[0], d.address[1],
  111. SERVER_TYPE._fields[d.server_type])