@@ -533,46 +533,63 @@ func (it *messageIterator) handleKeepAlives() {
533
533
it .checkDrained ()
534
534
}
535
535
536
- // sendAck is used to confirm acknowledgement of a message. If exactly once delivery is
537
- // enabled, we'll retry these messages for a short duration in a goroutine.
538
- func (it * messageIterator ) sendAck (m map [string ]* AckResult ) {
536
+ type ackFunc = func (ctx context.Context , subName string , ackIds []string ) error
537
+ type ackRecordStat = func (ctx context.Context , toSend []string )
538
+ type retryAckFunc = func (toRetry map [string ]* ipubsub.AckResult )
539
+
540
+ func (it * messageIterator ) sendAckWithFunc (m map [string ]* AckResult , ackFunc ackFunc , retryAckFunc retryAckFunc , ackRecordStat ackRecordStat ) {
539
541
ackIDs := make ([]string , 0 , len (m ))
540
542
for k := range m {
541
543
ackIDs = append (ackIDs , k )
542
544
}
543
545
it .eoMu .RLock ()
544
546
exactlyOnceDelivery := it .enableExactlyOnceDelivery
545
547
it .eoMu .RUnlock ()
548
+ batches := makeBatches (ackIDs , ackIDBatchSize )
549
+ wg := sync.WaitGroup {}
550
+
551
+ for _ , batch := range batches {
552
+ wg .Add (1 )
553
+ go func (toSend []string ) {
554
+ defer wg .Done ()
555
+ ackRecordStat (it .ctx , toSend )
556
+ // Use context.Background() as the call's context, not it.ctx. We don't
557
+ // want to cancel this RPC when the iterator is stopped.
558
+ cctx , cancel2 := context .WithTimeout (context .Background (), 60 * time .Second )
559
+ defer cancel2 ()
560
+ err := ackFunc (cctx , it .subName , toSend )
561
+ if exactlyOnceDelivery {
562
+ resultsByAckID := make (map [string ]* AckResult )
563
+ for _ , ackID := range toSend {
564
+ resultsByAckID [ackID ] = m [ackID ]
565
+ }
546
566
547
- var toSend []string
548
- for len (ackIDs ) > 0 {
549
- toSend , ackIDs = splitRequestIDs (ackIDs , ackIDBatchSize )
567
+ st , md := extractMetadata (err )
568
+ _ , toRetry := processResults (st , resultsByAckID , md )
569
+ if len (toRetry ) > 0 {
570
+ // Retry modacks/nacks in a separate goroutine.
571
+ go func () {
572
+ retryAckFunc (toRetry )
573
+ }()
574
+ }
575
+ }
576
+ }(batch )
577
+ }
578
+ wg .Wait ()
579
+ }
550
580
551
- recordStat (it .ctx , AckCount , int64 (len (toSend )))
552
- addAcks (toSend )
553
- // Use context.Background() as the call's context, not it.ctx. We don't
554
- // want to cancel this RPC when the iterator is stopped.
555
- cctx2 , cancel2 := context .WithTimeout (context .Background (), 60 * time .Second )
556
- defer cancel2 ()
557
- err := it .subc .Acknowledge (cctx2 , & pb.AcknowledgeRequest {
581
+ // sendAck is used to confirm acknowledgement of a message. If exactly once delivery is
582
+ // enabled, we'll retry these messages for a short duration in a goroutine.
583
+ func (it * messageIterator ) sendAck (m map [string ]* AckResult ) {
584
+ it .sendAckWithFunc (m , func (ctx context.Context , subName string , ackIds []string ) error {
585
+ return it .subc .Acknowledge (ctx , & pb.AcknowledgeRequest {
558
586
Subscription : it .subName ,
559
- AckIds : toSend ,
587
+ AckIds : ackIds ,
560
588
})
561
- if exactlyOnceDelivery {
562
- resultsByAckID := make (map [string ]* AckResult )
563
- for _ , ackID := range toSend {
564
- resultsByAckID [ackID ] = m [ackID ]
565
- }
566
- st , md := extractMetadata (err )
567
- _ , toRetry := processResults (st , resultsByAckID , md )
568
- if len (toRetry ) > 0 {
569
- // Retry acks in a separate goroutine.
570
- go func () {
571
- it .retryAcks (toRetry )
572
- }()
573
- }
574
- }
575
- }
589
+ }, it .retryAcks , func (ctx context.Context , toSend []string ) {
590
+ recordStat (it .ctx , AckCount , int64 (len (toSend )))
591
+ addAcks (toSend )
592
+ })
576
593
}
577
594
578
595
// sendModAck is used to extend the lease of messages or nack them.
@@ -583,47 +600,22 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
583
600
// enabled, we retry it in a separate goroutine for a short duration.
584
601
func (it * messageIterator ) sendModAck (m map [string ]* AckResult , deadline time.Duration , logOnInvalid bool ) {
585
602
deadlineSec := int32 (deadline / time .Second )
586
- ackIDs := make ([]string , 0 , len (m ))
587
- for k := range m {
588
- ackIDs = append (ackIDs , k )
589
- }
590
- it .eoMu .RLock ()
591
- exactlyOnceDelivery := it .enableExactlyOnceDelivery
592
- it .eoMu .RUnlock ()
593
- var toSend []string
594
- for len (ackIDs ) > 0 {
595
- toSend , ackIDs = splitRequestIDs (ackIDs , ackIDBatchSize )
603
+ it .sendAckWithFunc (m , func (ctx context.Context , subName string , ackIds []string ) error {
604
+ return it .subc .ModifyAckDeadline (ctx , & pb.ModifyAckDeadlineRequest {
605
+ Subscription : it .subName ,
606
+ AckDeadlineSeconds : deadlineSec ,
607
+ AckIds : ackIds ,
608
+ })
609
+ }, func (toRetry map [string ]* ipubsub.AckResult ) {
610
+ it .retryModAcks (toRetry , deadlineSec , logOnInvalid )
611
+ }, func (ctx context.Context , toSend []string ) {
596
612
if deadline == 0 {
597
613
recordStat (it .ctx , NackCount , int64 (len (toSend )))
598
614
} else {
599
615
recordStat (it .ctx , ModAckCount , int64 (len (toSend )))
600
616
}
601
617
addModAcks (toSend , deadlineSec )
602
- // Use context.Background() as the call's context, not it.ctx. We don't
603
- // want to cancel this RPC when the iterator is stopped.
604
- cctx , cancel2 := context .WithTimeout (context .Background (), 60 * time .Second )
605
- defer cancel2 ()
606
- err := it .subc .ModifyAckDeadline (cctx , & pb.ModifyAckDeadlineRequest {
607
- Subscription : it .subName ,
608
- AckDeadlineSeconds : deadlineSec ,
609
- AckIds : toSend ,
610
- })
611
- if exactlyOnceDelivery {
612
- resultsByAckID := make (map [string ]* AckResult )
613
- for _ , ackID := range toSend {
614
- resultsByAckID [ackID ] = m [ackID ]
615
- }
616
-
617
- st , md := extractMetadata (err )
618
- _ , toRetry := processResults (st , resultsByAckID , md )
619
- if len (toRetry ) > 0 {
620
- // Retry modacks/nacks in a separate goroutine.
621
- go func () {
622
- it .retryModAcks (toRetry , deadlineSec , logOnInvalid )
623
- }()
624
- }
625
- }
626
- }
618
+ })
627
619
}
628
620
629
621
// retryAcks retries the ack RPC with backoff. This must be called in a goroutine
@@ -751,13 +743,20 @@ func calcFieldSizeInt(fields ...int) int {
751
743
return overhead
752
744
}
753
745
754
- // splitRequestIDs takes a slice of ackIDs and returns two slices such that the first
755
- // ackID slice can be used in a request where the payload does not exceed ackIDBatchSize.
756
- func splitRequestIDs (ids []string , maxBatchSize int ) (prefix , remainder []string ) {
757
- if len (ids ) < maxBatchSize {
758
- return ids , []string {}
746
+ // makeBatches takes a slice of ackIDs and returns a slice of ackID batches.
747
+ // Each ackID batch can be used in a request where the payload does not exceed maxBatchSize.
748
+ func makeBatches (ids []string , maxBatchSize int ) [][]string {
749
+ var batches [][]string
750
+ for len (ids ) > 0 {
751
+ if len (ids ) < maxBatchSize {
752
+ batches = append (batches , ids )
753
+ ids = []string {}
754
+ } else {
755
+ batches = append (batches , ids [:maxBatchSize ])
756
+ ids = ids [maxBatchSize :]
757
+ }
759
758
}
760
- return ids [: maxBatchSize ], ids [ maxBatchSize :]
759
+ return batches
761
760
}
762
761
763
762
// The deadline to ack is derived from a percentile distribution based
0 commit comments