WebClient (Reactor) attributes on Netty channel do not clear after connection release
WebClient, when used with Reactor HttpClient and pooled connection provider, sets the WebClient attributes map as a key on the underlying NioSocketChannel. This key is never unset. It is overwritten on subsequent requests. However, if requests are periodic on that channel, then the memory lives past the response. These attributes should be released and cleared when the channel is returned to the pool.
WebClient, once the request is executed in AbstractClientHttpRequest.doCommit, calls applyAttributes.
This invokes org.springframework.http.client.reactive.ReactorClientHttpRequest.applyAttributes (reactive version) that saves the entire attributes map into the Netty channel: https://github.com/spring-projects/spring-framework/blob/main/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java#L148
The following test demonstrates this behavior. This creates a WebClient instance with reactor, netty, and pooled connection provider. After a request is made, that connection is returned to the pool. However, the attributes remain set and are never cleared. If these types of requests are executed across large clusters with large pools and large attributes are set, this can lead to memory issues, especially in a batch style environment where connections are bursty and sit idle for a moment.
This was tested against 7.0.2 version of Spring Framework but for sure also exists in 6.x where we initially discovered the issue. We validated against the 7.x series to verify it didn't get fixed in newer version.
package webclient.bug.report; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Attribute; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.http.client.reactive.ClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.util.ReflectionUtils; import org.springframework.web.reactive.function.client.ClientRequest; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.ExchangeFunction; import org.springframework.web.reactive.function.client.ExchangeFunctions; import org.springframework.web.reactive.function.client.WebClient; import reactor.netty.Connection; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientConfig; import reactor.netty.internal.shaded.reactor.pool.Pool; import reactor.netty.resources.ConnectionProvider; import java.lang.reflect.Field; import java.time.Duration; import java.util.Deque; import java.util.Map; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentMap; public class BugReproducerTest { static final String ATTR_ORIGINAL_REQUEST = "original.request"; @Test public void testBug() { // build the client instance WebClient client = buildWebClient(); // make a request, blocking until completion String result = client.get().uri("https://lorem-api.com/api/lorem") .retrieve().bodyToMono(String.class) .block(); System.out.println(result); // wait for connection to return to pool try { Thread.sleep(2000); } catch (InterruptedException e) { } // verify the request attributes are not set assertNoRequestAttributes(client); } static WebClient buildWebClient() { // setup a pool to ensure connections reused ConnectionProvider pool = ConnectionProvider.builder("test") .maxIdleTime(Duration.ofMinutes(10)) .maxLifeTime(Duration.ofMinutes(60)) .maxConnections(50) .lifo() .build(); // build the reactor http client HttpClient httpClient = HttpClient.create(pool); ClientHttpConnector httpConnector = new ReactorClientHttpConnector(httpClient); ExchangeFunction exchangeFunction = ExchangeFunctions.create(httpConnector); // add a filter that populates a request attribute ExchangeFilterFunction filter = (request, next) -> { ClientRequest updated = ClientRequest.from(request) .attribute(ATTR_ORIGINAL_REQUEST, request) .build(); return next.exchange(updated); }; // build the webclient instance return WebClient.builder() .filter(filter) .exchangeFunction(exchangeFunction) .build(); } static void assertNoRequestAttributes(WebClient client) { // WebClient.exchangeFunction -> DefaultExchangeFunction // DefaultExchangeFunction.connector -> ReactorClientHttpConnector // ReactorClientHttpConnector.httpClient -> HttpClientConnect // HttpClientConnect.config -> HttpClientConfig // HttpClientConfig.connectionProvider -> HttpConnectionProvider // HttpConnectionProvider.http1ConnectionProvider -> DefaultPooledConnectionProvider // DefaultPooledConnectionProvider.channelPools -> Map<PoolKey, SimpleDequePool> // SimpleDequePool.idleResources -> ConcurrentLinkedDeque<SimpleDequePool$QueuePooledRef> // SimpleDequePool$QueuePooledRef.poolable -> DefaultPooledConnectionProvider$PooledConnection // DefaultPooledConnectionProvider$PooledConnection.channel -> NioSocketChannel // NioSocketChannel.attributes -> DefaultAttributeMap // org.springframework.http.client.reactive.ReactorClientHttpRequest.ATTRIBUTES -> WebClient attributes map ExchangeFunction exchangeFunction = getField(client, "exchangeFunction", ExchangeFunction.class); Assertions.assertNotNull(exchangeFunction); ClientHttpConnector clientHttpConnector = getField(exchangeFunction, "connector", ClientHttpConnector.class); Assertions.assertNotNull(clientHttpConnector); HttpClient httpClient = getField(clientHttpConnector, "httpClient", HttpClient.class); Assertions.assertNotNull(httpClient); HttpClientConfig httpConfig = getField(httpClient, "config", HttpClientConfig.class); Assertions.assertNotNull(httpConfig); ConnectionProvider connectionProvider = getField(httpConfig, "connectionProvider", ConnectionProvider.class); Assertions.assertNotNull(connectionProvider); ConnectionProvider httpConnectionProvider = getField(connectionProvider, "http1ConnectionProvider", ConnectionProvider.class); Assertions.assertNotNull(httpConnectionProvider); Map<?, ?> channelPools = getField(httpConnectionProvider, "channelPools", ConcurrentMap.class); Assertions.assertNotNull(channelPools); Pool<?> channelPool = (Pool) channelPools.values().iterator().next(); Assertions.assertNotNull(channelPool); Queue<?> resources = getField(channelPool, "idleResources", Deque.class); Assertions.assertNotNull(resources); Object resource = resources.iterator().next(); Assertions.assertNotNull(resource); Connection connection = getField(resource, "poolable", Connection.class); Assertions.assertNotNull(connection); NioSocketChannel channel = getField(connection, "channel", NioSocketChannel.class); Assertions.assertNotNull(channel); Attribute<Map<String, Object>> attribute = channel.attr(ReactorClientHttpConnector.ATTRIBUTES_KEY); Assertions.assertNull(attribute); // Map<String, Object> attributes = attribute.get(); // ClientRequest original = (ClientRequest) attributes.get(ATTR_ORIGINAL_REQUEST); } static <T> T getField(Object object, String name, Class<T> type) { Field field = ReflectionUtils.findField(object.getClass(), name); ReflectionUtils.makeAccessible(Objects.requireNonNull(field)); return type.cast(ReflectionUtils.getField(field, object)); } }