Have EDS resource parse the additional addresses from envoy message (… · grpc/grpc-java@38f968f
@@ -20,11 +20,14 @@
20202121import com.google.common.annotations.VisibleForTesting;
2222import com.google.common.base.MoreObjects;
23-import com.google.common.collect.ImmutableList;
2423import com.google.protobuf.Message;
24+import io.envoyproxy.envoy.config.core.v3.Address;
25+import io.envoyproxy.envoy.config.core.v3.HealthStatus;
2526import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
27+import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
2628import io.envoyproxy.envoy.type.v3.FractionalPercent;
2729import io.grpc.EquivalentAddressGroup;
30+import io.grpc.internal.GrpcUtil;
2831import io.grpc.xds.Endpoints.DropOverload;
2932import io.grpc.xds.Endpoints.LocalityLbEndpoints;
3033import io.grpc.xds.XdsEndpointResource.EdsUpdate;
@@ -46,6 +49,8 @@
4649class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
4750static final String ADS_TYPE_URL_EDS =
4851"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
52+static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS =
53+"grpc.experimental.xdsDualstackEndpoints";
49545055private static final XdsEndpointResource instance = new XdsEndpointResource();
5156@@ -95,6 +100,10 @@ protected EdsUpdate doParse(Args args, Message unpackedMessage) throws ResourceI
95100return processClusterLoadAssignment((ClusterLoadAssignment) unpackedMessage);
96101 }
97102103+private static boolean isEnabledXdsDualStack() {
104+return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, false);
105+ }
106+98107private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
99108throws ResourceInvalidException {
100109Map<Integer, Set<Locality>> priorities = new HashMap<>();
@@ -190,22 +199,30 @@ static StructOrError<LocalityLbEndpoints> parseLocalityLbEndpoints(
190199if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) {
191200return StructOrError.fromError("LbEndpoint with no endpoint/address");
192201 }
193-io.envoyproxy.envoy.config.core.v3.SocketAddress socketAddress =
194-endpoint.getEndpoint().getAddress().getSocketAddress();
195-InetSocketAddress addr =
196-new InetSocketAddress(socketAddress.getAddress(), socketAddress.getPortValue());
197-boolean isHealthy =
198-endpoint.getHealthStatus() == io.envoyproxy.envoy.config.core.v3.HealthStatus.HEALTHY
199- || endpoint.getHealthStatus()
200- == io.envoyproxy.envoy.config.core.v3.HealthStatus.UNKNOWN;
202+List<java.net.SocketAddress> addresses = new ArrayList<>();
203+addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress()));
204+if (isEnabledXdsDualStack()) {
205+for (Endpoint.AdditionalAddress additionalAddress
206+ : endpoint.getEndpoint().getAdditionalAddressesList()) {
207+addresses.add(getInetSocketAddress(additionalAddress.getAddress()));
208+ }
209+ }
210+boolean isHealthy = (endpoint.getHealthStatus() == HealthStatus.HEALTHY)
211+ || (endpoint.getHealthStatus() == HealthStatus.UNKNOWN);
201212endpoints.add(Endpoints.LbEndpoint.create(
202-new EquivalentAddressGroup(ImmutableList.<java.net.SocketAddress>of(addr)),
213+new EquivalentAddressGroup(addresses),
203214endpoint.getLoadBalancingWeight().getValue(), isHealthy));
204215 }
205216return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create(
206217endpoints, proto.getLoadBalancingWeight().getValue(), proto.getPriority()));
207218 }
208219220+private static InetSocketAddress getInetSocketAddress(Address address) {
221+io.envoyproxy.envoy.config.core.v3.SocketAddress socketAddress = address.getSocketAddress();
222+223+return new InetSocketAddress(socketAddress.getAddress(), socketAddress.getPortValue());
224+ }
225+209226static final class EdsUpdate implements ResourceUpdate {
210227final String clusterName;
211228final Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap;