WebClient (Reactor) attributes on Netty channel do not clear after connection release

WebClient, when used with Reactor HttpClient and pooled connection provider, sets the WebClient attributes map as a key on the underlying NioSocketChannel. This key is never unset. It is overwritten on subsequent requests. However, if requests are periodic on that channel, then the memory lives past the response. These attributes should be released and cleared when the channel is returned to the pool.

WebClient, once the request is executed in AbstractClientHttpRequest.doCommit, calls applyAttributes.
This invokes org.springframework.http.client.reactive.ReactorClientHttpRequest.applyAttributes (reactive version) that saves the entire attributes map into the Netty channel: https://github.com/spring-projects/spring-framework/blob/main/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java#L148

The following test demonstrates this behavior. This creates a WebClient instance with reactor, netty, and pooled connection provider. After a request is made, that connection is returned to the pool. However, the attributes remain set and are never cleared. If these types of requests are executed across large clusters with large pools and large attributes are set, this can lead to memory issues, especially in a batch style environment where connections are bursty and sit idle for a moment.

This was tested against 7.0.2 version of Spring Framework but for sure also exists in 6.x where we initially discovered the issue. We validated against the 7.x series to verify it didn't get fixed in newer version.

package webclient.bug.report;

import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Attribute;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeFunctions;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.internal.shaded.reactor.pool.Pool;
import reactor.netty.resources.ConnectionProvider;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Deque;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;

public class BugReproducerTest {

    static final String ATTR_ORIGINAL_REQUEST = "original.request";

    @Test
    public void testBug() {
        // build the client instance
        WebClient client = buildWebClient();

        // make a request, blocking until completion
        String result = client.get().uri("https://lorem-api.com/api/lorem")
                .retrieve().bodyToMono(String.class)
                .block();
        System.out.println(result);

        // wait for connection to return to pool
        try { Thread.sleep(2000); } catch (InterruptedException e) { }

        // verify the request attributes are not set
        assertNoRequestAttributes(client);
    }

    static WebClient buildWebClient() {
        // setup a pool to ensure connections reused
        ConnectionProvider pool = ConnectionProvider.builder("test")
                .maxIdleTime(Duration.ofMinutes(10))
                .maxLifeTime(Duration.ofMinutes(60))
                .maxConnections(50)
                .lifo()
                .build();

        // build the reactor http client
        HttpClient httpClient = HttpClient.create(pool);
        ClientHttpConnector httpConnector = new ReactorClientHttpConnector(httpClient);
        ExchangeFunction exchangeFunction = ExchangeFunctions.create(httpConnector);

        // add a filter that populates a request attribute
        ExchangeFilterFunction filter = (request, next) -> {
            ClientRequest updated = ClientRequest.from(request)
                    .attribute(ATTR_ORIGINAL_REQUEST, request)
                    .build();
            return next.exchange(updated);
        };

        // build the webclient instance
        return WebClient.builder()
                .filter(filter)
                .exchangeFunction(exchangeFunction)
                .build();
    }

    static void assertNoRequestAttributes(WebClient client) {
        // WebClient.exchangeFunction -> DefaultExchangeFunction
        // DefaultExchangeFunction.connector -> ReactorClientHttpConnector
        // ReactorClientHttpConnector.httpClient -> HttpClientConnect
        // HttpClientConnect.config -> HttpClientConfig
        // HttpClientConfig.connectionProvider -> HttpConnectionProvider
        // HttpConnectionProvider.http1ConnectionProvider -> DefaultPooledConnectionProvider
        // DefaultPooledConnectionProvider.channelPools -> Map<PoolKey, SimpleDequePool>
        // SimpleDequePool.idleResources -> ConcurrentLinkedDeque<SimpleDequePool$QueuePooledRef>
        // SimpleDequePool$QueuePooledRef.poolable -> DefaultPooledConnectionProvider$PooledConnection
        // DefaultPooledConnectionProvider$PooledConnection.channel -> NioSocketChannel
        // NioSocketChannel.attributes -> DefaultAttributeMap
        //    org.springframework.http.client.reactive.ReactorClientHttpRequest.ATTRIBUTES -> WebClient attributes map

        ExchangeFunction exchangeFunction = getField(client, "exchangeFunction", ExchangeFunction.class);
        Assertions.assertNotNull(exchangeFunction);

        ClientHttpConnector clientHttpConnector = getField(exchangeFunction, "connector", ClientHttpConnector.class);
        Assertions.assertNotNull(clientHttpConnector);

        HttpClient httpClient = getField(clientHttpConnector, "httpClient", HttpClient.class);
        Assertions.assertNotNull(httpClient);

        HttpClientConfig httpConfig = getField(httpClient, "config", HttpClientConfig.class);
        Assertions.assertNotNull(httpConfig);

        ConnectionProvider connectionProvider = getField(httpConfig, "connectionProvider", ConnectionProvider.class);
        Assertions.assertNotNull(connectionProvider);

        ConnectionProvider httpConnectionProvider = getField(connectionProvider, "http1ConnectionProvider", ConnectionProvider.class);
        Assertions.assertNotNull(httpConnectionProvider);

        Map<?, ?> channelPools = getField(httpConnectionProvider, "channelPools", ConcurrentMap.class);
        Assertions.assertNotNull(channelPools);

        Pool<?> channelPool = (Pool) channelPools.values().iterator().next();
        Assertions.assertNotNull(channelPool);

        Queue<?> resources = getField(channelPool, "idleResources", Deque.class);
        Assertions.assertNotNull(resources);

        Object resource = resources.iterator().next();
        Assertions.assertNotNull(resource);

        Connection connection = getField(resource, "poolable", Connection.class);
        Assertions.assertNotNull(connection);

        NioSocketChannel channel = getField(connection, "channel", NioSocketChannel.class);
        Assertions.assertNotNull(channel);

        Attribute<Map<String, Object>> attribute = channel.attr(ReactorClientHttpConnector.ATTRIBUTES_KEY);
        Assertions.assertNull(attribute);

//        Map<String, Object> attributes = attribute.get();
//        ClientRequest original = (ClientRequest) attributes.get(ATTR_ORIGINAL_REQUEST);
    }

    static <T> T getField(Object object, String name, Class<T> type) {
        Field field = ReflectionUtils.findField(object.getClass(), name);
        ReflectionUtils.makeAccessible(Objects.requireNonNull(field));
        return type.cast(ReflectionUtils.getField(field, object));
    }
}