xds: Convert CdsLb to XdsDepManager · grpc/grpc-java@297ab05

@@ -18,6 +18,7 @@

18181919

import static com.google.common.base.Preconditions.checkArgument;

2020

import static com.google.common.base.Preconditions.checkNotNull;

21+

import static com.google.common.base.Preconditions.checkState;

2122

import static io.grpc.xds.client.XdsClient.ResourceUpdate;

22232324

import com.google.common.annotations.VisibleForTesting;

@@ -34,8 +35,6 @@

3435

import io.grpc.xds.client.XdsClient;

3536

import io.grpc.xds.client.XdsClient.ResourceWatcher;

3637

import io.grpc.xds.client.XdsResourceType;

37-

import java.io.Closeable;

38-

import java.io.IOException;

3938

import java.util.Collections;

4039

import java.util.HashMap;

4140

import java.util.HashSet;

@@ -56,39 +55,43 @@

5655

final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {

5756

public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();

5857

public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();

59-

private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++

58+

private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37

6059

private final String listenerName;

6160

private final XdsClient xdsClient;

62-

private final XdsConfigWatcher xdsConfigWatcher;

6361

private final SynchronizationContext syncContext;

6462

private final String dataPlaneAuthority;

63+

private XdsConfigWatcher xdsConfigWatcher;

65646665

private StatusOr<XdsConfig> lastUpdate = null;

6766

private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();

6867

private final Set<ClusterSubscription> subscriptions = new HashSet<>();

696870-

XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,

69+

XdsDependencyManager(XdsClient xdsClient,

7170

SynchronizationContext syncContext, String dataPlaneAuthority,

7271

String listenerName, NameResolver.Args nameResolverArgs,

7372

ScheduledExecutorService scheduler) {

7473

this.listenerName = checkNotNull(listenerName, "listenerName");

7574

this.xdsClient = checkNotNull(xdsClient, "xdsClient");

76-

this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");

7775

this.syncContext = checkNotNull(syncContext, "syncContext");

7876

this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");

7977

checkNotNull(nameResolverArgs, "nameResolverArgs");

8078

checkNotNull(scheduler, "scheduler");

81-82-

// start the ball rolling

83-

syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));

8479

}

85808681

public static String toContextStr(String typeName, String resourceName) {

8782

return typeName + " resource " + resourceName;

8883

}

898485+

public void start(XdsConfigWatcher xdsConfigWatcher) {

86+

checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");

87+

this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");

88+

// start the ball rolling

89+

syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));

90+

}

91+9092

@Override

91-

public Closeable subscribeToCluster(String clusterName) {

93+

public XdsConfig.Subscription subscribeToCluster(String clusterName) {

94+

checkState(this.xdsConfigWatcher != null, "dep manager must first be started");

9295

checkNotNull(clusterName, "clusterName");

9396

ClusterSubscription subscription = new ClusterSubscription(clusterName);

9497

@@ -291,10 +294,17 @@ private static void addConfigForCluster(

291294

addConfigForCluster(clusters, childCluster, ancestors, tracer);

292295

StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);

293296

if (!config.hasValue()) {

294-

clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(

295-

"Unable to get leaves for " + clusterName + ": "

296-

+ config.getStatus().getDescription())));

297-

return;

297+

// gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not

298+

// exist the policy should report that it is in TRANSIENT_FAILURE. If any of the

299+

// watchers reports a transient ADS stream error, the policy should report that it is in

300+

// TRANSIENT_FAILURE if it has never passed a config to its child.

301+

//

302+

// But there's currently disagreement about whether that is actually what we want, and

303+

// that was not originally implemented in gRPC Java. So we're keeping Java's old

304+

// behavior for now and only failing the "leaves" (which is a bit arbitrary for a

305+

// cycle).

306+

leafNames.add(childCluster);

307+

continue;

298308

}

299309

XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();

300310

if (children instanceof AggregateConfig) {

@@ -325,6 +335,11 @@ private static void addConfigForCluster(

325335

default:

326336

throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());

327337

}

338+

if (clusters.containsKey(clusterName)) {

339+

// If a cycle is detected, we'll have detected it while recursing, so now there will be a key

340+

// present. We don't want to overwrite it with a non-error value.

341+

return;

342+

}

328343

clusters.put(clusterName, StatusOr.fromValue(

329344

new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));

330345

}

@@ -406,7 +421,7 @@ public interface XdsConfigWatcher {

406421

void onUpdate(StatusOr<XdsConfig> config);

407422

}

408423409-

private final class ClusterSubscription implements Closeable {

424+

private final class ClusterSubscription implements XdsConfig.Subscription {

410425

private final String clusterName;

411426

boolean closed; // Accessed from syncContext

412427

@@ -419,7 +434,7 @@ String getClusterName() {

419434

}

420435421436

@Override

422-

public void close() throws IOException {

437+

public void close() {

423438

releaseSubscription(this);

424439

}

425440

}