Clean up ReactiveSupport by hantsy · Pull Request #3908 · graphql-java/graphql-java
Expand Up
@@ -2,9 +2,8 @@
import graphql.DuckTyped; import graphql.Internal; import org.reactivestreams.FlowAdapters; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription;
import java.util.Objects; import java.util.concurrent.CompletableFuture; Expand All @@ -26,17 +25,11 @@ public static Object fetchedObject(Object fetchedObject) { return flowPublisherToCF((Flow.Publisher<?>) fetchedObject); } if (fetchedObject instanceof Publisher) { return reactivePublisherToCF((Publisher<?>) fetchedObject); return flowPublisherToCF(FlowAdapters.toFlowPublisher((Publisher<?>) fetchedObject)); } return fetchedObject; }
private static CompletableFuture<Object> reactivePublisherToCF(Publisher<?> publisher) { ReactivePublisherToCompletableFuture<Object> cf = new ReactivePublisherToCompletableFuture<>(); publisher.subscribe(cf); return cf; }
private static CompletableFuture<Object> flowPublisherToCF(Flow.Publisher<?> publisher) { FlowPublisherToCompletableFuture<Object> cf = new FlowPublisherToCompletableFuture<>(); publisher.subscribe(cf); Expand Down Expand Up @@ -116,39 +109,6 @@ void onCompleteImpl() { } }
private static class ReactivePublisherToCompletableFuture<T> extends PublisherToCompletableFuture<T, Subscription> implements Subscriber<T> {
@Override void doSubscriptionCancel(Subscription subscription) { subscription.cancel(); }
@Override void doSubscriptionRequest(Subscription subscription, long n) { subscription.request(n); }
@Override public void onSubscribe(Subscription s) { onSubscribeImpl(s); }
@Override public void onNext(T t) { onNextImpl(t); }
@Override public void onError(Throwable t) { onErrorImpl(t); }
@Override public void onComplete() { onCompleteImpl(); } }
private static class FlowPublisherToCompletableFuture<T> extends PublisherToCompletableFuture<T, Flow.Subscription> implements Flow.Subscriber<T> {
@Override Expand Down
import graphql.DuckTyped; import graphql.Internal; import org.reactivestreams.FlowAdapters; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription;
import java.util.Objects; import java.util.concurrent.CompletableFuture; Expand All @@ -26,17 +25,11 @@ public static Object fetchedObject(Object fetchedObject) { return flowPublisherToCF((Flow.Publisher<?>) fetchedObject); } if (fetchedObject instanceof Publisher) { return reactivePublisherToCF((Publisher<?>) fetchedObject); return flowPublisherToCF(FlowAdapters.toFlowPublisher((Publisher<?>) fetchedObject)); } return fetchedObject; }
private static CompletableFuture<Object> reactivePublisherToCF(Publisher<?> publisher) { ReactivePublisherToCompletableFuture<Object> cf = new ReactivePublisherToCompletableFuture<>(); publisher.subscribe(cf); return cf; }
private static CompletableFuture<Object> flowPublisherToCF(Flow.Publisher<?> publisher) { FlowPublisherToCompletableFuture<Object> cf = new FlowPublisherToCompletableFuture<>(); publisher.subscribe(cf); Expand Down Expand Up @@ -116,39 +109,6 @@ void onCompleteImpl() { } }
private static class ReactivePublisherToCompletableFuture<T> extends PublisherToCompletableFuture<T, Subscription> implements Subscriber<T> {
@Override void doSubscriptionCancel(Subscription subscription) { subscription.cancel(); }
@Override void doSubscriptionRequest(Subscription subscription, long n) { subscription.request(n); }
@Override public void onSubscribe(Subscription s) { onSubscribeImpl(s); }
@Override public void onNext(T t) { onNextImpl(t); }
@Override public void onError(Throwable t) { onErrorImpl(t); }
@Override public void onComplete() { onCompleteImpl(); } }
private static class FlowPublisherToCompletableFuture<T> extends PublisherToCompletableFuture<T, Flow.Subscription> implements Flow.Subscriber<T> {
@Override Expand Down