Have EDS resource parse the additional addresses from envoy message (… · grpc/grpc-java@38f968f

@@ -20,11 +20,14 @@

20202121

import com.google.common.annotations.VisibleForTesting;

2222

import com.google.common.base.MoreObjects;

23-

import com.google.common.collect.ImmutableList;

2423

import com.google.protobuf.Message;

24+

import io.envoyproxy.envoy.config.core.v3.Address;

25+

import io.envoyproxy.envoy.config.core.v3.HealthStatus;

2526

import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;

27+

import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;

2628

import io.envoyproxy.envoy.type.v3.FractionalPercent;

2729

import io.grpc.EquivalentAddressGroup;

30+

import io.grpc.internal.GrpcUtil;

2831

import io.grpc.xds.Endpoints.DropOverload;

2932

import io.grpc.xds.Endpoints.LocalityLbEndpoints;

3033

import io.grpc.xds.XdsEndpointResource.EdsUpdate;

@@ -46,6 +49,8 @@

4649

class XdsEndpointResource extends XdsResourceType<EdsUpdate> {

4750

static final String ADS_TYPE_URL_EDS =

4851

"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";

52+

static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS =

53+

"grpc.experimental.xdsDualstackEndpoints";

49545055

private static final XdsEndpointResource instance = new XdsEndpointResource();

5156

@@ -95,6 +100,10 @@ protected EdsUpdate doParse(Args args, Message unpackedMessage) throws ResourceI

95100

return processClusterLoadAssignment((ClusterLoadAssignment) unpackedMessage);

96101

}

97102103+

private static boolean isEnabledXdsDualStack() {

104+

return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, false);

105+

}

106+98107

private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)

99108

throws ResourceInvalidException {

100109

Map<Integer, Set<Locality>> priorities = new HashMap<>();

@@ -190,22 +199,30 @@ static StructOrError<LocalityLbEndpoints> parseLocalityLbEndpoints(

190199

if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) {

191200

return StructOrError.fromError("LbEndpoint with no endpoint/address");

192201

}

193-

io.envoyproxy.envoy.config.core.v3.SocketAddress socketAddress =

194-

endpoint.getEndpoint().getAddress().getSocketAddress();

195-

InetSocketAddress addr =

196-

new InetSocketAddress(socketAddress.getAddress(), socketAddress.getPortValue());

197-

boolean isHealthy =

198-

endpoint.getHealthStatus() == io.envoyproxy.envoy.config.core.v3.HealthStatus.HEALTHY

199-

|| endpoint.getHealthStatus()

200-

== io.envoyproxy.envoy.config.core.v3.HealthStatus.UNKNOWN;

202+

List<java.net.SocketAddress> addresses = new ArrayList<>();

203+

addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress()));

204+

if (isEnabledXdsDualStack()) {

205+

for (Endpoint.AdditionalAddress additionalAddress

206+

: endpoint.getEndpoint().getAdditionalAddressesList()) {

207+

addresses.add(getInetSocketAddress(additionalAddress.getAddress()));

208+

}

209+

}

210+

boolean isHealthy = (endpoint.getHealthStatus() == HealthStatus.HEALTHY)

211+

|| (endpoint.getHealthStatus() == HealthStatus.UNKNOWN);

201212

endpoints.add(Endpoints.LbEndpoint.create(

202-

new EquivalentAddressGroup(ImmutableList.<java.net.SocketAddress>of(addr)),

213+

new EquivalentAddressGroup(addresses),

203214

endpoint.getLoadBalancingWeight().getValue(), isHealthy));

204215

}

205216

return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create(

206217

endpoints, proto.getLoadBalancingWeight().getValue(), proto.getPriority()));

207218

}

208219220+

private static InetSocketAddress getInetSocketAddress(Address address) {

221+

io.envoyproxy.envoy.config.core.v3.SocketAddress socketAddress = address.getSocketAddress();

222+223+

return new InetSocketAddress(socketAddress.getAddress(), socketAddress.getPortValue());

224+

}

225+209226

static final class EdsUpdate implements ResourceUpdate {

210227

final String clusterName;

211228

final Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap;