Skip to content

Commit f96821f

Browse files
authored
3.x: Fix window (boundary, start/end) cancel and abandonment (#6762)
1 parent df30aa1 commit f96821f

12 files changed

+1056
-482
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17965,6 +17965,11 @@ public final Flowable> window(
1796517965
* Publisher.
1796617966
*

1796717967
*
17968+
*

17969+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
17970+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
17971+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
17972+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1796817973
*
1796917974
*
Backpressure:
1797017975
*
The outer Publisher of this operator does not support backpressure as it uses a {@code boundary} Publisher to control data
@@ -17995,6 +18000,11 @@ public final Flowable> window(Publisher boundaryIndicator) {
1799518000
* Publisher.
1799618001
*

1799718002
*
18003+
*

18004+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
18005+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
18006+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
18007+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1799818008
*
1799918009
*
Backpressure:
1800018010
*
The outer Publisher of this operator does not support backpressure as it uses a {@code boundary} Publisher to control data
@@ -18031,6 +18041,11 @@ public final Flowable> window(Publisher boundaryIndicator, in
1803118041
* {@code closingSelector} emits an item.
1803218042
*

1803318043
*
18044+
*

18045+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
18046+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
18047+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
18048+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1803418049
*
1803518050
*
Backpressure:
1803618051
*
The outer Publisher of this operator doesn't support backpressure because the emission of new
@@ -18068,6 +18083,11 @@ public final Flowable> window(
1806818083
* {@code closingSelector} emits an item.
1806918084
*

1807018085
*
18086+
*

18087+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
18088+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
18089+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
18090+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1807118091
*
1807218092
*
Backpressure:
1807318093
*
The outer Publisher of this operator doesn't support backpressure because the emission of new

src/main/java/io/reactivex/rxjava3/core/Observable.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14536,6 +14536,11 @@ public final Observable> window(long count, long skip, int bufferS
1453614536
* current window and propagates the notification from the source ObservableSource.
1453714537
*

1453814538
*
14539+
*

14540+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14541+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14542+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14543+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1453914544
*
1454014545
*
Scheduler:
1454114546
*
This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.
@@ -14564,6 +14569,11 @@ public final Observable> window(long timespan, long timeskip, Time
1456414569
* current window and propagates the notification from the source ObservableSource.
1456514570
*

1456614571
*
14572+
*

14573+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14574+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14575+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14576+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1456714577
*
1456814578
*
Scheduler:
1456914579
*
You specify which {@link Scheduler} this operator will use.
@@ -14594,6 +14604,11 @@ public final Observable> window(long timespan, long timeskip, Time
1459414604
* current window and propagates the notification from the source ObservableSource.
1459514605
*

1459614606
*
14607+
*

14608+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14609+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14610+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14611+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1459714612
*
1459814613
*
Scheduler:
1459914614
*
You specify which {@link Scheduler} this operator will use.
@@ -14630,6 +14645,11 @@ public final Observable> window(long timespan, long timeskip, Time
1463014645
* ObservableSource emits the current window and propagates the notification from the source ObservableSource.
1463114646
*

1463214647
*
14648+
*

14649+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14650+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14651+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14652+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1463314653
*
1463414654
*
Scheduler:
1463514655
*
This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.
@@ -14658,6 +14678,11 @@ public final Observable> window(long timespan, TimeUnit unit) {
1465814678
* emits the current window and propagates the notification from the source ObservableSource.
1465914679
*

1466014680
*
14681+
*

14682+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14683+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14684+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14685+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1466114686
*
1466214687
*
Scheduler:
1466314688
*
This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.
@@ -14690,6 +14715,11 @@ public final Observable> window(long timespan, TimeUnit unit,
1469014715
* emits the current window and propagates the notification from the source ObservableSource.
1469114716
*

1469214717
*
14718+
*

14719+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14720+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14721+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14722+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1469314723
*
1469414724
*
Scheduler:
1469514725
*
This version of {@code window} operates by default on the {@code computation} {@link Scheduler}.
@@ -14723,6 +14753,11 @@ public final Observable> window(long timespan, TimeUnit unit,
1472314753
* ObservableSource emits the current window and propagates the notification from the source ObservableSource.
1472414754
*

1472514755
*
14756+
*

14757+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14758+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14759+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14760+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1472614761
*
1472714762
*
Scheduler:
1472814763
*
You specify which {@link Scheduler} this operator will use.
@@ -14754,6 +14789,11 @@ public final Observable> window(long timespan, TimeUnit unit,
1475414789
* current window and propagates the notification from the source ObservableSource.
1475514790
*

1475614791
*
14792+
*

14793+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14794+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14795+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14796+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1475714797
*
1475814798
*
Scheduler:
1475914799
*
You specify which {@link Scheduler} this operator will use.
@@ -14788,6 +14828,11 @@ public final Observable> window(long timespan, TimeUnit unit,
1478814828
* current window and propagates the notification from the source ObservableSource.
1478914829
*

1479014830
*
14831+
*

14832+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14833+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14834+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14835+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1479114836
*
1479214837
*
Scheduler:
1479314838
*
You specify which {@link Scheduler} this operator will use.
@@ -14824,6 +14869,11 @@ public final Observable> window(long timespan, TimeUnit unit,
1482414869
* current window and propagates the notification from the source ObservableSource.
1482514870
*

1482614871
*
14872+
*

14873+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14874+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14875+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14876+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1482714877
*
1482814878
*
Scheduler:
1482914879
*
You specify which {@link Scheduler} this operator will use.
@@ -14865,6 +14915,11 @@ public final Observable> window(
1486514915
* ObservableSource.
1486614916
*

1486714917
*
14918+
*

14919+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14920+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14921+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14922+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1486814923
*
1486914924
*
Scheduler:
1487014925
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
@@ -14891,6 +14946,11 @@ public final Observable> window(ObservableSource boundary)
1489114946
* ObservableSource.
1489214947
*

1489314948
*
14949+
*

14950+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14951+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14952+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14953+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1489414954
*
1489514955
*
Scheduler:
1489614956
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
@@ -14922,6 +14982,11 @@ public final Observable> window(ObservableSource boundary,
1492214982
* {@code closingIndicator} emits an item.
1492314983
*

1492414984
*
14985+
*

14986+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
14987+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
14988+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
14989+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1492514990
*
1492614991
*
Scheduler:
1492714992
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
@@ -14953,6 +15018,11 @@ public final Observable> window(
1495315018
* {@code closingIndicator} emits an item.
1495415019
*

1495515020
*
15021+
*

15022+
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
15023+
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
15024+
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
15025+
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
1495615026
*
1495715027
*
Scheduler:
1495815028
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,11 @@ void drain() {
240240

241241
if (emitted != requested.get()) {
242242
emitted++;
243-
downstream.onNext(w);
243+
FlowableWindowSubscribeIntercept<T> intercept = new FlowableWindowSubscribeIntercept<T>(w);
244+
downstream.onNext(intercept);
245+
if (intercept.tryAbandon()) {
246+
w.onComplete();
247+
}
244248
} else {
245249
SubscriptionHelper.cancel(upstream);
246250
boundarySubscriber.dispose();

0 commit comments

Comments
 (0)