Skip to content

Commit b71a1d9

Browse files
authored
3.x: Add eager truncation to bounded replay() to avoid item retention (#6532)
* 3.x: Add eager truncation to bounded replay() to avoid item retention * Those eager tests are in their separate files already
1 parent 46b4ac8 commit b71a1d9

File tree

11 files changed

+5083
-102
lines changed

11 files changed

+5083
-102
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 300 additions & 10 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

Lines changed: 262 additions & 10 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -197,16 +197,16 @@ public static Supplier> replaySupplier(final Flowable
197197
return new ReplaySupplier<T>(parent);
198198
}
199199

200-
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize) {
201-
return new BufferedReplaySupplier<T>(parent, bufferSize);
200+
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize, boolean eagerTruncate) {
201+
return new BufferedReplaySupplier<T>(parent, bufferSize, eagerTruncate);
202202
}
203203

204-
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
205-
return new BufferedTimedReplay<T>(parent, bufferSize, time, unit, scheduler);
204+
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
205+
return new BufferedTimedReplay<T>(parent, bufferSize, time, unit, scheduler, eagerTruncate);
206206
}
207207

208-
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler) {
209-
return new TimedReplay<T>(parent, time, unit, scheduler);
208+
public static <T> Supplier<ConnectableFlowable<T>> replaySupplier(final Flowable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
209+
return new TimedReplay<T>(parent, time, unit, scheduler, eagerTruncate);
210210
}
211211

212212
public static <T, R> Function<Flowable<T>, Publisher<R>> replayFunction(final Functionsuper Flowable<T>, ? extends Publisher<R>> selector, final Scheduler scheduler) {
@@ -240,7 +240,8 @@ public static Function>, Publisher
240240
}
241241

242242
static final class ReplaySupplier<T> implements Supplier<ConnectableFlowable<T>> {
243-
private final Flowable<T> parent;
243+
244+
final Flowable<T> parent;
244245

245246
ReplaySupplier(Flowable<T> parent) {
246247
this.parent = parent;
@@ -253,38 +254,46 @@ public ConnectableFlowable get() {
253254
}
254255

255256
static final class BufferedReplaySupplier<T> implements Supplier<ConnectableFlowable<T>> {
256-
private final Flowable<T> parent;
257-
private final int bufferSize;
258257

259-
BufferedReplaySupplier(Flowable<T> parent, int bufferSize) {
258+
final Flowable<T> parent;
259+
260+
final int bufferSize;
261+
262+
final boolean eagerTruncate;
263+
264+
BufferedReplaySupplier(Flowable<T> parent, int bufferSize, boolean eagerTruncate) {
260265
this.parent = parent;
261266
this.bufferSize = bufferSize;
267+
this.eagerTruncate = eagerTruncate;
262268
}
263269

264270
@Override
265271
public ConnectableFlowable<T> get() {
266-
return parent.replay(bufferSize);
272+
return parent.replay(bufferSize, eagerTruncate);
267273
}
268274
}
269275

270276
static final class BufferedTimedReplay<T> implements Supplier<ConnectableFlowable<T>> {
271-
private final Flowable<T> parent;
272-
private final int bufferSize;
273-
private final long time;
274-
private final TimeUnit unit;
275-
private final Scheduler scheduler;
277+
final Flowable<T> parent;
278+
final int bufferSize;
279+
final long time;
280+
final TimeUnit unit;
281+
final Scheduler scheduler;
282+
283+
final boolean eagerTruncate;
276284

277-
BufferedTimedReplay(Flowable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
285+
BufferedTimedReplay(Flowable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
278286
this.parent = parent;
279287
this.bufferSize = bufferSize;
280288
this.time = time;
281289
this.unit = unit;
282290
this.scheduler = scheduler;
291+
this.eagerTruncate = eagerTruncate;
283292
}
284293

285294
@Override
286295
public ConnectableFlowable<T> get() {
287-
return parent.replay(bufferSize, time, unit, scheduler);
296+
return parent.replay(bufferSize, time, unit, scheduler, eagerTruncate);
288297
}
289298
}
290299

@@ -294,16 +303,19 @@ static final class TimedReplay implements Supplier> {
294303
private final TimeUnit unit;
295304
private final Scheduler scheduler;
296305

297-
TimedReplay(Flowable<T> parent, long time, TimeUnit unit, Scheduler scheduler) {
306+
final boolean eagerTruncate;
307+
308+
TimedReplay(Flowable<T> parent, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
298309
this.parent = parent;
299310
this.time = time;
300311
this.unit = unit;
301312
this.scheduler = scheduler;
313+
this.eagerTruncate = eagerTruncate;
302314
}
303315

304316
@Override
305317
public ConnectableFlowable<T> get() {
306-
return parent.replay(time, unit, scheduler);
318+
return parent.replay(time, unit, scheduler, eagerTruncate);
307319
}
308320
}
309321

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,15 @@ public static ConnectableFlowable createFrom(Flowable source
8989
* @param the value type
9090
* @param source the source Flowable to use
9191
* @param bufferSize the maximum number of elements to hold
92+
* @param eagerTruncate if true, the head reference is refreshed to avoid unwanted item retention
9293
* @return the new ConnectableObservable instance
9394
*/
9495
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
95-
final int bufferSize) {
96+
final int bufferSize, boolean eagerTruncate) {
9697
if (bufferSize == Integer.MAX_VALUE) {
9798
return createFrom(source);
9899
}
99-
return create(source, new ReplayBufferTask<T>(bufferSize));
100+
return create(source, new ReplayBufferSupplier<T>(bufferSize, eagerTruncate));
100101
}
101102

102103
/**
@@ -106,11 +107,12 @@ public static ConnectableFlowable create(Flowable source,
106107
* @param maxAge the maximum age of entries
107108
* @param unit the unit of measure of the age amount
108109
* @param scheduler the target scheduler providing the current time
110+
* @param eagerTruncate if true, the head reference is refreshed to avoid unwanted item retention
109111
* @return the new ConnectableObservable instance
110112
*/
111113
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
112-
long maxAge, TimeUnit unit, Scheduler scheduler) {
113-
return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE);
114+
long maxAge, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
115+
return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE, eagerTruncate);
114116
}
115117

116118
/**
@@ -121,11 +123,12 @@ public static ConnectableFlowable create(Flowable source,
121123
* @param unit the unit of measure of the age amount
122124
* @param scheduler the target scheduler providing the current time
123125
* @param bufferSize the maximum number of elements to hold
126+
* @param eagerTruncate if true, the head reference is refreshed to avoid unwanted item retention
124127
* @return the new ConnectableFlowable instance
125128
*/
126129
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
127-
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) {
128-
return create(source, new ScheduledReplayBufferTask<T>(bufferSize, maxAge, unit, scheduler));
130+
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize, boolean eagerTruncate) {
131+
return create(source, new ScheduledReplayBufferSupplier<T>(bufferSize, maxAge, unit, scheduler, eagerTruncate));
129132
}
130133

131134
/**
@@ -731,12 +734,15 @@ static class BoundedReplayBuffer extends AtomicReference implements Rep
731734

732735
private static final long serialVersionUID = 2346567790059478686L;
733736

737+
final boolean eagerTruncate;
738+
734739
Node tail;
735740
int size;
736741

737742
long index;
738743

739-
BoundedReplayBuffer() {
744+
BoundedReplayBuffer(boolean eagerTruncate) {
745+
this.eagerTruncate = eagerTruncate;
740746
Node n = new Node(null, 0);
741747
tail = n;
742748
set(n);
@@ -780,6 +786,15 @@ final void removeFirst() {
780786
* @param n the Node instance to set as first
781787
*/
782788
final void setFirst(Node n) {
789+
if (eagerTruncate) {
790+
Node m = new Node(null, n.index);
791+
Node nextNode = n.get();
792+
if (nextNode == null) {
793+
tail = m;
794+
}
795+
m.lazySet(nextNode);
796+
n = m;
797+
}
783798
set(n);
784799
}
785800

@@ -962,7 +977,8 @@ static final class SizeBoundReplayBuffer extends BoundedReplayBuffer {
962977
private static final long serialVersionUID = -5898283885385201806L;
963978

964979
final int limit;
965-
SizeBoundReplayBuffer(int limit) {
980+
SizeBoundReplayBuffer(int limit, boolean eagerTruncate) {
981+
super(eagerTruncate);
966982
this.limit = limit;
967983
}
968984

@@ -989,7 +1005,8 @@ static final class SizeAndTimeBoundReplayBuffer extends BoundedReplayBuffer
9891005
final long maxAge;
9901006
final TimeUnit unit;
9911007
final int limit;
992-
SizeAndTimeBoundReplayBuffer(int limit, long maxAge, TimeUnit unit, Scheduler scheduler) {
1008+
SizeAndTimeBoundReplayBuffer(int limit, long maxAge, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
1009+
super(eagerTruncate);
9931010
this.scheduler = scheduler;
9941011
this.limit = limit;
9951012
this.maxAge = maxAge;
@@ -1168,35 +1185,42 @@ protected void subscribeActual(Subscriber s) {
11681185
}
11691186
}
11701187

1171-
static final class ReplayBufferTask<T> implements Supplier<ReplayBuffer<T>> {
1172-
private final int bufferSize;
1188+
static final class ReplayBufferSupplier<T> implements Supplier<ReplayBuffer<T>> {
11731189

1174-
ReplayBufferTask(int bufferSize) {
1190+
final int bufferSize;
1191+
1192+
final boolean eagerTruncate;
1193+
1194+
ReplayBufferSupplier(int bufferSize, boolean eagerTruncate) {
11751195
this.bufferSize = bufferSize;
1196+
this.eagerTruncate = eagerTruncate;
11761197
}
11771198

11781199
@Override
11791200
public ReplayBuffer<T> get() {
1180-
return new SizeBoundReplayBuffer<T>(bufferSize);
1201+
return new SizeBoundReplayBuffer<T>(bufferSize, eagerTruncate);
11811202
}
11821203
}
11831204

1184-
static final class ScheduledReplayBufferTask<T> implements Supplier<ReplayBuffer<T>> {
1205+
static final class ScheduledReplayBufferSupplier<T> implements Supplier<ReplayBuffer<T>> {
11851206
private final int bufferSize;
11861207
private final long maxAge;
11871208
private final TimeUnit unit;
11881209
private final Scheduler scheduler;
11891210

1190-
ScheduledReplayBufferTask(int bufferSize, long maxAge, TimeUnit unit, Scheduler scheduler) {
1211+
final boolean eagerTruncate;
1212+
1213+
ScheduledReplayBufferSupplier(int bufferSize, long maxAge, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
11911214
this.bufferSize = bufferSize;
11921215
this.maxAge = maxAge;
11931216
this.unit = unit;
11941217
this.scheduler = scheduler;
1218+
this.eagerTruncate = eagerTruncate;
11951219
}
11961220

11971221
@Override
11981222
public ReplayBuffer<T> get() {
1199-
return new SizeAndTimeBoundReplayBuffer<T>(bufferSize, maxAge, unit, scheduler);
1223+
return new SizeAndTimeBoundReplayBuffer<T>(bufferSize, maxAge, unit, scheduler, eagerTruncate);
12001224
}
12011225
}
12021226

src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -202,16 +202,16 @@ public static Supplier> replaySupplier(final Observ
202202
return new ReplaySupplier<T>(parent);
203203
}
204204

205-
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize) {
206-
return new BufferedReplaySupplier<T>(parent, bufferSize);
205+
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize, boolean eagerTruncate) {
206+
return new BufferedReplaySupplier<T>(parent, bufferSize, eagerTruncate);
207207
}
208208

209-
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
210-
return new BufferedTimedReplaySupplier<T>(parent, bufferSize, time, unit, scheduler);
209+
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
210+
return new BufferedTimedReplaySupplier<T>(parent, bufferSize, time, unit, scheduler, eagerTruncate);
211211
}
212212

213-
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler) {
214-
return new TimedReplayCallable<T>(parent, time, unit, scheduler);
213+
public static <T> Supplier<ConnectableObservable<T>> replaySupplier(final Observable<T> parent, final long time, final TimeUnit unit, final Scheduler scheduler, boolean eagerTruncate) {
214+
return new TimedReplayCallable<T>(parent, time, unit, scheduler, eagerTruncate);
215215
}
216216

217217
public static <T, R> Function<Observable<T>, ObservableSource<R>> replayFunction(final Functionsuper Observable<T>, ? extends ObservableSource<R>> selector, final Scheduler scheduler) {
@@ -250,57 +250,66 @@ public ConnectableObservable get() {
250250
}
251251

252252
static final class BufferedReplaySupplier<T> implements Supplier<ConnectableObservable<T>> {
253-
private final Observable<T> parent;
254-
private final int bufferSize;
253+
final Observable<T> parent;
254+
final int bufferSize;
255+
256+
final boolean eagerTruncate;
255257

256-
BufferedReplaySupplier(Observable<T> parent, int bufferSize) {
258+
BufferedReplaySupplier(Observable<T> parent, int bufferSize, boolean eagerTruncate) {
257259
this.parent = parent;
258260
this.bufferSize = bufferSize;
261+
this.eagerTruncate = eagerTruncate;
259262
}
260263

261264
@Override
262265
public ConnectableObservable<T> get() {
263-
return parent.replay(bufferSize);
266+
return parent.replay(bufferSize, eagerTruncate);
264267
}
265268
}
266269

267270
static final class BufferedTimedReplaySupplier<T> implements Supplier<ConnectableObservable<T>> {
268-
private final Observable<T> parent;
269-
private final int bufferSize;
270-
private final long time;
271-
private final TimeUnit unit;
272-
private final Scheduler scheduler;
271+
final Observable<T> parent;
272+
final int bufferSize;
273+
final long time;
274+
final TimeUnit unit;
275+
final Scheduler scheduler;
273276

274-
BufferedTimedReplaySupplier(Observable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
277+
final boolean eagerTruncate;
278+
279+
BufferedTimedReplaySupplier(Observable<T> parent, int bufferSize, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
275280
this.parent = parent;
276281
this.bufferSize = bufferSize;
277282
this.time = time;
278283
this.unit = unit;
279284
this.scheduler = scheduler;
285+
this.eagerTruncate = eagerTruncate;
280286
}
281287

282288
@Override
283289
public ConnectableObservable<T> get() {
284-
return parent.replay(bufferSize, time, unit, scheduler);
290+
return parent.replay(bufferSize, time, unit, scheduler, eagerTruncate);
285291
}
286292
}
287293

288294
static final class TimedReplayCallable<T> implements Supplier<ConnectableObservable<T>> {
289-
private final Observable<T> parent;
290-
private final long time;
291-
private final TimeUnit unit;
292-
private final Scheduler scheduler;
295+
final Observable<T> parent;
296+
final long time;
297+
final TimeUnit unit;
298+
final Scheduler scheduler;
299+
300+
final boolean eagerTruncate;
293301

294-
TimedReplayCallable(Observable<T> parent, long time, TimeUnit unit, Scheduler scheduler) {
302+
TimedReplayCallable(Observable<T> parent, long time, TimeUnit unit, Scheduler scheduler, boolean eagerTruncate) {
295303
this.parent = parent;
296304
this.time = time;
297305
this.unit = unit;
298306
this.scheduler = scheduler;
307+
this.eagerTruncate = eagerTruncate;
299308
}
300309

301310
@Override
302311
public ConnectableObservable<T> get() {
303-
return parent.replay(time, unit, scheduler);
312+
return parent.replay(time, unit, scheduler, eagerTruncate);
304313
}
305314
}
306315

0 commit comments

Comments
 (0)