Fix Netty event loop resource leak on MongoClient close. by vbabanin · Pull Request #1646 · mongodb/mongo-java-driver
Expand Up
@@ -33,20 +33,20 @@
import com.mongodb.client.MongoIterable;
import com.mongodb.client.SynchronousContextProvider;
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.SocketSettings;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.DefaultClusterFactory;
import com.mongodb.internal.connection.InternalConnectionPoolSettings;
import com.mongodb.internal.connection.StreamFactory;
import com.mongodb.internal.connection.StreamFactoryFactory;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.session.ServerSessionPool;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.codecs.configuration.CodecRegistry;
Expand All
@@ -59,8 +59,6 @@
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.client.internal.Crypts.createCrypt;
import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument;
import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver;
import static com.mongodb.internal.connection.StreamFactoryHelper.getSyncStreamFactory;
import static com.mongodb.internal.event.EventListenerHelper.getCommandListener;
import static java.lang.String.format;
import static org.bson.codecs.configuration.CodecRegistries.withUuidRepresentation;
Expand All
@@ -75,14 +73,22 @@ public final class MongoClientImpl implements MongoClient {
private final MongoDriverInformation mongoDriverInformation;
private final MongoClusterImpl delegate;
private final AtomicBoolean closed;
private final AutoCloseable externalResourceCloser;
public MongoClientImpl(final MongoClientSettings settings, final MongoDriverInformation mongoDriverInformation) { this(createCluster(settings, mongoDriverInformation), mongoDriverInformation, settings, null); public MongoClientImpl(final Cluster cluster, final MongoClientSettings settings, final MongoDriverInformation mongoDriverInformation, @Nullable final AutoCloseable externalResourceCloser) { this(cluster, mongoDriverInformation, settings, externalResourceCloser, null); }
public MongoClientImpl(final Cluster cluster, final MongoDriverInformation mongoDriverInformation, private MongoClientImpl(final Cluster cluster, final MongoDriverInformation mongoDriverInformation, final MongoClientSettings settings, @Nullable final OperationExecutor operationExecutor) { @Nullable final AutoCloseable externalResourceCloser, @Nullable final OperationExecutor operationExecutor) {
this.externalResourceCloser = externalResourceCloser; this.settings = notNull("settings", settings); this.mongoDriverInformation = mongoDriverInformation; AutoEncryptionSettings autoEncryptionSettings = settings.getAutoEncryptionSettings(); Expand Down Expand Up @@ -114,6 +120,13 @@ public void close() { } delegate.getServerSessionPool().close(); delegate.getCluster().close(); if (externalResourceCloser != null) { try { externalResourceCloser.close(); } catch (Exception e) { LOGGER.warn("Exception closing resource", e); } } } }
Expand Down Expand Up @@ -287,21 +300,24 @@ public ClientBulkWriteResult bulkWrite( }
private static Cluster createCluster(final MongoClientSettings settings, @Nullable final MongoDriverInformation mongoDriverInformation) { @Nullable final MongoDriverInformation mongoDriverInformation, final StreamFactory streamFactory, final StreamFactory heartbeatStreamFactory) { notNull("settings", settings); return new DefaultClusterFactory().createCluster(settings.getClusterSettings(), settings.getServerSettings(), settings.getConnectionPoolSettings(), InternalConnectionPoolSettings.builder().build(), TimeoutSettings.create(settings), getStreamFactory(settings, false), TimeoutSettings.createHeartbeatSettings(settings), getStreamFactory(settings, true), TimeoutSettings.create(settings), streamFactory, TimeoutSettings.createHeartbeatSettings(settings), heartbeatStreamFactory, settings.getCredential(), settings.getLoggerSettings(), getCommandListener(settings.getCommandListeners()), settings.getApplicationName(), mongoDriverInformation, settings.getCompressorList(), settings.getServerApi(), settings.getDnsClient()); }
private static StreamFactory getStreamFactory(final MongoClientSettings settings, final boolean isHeartbeat) { private static StreamFactory getStreamFactory( final StreamFactoryFactory streamFactoryFactory, final MongoClientSettings settings, final boolean isHeartbeat) { SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(); InetAddressResolver inetAddressResolver = getInetAddressResolver(settings); return getSyncStreamFactory(settings, inetAddressResolver, socketSettings); return streamFactoryFactory.create(socketSettings, settings.getSslSettings()); }
public Cluster getCluster() { Expand Down
public MongoClientImpl(final MongoClientSettings settings, final MongoDriverInformation mongoDriverInformation) { this(createCluster(settings, mongoDriverInformation), mongoDriverInformation, settings, null); public MongoClientImpl(final Cluster cluster, final MongoClientSettings settings, final MongoDriverInformation mongoDriverInformation, @Nullable final AutoCloseable externalResourceCloser) { this(cluster, mongoDriverInformation, settings, externalResourceCloser, null); }
public MongoClientImpl(final Cluster cluster, final MongoDriverInformation mongoDriverInformation, private MongoClientImpl(final Cluster cluster, final MongoDriverInformation mongoDriverInformation, final MongoClientSettings settings, @Nullable final OperationExecutor operationExecutor) { @Nullable final AutoCloseable externalResourceCloser, @Nullable final OperationExecutor operationExecutor) {
this.externalResourceCloser = externalResourceCloser; this.settings = notNull("settings", settings); this.mongoDriverInformation = mongoDriverInformation; AutoEncryptionSettings autoEncryptionSettings = settings.getAutoEncryptionSettings(); Expand Down Expand Up @@ -114,6 +120,13 @@ public void close() { } delegate.getServerSessionPool().close(); delegate.getCluster().close(); if (externalResourceCloser != null) { try { externalResourceCloser.close(); } catch (Exception e) { LOGGER.warn("Exception closing resource", e); } } } }
Expand Down Expand Up @@ -287,21 +300,24 @@ public ClientBulkWriteResult bulkWrite( }
private static Cluster createCluster(final MongoClientSettings settings, @Nullable final MongoDriverInformation mongoDriverInformation) { @Nullable final MongoDriverInformation mongoDriverInformation, final StreamFactory streamFactory, final StreamFactory heartbeatStreamFactory) { notNull("settings", settings); return new DefaultClusterFactory().createCluster(settings.getClusterSettings(), settings.getServerSettings(), settings.getConnectionPoolSettings(), InternalConnectionPoolSettings.builder().build(), TimeoutSettings.create(settings), getStreamFactory(settings, false), TimeoutSettings.createHeartbeatSettings(settings), getStreamFactory(settings, true), TimeoutSettings.create(settings), streamFactory, TimeoutSettings.createHeartbeatSettings(settings), heartbeatStreamFactory, settings.getCredential(), settings.getLoggerSettings(), getCommandListener(settings.getCommandListeners()), settings.getApplicationName(), mongoDriverInformation, settings.getCompressorList(), settings.getServerApi(), settings.getDnsClient()); }
private static StreamFactory getStreamFactory(final MongoClientSettings settings, final boolean isHeartbeat) { private static StreamFactory getStreamFactory( final StreamFactoryFactory streamFactoryFactory, final MongoClientSettings settings, final boolean isHeartbeat) { SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(); InetAddressResolver inetAddressResolver = getInetAddressResolver(settings); return getSyncStreamFactory(settings, inetAddressResolver, socketSettings); return streamFactoryFactory.create(socketSettings, settings.getSslSettings()); }
public Cluster getCluster() { Expand Down