Xds: Aggregate cluster fixes (A75) (#12186) · grpc/grpc-java@7e982e4
@@ -18,10 +18,10 @@
18181919import static com.google.common.base.Preconditions.checkNotNull;
2020import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21+import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME;
2122import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
23+import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
222423-import com.google.common.annotations.VisibleForTesting;
24-import com.google.common.collect.ImmutableList;
2525import com.google.errorprone.annotations.CheckReturnValue;
2626import io.grpc.InternalLogId;
2727import io.grpc.LoadBalancer;
@@ -33,6 +33,7 @@
3333import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
3434import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
3535import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
36+import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
3637import io.grpc.xds.XdsClusterResource.CdsUpdate;
3738import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
3839import io.grpc.xds.XdsConfig.Subscription;
@@ -41,10 +42,11 @@
4142import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
4243import io.grpc.xds.client.XdsLogger;
4344import io.grpc.xds.client.XdsLogger.XdsLogLevel;
44-import java.util.ArrayList;
4545import java.util.Arrays;
4646import java.util.Collections;
47+import java.util.HashMap;
4748import java.util.List;
49+import java.util.Map;
48504951/**
5052 * Load balancer for cds_experimental LB policy. One instance per top-level cluster.
@@ -55,19 +57,15 @@ final class CdsLoadBalancer2 extends LoadBalancer {
5557private final XdsLogger logger;
5658private final Helper helper;
5759private final LoadBalancerRegistry lbRegistry;
60+private GracefulSwitchLoadBalancer delegate;
5861// Following fields are effectively final.
5962private String clusterName;
6063private Subscription clusterSubscription;
61-private LoadBalancer childLb;
626463-CdsLoadBalancer2(Helper helper) {
64-this(helper, LoadBalancerRegistry.getDefaultRegistry());
65- }
66-67-@VisibleForTesting
6865CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
6966this.helper = checkNotNull(helper, "helper");
7067this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
68+this.delegate = new GracefulSwitchLoadBalancer(helper);
7169logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
7270logger.log(XdsLogLevel.INFO, "Created");
7371 }
@@ -91,7 +89,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
9189if (clusterSubscription == null) {
9290// Should be impossible, because XdsDependencyManager wouldn't have generated this
9391return fail(Status.INTERNAL.withDescription(
94-errorPrefix() + "Unable to find non-dynamic root cluster"));
92+errorPrefix() + "Unable to find non-dynamic cluster"));
9593 }
9694// The dynamic cluster must not have loaded yet
9795return Status.OK;
@@ -100,42 +98,25 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
10098return fail(clusterConfigOr.getStatus());
10199 }
102100XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
103-List<String> leafNames;
104-if (clusterConfig.getChildren() instanceof AggregateConfig) {
105-leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
106- } else if (clusterConfig.getChildren() instanceof EndpointConfig) {
107-leafNames = ImmutableList.of(clusterName);
108- } else {
109-return fail(Status.INTERNAL.withDescription(
110-errorPrefix() + "Unexpected cluster children type: "
111- + clusterConfig.getChildren().getClass()));
112- }
113-if (leafNames.isEmpty()) {
114-// Should be impossible, because XdsClusterResource validated this
115-return fail(Status.UNAVAILABLE.withDescription(
116-errorPrefix() + "Zero leaf clusters for root cluster " + clusterName));
117- }
118101119-Status noneFoundError = Status.INTERNAL
120- .withDescription(errorPrefix() + "No leaves and no error; this is a bug");
121-List<DiscoveryMechanism> instances = new ArrayList<>();
122-for (String leafName : leafNames) {
123-StatusOr<XdsClusterConfig> leafConfigOr = xdsConfig.getClusters().get(leafName);
124-if (!leafConfigOr.hasValue()) {
125-noneFoundError = leafConfigOr.getStatus();
126-continue;
127- }
128-if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) {
129-noneFoundError = Status.INTERNAL.withDescription(
130-errorPrefix() + "Unexpected child " + leafName + " cluster children type: "
131- + leafConfigOr.getValue().getChildren().getClass());
132-continue;
102+NameResolver.ConfigOrError configOrError;
103+Object gracefulConfig;
104+if (clusterConfig.getChildren() instanceof EndpointConfig) {
105+// The LB policy config is provided in service_config.proto/JSON format.
106+configOrError =
107+GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
108+Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()),
109+lbRegistry);
110+if (configOrError.getError() != null) {
111+// Should be impossible, because XdsClusterResource validated this
112+return fail(Status.INTERNAL.withDescription(
113+errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
133114 }
134-CdsUpdate result = leafConfigOr.getValue().getClusterResource();
115+CdsUpdate result = clusterConfig.getClusterResource();
135116DiscoveryMechanism instance;
136117if (result.clusterType() == ClusterType.EDS) {
137118instance = DiscoveryMechanism.forEds(
138-leafName,
119+clusterName,
139120result.edsServiceName(),
140121result.lrsServerInfo(),
141122result.maxConcurrentRequests(),
@@ -144,45 +125,49 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
144125result.outlierDetection());
145126 } else {
146127instance = DiscoveryMechanism.forLogicalDns(
147-leafName,
128+clusterName,
148129result.dnsHostName(),
149130result.lrsServerInfo(),
150131result.maxConcurrentRequests(),
151132result.upstreamTlsContext(),
152133result.filterMetadata());
153134 }
154-instances.add(instance);
155- }
156-if (instances.isEmpty()) {
157-return fail(noneFoundError);
158- }
159-160-// The LB policy config is provided in service_config.proto/JSON format.
161-NameResolver.ConfigOrError configOrError =
162-GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
163-Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry);
164-if (configOrError.getError() != null) {
165-// Should be impossible, because XdsClusterResource validated this
135+gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
136+lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME),
137+new ClusterResolverConfig(
138+instance,
139+configOrError.getConfig(),
140+clusterConfig.getClusterResource().isHttp11ProxyAvailable()));
141+ } else if (clusterConfig.getChildren() instanceof AggregateConfig) {
142+Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
143+List<String> leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
144+for (String childCluster: leafClusters) {
145+priorityChildConfigs.put(childCluster,
146+new PriorityChildConfig(
147+GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
148+lbRegistry.getProvider(CDS_POLICY_NAME),
149+new CdsConfig(childCluster)),
150+false));
151+ }
152+gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
153+lbRegistry.getProvider(PRIORITY_POLICY_NAME),
154+new PriorityLoadBalancerProvider.PriorityLbConfig(
155+Collections.unmodifiableMap(priorityChildConfigs), leafClusters));
156+ } else {
166157return fail(Status.INTERNAL.withDescription(
167-errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
158+errorPrefix() + "Unexpected cluster children type: "
159+ + clusterConfig.getChildren().getClass()));
168160 }
169161170-ClusterResolverConfig config = new ClusterResolverConfig(
171-Collections.unmodifiableList(instances),
172-configOrError.getConfig(),
173-clusterConfig.getClusterResource().isHttp11ProxyAvailable());
174-if (childLb == null) {
175-childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
176- }
177-return childLb.acceptResolvedAddresses(
178-resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
162+return delegate.acceptResolvedAddresses(
163+resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build());
179164 }
180165181166@Override
182167public void handleNameResolutionError(Status error) {
183168logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
184-if (childLb != null) {
185-childLb.handleNameResolutionError(error);
169+if (delegate != null) {
170+delegate.handleNameResolutionError(error);
186171 } else {
187172helper.updateBalancingState(
188173TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
@@ -192,10 +177,8 @@ public void handleNameResolutionError(Status error) {
192177@Override
193178public void shutdown() {
194179logger.log(XdsLogLevel.INFO, "Shutdown");
195-if (childLb != null) {
196-childLb.shutdown();
197-childLb = null;
198- }
180+delegate.shutdown();
181+delegate = new GracefulSwitchLoadBalancer(helper);
199182if (clusterSubscription != null) {
200183clusterSubscription.close();
201184clusterSubscription = null;
@@ -204,10 +187,7 @@ public void shutdown() {
204187205188@CheckReturnValue // don't forget to return up the stack after the fail call
206189private Status fail(Status error) {
207-if (childLb != null) {
208-childLb.shutdown();
209-childLb = null;
210- }
190+delegate.shutdown();
211191helper.updateBalancingState(
212192TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
213193return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter