xds: Convert CdsLb to XdsDepManager · grpc/grpc-java@297ab05
@@ -18,6 +18,7 @@
18181919import static com.google.common.base.Preconditions.checkArgument;
2020import static com.google.common.base.Preconditions.checkNotNull;
21+import static com.google.common.base.Preconditions.checkState;
2122import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22232324import com.google.common.annotations.VisibleForTesting;
@@ -34,8 +35,6 @@
3435import io.grpc.xds.client.XdsClient;
3536import io.grpc.xds.client.XdsClient.ResourceWatcher;
3637import io.grpc.xds.client.XdsResourceType;
37-import java.io.Closeable;
38-import java.io.IOException;
3938import java.util.Collections;
4039import java.util.HashMap;
4140import java.util.HashSet;
@@ -56,39 +55,43 @@
5655final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
5756public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
5857public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
59-private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
58+private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
6059private final String listenerName;
6160private final XdsClient xdsClient;
62-private final XdsConfigWatcher xdsConfigWatcher;
6361private final SynchronizationContext syncContext;
6462private final String dataPlaneAuthority;
63+private XdsConfigWatcher xdsConfigWatcher;
65646665private StatusOr<XdsConfig> lastUpdate = null;
6766private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
6867private final Set<ClusterSubscription> subscriptions = new HashSet<>();
696870-XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
69+XdsDependencyManager(XdsClient xdsClient,
7170SynchronizationContext syncContext, String dataPlaneAuthority,
7271String listenerName, NameResolver.Args nameResolverArgs,
7372ScheduledExecutorService scheduler) {
7473this.listenerName = checkNotNull(listenerName, "listenerName");
7574this.xdsClient = checkNotNull(xdsClient, "xdsClient");
76-this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
7775this.syncContext = checkNotNull(syncContext, "syncContext");
7876this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
7977checkNotNull(nameResolverArgs, "nameResolverArgs");
8078checkNotNull(scheduler, "scheduler");
81-82-// start the ball rolling
83-syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
8479 }
85808681public static String toContextStr(String typeName, String resourceName) {
8782return typeName + " resource " + resourceName;
8883 }
898485+public void start(XdsConfigWatcher xdsConfigWatcher) {
86+checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
87+this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
88+// start the ball rolling
89+syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
90+ }
91+9092@Override
91-public Closeable subscribeToCluster(String clusterName) {
93+public XdsConfig.Subscription subscribeToCluster(String clusterName) {
94+checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
9295checkNotNull(clusterName, "clusterName");
9396ClusterSubscription subscription = new ClusterSubscription(clusterName);
9497@@ -291,10 +294,17 @@ private static void addConfigForCluster(
291294addConfigForCluster(clusters, childCluster, ancestors, tracer);
292295StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
293296if (!config.hasValue()) {
294-clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
295-"Unable to get leaves for " + clusterName + ": "
296- + config.getStatus().getDescription())));
297-return;
297+// gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
298+// exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
299+// watchers reports a transient ADS stream error, the policy should report that it is in
300+// TRANSIENT_FAILURE if it has never passed a config to its child.
301+//
302+// But there's currently disagreement about whether that is actually what we want, and
303+// that was not originally implemented in gRPC Java. So we're keeping Java's old
304+// behavior for now and only failing the "leaves" (which is a bit arbitrary for a
305+// cycle).
306+leafNames.add(childCluster);
307+continue;
298308 }
299309XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
300310if (children instanceof AggregateConfig) {
@@ -325,6 +335,11 @@ private static void addConfigForCluster(
325335default:
326336throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
327337 }
338+if (clusters.containsKey(clusterName)) {
339+// If a cycle is detected, we'll have detected it while recursing, so now there will be a key
340+// present. We don't want to overwrite it with a non-error value.
341+return;
342+ }
328343clusters.put(clusterName, StatusOr.fromValue(
329344new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
330345 }
@@ -406,7 +421,7 @@ public interface XdsConfigWatcher {
406421void onUpdate(StatusOr<XdsConfig> config);
407422 }
408423409-private final class ClusterSubscription implements Closeable {
424+private final class ClusterSubscription implements XdsConfig.Subscription {
410425private final String clusterName;
411426boolean closed; // Accessed from syncContext
412427@@ -419,7 +434,7 @@ String getClusterName() {
419434 }
420435421436@Override
422-public void close() throws IOException {
437+public void close() {
423438releaseSubscription(this);
424439 }
425440 }