Skip to content

Commit d786d22

Browse files
authored
fix: handle receipt modAck and lease extensions with exactly-once delivery correctly (#1709)
* fix: don't try to lease messages with failed acks in exactly-once delivery * fix: don't pass messages to clients or start leasing in exactly-once delivery if the receipt modAck fails * fix: handle receipt modAcks and lease extensions properly with exactly-once delivery enabled * fix: permanently fail any ack/modAck/nack that fails once under exactly-once delivery
1 parent 0eb1ca8 commit d786d22

File tree

4 files changed

+401
-8
lines changed

4 files changed

+401
-8
lines changed

src/lease-manager.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
import {EventEmitter} from 'events';
18-
import {Message, Subscriber} from './subscriber';
18+
import {AckError, Message, Subscriber} from './subscriber';
1919
import {defaultOptions} from './default-options';
2020

2121
export interface FlowControlOptions {
@@ -257,7 +257,16 @@ export class LeaseManager extends EventEmitter {
257257
const lifespan = (Date.now() - message.received) / (60 * 1000);
258258

259259
if (lifespan < this._options.maxExtensionMinutes!) {
260-
message.modAck(deadline);
260+
if (this._subscriber.isExactlyOnceDelivery) {
261+
message.modAckWithResponse(deadline).catch(e => {
262+
// In the case of a permanent failure (temporary failures are retried),
263+
// we need to stop trying to lease-manage the message.
264+
message.ackFailed(e as AckError);
265+
this.remove(message);
266+
});
267+
} else {
268+
message.modAck(deadline);
269+
}
261270
} else {
262271
this.remove(message);
263272
}

src/subscriber.ts

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ export class Message {
103103
private _handled: boolean;
104104
private _length: number;
105105
private _subscriber: Subscriber;
106+
private _ackFailed?: AckError;
107+
106108
/**
107109
* @hideconstructor
108110
*
@@ -194,6 +196,16 @@ export class Message {
194196
return this._length;
195197
}
196198

199+
/**
200+
* Sets this message's exactly once delivery acks to permanent failure. This is
201+
* meant for internal library use only.
202+
*
203+
* @private
204+
*/
205+
ackFailed(error: AckError): void {
206+
this._ackFailed = error;
207+
}
208+
197209
/**
198210
* Acknowledges the message.
199211
*
@@ -228,9 +240,18 @@ export class Message {
228240
return AckResponses.Success;
229241
}
230242

243+
if (this._ackFailed) {
244+
throw this._ackFailed;
245+
}
246+
231247
if (!this._handled) {
232248
this._handled = true;
233-
return await this._subscriber.ackWithResponse(this);
249+
try {
250+
return await this._subscriber.ackWithResponse(this);
251+
} catch (e) {
252+
this.ackFailed(e as AckError);
253+
throw e;
254+
}
234255
} else {
235256
return AckResponses.Invalid;
236257
}
@@ -261,8 +282,17 @@ export class Message {
261282
return AckResponses.Success;
262283
}
263284

285+
if (this._ackFailed) {
286+
throw this._ackFailed;
287+
}
288+
264289
if (!this._handled) {
265-
return await this._subscriber.modAckWithResponse(this, deadline);
290+
try {
291+
return await this._subscriber.modAckWithResponse(this, deadline);
292+
} catch (e) {
293+
this.ackFailed(e as AckError);
294+
throw e;
295+
}
266296
} else {
267297
return AckResponses.Invalid;
268298
}
@@ -303,9 +333,18 @@ export class Message {
303333
return AckResponses.Success;
304334
}
305335

336+
if (this._ackFailed) {
337+
throw this._ackFailed;
338+
}
339+
306340
if (!this._handled) {
307341
this._handled = true;
308-
return await this._subscriber.nackWithResponse(this);
342+
try {
343+
return await this._subscriber.nackWithResponse(this);
344+
} catch (e) {
345+
this.ackFailed(e as AckError);
346+
throw e;
347+
}
309348
} else {
310349
return AckResponses.Invalid;
311350
}
@@ -824,8 +863,23 @@ export class Subscriber extends EventEmitter {
824863
const span: Span | undefined = this._constructSpan(message);
825864

826865
if (this.isOpen) {
827-
message.modAck(this.ackDeadline);
828-
this._inventory.add(message);
866+
if (this.isExactlyOnceDelivery) {
867+
// For exactly-once delivery, we must validate that we got a valid
868+
// lease on the message before actually leasing it.
869+
message
870+
.modAckWithResponse(this.ackDeadline)
871+
.then(() => {
872+
this._inventory.add(message);
873+
})
874+
.catch(() => {
875+
// Temporary failures will retry, so if an error reaches us
876+
// here, that means a permanent failure. Silently drop these.
877+
this._discardMessage(message);
878+
});
879+
} else {
880+
message.modAck(this.ackDeadline);
881+
this._inventory.add(message);
882+
}
829883
} else {
830884
message.nack();
831885
}
@@ -835,6 +889,11 @@ export class Subscriber extends EventEmitter {
835889
}
836890
}
837891

892+
// Internal: This is here to provide a hook for unit testing, at least for now.
893+
private _discardMessage(message: Message): void {
894+
message;
895+
}
896+
838897
/**
839898
* Returns a promise that will resolve once all pending requests have settled.
840899
*

test/lease-manager.ts

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,16 @@ import {describe, it, before, beforeEach, afterEach} from 'mocha';
1919
import {EventEmitter} from 'events';
2020
import * as proxyquire from 'proxyquire';
2121
import * as sinon from 'sinon';
22+
import * as defer from 'p-defer';
2223

2324
import * as leaseTypes from '../src/lease-manager';
24-
import {Message, Subscriber} from '../src/subscriber';
25+
import {
26+
AckError,
27+
AckResponse,
28+
AckResponses,
29+
Message,
30+
Subscriber,
31+
} from '../src/subscriber';
2532
import {defaultOptions} from '../src/default-options';
2633

2734
const FREE_MEM = 9376387072;
@@ -34,6 +41,10 @@ class FakeSubscriber extends EventEmitter {
3441
isOpen = true;
3542
modAckLatency = 2000;
3643
async modAck(): Promise<void> {}
44+
async modAckWithResponse(): Promise<AckResponse> {
45+
return AckResponses.Success;
46+
}
47+
isExactlyOnceDelivery = false;
3748
}
3849

3950
class FakeMessage {
@@ -43,6 +54,21 @@ class FakeMessage {
4354
this.received = Date.now();
4455
}
4556
modAck(): void {}
57+
async modAckWithResponse(): Promise<AckResponse> {
58+
return AckResponses.Success;
59+
}
60+
ackFailed() {}
61+
}
62+
63+
interface LeaseManagerInternals {
64+
_extendDeadlines(): void;
65+
_messages: Set<Message>;
66+
_isLeasing: boolean;
67+
_scheduleExtension(): void;
68+
}
69+
70+
function getLMInternals(mgr: leaseTypes.LeaseManager): LeaseManagerInternals {
71+
return mgr as unknown as LeaseManagerInternals;
4672
}
4773

4874
describe('LeaseManager', () => {
@@ -207,6 +233,18 @@ describe('LeaseManager', () => {
207233
assert.strictEqual(stub.callCount, 1);
208234
});
209235

236+
it('should schedule a lease extension for exactly-once delivery', () => {
237+
const message = new FakeMessage() as {} as Message;
238+
const stub = sandbox
239+
.stub(message, 'modAck')
240+
.withArgs(subscriber.ackDeadline);
241+
242+
leaseManager.add(message);
243+
clock.tick(expectedTimeout);
244+
245+
assert.strictEqual(stub.callCount, 1);
246+
});
247+
210248
it('should not schedule a lease extension if already in progress', () => {
211249
const messages = [new FakeMessage(), new FakeMessage()];
212250
const stubs = messages.map(message => sandbox.stub(message, 'modAck'));
@@ -274,6 +312,32 @@ describe('LeaseManager', () => {
274312
assert.strictEqual(deadline, subscriber.ackDeadline);
275313
});
276314

315+
it('should remove and ackFailed any messages that fail to ack', done => {
316+
(subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true;
317+
318+
leaseManager.setOptions({
319+
maxExtensionMinutes: 600,
320+
});
321+
322+
const goodMessage = new FakeMessage();
323+
324+
const removeStub = sandbox.stub(leaseManager, 'remove');
325+
const mawrStub = sandbox
326+
.stub(goodMessage, 'modAckWithResponse')
327+
.rejects(new AckError(AckResponses.Invalid));
328+
const failed = sandbox.stub(goodMessage, 'ackFailed');
329+
330+
removeStub.callsFake(() => {
331+
assert.strictEqual(mawrStub.callCount, 1);
332+
assert.strictEqual(removeStub.callCount, 1);
333+
assert.strictEqual(failed.callCount, 1);
334+
done();
335+
});
336+
337+
leaseManager.add(goodMessage as {} as Message);
338+
clock.tick(halfway * 2 + 1);
339+
});
340+
277341
it('should continuously extend the deadlines', () => {
278342
const message = new FakeMessage();
279343
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -473,4 +537,86 @@ describe('LeaseManager', () => {
473537
assert.strictEqual(leaseManager.isFull(), true);
474538
});
475539
});
540+
541+
describe('deadline extension', () => {
542+
beforeEach(() => {
543+
sandbox.useFakeTimers();
544+
});
545+
afterEach(() => {
546+
sandbox.clock.restore();
547+
});
548+
549+
it('calls regular modAck periodically w/o exactly-once', () => {
550+
const lmi = getLMInternals(leaseManager);
551+
const msg = new Message(subscriber, {
552+
ackId: 'ackack',
553+
message: {data: ''},
554+
deliveryAttempt: 0,
555+
});
556+
sandbox.clock.tick(1);
557+
558+
const maStub = sandbox.stub(msg, 'modAck');
559+
560+
lmi._messages.add(msg);
561+
lmi._extendDeadlines();
562+
563+
assert.ok(maStub.calledOnce);
564+
});
565+
566+
it('calls modAckWithResponse periodically w/exactly-once, successful', async () => {
567+
const lmi = getLMInternals(leaseManager);
568+
const msg = new Message(subscriber, {
569+
ackId: 'ackack',
570+
message: {data: ''},
571+
deliveryAttempt: 0,
572+
});
573+
sandbox.clock.tick(1);
574+
(subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true;
575+
576+
const done = defer();
577+
sandbox.stub(msg, 'modAck').callsFake(() => {
578+
console.error('oops we did it wrong');
579+
});
580+
581+
const maStub = sandbox.stub(msg, 'modAckWithResponse');
582+
maStub.callsFake(async () => {
583+
done.resolve();
584+
return AckResponses.Success;
585+
});
586+
587+
lmi._messages.add(msg);
588+
lmi._extendDeadlines();
589+
590+
await done.promise;
591+
assert.ok(maStub.calledOnce);
592+
});
593+
594+
it('calls modAckWithResponse periodically w/exactly-once, failure', async () => {
595+
const lmi = getLMInternals(leaseManager);
596+
const msg = new Message(subscriber, {
597+
ackId: 'ackack',
598+
message: {data: ''},
599+
deliveryAttempt: 0,
600+
});
601+
sandbox.clock.tick(1);
602+
(subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true;
603+
604+
const done = defer();
605+
606+
const maStub = sandbox.stub(msg, 'modAckWithResponse');
607+
maStub.callsFake(async () => {
608+
done.resolve();
609+
throw new AckError(AckResponses.Invalid);
610+
});
611+
const rmStub = sandbox.stub(leaseManager, 'remove');
612+
613+
lmi._messages.add(msg);
614+
lmi._extendDeadlines();
615+
616+
await done.promise;
617+
618+
assert.ok(maStub.calledOnce);
619+
assert.ok(rmStub.calledOnce);
620+
});
621+
});
476622
});

0 commit comments

Comments
 (0)