Updated the documentation of the CacheMap and also some refactoring by bbakerman · Pull Request #244 · graphql-java/java-dataloader

Expand Up @@ -49,7 +49,7 @@ @Internal class DataLoaderHelper<K, V> {
static class LoaderQueueEntry<K, V> { private static class LoaderQueueEntry<K, V> {
final K key; final CompletableFuture<V> value; Expand Down Expand Up @@ -155,11 +155,8 @@ CompletableFuture<V> load(K key, Object loadContext) { try { CompletableFuture<V> cachedFuture = futureCache.get(cacheKey); if (cachedFuture != null) { // We already have a promise for this key, no need to check value cache or queue up load stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); ctx.onDispatched(); cachedFuture.whenComplete(ctx::onCompleted); return cachedFuture; // We already have a promise for this key, no need to check value cache or queue this load return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture); } } catch (Exception ignored) { } Expand All @@ -170,11 +167,8 @@ CompletableFuture<V> load(K key, Object loadContext) { if (futureCachingEnabled) { CompletableFuture<V> cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture); if (cachedFuture != null) { // another thread was faster and created a matching CF ... hence this is really a cachehit and we are done stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); ctx.onDispatched(); cachedFuture.whenComplete(ctx::onCompleted); return cachedFuture; // another thread was faster and created a matching CF ... hence this is really a cache hit and we are done return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture); } } addEntryToLoaderQueue(key, loadCallFuture, loadContext); Expand All @@ -186,12 +180,9 @@ CompletableFuture<V> load(K key, Object loadContext) { CompletableFuture<V> cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture); if (cachedFuture != null) { // another thread was faster and the loader was invoked twice with the same key // we are disregarding the resul of our dispatch call and use the already cached value // we are disregarding the result of our dispatch call and use the already cached value // meaning this is a cache hit and we are done stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); ctx.onDispatched(); cachedFuture.whenComplete(ctx::onCompleted); return cachedFuture; return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture); } } } Expand All @@ -201,6 +192,13 @@ CompletableFuture<V> load(K key, Object loadContext) { return loadCallFuture; }
private CompletableFuture<V> incrementCacheHitAndReturnCF(DataLoaderInstrumentationContext<Object> ctx, K key, Object loadContext, CompletableFuture<V> cachedFuture) { stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); ctx.onDispatched(); cachedFuture.whenComplete(ctx::onCompleted); return cachedFuture; }
private void addEntryToLoaderQueue(K key, CompletableFuture<V> future, Object loadContext) { while (true) { LoaderQueueEntry<K, V> prev = loaderQueue.get(); Expand All @@ -223,6 +221,7 @@ Object getCacheKeyWithContext(K key, Object context) { loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key; }
@SuppressWarnings("unchecked") DispatchResult<V> dispatch() { DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader));
Expand All @@ -232,6 +231,8 @@ DispatchResult<V> dispatch() { while (true) { loaderQueueEntryHead = loaderQueue.get(); if (loaderQueue.compareAndSet(loaderQueueEntryHead, null)) { // one or more threads competed and this one won. This thread holds // the loader queue root in the local variable loaderQueueEntryHead break; } } Expand Down