Concurrent subscription to ReplaySubject leads to bogus onNext(NotificationLite) signals
As far as I know, the reactive streams specification imposes no restrictions on when or from which thread(s) a subscriber may subscribe to a source. However, if a subscriber subscribes to a ReplaySubject in parallel with the subject's upstream issuing a legal sequence of signals to that same ReplaySubject, the ReplaySubject can erroneously release an onNext signal carrying a NotificationLite value.
See the following repro:
@Test public void normal() throws Exception { for (int i = 1; i <= 1000000; i++) { Subject<String> source = ReplaySubject.create(); Subject<String> sink = PublishSubject.create(); TestObserver<String> observer = sink.test(); Schedulers.computation().scheduleDirect(() -> { // issue signals to the source in adherence to the reactive streams specification source.onSubscribe(Disposable.empty()); source.onNext("hello"); source.onNext("world"); source.onComplete(); }); Schedulers.computation().scheduleDirect(() -> { // connect the source to the sink in parallel with the signals issued to the source // note the cast() operator, which is here to detect non-String escapees source.cast(String.class).subscribe(sink); }); observer.await().assertValues("hello", "world").assertComplete(); } }
When I run this, I get the following as output:
ReplaySubjectTest > normal() FAILED
java.lang.AssertionError: Not completed (latch = 0, values = 2, errors = 1, completions = 0)
at io.reactivex.rxjava3.observers.BaseTestConsumer.fail(BaseTestConsumer.java:128)
at io.reactivex.rxjava3.observers.BaseTestConsumer.assertComplete(BaseTestConsumer.java:181)
at joepembe.ReplaySubjectTest.normal(ReplaySubjectTest.java:57)
Caused by:
java.lang.ClassCastException: Cannot cast io.reactivex.rxjava3.internal.util.NotificationLite to java.lang.String
at java.base/java.lang.Class.cast(Class.java:3606)
at io.reactivex.rxjava3.internal.functions.Functions$CastToClass.apply(Functions.java:235)
at io.reactivex.rxjava3.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:58)
at io.reactivex.rxjava3.subjects.ReplaySubject$UnboundedReplayBuffer.replay(ReplaySubject.java:791)
at io.reactivex.rxjava3.subjects.ReplaySubject.subscribeActual(ReplaySubject.java:344)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
at io.reactivex.rxjava3.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
at joepembe.ReplaySubjectTest.lambda$normal$3(ReplaySubjectTest.java:55)
The same behavior occurs even when using SerializedSubject (not that I would expect anything different to occur, as SerializedSubject merely forwards the subscribe() invocation to the "real" subject):
for (int i = 1; i <= 1000000; i++) { Subject<String> source = ReplaySubject.<String>create().toSerialized(); Subject<String> sink = PublishSubject.<String>create().toSerialized(); TestObserver<String> observer = sink.test(); Schedulers.computation().scheduleDirect(() -> { // issue signals to the source in adherence to the reactive streams specification source.onSubscribe(Disposable.empty()); source.onNext("hello"); source.onNext("world"); source.onComplete(); }); Schedulers.computation().scheduleDirect(() -> { // connect the source to the sink in parallel with the signals issued to the source // note the cast() operator, which is here to detect non-String escapees source.cast(String.class).subscribe(sink); }); observer.await().assertValues("hello", "world").assertComplete(); }
Yields:
ReplaySubjectTest > serialized() FAILED
java.lang.AssertionError: Not completed (latch = 0, values = 2, errors = 1, completions = 0)
at io.reactivex.rxjava3.observers.BaseTestConsumer.fail(BaseTestConsumer.java:128)
at io.reactivex.rxjava3.observers.BaseTestConsumer.assertComplete(BaseTestConsumer.java:181)
at joepembe.ReplaySubjectTest.serialized(ReplaySubjectTest.java:33)
Caused by:
java.lang.ClassCastException: Cannot cast io.reactivex.rxjava3.internal.util.NotificationLite to java.lang.String
at java.base/java.lang.Class.cast(Class.java:3606)
at io.reactivex.rxjava3.internal.functions.Functions$CastToClass.apply(Functions.java:235)
at io.reactivex.rxjava3.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:58)
at io.reactivex.rxjava3.subjects.ReplaySubject$UnboundedReplayBuffer.replay(ReplaySubject.java:791)
at io.reactivex.rxjava3.subjects.ReplaySubject.subscribeActual(ReplaySubject.java:344)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
at io.reactivex.rxjava3.subjects.SerializedSubject.subscribeActual(SerializedSubject.java:49)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
at io.reactivex.rxjava3.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13263)
at joepembe.ReplaySubjectTest.lambda$serialized$1(ReplaySubjectTest.java:31)