Skip to content

Commit f067af3

Browse files
authored
fix: remove suboptimal logic in leasing behavior (#816)
* fix: subtract time spent leasing from max snooze value * Revert "fix: subtract time spent leasing from max snooze value" This reverts commit 01f7ff4. * fix: remove suboptimal list operations in leasing * remove typing * add default_deadline as separate argument to send_unary_modack * remove unused import * fix test_streaming_pull_manager * fix test_streaming_pull_manager lint * drop expired_ack_ids from lease management * add return value to _send_lease_modacks in unit tests * remove unused import * addressing comments * fix comment * fix modify_deadline_seconds generator * fix modify_deadline_seconds generator * fix subscripting in streaming_pull_manager * fix mypy checks * fix mypy checks * fix lint
1 parent 10cfc05 commit f067af3

File tree

6 files changed

+228
-56
lines changed

6 files changed

+228
-56
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,11 @@ def lease(self, items: Sequence[requests.LeaseRequest]) -> None:
319319
self._manager.leaser.add(items)
320320
self._manager.maybe_pause_consumer()
321321

322-
def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None:
322+
def modify_ack_deadline(
323+
self,
324+
items: Sequence[requests.ModAckRequest],
325+
default_deadline: Optional[float] = None,
326+
) -> None:
323327
"""Modify the ack deadline for the given messages.
324328
325329
Args:
@@ -337,16 +341,28 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None:
337341
req.ack_id: req
338342
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
339343
}
340-
# no further work needs to be done for `requests_to_retry`
341-
requests_completed, requests_to_retry = self._manager.send_unary_modack(
342-
modify_deadline_ack_ids=list(
343-
itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)
344-
),
345-
modify_deadline_seconds=list(
346-
itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE)
347-
),
348-
ack_reqs_dict=ack_reqs_dict,
349-
)
344+
requests_to_retry: List[requests.ModAckRequest]
345+
if default_deadline is None:
346+
# no further work needs to be done for `requests_to_retry`
347+
_, requests_to_retry = self._manager.send_unary_modack(
348+
modify_deadline_ack_ids=list(
349+
itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)
350+
),
351+
modify_deadline_seconds=list(
352+
itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE)
353+
),
354+
ack_reqs_dict=ack_reqs_dict,
355+
default_deadline=None,
356+
)
357+
else:
358+
_, requests_to_retry = self._manager.send_unary_modack(
359+
modify_deadline_ack_ids=itertools.islice(
360+
ack_ids_gen, _ACK_IDS_BATCH_SIZE
361+
),
362+
modify_deadline_seconds=None,
363+
ack_reqs_dict=ack_reqs_dict,
364+
default_deadline=default_deadline,
365+
)
350366
assert (
351367
len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
352368
), "Too many requests to be retried."

google/cloud/pubsub_v1/subscriber/_protocol/leaser.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ def maintain_leases(self) -> None:
187187
# We do not actually call `modify_ack_deadline` over and over
188188
# because it is more efficient to make a single request.
189189
ack_ids = leased_messages.keys()
190+
expired_ack_ids = set()
190191
if ack_ids:
191192
_LOGGER.debug("Renewing lease for %d ack IDs.", len(ack_ids))
192193

@@ -197,8 +198,25 @@ def maintain_leases(self) -> None:
197198
# is inactive.
198199
assert self._manager.dispatcher is not None
199200
ack_id_gen = (ack_id for ack_id in ack_ids)
200-
self._manager._send_lease_modacks(ack_id_gen, deadline)
201+
expired_ack_ids = self._manager._send_lease_modacks(
202+
ack_id_gen, deadline
203+
)
201204

205+
start_time = time.time()
206+
# If exactly once delivery is enabled, we should drop all expired ack_ids from lease management.
207+
if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids):
208+
assert self._manager.dispatcher is not None
209+
self._manager.dispatcher.drop(
210+
[
211+
requests.DropRequest(
212+
ack_id,
213+
leased_messages.get(ack_id).size, # type: ignore
214+
leased_messages.get(ack_id).ordering_key, # type: ignore
215+
)
216+
for ack_id in expired_ack_ids
217+
if ack_id in leased_messages
218+
]
219+
)
202220
# Now wait an appropriate period of time and do this again.
203221
#
204222
# We determine the appropriate period of time based on a random
@@ -208,7 +226,10 @@ def maintain_leases(self) -> None:
208226
# This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server.
209227
# This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases
210228
# where there are many clients.
211-
snooze = random.uniform(_MAX_BATCH_LATENCY, deadline * 0.9)
229+
# If we spent any time iterating over expired acks, we should subtract this from the deadline.
230+
snooze = random.uniform(
231+
_MAX_BATCH_LATENCY, (deadline * 0.9 - (time.time() - start_time))
232+
)
212233
_LOGGER.debug("Snoozing lease management for %f seconds.", snooze)
213234
self._stop_event.wait(timeout=snooze)
214235

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import logging
2121
import threading
2222
import typing
23-
from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple
23+
from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple
2424
import uuid
2525

2626
import grpc # type: ignore
@@ -686,30 +686,44 @@ def send_unary_ack(
686686
return requests_completed, requests_to_retry
687687

688688
def send_unary_modack(
689-
self, modify_deadline_ack_ids, modify_deadline_seconds, ack_reqs_dict
689+
self,
690+
modify_deadline_ack_ids,
691+
modify_deadline_seconds,
692+
ack_reqs_dict,
693+
default_deadline=None,
690694
) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]:
691695
"""Send a request using a separate unary request instead of over the stream.
692696
693697
If a RetryError occurs, the manager shutdown is triggered, and the
694698
error is re-raised.
695699
"""
696700
assert modify_deadline_ack_ids
701+
# Either we have a generator or a single deadline.
702+
assert modify_deadline_seconds is None or default_deadline is None
697703

698704
error_status = None
699705
modack_errors_dict = None
700706
try:
701-
# Send ack_ids with the same deadline seconds together.
702-
deadline_to_ack_ids = collections.defaultdict(list)
703-
704-
for n, ack_id in enumerate(modify_deadline_ack_ids):
705-
deadline = modify_deadline_seconds[n]
706-
deadline_to_ack_ids[deadline].append(ack_id)
707-
708-
for deadline, ack_ids in deadline_to_ack_ids.items():
707+
if default_deadline is None:
708+
# Send ack_ids with the same deadline seconds together.
709+
deadline_to_ack_ids = collections.defaultdict(list)
710+
711+
for n, ack_id in enumerate(modify_deadline_ack_ids):
712+
deadline = modify_deadline_seconds[n]
713+
deadline_to_ack_ids[deadline].append(ack_id)
714+
715+
for deadline, ack_ids in deadline_to_ack_ids.items():
716+
self._client.modify_ack_deadline(
717+
subscription=self._subscription,
718+
ack_ids=ack_ids,
719+
ack_deadline_seconds=deadline,
720+
)
721+
else:
722+
# We can send all requests with the default deadline.
709723
self._client.modify_ack_deadline(
710724
subscription=self._subscription,
711-
ack_ids=ack_ids,
712-
ack_deadline_seconds=deadline,
725+
ack_ids=modify_deadline_ack_ids,
726+
ack_deadline_seconds=default_deadline,
713727
)
714728
except exceptions.GoogleAPICallError as exc:
715729
_LOGGER.debug(
@@ -990,21 +1004,20 @@ def _get_initial_request(
9901004

9911005
def _send_lease_modacks(
9921006
self, ack_ids: Iterable[str], ack_deadline: float, warn_on_invalid=True
993-
) -> List[str]:
1007+
) -> Set[str]:
9941008
exactly_once_enabled = False
9951009
with self._exactly_once_enabled_lock:
9961010
exactly_once_enabled = self._exactly_once_enabled
9971011
if exactly_once_enabled:
998-
items = []
999-
for ack_id in ack_ids:
1000-
future = futures.Future()
1001-
request = requests.ModAckRequest(ack_id, ack_deadline, future)
1002-
items.append(request)
1012+
items = [
1013+
requests.ModAckRequest(ack_id, ack_deadline, futures.Future())
1014+
for ack_id in ack_ids
1015+
]
10031016

10041017
assert self._dispatcher is not None
1005-
self._dispatcher.modify_ack_deadline(items)
1018+
self._dispatcher.modify_ack_deadline(items, ack_deadline)
10061019

1007-
expired_ack_ids = []
1020+
expired_ack_ids = set()
10081021
for req in items:
10091022
try:
10101023
assert req.future is not None
@@ -1019,16 +1032,16 @@ def _send_lease_modacks(
10191032
exc_info=True,
10201033
)
10211034
if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID:
1022-
expired_ack_ids.append(req.ack_id)
1035+
expired_ack_ids.add(req.ack_id)
10231036
return expired_ack_ids
10241037
else:
10251038
items = [
10261039
requests.ModAckRequest(ack_id, self.ack_deadline, None)
10271040
for ack_id in ack_ids
10281041
]
10291042
assert self._dispatcher is not None
1030-
self._dispatcher.modify_ack_deadline(items)
1031-
return []
1043+
self._dispatcher.modify_ack_deadline(items, ack_deadline)
1044+
return set()
10321045

10331046
def _exactly_once_delivery_enabled(self) -> bool:
10341047
"""Whether exactly-once delivery is enabled for the subscription."""
@@ -1082,10 +1095,8 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
10821095
# modack the messages we received, as this tells the server that we've
10831096
# received them.
10841097
ack_id_gen = (message.ack_id for message in received_messages)
1085-
expired_ack_ids = set(
1086-
self._send_lease_modacks(
1087-
ack_id_gen, self.ack_deadline, warn_on_invalid=False
1088-
)
1098+
expired_ack_ids = self._send_lease_modacks(
1099+
ack_id_gen, self.ack_deadline, warn_on_invalid=False
10891100
)
10901101

10911102
with self._pause_resume_lock:

tests/unit/pubsub_v1/subscriber/test_dispatcher.py

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -645,16 +645,20 @@ def test_nack():
645645
]
646646
manager.send_unary_modack.return_value = (items, [])
647647
dispatcher_.nack(items)
648+
calls = manager.send_unary_modack.call_args_list
649+
assert len(calls) == 1
648650

649-
manager.send_unary_modack.assert_called_once_with(
650-
modify_deadline_ack_ids=["ack_id_string"],
651-
modify_deadline_seconds=[0],
652-
ack_reqs_dict={
651+
for call in calls:
652+
modify_deadline_ack_ids = call[1]["modify_deadline_ack_ids"]
653+
assert list(modify_deadline_ack_ids) == ["ack_id_string"]
654+
modify_deadline_seconds = call[1]["modify_deadline_seconds"]
655+
assert list(modify_deadline_seconds) == [0]
656+
ack_reqs_dict = call[1]["ack_reqs_dict"]
657+
assert ack_reqs_dict == {
653658
"ack_id_string": requests.ModAckRequest(
654659
ack_id="ack_id_string", seconds=0, future=None
655660
)
656-
},
657-
)
661+
}
658662

659663

660664
def test_modify_ack_deadline():
@@ -666,12 +670,16 @@ def test_modify_ack_deadline():
666670
items = [requests.ModAckRequest(ack_id="ack_id_string", seconds=60, future=None)]
667671
manager.send_unary_modack.return_value = (items, [])
668672
dispatcher_.modify_ack_deadline(items)
673+
calls = manager.send_unary_modack.call_args_list
674+
assert len(calls) == 1
669675

670-
manager.send_unary_modack.assert_called_once_with(
671-
modify_deadline_ack_ids=["ack_id_string"],
672-
modify_deadline_seconds=[60],
673-
ack_reqs_dict={"ack_id_string": items[0]},
674-
)
676+
for call in calls:
677+
modify_deadline_ack_ids = call[1]["modify_deadline_ack_ids"]
678+
assert list(modify_deadline_ack_ids) == ["ack_id_string"]
679+
modify_deadline_seconds = call[1]["modify_deadline_seconds"]
680+
assert list(modify_deadline_seconds) == [60]
681+
ack_reqs_dict = call[1]["ack_reqs_dict"]
682+
assert ack_reqs_dict == {"ack_id_string": items[0]}
675683

676684

677685
def test_modify_ack_deadline_splitting_large_payload():
@@ -695,14 +703,47 @@ def test_modify_ack_deadline_splitting_large_payload():
695703
sent_ack_ids = collections.Counter()
696704

697705
for call in calls:
698-
modack_ackids = call[1]["modify_deadline_ack_ids"]
706+
modack_ackids = list(call[1]["modify_deadline_ack_ids"])
699707
assert len(modack_ackids) <= dispatcher._ACK_IDS_BATCH_SIZE
700708
sent_ack_ids.update(modack_ackids)
701709

702710
assert set(sent_ack_ids) == all_ack_ids # all messages should have been MODACK-ed
703711
assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once
704712

705713

714+
def test_modify_ack_deadline_splitting_large_payload_with_default_deadline():
715+
manager = mock.create_autospec(
716+
streaming_pull_manager.StreamingPullManager, instance=True
717+
)
718+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
719+
720+
items = [
721+
# use realistic lengths for ACK IDs (max 176 bytes)
722+
requests.ModAckRequest(ack_id=str(i).zfill(176), seconds=60, future=None)
723+
for i in range(5001)
724+
]
725+
manager.send_unary_modack.return_value = (items, [])
726+
dispatcher_.modify_ack_deadline(items, 60)
727+
728+
calls = manager.send_unary_modack.call_args_list
729+
assert len(calls) == 6
730+
731+
all_ack_ids = {item.ack_id for item in items}
732+
sent_ack_ids = collections.Counter()
733+
734+
for call in calls:
735+
modack_ackids = list(call[1]["modify_deadline_ack_ids"])
736+
modack_deadline_seconds = call[1]["modify_deadline_seconds"]
737+
default_deadline = call[1]["default_deadline"]
738+
assert len(list(modack_ackids)) <= dispatcher._ACK_IDS_BATCH_SIZE
739+
assert modack_deadline_seconds is None
740+
assert default_deadline == 60
741+
sent_ack_ids.update(modack_ackids)
742+
743+
assert set(sent_ack_ids) == all_ack_ids # all messages should have been MODACK-ed
744+
assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once
745+
746+
706747
@mock.patch("threading.Thread", autospec=True)
707748
def test_start(thread):
708749
manager = mock.create_autospec(

0 commit comments

Comments
 (0)