Skip to content

Commit b4b809d

Browse files
authored
fix: remove expired ack_ids (#787)
1 parent f3ebbae commit b4b809d

File tree

2 files changed

+66
-27
lines changed

2 files changed

+66
-27
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,9 @@ def _get_initial_request(
988988
# Return the initial request.
989989
return request
990990

991-
def _send_lease_modacks(self, ack_ids: Iterable[str], ack_deadline: float):
991+
def _send_lease_modacks(
992+
self, ack_ids: Iterable[str], ack_deadline: float
993+
) -> List[str]:
992994
exactly_once_enabled = False
993995
with self._exactly_once_enabled_lock:
994996
exactly_once_enabled = self._exactly_once_enabled
@@ -1002,22 +1004,27 @@ def _send_lease_modacks(self, ack_ids: Iterable[str], ack_deadline: float):
10021004
assert self._dispatcher is not None
10031005
self._dispatcher.modify_ack_deadline(items)
10041006

1007+
expired_ack_ids = []
10051008
for req in items:
10061009
try:
10071010
assert req.future is not None
10081011
req.future.result()
1009-
except AcknowledgeError:
1012+
except AcknowledgeError as ack_error:
10101013
_LOGGER.warning(
10111014
"AcknowledgeError when lease-modacking a message.",
10121015
exc_info=True,
10131016
)
1017+
if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID:
1018+
expired_ack_ids.append(req.ack_id)
1019+
return expired_ack_ids
10141020
else:
10151021
items = [
10161022
requests.ModAckRequest(ack_id, self.ack_deadline, None)
10171023
for ack_id in ack_ids
10181024
]
10191025
assert self._dispatcher is not None
10201026
self._dispatcher.modify_ack_deadline(items)
1027+
return []
10211028

10221029
def _exactly_once_delivery_enabled(self) -> bool:
10231030
"""Whether exactly-once delivery is enabled for the subscription."""
@@ -1071,28 +1078,32 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
10711078
# modack the messages we received, as this tells the server that we've
10721079
# received them.
10731080
ack_id_gen = (message.ack_id for message in received_messages)
1074-
self._send_lease_modacks(ack_id_gen, self.ack_deadline)
1081+
expired_ack_ids = set(self._send_lease_modacks(ack_id_gen, self.ack_deadline))
10751082

10761083
with self._pause_resume_lock:
10771084
assert self._scheduler is not None
10781085
assert self._leaser is not None
10791086

10801087
for received_message in received_messages:
1081-
message = google.cloud.pubsub_v1.subscriber.message.Message(
1082-
received_message.message,
1083-
received_message.ack_id,
1084-
received_message.delivery_attempt,
1085-
self._scheduler.queue,
1086-
self._exactly_once_delivery_enabled,
1087-
)
1088-
self._messages_on_hold.put(message)
1089-
self._on_hold_bytes += message.size
1090-
req = requests.LeaseRequest(
1091-
ack_id=message.ack_id,
1092-
byte_size=message.size,
1093-
ordering_key=message.ordering_key,
1094-
)
1095-
self._leaser.add([req])
1088+
if (
1089+
not self._exactly_once_delivery_enabled()
1090+
or received_message.ack_id not in expired_ack_ids
1091+
):
1092+
message = google.cloud.pubsub_v1.subscriber.message.Message(
1093+
received_message.message,
1094+
received_message.ack_id,
1095+
received_message.delivery_attempt,
1096+
self._scheduler.queue,
1097+
self._exactly_once_delivery_enabled,
1098+
)
1099+
self._messages_on_hold.put(message)
1100+
self._on_hold_bytes += message.size
1101+
req = requests.LeaseRequest(
1102+
ack_id=message.ack_id,
1103+
byte_size=message.size,
1104+
ordering_key=message.ordering_key,
1105+
)
1106+
self._leaser.add([req])
10961107

10971108
self._maybe_release_messages()
10981109

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,6 +1125,7 @@ def test_heartbeat_stream_ack_deadline_seconds(caplog):
11251125
"google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True
11261126
)
11271127
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
1128+
11281129
manager = make_manager()
11291130

11301131
with mock.patch.object(
@@ -1852,11 +1853,18 @@ def test__on_response_exactly_once_immediate_modacks_fail():
18521853
def complete_futures_with_error(*args, **kwargs):
18531854
modack_requests = args[0]
18541855
for req in modack_requests:
1855-
req.future.set_exception(
1856-
subscriber_exceptions.AcknowledgeError(
1857-
subscriber_exceptions.AcknowledgeStatus.SUCCESS, None
1856+
if req.ack_id == "fack":
1857+
req.future.set_exception(
1858+
subscriber_exceptions.AcknowledgeError(
1859+
subscriber_exceptions.AcknowledgeStatus.INVALID_ACK_ID, None
1860+
)
1861+
)
1862+
else:
1863+
req.future.set_exception(
1864+
subscriber_exceptions.AcknowledgeError(
1865+
subscriber_exceptions.AcknowledgeStatus.SUCCESS, None
1866+
)
18581867
)
1859-
)
18601868

18611869
dispatcher.modify_ack_deadline.side_effect = complete_futures_with_error
18621870

@@ -1866,19 +1874,39 @@ def complete_futures_with_error(*args, **kwargs):
18661874
gapic_types.ReceivedMessage(
18671875
ack_id="fack",
18681876
message=gapic_types.PubsubMessage(data=b"foo", message_id="1"),
1869-
)
1877+
),
1878+
gapic_types.ReceivedMessage(
1879+
ack_id="good",
1880+
message=gapic_types.PubsubMessage(data=b"foo", message_id="2"),
1881+
),
18701882
],
18711883
subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties(
18721884
exactly_once_delivery_enabled=True
18731885
),
18741886
)
18751887

1876-
# adjust message bookkeeping in leaser
1877-
fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42)
1888+
# Actually run the method and prove that modack and schedule are called in
1889+
# the expected way.
1890+
1891+
fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10)
18781892

1879-
# exactly_once should be enabled
18801893
manager._on_response(response)
1881-
# exceptions are logged, but otherwise no effect
1894+
1895+
# The second messages should be scheduled, and not the first.
1896+
1897+
schedule_calls = scheduler.schedule.mock_calls
1898+
assert len(schedule_calls) == 1
1899+
call_args = schedule_calls[0][1]
1900+
assert call_args[0] == mock.sentinel.callback
1901+
assert isinstance(call_args[1], message.Message)
1902+
assert call_args[1].message_id == "2"
1903+
1904+
assert manager._messages_on_hold.size == 0
1905+
# No messages available
1906+
assert manager._messages_on_hold.get() is None
1907+
1908+
# do not add message
1909+
assert manager.load == 0.001
18821910

18831911

18841912
def test__should_recover_true():

0 commit comments

Comments
 (0)