core,xds: Metrics recording in WRR LB (#11129) · grpc/grpc-java@06df25b

@@ -24,13 +24,17 @@

2424

import com.google.common.base.MoreObjects;

2525

import com.google.common.base.Preconditions;

2626

import com.google.common.collect.ImmutableList;

27+

import com.google.common.collect.Lists;

2728

import io.grpc.ConnectivityState;

2829

import io.grpc.ConnectivityStateInfo;

2930

import io.grpc.Deadline.Ticker;

31+

import io.grpc.DoubleHistogramMetricInstrument;

3032

import io.grpc.EquivalentAddressGroup;

3133

import io.grpc.ExperimentalApi;

3234

import io.grpc.LoadBalancer;

3335

import io.grpc.LoadBalancerProvider;

36+

import io.grpc.LongCounterMetricInstrument;

37+

import io.grpc.MetricInstrumentRegistry;

3438

import io.grpc.NameResolver;

3539

import io.grpc.Status;

3640

import io.grpc.SynchronizationContext;

@@ -57,12 +61,17 @@

5761

import java.util.logging.Logger;

58625963

/**

60-

* A {@link LoadBalancer} that provides weighted-round-robin load-balancing over

61-

* the {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are

64+

* A {@link LoadBalancer} that provides weighted-round-robin load-balancing over the

65+

* {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are

6266

* determined by backend metrics using ORCA.

6367

*/

6468

@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9885")

6569

final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {

70+71+

private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;

72+

private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;

73+

private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;

74+

private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;

6675

private static final Logger log = Logger.getLogger(

6776

WeightedRoundRobinLoadBalancer.class.getName());

6877

private WeightedRoundRobinLoadBalancerConfig config;

@@ -74,6 +83,31 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {

7483

private final long infTime;

7584

private final Ticker ticker;

768586+

// The metric instruments are only registered once and shared by all instances of this LB.

87+

static {

88+

MetricInstrumentRegistry metricInstrumentRegistry

89+

= MetricInstrumentRegistry.getDefaultRegistry();

90+

RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",

91+

"Number of scheduler updates in which there were not enough endpoints with valid "

92+

+ "weight, which caused the WRR policy to fall back to RR behavior", "update",

93+

Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), true);

94+

ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(

95+

"grpc.lb.wrr.endpoint_weight_not_yet_usable",

96+

"Number of endpoints from each scheduler update that don't yet have usable weight "

97+

+ "information", "endpoint", Lists.newArrayList("grpc.target"),

98+

Lists.newArrayList("grpc.lb.locality"), true);

99+

ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(

100+

"grpc.lb.wrr.endpoint_weight_stale",

101+

"Number of endpoints from each scheduler update whose latest weight is older than the "

102+

+ "expiration period", "endpoint", Lists.newArrayList("grpc.target"),

103+

Lists.newArrayList("grpc.lb.locality"), true);

104+

ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(

105+

"grpc.lb.wrr.endpoint_weights", "The histogram buckets will be endpoint weight ranges.",

106+

"weight", Lists.newArrayList(), Lists.newArrayList("grpc.target"),

107+

Lists.newArrayList("grpc.lb.locality"),

108+

true);

109+

}

110+77111

public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {

78112

this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random());

79113

}

@@ -145,7 +179,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {

145179

@Override

146180

public SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {

147181

return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),

148-

config.enableOobLoadReport, config.errorUtilizationPenalty, sequence);

182+

config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper());

149183

}

150184151185

@VisibleForTesting

@@ -163,16 +197,18 @@ public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider, Obj

163197

super(key, policyProvider, childConfig, initialPicker);

164198

}

165199166-

private double getWeight() {

200+

private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) {

167201

if (config == null) {

168202

return 0;

169203

}

170204

long now = ticker.nanoTime();

171205

if (now - lastUpdated >= config.weightExpirationPeriodNanos) {

172206

nonEmptySince = infTime;

207+

staleEndpoints.incrementAndGet();

173208

return 0;

174209

} else if (now - nonEmptySince < config.blackoutPeriodNanos

175210

&& config.blackoutPeriodNanos > 0) {

211+

notYetUsableEndpoints.incrementAndGet();

176212

return 0;

177213

} else {

178214

return weight;

@@ -336,10 +372,11 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {

336372

private final float errorUtilizationPenalty;

337373

private final AtomicInteger sequence;

338374

private final int hashCode;

375+

private final LoadBalancer.Helper helper;

339376

private volatile StaticStrideScheduler scheduler;

340377341378

WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,

342-

float errorUtilizationPenalty, AtomicInteger sequence) {

379+

float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper) {

343380

checkNotNull(children, "children");

344381

Preconditions.checkArgument(!children.isEmpty(), "empty child list");

345382

this.children = children;

@@ -353,6 +390,7 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {

353390

this.enableOobLoadReport = enableOobLoadReport;

354391

this.errorUtilizationPenalty = errorUtilizationPenalty;

355392

this.sequence = checkNotNull(sequence, "sequence");

393+

this.helper = helper;

356394357395

// For equality we treat children as a set; use hash code as defined by Set

358396

int sum = 0;

@@ -387,11 +425,37 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {

387425388426

private void updateWeight() {

389427

float[] newWeights = new float[children.size()];

428+

AtomicInteger staleEndpoints = new AtomicInteger();

429+

AtomicInteger notYetUsableEndpoints = new AtomicInteger();

390430

for (int i = 0; i < children.size(); i++) {

391-

double newWeight = ((WeightedChildLbState)children.get(i)).getWeight();

431+

double newWeight = ((WeightedChildLbState) children.get(i)).getWeight(staleEndpoints,

432+

notYetUsableEndpoints);

433+

// TODO: add target and locality labels once available

434+

helper.getMetricRecorder()

435+

.recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight, ImmutableList.of(""),

436+

ImmutableList.of(""));

392437

newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;

393438

}

439+

if (staleEndpoints.get() > 0) {

440+

// TODO: add target and locality labels once available

441+

helper.getMetricRecorder()

442+

.addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),

443+

ImmutableList.of(""),

444+

ImmutableList.of(""));

445+

}

446+

if (notYetUsableEndpoints.get() > 0) {

447+

// TODO: add target and locality labels once available

448+

helper.getMetricRecorder()

449+

.addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),

450+

ImmutableList.of(""), ImmutableList.of(""));

451+

}

452+394453

this.scheduler = new StaticStrideScheduler(newWeights, sequence);

454+

if (this.scheduler.usesRoundRobin()) {

455+

// TODO: add target and locality labels once available

456+

helper.getMetricRecorder()

457+

.addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(""), ImmutableList.of(""));

458+

}

395459

}

396460397461

@Override

@@ -454,6 +518,7 @@ public boolean equals(Object o) {

454518

static final class StaticStrideScheduler {

455519

private final short[] scaledWeights;

456520

private final AtomicInteger sequence;

521+

private final boolean usesRoundRobin;

457522

private static final int K_MAX_WEIGHT = 0xFFFF;

458523459524

// Assuming the mean of all known weights is M, StaticStrideScheduler will clamp

@@ -494,8 +559,10 @@ static final class StaticStrideScheduler {

494559

if (numWeightedChannels > 0) {

495560

unscaledMeanWeight = sumWeight / numWeightedChannels;

496561

unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));

562+

usesRoundRobin = false;

497563

} else {

498564

// Fall back to round robin if all values are non-positives

565+

usesRoundRobin = true;

499566

unscaledMeanWeight = 1;

500567

unscaledMaxWeight = 1;

501568

}

@@ -521,7 +588,14 @@ static final class StaticStrideScheduler {

521588

this.sequence = sequence;

522589

}

523590524-

/** Returns the next sequence number and atomically increases sequence with wraparound. */

591+

// Without properly weighted channels, we do plain vanilla round_robin.

592+

boolean usesRoundRobin() {

593+

return usesRoundRobin;

594+

}

595+596+

/**

597+

* Returns the next sequence number and atomically increases sequence with wraparound.

598+

*/

525599

private long nextSequence() {

526600

return Integer.toUnsignedLong(sequence.getAndIncrement());

527601

}