core,xds: Metrics recording in WRR LB (#11129) · grpc/grpc-java@06df25b
@@ -24,13 +24,17 @@
2424import com.google.common.base.MoreObjects;
2525import com.google.common.base.Preconditions;
2626import com.google.common.collect.ImmutableList;
27+import com.google.common.collect.Lists;
2728import io.grpc.ConnectivityState;
2829import io.grpc.ConnectivityStateInfo;
2930import io.grpc.Deadline.Ticker;
31+import io.grpc.DoubleHistogramMetricInstrument;
3032import io.grpc.EquivalentAddressGroup;
3133import io.grpc.ExperimentalApi;
3234import io.grpc.LoadBalancer;
3335import io.grpc.LoadBalancerProvider;
36+import io.grpc.LongCounterMetricInstrument;
37+import io.grpc.MetricInstrumentRegistry;
3438import io.grpc.NameResolver;
3539import io.grpc.Status;
3640import io.grpc.SynchronizationContext;
@@ -57,12 +61,17 @@
5761import java.util.logging.Logger;
58625963/**
60- * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over
61- * the {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
64+ * A {@link LoadBalancer} that provides weighted-round-robin load-balancing over the
65+ * {@link EquivalentAddressGroup}s from the {@link NameResolver}. The subchannel weights are
6266 * determined by backend metrics using ORCA.
6367 */
6468@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9885")
6569final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
70+71+private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER;
72+private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER;
73+private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER;
74+private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM;
6675private static final Logger log = Logger.getLogger(
6776WeightedRoundRobinLoadBalancer.class.getName());
6877private WeightedRoundRobinLoadBalancerConfig config;
@@ -74,6 +83,31 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
7483private final long infTime;
7584private final Ticker ticker;
768586+// The metric instruments are only registered once and shared by all instances of this LB.
87+static {
88+MetricInstrumentRegistry metricInstrumentRegistry
89+ = MetricInstrumentRegistry.getDefaultRegistry();
90+RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
91+"Number of scheduler updates in which there were not enough endpoints with valid "
92+ + "weight, which caused the WRR policy to fall back to RR behavior", "update",
93+Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), true);
94+ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
95+"grpc.lb.wrr.endpoint_weight_not_yet_usable",
96+"Number of endpoints from each scheduler update that don't yet have usable weight "
97+ + "information", "endpoint", Lists.newArrayList("grpc.target"),
98+Lists.newArrayList("grpc.lb.locality"), true);
99+ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
100+"grpc.lb.wrr.endpoint_weight_stale",
101+"Number of endpoints from each scheduler update whose latest weight is older than the "
102+ + "expiration period", "endpoint", Lists.newArrayList("grpc.target"),
103+Lists.newArrayList("grpc.lb.locality"), true);
104+ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
105+"grpc.lb.wrr.endpoint_weights", "The histogram buckets will be endpoint weight ranges.",
106+"weight", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
107+Lists.newArrayList("grpc.lb.locality"),
108+true);
109+ }
110+77111public WeightedRoundRobinLoadBalancer(Helper helper, Ticker ticker) {
78112this(new WrrHelper(OrcaOobUtil.newOrcaReportingHelper(helper)), ticker, new Random());
79113 }
@@ -145,7 +179,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
145179@Override
146180public SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
147181return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
148-config.enableOobLoadReport, config.errorUtilizationPenalty, sequence);
182+config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper());
149183 }
150184151185@VisibleForTesting
@@ -163,16 +197,18 @@ public WeightedChildLbState(Object key, LoadBalancerProvider policyProvider, Obj
163197super(key, policyProvider, childConfig, initialPicker);
164198 }
165199166-private double getWeight() {
200+private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsableEndpoints) {
167201if (config == null) {
168202return 0;
169203 }
170204long now = ticker.nanoTime();
171205if (now - lastUpdated >= config.weightExpirationPeriodNanos) {
172206nonEmptySince = infTime;
207+staleEndpoints.incrementAndGet();
173208return 0;
174209 } else if (now - nonEmptySince < config.blackoutPeriodNanos
175210&& config.blackoutPeriodNanos > 0) {
211+notYetUsableEndpoints.incrementAndGet();
176212return 0;
177213 } else {
178214return weight;
@@ -336,10 +372,11 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
336372private final float errorUtilizationPenalty;
337373private final AtomicInteger sequence;
338374private final int hashCode;
375+private final LoadBalancer.Helper helper;
339376private volatile StaticStrideScheduler scheduler;
340377341378WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
342-float errorUtilizationPenalty, AtomicInteger sequence) {
379+float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper) {
343380checkNotNull(children, "children");
344381Preconditions.checkArgument(!children.isEmpty(), "empty child list");
345382this.children = children;
@@ -353,6 +390,7 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
353390this.enableOobLoadReport = enableOobLoadReport;
354391this.errorUtilizationPenalty = errorUtilizationPenalty;
355392this.sequence = checkNotNull(sequence, "sequence");
393+this.helper = helper;
356394357395// For equality we treat children as a set; use hash code as defined by Set
358396int sum = 0;
@@ -387,11 +425,37 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
387425388426private void updateWeight() {
389427float[] newWeights = new float[children.size()];
428+AtomicInteger staleEndpoints = new AtomicInteger();
429+AtomicInteger notYetUsableEndpoints = new AtomicInteger();
390430for (int i = 0; i < children.size(); i++) {
391-double newWeight = ((WeightedChildLbState)children.get(i)).getWeight();
431+double newWeight = ((WeightedChildLbState) children.get(i)).getWeight(staleEndpoints,
432+notYetUsableEndpoints);
433+// TODO: add target and locality labels once available
434+helper.getMetricRecorder()
435+ .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight, ImmutableList.of(""),
436+ImmutableList.of(""));
392437newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
393438 }
439+if (staleEndpoints.get() > 0) {
440+// TODO: add target and locality labels once available
441+helper.getMetricRecorder()
442+ .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
443+ImmutableList.of(""),
444+ImmutableList.of(""));
445+ }
446+if (notYetUsableEndpoints.get() > 0) {
447+// TODO: add target and locality labels once available
448+helper.getMetricRecorder()
449+ .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
450+ImmutableList.of(""), ImmutableList.of(""));
451+ }
452+394453this.scheduler = new StaticStrideScheduler(newWeights, sequence);
454+if (this.scheduler.usesRoundRobin()) {
455+// TODO: add target and locality labels once available
456+helper.getMetricRecorder()
457+ .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(""), ImmutableList.of(""));
458+ }
395459 }
396460397461@Override
@@ -454,6 +518,7 @@ public boolean equals(Object o) {
454518static final class StaticStrideScheduler {
455519private final short[] scaledWeights;
456520private final AtomicInteger sequence;
521+private final boolean usesRoundRobin;
457522private static final int K_MAX_WEIGHT = 0xFFFF;
458523459524// Assuming the mean of all known weights is M, StaticStrideScheduler will clamp
@@ -494,8 +559,10 @@ static final class StaticStrideScheduler {
494559if (numWeightedChannels > 0) {
495560unscaledMeanWeight = sumWeight / numWeightedChannels;
496561unscaledMaxWeight = Math.min(unscaledMaxWeight, (float) (K_MAX_RATIO * unscaledMeanWeight));
562+usesRoundRobin = false;
497563 } else {
498564// Fall back to round robin if all values are non-positives
565+usesRoundRobin = true;
499566unscaledMeanWeight = 1;
500567unscaledMaxWeight = 1;
501568 }
@@ -521,7 +588,14 @@ static final class StaticStrideScheduler {
521588this.sequence = sequence;
522589 }
523590524-/** Returns the next sequence number and atomically increases sequence with wraparound. */
591+// Without properly weighted channels, we do plain vanilla round_robin.
592+boolean usesRoundRobin() {
593+return usesRoundRobin;
594+ }
595+596+/**
597+ * Returns the next sequence number and atomically increases sequence with wraparound.
598+ */
525599private long nextSequence() {
526600return Integer.toUnsignedLong(sequence.getAndIncrement());
527601 }