Skip to content

Commit 22a87c6

Browse files
fix: Message ordering fix for #1889 (#1903)
* chore: change assignees for issues and PRs to michaelpri10 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: Revert PR#1807 and use a LinkedHasMap in the MessageDispatcher * fix: Make processedReceivedMessages thread-safe * fix: Only synchronize on the outstandingReceipts object in the MessageDispatcher --------- Co-authored-by: Owl Bot
1 parent 77f3c26 commit 22a87c6

File tree

1 file changed

+29
-23
lines changed

1 file changed

+29
-23
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.ArrayList;
3232
import java.util.HashMap;
3333
import java.util.Iterator;
34+
import java.util.LinkedHashMap;
3435
import java.util.List;
3536
import java.util.Map;
3637
import java.util.Map.Entry;
@@ -92,8 +93,8 @@ class MessageDispatcher {
9293
private final LinkedBlockingQueue<AckRequestData> pendingAcks = new LinkedBlockingQueue<>();
9394
private final LinkedBlockingQueue<AckRequestData> pendingNacks = new LinkedBlockingQueue<>();
9495
private final LinkedBlockingQueue<AckRequestData> pendingReceipts = new LinkedBlockingQueue<>();
95-
private final ConcurrentMap<String, ReceiptCompleteData> outstandingReceipts =
96-
new ConcurrentHashMap<String, ReceiptCompleteData>();
96+
private final LinkedHashMap<String, ReceiptCompleteData> outstandingReceipts =
97+
new LinkedHashMap<String, ReceiptCompleteData>();
9798
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
9899
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
99100
private final Lock jobLock;
@@ -397,7 +398,9 @@ void processReceivedMessages(List messages) {
397398
if (this.exactlyOnceDeliveryEnabled.get()) {
398399
// For exactly once deliveries we don't add to outstanding batch because we first
399400
// process the receipt modack. If that is successful then we process the message.
400-
outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage));
401+
synchronized (outstandingReceipts) {
402+
outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage));
403+
}
401404
} else if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) {
402405
// putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the
403406
// previously-mapped element.
@@ -417,33 +420,36 @@ void processReceivedMessages(List messages) {
417420
}
418421

419422
void notifyAckSuccess(AckRequestData ackRequestData) {
420-
421-
if (outstandingReceipts.containsKey(ackRequestData.getAckId())) {
422-
outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
423-
List<OutstandingMessage> outstandingBatch = new ArrayList<>();
424-
425-
for (Iterator<Entry<String, ReceiptCompleteData>> it =
426-
outstandingReceipts.entrySet().iterator();
427-
it.hasNext(); ) {
428-
Map.Entry<String, ReceiptCompleteData> receipt = it.next();
429-
// If receipt is complete then add to outstandingBatch to process the batch
430-
if (receipt.getValue().isReceiptComplete()) {
431-
it.remove();
432-
if (pendingMessages.putIfAbsent(
433-
receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler)
434-
== null) {
435-
outstandingBatch.add(receipt.getValue().getOutstandingMessage());
423+
synchronized (outstandingReceipts) {
424+
if (outstandingReceipts.containsKey(ackRequestData.getAckId())) {
425+
outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
426+
List<OutstandingMessage> outstandingBatch = new ArrayList<>();
427+
428+
for (Iterator<Entry<String, ReceiptCompleteData>> it =
429+
outstandingReceipts.entrySet().iterator();
430+
it.hasNext(); ) {
431+
Map.Entry<String, ReceiptCompleteData> receipt = it.next();
432+
// If receipt is complete then add to outstandingBatch to process the batch
433+
if (receipt.getValue().isReceiptComplete()) {
434+
it.remove();
435+
if (pendingMessages.putIfAbsent(
436+
receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler)
437+
== null) {
438+
outstandingBatch.add(receipt.getValue().getOutstandingMessage());
439+
}
440+
} else {
441+
break;
436442
}
437-
} else {
438-
break;
439443
}
444+
processBatch(outstandingBatch);
440445
}
441-
processBatch(outstandingBatch);
442446
}
443447
}
444448

445449
void notifyAckFailed(AckRequestData ackRequestData) {
446-
outstandingReceipts.remove(ackRequestData.getAckId());
450+
synchronized (outstandingReceipts) {
451+
outstandingReceipts.remove(ackRequestData.getAckId());
452+
}
447453
}
448454

449455
private void processBatch(List<OutstandingMessage> batch) {

0 commit comments

Comments
 (0)