Xds: Aggregate cluster fixes (A75) (#12186) · grpc/grpc-java@7e982e4

@@ -18,10 +18,10 @@

18181919

import static com.google.common.base.Preconditions.checkNotNull;

2020

import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

21+

import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME;

2122

import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;

23+

import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;

222423-

import com.google.common.annotations.VisibleForTesting;

24-

import com.google.common.collect.ImmutableList;

2525

import com.google.errorprone.annotations.CheckReturnValue;

2626

import io.grpc.InternalLogId;

2727

import io.grpc.LoadBalancer;

@@ -33,6 +33,7 @@

3333

import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;

3434

import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;

3535

import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;

36+

import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;

3637

import io.grpc.xds.XdsClusterResource.CdsUpdate;

3738

import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;

3839

import io.grpc.xds.XdsConfig.Subscription;

@@ -41,10 +42,11 @@

4142

import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;

4243

import io.grpc.xds.client.XdsLogger;

4344

import io.grpc.xds.client.XdsLogger.XdsLogLevel;

44-

import java.util.ArrayList;

4545

import java.util.Arrays;

4646

import java.util.Collections;

47+

import java.util.HashMap;

4748

import java.util.List;

49+

import java.util.Map;

48504951

/**

5052

* Load balancer for cds_experimental LB policy. One instance per top-level cluster.

@@ -55,19 +57,15 @@ final class CdsLoadBalancer2 extends LoadBalancer {

5557

private final XdsLogger logger;

5658

private final Helper helper;

5759

private final LoadBalancerRegistry lbRegistry;

60+

private GracefulSwitchLoadBalancer delegate;

5861

// Following fields are effectively final.

5962

private String clusterName;

6063

private Subscription clusterSubscription;

61-

private LoadBalancer childLb;

626463-

CdsLoadBalancer2(Helper helper) {

64-

this(helper, LoadBalancerRegistry.getDefaultRegistry());

65-

}

66-67-

@VisibleForTesting

6865

CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {

6966

this.helper = checkNotNull(helper, "helper");

7067

this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");

68+

this.delegate = new GracefulSwitchLoadBalancer(helper);

7169

logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));

7270

logger.log(XdsLogLevel.INFO, "Created");

7371

}

@@ -91,7 +89,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {

9189

if (clusterSubscription == null) {

9290

// Should be impossible, because XdsDependencyManager wouldn't have generated this

9391

return fail(Status.INTERNAL.withDescription(

94-

errorPrefix() + "Unable to find non-dynamic root cluster"));

92+

errorPrefix() + "Unable to find non-dynamic cluster"));

9593

}

9694

// The dynamic cluster must not have loaded yet

9795

return Status.OK;

@@ -100,42 +98,25 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {

10098

return fail(clusterConfigOr.getStatus());

10199

}

102100

XdsClusterConfig clusterConfig = clusterConfigOr.getValue();

103-

List<String> leafNames;

104-

if (clusterConfig.getChildren() instanceof AggregateConfig) {

105-

leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();

106-

} else if (clusterConfig.getChildren() instanceof EndpointConfig) {

107-

leafNames = ImmutableList.of(clusterName);

108-

} else {

109-

return fail(Status.INTERNAL.withDescription(

110-

errorPrefix() + "Unexpected cluster children type: "

111-

+ clusterConfig.getChildren().getClass()));

112-

}

113-

if (leafNames.isEmpty()) {

114-

// Should be impossible, because XdsClusterResource validated this

115-

return fail(Status.UNAVAILABLE.withDescription(

116-

errorPrefix() + "Zero leaf clusters for root cluster " + clusterName));

117-

}

118101119-

Status noneFoundError = Status.INTERNAL

120-

.withDescription(errorPrefix() + "No leaves and no error; this is a bug");

121-

List<DiscoveryMechanism> instances = new ArrayList<>();

122-

for (String leafName : leafNames) {

123-

StatusOr<XdsClusterConfig> leafConfigOr = xdsConfig.getClusters().get(leafName);

124-

if (!leafConfigOr.hasValue()) {

125-

noneFoundError = leafConfigOr.getStatus();

126-

continue;

127-

}

128-

if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) {

129-

noneFoundError = Status.INTERNAL.withDescription(

130-

errorPrefix() + "Unexpected child " + leafName + " cluster children type: "

131-

+ leafConfigOr.getValue().getChildren().getClass());

132-

continue;

102+

NameResolver.ConfigOrError configOrError;

103+

Object gracefulConfig;

104+

if (clusterConfig.getChildren() instanceof EndpointConfig) {

105+

// The LB policy config is provided in service_config.proto/JSON format.

106+

configOrError =

107+

GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(

108+

Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()),

109+

lbRegistry);

110+

if (configOrError.getError() != null) {

111+

// Should be impossible, because XdsClusterResource validated this

112+

return fail(Status.INTERNAL.withDescription(

113+

errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));

133114

}

134-

CdsUpdate result = leafConfigOr.getValue().getClusterResource();

115+

CdsUpdate result = clusterConfig.getClusterResource();

135116

DiscoveryMechanism instance;

136117

if (result.clusterType() == ClusterType.EDS) {

137118

instance = DiscoveryMechanism.forEds(

138-

leafName,

119+

clusterName,

139120

result.edsServiceName(),

140121

result.lrsServerInfo(),

141122

result.maxConcurrentRequests(),

@@ -144,45 +125,49 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {

144125

result.outlierDetection());

145126

} else {

146127

instance = DiscoveryMechanism.forLogicalDns(

147-

leafName,

128+

clusterName,

148129

result.dnsHostName(),

149130

result.lrsServerInfo(),

150131

result.maxConcurrentRequests(),

151132

result.upstreamTlsContext(),

152133

result.filterMetadata());

153134

}

154-

instances.add(instance);

155-

}

156-

if (instances.isEmpty()) {

157-

return fail(noneFoundError);

158-

}

159-160-

// The LB policy config is provided in service_config.proto/JSON format.

161-

NameResolver.ConfigOrError configOrError =

162-

GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(

163-

Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry);

164-

if (configOrError.getError() != null) {

165-

// Should be impossible, because XdsClusterResource validated this

135+

gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(

136+

lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME),

137+

new ClusterResolverConfig(

138+

instance,

139+

configOrError.getConfig(),

140+

clusterConfig.getClusterResource().isHttp11ProxyAvailable()));

141+

} else if (clusterConfig.getChildren() instanceof AggregateConfig) {

142+

Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();

143+

List<String> leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();

144+

for (String childCluster: leafClusters) {

145+

priorityChildConfigs.put(childCluster,

146+

new PriorityChildConfig(

147+

GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(

148+

lbRegistry.getProvider(CDS_POLICY_NAME),

149+

new CdsConfig(childCluster)),

150+

false));

151+

}

152+

gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(

153+

lbRegistry.getProvider(PRIORITY_POLICY_NAME),

154+

new PriorityLoadBalancerProvider.PriorityLbConfig(

155+

Collections.unmodifiableMap(priorityChildConfigs), leafClusters));

156+

} else {

166157

return fail(Status.INTERNAL.withDescription(

167-

errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));

158+

errorPrefix() + "Unexpected cluster children type: "

159+

+ clusterConfig.getChildren().getClass()));

168160

}

169161170-

ClusterResolverConfig config = new ClusterResolverConfig(

171-

Collections.unmodifiableList(instances),

172-

configOrError.getConfig(),

173-

clusterConfig.getClusterResource().isHttp11ProxyAvailable());

174-

if (childLb == null) {

175-

childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);

176-

}

177-

return childLb.acceptResolvedAddresses(

178-

resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());

162+

return delegate.acceptResolvedAddresses(

163+

resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build());

179164

}

180165181166

@Override

182167

public void handleNameResolutionError(Status error) {

183168

logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);

184-

if (childLb != null) {

185-

childLb.handleNameResolutionError(error);

169+

if (delegate != null) {

170+

delegate.handleNameResolutionError(error);

186171

} else {

187172

helper.updateBalancingState(

188173

TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));

@@ -192,10 +177,8 @@ public void handleNameResolutionError(Status error) {

192177

@Override

193178

public void shutdown() {

194179

logger.log(XdsLogLevel.INFO, "Shutdown");

195-

if (childLb != null) {

196-

childLb.shutdown();

197-

childLb = null;

198-

}

180+

delegate.shutdown();

181+

delegate = new GracefulSwitchLoadBalancer(helper);

199182

if (clusterSubscription != null) {

200183

clusterSubscription.close();

201184

clusterSubscription = null;

@@ -204,10 +187,7 @@ public void shutdown() {

204187205188

@CheckReturnValue // don't forget to return up the stack after the fail call

206189

private Status fail(Status error) {

207-

if (childLb != null) {

208-

childLb.shutdown();

209-

childLb = null;

210-

}

190+

delegate.shutdown();

211191

helper.updateBalancingState(

212192

TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));

213193

return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter