connection: release request ids after failed sends#874
Conversation
32e8008 to
856efaa
Compare
| try: | ||
| msg = encoder(msg, request_id, self.protocol_version, compressor=self.compressor, | ||
| allow_beta_protocol_version=self.allow_beta_protocol_version) | ||
|
|
||
| if self._is_checksumming_enabled: | ||
| buffer = io.BytesIO() | ||
| self._segment_codec.encode(buffer, msg) | ||
| msg = buffer.getvalue() | ||
| if self._is_checksumming_enabled: | ||
| buffer = io.BytesIO() | ||
| self._segment_codec.encode(buffer, msg) | ||
| msg = buffer.getvalue() | ||
|
|
||
| self.push(msg) | ||
| self.push(msg) | ||
| except Exception: | ||
| self._requests.pop(request_id, None) | ||
| raise | ||
| return len(msg) | ||
|
|
||
| def wait_for_response(self, msg, timeout=None, **kwargs): | ||
| return self.wait_for_responses(msg, timeout=timeout, **kwargs)[0] | ||
|
|
||
| def wait_for_responses(self, *msgs, **kwargs): | ||
| """ | ||
| Returns a list of (success, response) tuples. If success | ||
| is False, response will be an Exception. Otherwise, response | ||
| will be the normal query response. | ||
|
|
||
| If fail_on_error was left as True and one of the requests | ||
| failed, the corresponding Exception will be raised. | ||
| """ | ||
| if self.is_closed or self.is_defunct: | ||
| msg = "Connection %s is already closed" % (self,) | ||
| if self.last_error: | ||
| msg += ": %s" % (self.last_error,) | ||
| raise ConnectionShutdown(msg) | ||
| timeout = kwargs.get('timeout') | ||
| original_timeout = timeout # preserve for exception reporting | ||
| fail_on_error = kwargs.get('fail_on_error', True) | ||
| waiter = ResponseWaiter(self, len(msgs), fail_on_error) | ||
|
|
||
| # busy wait for sufficient space on the connection | ||
| messages_sent = 0 | ||
| while True: | ||
| needed = len(msgs) - messages_sent | ||
| with self.lock: | ||
| available = min(needed, self.max_request_id - self.in_flight + 1) | ||
| request_ids = [self.get_request_id() for _ in range(available)] | ||
| self.in_flight += available | ||
|
|
||
| for i, request_id in enumerate(request_ids): | ||
| self.send_msg(msgs[messages_sent + i], | ||
| request_id, | ||
| partial(waiter.got_response, index=messages_sent + i)) | ||
| try: | ||
| self.send_msg(msgs[messages_sent + i], | ||
| request_id, | ||
| partial(waiter.got_response, index=messages_sent + i)) | ||
| except Exception: | ||
| unsent_request_ids = request_ids[i:] | ||
| with self.lock: | ||
| self.in_flight -= len(unsent_request_ids) | ||
| self.request_ids.extend(unsent_request_ids) | ||
| raise | ||
| messages_sent += available |
There was a problem hiding this comment.
There are now multiple PRs regarding request_ids, in_flight etc.
It is incredible that we need to ever worry about this stuff.
Why is it even responsibility of the caller to adjust those values?
Connection should have a method for sending request. This method should be responsible for managing in_flight, request_ids and other state of Connection. Callers should never worry about that.
This is the only sane solution, and anything else will just require fixing callsites forever.
There was a problem hiding this comment.
And yes I know this is a code in connection. But you also have PRs for e.g. hearbeats. Heartbeats should never need to touch this stuff.
There was a problem hiding this comment.
Acknowledged. I removed the async keyspace cleanup from this branch, so this PR is now scoped to the concrete send-failure leak only. The broader connection-level helper/refactor can stay as a separate follow-up.
|
As I said on the hearbeat PR - maybe the |
Fixes #873. Reclaims request ids when send_msg fails and covers the async keyspace path with unit tests.