feat: add ChannelFinder server interfaces (#4293) · googleapis/java-spanner@0b7a32e
1+/*
2+ * Copyright 2026 Google LLC
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+17+package com.google.cloud.spanner.spi.v1;
18+19+import com.google.api.core.InternalApi;
20+import com.google.api.gax.grpc.GrpcTransportChannel;
21+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
22+import com.google.api.gax.rpc.TransportChannelProvider;
23+import com.google.cloud.spanner.ErrorCode;
24+import com.google.cloud.spanner.SpannerExceptionFactory;
25+import com.google.common.annotations.VisibleForTesting;
26+import io.grpc.ConnectivityState;
27+import io.grpc.ManagedChannel;
28+import java.io.IOException;
29+import java.util.Map;
30+import java.util.concurrent.ConcurrentHashMap;
31+import java.util.concurrent.TimeUnit;
32+import java.util.concurrent.atomic.AtomicBoolean;
33+34+/**
35+ * gRPC implementation of {@link ChannelEndpointCache}.
36+ *
37+ * <p>This cache creates and caches gRPC channels per address. It uses {@link
38+ * InstantiatingGrpcChannelProvider#withEndpoint(String)} to create new channels with the same
39+ * configuration but different endpoints, avoiding race conditions.
40+ */
41+@InternalApi
42+class GrpcChannelEndpointCache implements ChannelEndpointCache {
43+44+/** Timeout for graceful channel shutdown. */
45+private static final long SHUTDOWN_TIMEOUT_SECONDS = 5;
46+47+private final InstantiatingGrpcChannelProvider baseProvider;
48+private final Map<String, GrpcChannelEndpoint> servers = new ConcurrentHashMap<>();
49+private final GrpcChannelEndpoint defaultEndpoint;
50+private final AtomicBoolean isShutdown = new AtomicBoolean(false);
51+52+/**
53+ * Creates a new cache with the given channel provider.
54+ *
55+ * @param channelProvider the base provider used to create channels. New channels for different
56+ * endpoints are created using {@link InstantiatingGrpcChannelProvider#withEndpoint(String)}.
57+ * @throws IOException if the default channel cannot be created
58+ */
59+public GrpcChannelEndpointCache(InstantiatingGrpcChannelProvider channelProvider)
60+throws IOException {
61+this.baseProvider = channelProvider;
62+String defaultEndpoint = channelProvider.getEndpoint();
63+this.defaultEndpoint = new GrpcChannelEndpoint(defaultEndpoint, channelProvider);
64+this.servers.put(defaultEndpoint, this.defaultEndpoint);
65+ }
66+67+@Override
68+public ChannelEndpoint defaultChannel() {
69+return defaultEndpoint;
70+ }
71+72+@Override
73+public ChannelEndpoint get(String address) {
74+if (isShutdown.get()) {
75+throw SpannerExceptionFactory.newSpannerException(
76+ErrorCode.FAILED_PRECONDITION, "ChannelEndpointCache has been shut down");
77+ }
78+79+return servers.computeIfAbsent(
80+address,
81+addr -> {
82+try {
83+// Create a new provider with the same config but different endpoint.
84+// This is thread-safe as withEndpoint() returns a new provider instance.
85+TransportChannelProvider newProvider = baseProvider.withEndpoint(addr);
86+return new GrpcChannelEndpoint(addr, newProvider);
87+ } catch (IOException e) {
88+throw SpannerExceptionFactory.newSpannerException(
89+ErrorCode.INTERNAL, "Failed to create channel for address: " + addr, e);
90+ }
91+ });
92+ }
93+94+@Override
95+public void evict(String address) {
96+if (defaultEndpoint.getAddress().equals(address)) {
97+return;
98+ }
99+GrpcChannelEndpoint server = servers.remove(address);
100+if (server != null) {
101+shutdownChannel(server, false);
102+ }
103+ }
104+105+@Override
106+public void shutdown() {
107+if (!isShutdown.compareAndSet(false, true)) {
108+return;
109+ }
110+for (GrpcChannelEndpoint server : servers.values()) {
111+shutdownChannel(server, true);
112+ }
113+servers.clear();
114+ }
115+116+/**
117+ * Shuts down a server's channel.
118+ *
119+ * <p>First attempts a graceful shutdown. When awaitTermination is true, waits for in-flight RPCs
120+ * to complete and forces shutdown on timeout.
121+ */
122+private void shutdownChannel(GrpcChannelEndpoint server, boolean awaitTermination) {
123+ManagedChannel channel = server.getChannel();
124+if (channel.isShutdown()) {
125+return;
126+ }
127+128+channel.shutdown();
129+if (!awaitTermination) {
130+return;
131+ }
132+try {
133+if (!channel.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
134+channel.shutdownNow();
135+ }
136+ } catch (InterruptedException e) {
137+channel.shutdownNow();
138+Thread.currentThread().interrupt();
139+ }
140+ }
141+142+/** gRPC implementation of {@link ChannelEndpoint}. */
143+static class GrpcChannelEndpoint implements ChannelEndpoint {
144+private final String address;
145+private final ManagedChannel channel;
146+147+/**
148+ * Creates a server from a channel provider.
149+ *
150+ * @param address the server address
151+ * @param provider the channel provider (must be a gRPC provider)
152+ * @throws IOException if the channel cannot be created
153+ */
154+GrpcChannelEndpoint(String address, TransportChannelProvider provider) throws IOException {
155+this.address = address;
156+TransportChannelProvider readyProvider = provider;
157+if (provider.needsHeaders()) {
158+readyProvider = provider.withHeaders(java.util.Collections.emptyMap());
159+ }
160+GrpcTransportChannel transportChannel =
161+ (GrpcTransportChannel) readyProvider.getTransportChannel();
162+this.channel = (ManagedChannel) transportChannel.getChannel();
163+ }
164+165+/**
166+ * Creates a server with an existing channel. Primarily for testing.
167+ *
168+ * @param address the server address
169+ * @param channel the managed channel
170+ */
171+@VisibleForTesting
172+GrpcChannelEndpoint(String address, ManagedChannel channel) {
173+this.address = address;
174+this.channel = channel;
175+ }
176+177+@Override
178+public String getAddress() {
179+return address;
180+ }
181+182+@Override
183+public boolean isHealthy() {
184+if (channel.isShutdown() || channel.isTerminated()) {
185+return false;
186+ }
187+// Check connectivity state without triggering a connection attempt.
188+// Some channel implementations don't support getState(), in which case
189+// we assume the channel is healthy if it's not shutdown/terminated.
190+try {
191+ConnectivityState state = channel.getState(false);
192+return state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE;
193+ } catch (UnsupportedOperationException ignore) {
194+return true;
195+ }
196+ }
197+198+@Override
199+public ManagedChannel getChannel() {
200+return channel;
201+ }
202+ }
203+}