feat: add ChannelFinder server interfaces (#4293) · googleapis/java-spanner@0b7a32e

1+

/*

2+

* Copyright 2026 Google LLC

3+

*

4+

* Licensed under the Apache License, Version 2.0 (the "License");

5+

* you may not use this file except in compliance with the License.

6+

* You may obtain a copy of the License at

7+

*

8+

* http://www.apache.org/licenses/LICENSE-2.0

9+

*

10+

* Unless required by applicable law or agreed to in writing, software

11+

* distributed under the License is distributed on an "AS IS" BASIS,

12+

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

13+

* See the License for the specific language governing permissions and

14+

* limitations under the License.

15+

*/

16+17+

package com.google.cloud.spanner.spi.v1;

18+19+

import com.google.api.core.InternalApi;

20+

import com.google.api.gax.grpc.GrpcTransportChannel;

21+

import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;

22+

import com.google.api.gax.rpc.TransportChannelProvider;

23+

import com.google.cloud.spanner.ErrorCode;

24+

import com.google.cloud.spanner.SpannerExceptionFactory;

25+

import com.google.common.annotations.VisibleForTesting;

26+

import io.grpc.ConnectivityState;

27+

import io.grpc.ManagedChannel;

28+

import java.io.IOException;

29+

import java.util.Map;

30+

import java.util.concurrent.ConcurrentHashMap;

31+

import java.util.concurrent.TimeUnit;

32+

import java.util.concurrent.atomic.AtomicBoolean;

33+34+

/**

35+

* gRPC implementation of {@link ChannelEndpointCache}.

36+

*

37+

* <p>This cache creates and caches gRPC channels per address. It uses {@link

38+

* InstantiatingGrpcChannelProvider#withEndpoint(String)} to create new channels with the same

39+

* configuration but different endpoints, avoiding race conditions.

40+

*/

41+

@InternalApi

42+

class GrpcChannelEndpointCache implements ChannelEndpointCache {

43+44+

/** Timeout for graceful channel shutdown. */

45+

private static final long SHUTDOWN_TIMEOUT_SECONDS = 5;

46+47+

private final InstantiatingGrpcChannelProvider baseProvider;

48+

private final Map<String, GrpcChannelEndpoint> servers = new ConcurrentHashMap<>();

49+

private final GrpcChannelEndpoint defaultEndpoint;

50+

private final AtomicBoolean isShutdown = new AtomicBoolean(false);

51+52+

/**

53+

* Creates a new cache with the given channel provider.

54+

*

55+

* @param channelProvider the base provider used to create channels. New channels for different

56+

* endpoints are created using {@link InstantiatingGrpcChannelProvider#withEndpoint(String)}.

57+

* @throws IOException if the default channel cannot be created

58+

*/

59+

public GrpcChannelEndpointCache(InstantiatingGrpcChannelProvider channelProvider)

60+

throws IOException {

61+

this.baseProvider = channelProvider;

62+

String defaultEndpoint = channelProvider.getEndpoint();

63+

this.defaultEndpoint = new GrpcChannelEndpoint(defaultEndpoint, channelProvider);

64+

this.servers.put(defaultEndpoint, this.defaultEndpoint);

65+

}

66+67+

@Override

68+

public ChannelEndpoint defaultChannel() {

69+

return defaultEndpoint;

70+

}

71+72+

@Override

73+

public ChannelEndpoint get(String address) {

74+

if (isShutdown.get()) {

75+

throw SpannerExceptionFactory.newSpannerException(

76+

ErrorCode.FAILED_PRECONDITION, "ChannelEndpointCache has been shut down");

77+

}

78+79+

return servers.computeIfAbsent(

80+

address,

81+

addr -> {

82+

try {

83+

// Create a new provider with the same config but different endpoint.

84+

// This is thread-safe as withEndpoint() returns a new provider instance.

85+

TransportChannelProvider newProvider = baseProvider.withEndpoint(addr);

86+

return new GrpcChannelEndpoint(addr, newProvider);

87+

} catch (IOException e) {

88+

throw SpannerExceptionFactory.newSpannerException(

89+

ErrorCode.INTERNAL, "Failed to create channel for address: " + addr, e);

90+

}

91+

});

92+

}

93+94+

@Override

95+

public void evict(String address) {

96+

if (defaultEndpoint.getAddress().equals(address)) {

97+

return;

98+

}

99+

GrpcChannelEndpoint server = servers.remove(address);

100+

if (server != null) {

101+

shutdownChannel(server, false);

102+

}

103+

}

104+105+

@Override

106+

public void shutdown() {

107+

if (!isShutdown.compareAndSet(false, true)) {

108+

return;

109+

}

110+

for (GrpcChannelEndpoint server : servers.values()) {

111+

shutdownChannel(server, true);

112+

}

113+

servers.clear();

114+

}

115+116+

/**

117+

* Shuts down a server's channel.

118+

*

119+

* <p>First attempts a graceful shutdown. When awaitTermination is true, waits for in-flight RPCs

120+

* to complete and forces shutdown on timeout.

121+

*/

122+

private void shutdownChannel(GrpcChannelEndpoint server, boolean awaitTermination) {

123+

ManagedChannel channel = server.getChannel();

124+

if (channel.isShutdown()) {

125+

return;

126+

}

127+128+

channel.shutdown();

129+

if (!awaitTermination) {

130+

return;

131+

}

132+

try {

133+

if (!channel.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {

134+

channel.shutdownNow();

135+

}

136+

} catch (InterruptedException e) {

137+

channel.shutdownNow();

138+

Thread.currentThread().interrupt();

139+

}

140+

}

141+142+

/** gRPC implementation of {@link ChannelEndpoint}. */

143+

static class GrpcChannelEndpoint implements ChannelEndpoint {

144+

private final String address;

145+

private final ManagedChannel channel;

146+147+

/**

148+

* Creates a server from a channel provider.

149+

*

150+

* @param address the server address

151+

* @param provider the channel provider (must be a gRPC provider)

152+

* @throws IOException if the channel cannot be created

153+

*/

154+

GrpcChannelEndpoint(String address, TransportChannelProvider provider) throws IOException {

155+

this.address = address;

156+

TransportChannelProvider readyProvider = provider;

157+

if (provider.needsHeaders()) {

158+

readyProvider = provider.withHeaders(java.util.Collections.emptyMap());

159+

}

160+

GrpcTransportChannel transportChannel =

161+

(GrpcTransportChannel) readyProvider.getTransportChannel();

162+

this.channel = (ManagedChannel) transportChannel.getChannel();

163+

}

164+165+

/**

166+

* Creates a server with an existing channel. Primarily for testing.

167+

*

168+

* @param address the server address

169+

* @param channel the managed channel

170+

*/

171+

@VisibleForTesting

172+

GrpcChannelEndpoint(String address, ManagedChannel channel) {

173+

this.address = address;

174+

this.channel = channel;

175+

}

176+177+

@Override

178+

public String getAddress() {

179+

return address;

180+

}

181+182+

@Override

183+

public boolean isHealthy() {

184+

if (channel.isShutdown() || channel.isTerminated()) {

185+

return false;

186+

}

187+

// Check connectivity state without triggering a connection attempt.

188+

// Some channel implementations don't support getState(), in which case

189+

// we assume the channel is healthy if it's not shutdown/terminated.

190+

try {

191+

ConnectivityState state = channel.getState(false);

192+

return state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE;

193+

} catch (UnsupportedOperationException ignore) {

194+

return true;

195+

}

196+

}

197+198+

@Override

199+

public ManagedChannel getChannel() {

200+

return channel;

201+

}

202+

}

203+

}