Fix explicit bucket histogram aggregation by ocelotl · Pull Request #3429 · open-telemetry/opentelemetry-python

Expand Up @@ -377,6 +377,7 @@ class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]): def __init__( self, attributes: Attributes, instrument_aggregation_temporality: AggregationTemporality, start_time_unix_nano: int, boundaries: Sequence[float] = ( 0.0, Expand All @@ -398,33 +399,43 @@ def __init__( record_min_max: bool = True, ): super().__init__(attributes)
self._boundaries = tuple(boundaries) self._bucket_counts = self._get_empty_bucket_counts() self._record_min_max = record_min_max self._min = inf self._max = -inf self._sum = 0 self._record_min_max = record_min_max
self._start_time_unix_nano = start_time_unix_nano # It is assumed that the "natural" aggregation temporality for a # Histogram instrument is DELTA, like the "natural" aggregation # temporality for a Counter is DELTA and the "natural" aggregation # temporality for an ObservableCounter is CUMULATIVE. self._instrument_aggregation_temporality = AggregationTemporality.DELTA self._instrument_aggregation_temporality = ( instrument_aggregation_temporality )
self._current_value = None
self._previous_collection_start_nano = self._start_time_unix_nano self._previous_cumulative_value = self._get_empty_bucket_counts() self._previous_min = inf self._previous_max = -inf self._previous_sum = 0
def _get_empty_bucket_counts(self) -> List[int]: return [0] * (len(self._boundaries) + 1)
def aggregate(self, measurement: Measurement) -> None: with self._lock: if self._current_value is None: self._current_value = self._get_empty_bucket_counts()
value = measurement.value value = measurement.value
if self._record_min_max: self._min = min(self._min, value) self._max = max(self._max, value) self._sum += value
self._sum += value if self._record_min_max: self._min = min(self._min, value) self._max = max(self._max, value)
self._bucket_counts[bisect_left(self._boundaries, value)] += 1 self._current_value[bisect_left(self._boundaries, value)] += 1
def collect( self, Expand All @@ -434,84 +445,78 @@ def collect( """ Atomically return a point for the current value of the metric. """ with self._lock: if not any(self._bucket_counts): return None
bucket_counts = self._bucket_counts start_time_unix_nano = self._start_time_unix_nano with self._lock: current_value = self._current_value sum_ = self._sum max_ = self._max min_ = self._min max_ = self._max
self._bucket_counts = self._get_empty_bucket_counts() self._start_time_unix_nano = collection_start_nano self._current_value = None self._sum = 0 self._min = inf self._max = -inf
current_point = HistogramDataPoint( attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, time_unix_nano=collection_start_nano, count=sum(bucket_counts), sum=sum_, bucket_counts=tuple(bucket_counts), explicit_bounds=self._boundaries, min=min_, max=max_, ) if ( self._instrument_aggregation_temporality is AggregationTemporality.DELTA ): # This happens when the corresponding instrument for this # aggregation is synchronous. if ( collection_aggregation_temporality is AggregationTemporality.DELTA ):
if self._previous_point is None or ( self._instrument_aggregation_temporality is collection_aggregation_temporality ): self._previous_point = current_point return current_point if current_value is None: return None
max_ = current_point.max min_ = current_point.min previous_collection_start_nano = ( self._previous_collection_start_nano ) self._previous_collection_start_nano = ( collection_start_nano )
if ( collection_aggregation_temporality is AggregationTemporality.CUMULATIVE ): start_time_unix_nano = self._previous_point.start_time_unix_nano sum_ = current_point.sum + self._previous_point.sum # Only update min/max on delta -> cumulative max_ = max(current_point.max, self._previous_point.max) min_ = min(current_point.min, self._previous_point.min) bucket_counts = [ curr_count + prev_count for curr_count, prev_count in zip( current_point.bucket_counts, self._previous_point.bucket_counts, ) ] else: start_time_unix_nano = self._previous_point.time_unix_nano sum_ = current_point.sum - self._previous_point.sum bucket_counts = [ curr_count - prev_count for curr_count, prev_count in zip( current_point.bucket_counts, self._previous_point.bucket_counts, return HistogramDataPoint( attributes=self._attributes, start_time_unix_nano=previous_collection_start_nano, time_unix_nano=collection_start_nano, count=sum(current_value), sum=sum_, bucket_counts=tuple(current_value), explicit_bounds=self._boundaries, min=min_, max=max_, )
if current_value is None: current_value = self._get_empty_bucket_counts()
self._previous_cumulative_value = [ current_value_element + previous_cumulative_value_element for ( current_value_element, previous_cumulative_value_element, ) in zip(current_value, self._previous_cumulative_value) ] self._previous_min = min(min_, self._previous_min) self._previous_max = max(max_, self._previous_max) self._previous_sum = sum_ + self._previous_sum
return HistogramDataPoint( attributes=self._attributes, start_time_unix_nano=self._start_time_unix_nano, time_unix_nano=collection_start_nano, count=sum(self._previous_cumulative_value), sum=self._previous_sum, bucket_counts=tuple(self._previous_cumulative_value), explicit_bounds=self._boundaries, min=self._previous_min, max=self._previous_max, ) ]
current_point = HistogramDataPoint( attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, time_unix_nano=current_point.time_unix_nano, count=sum(bucket_counts), sum=sum_, bucket_counts=tuple(bucket_counts), explicit_bounds=current_point.explicit_bounds, min=min_, max=max_, ) self._previous_point = current_point return current_point return None

# pylint: disable=protected-access Expand Down Expand Up @@ -1100,7 +1105,11 @@ def _create_aggregation(
if isinstance(instrument, Histogram): return _ExplicitBucketHistogramAggregation( attributes, start_time_unix_nano attributes, instrument_aggregation_temporality=( AggregationTemporality.DELTA ), start_time_unix_nano=start_time_unix_nano, )
if isinstance(instrument, ObservableGauge): Expand Down Expand Up @@ -1179,8 +1188,18 @@ def _create_aggregation( attributes: Attributes, start_time_unix_nano: int, ) -> _Aggregation:
instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED if isinstance(instrument, Synchronous): instrument_aggregation_temporality = AggregationTemporality.DELTA elif isinstance(instrument, Asynchronous): instrument_aggregation_temporality = ( AggregationTemporality.CUMULATIVE )
return _ExplicitBucketHistogramAggregation( attributes, instrument_aggregation_temporality, start_time_unix_nano, self._boundaries, self._record_min_max, Expand All @@ -1200,16 +1219,18 @@ def _create_aggregation( start_time_unix_nano: int, ) -> _Aggregation:
temporality = AggregationTemporality.UNSPECIFIED instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED if isinstance(instrument, Synchronous): temporality = AggregationTemporality.DELTA instrument_aggregation_temporality = AggregationTemporality.DELTA elif isinstance(instrument, Asynchronous): temporality = AggregationTemporality.CUMULATIVE instrument_aggregation_temporality = ( AggregationTemporality.CUMULATIVE )
return _SumAggregation( attributes, isinstance(instrument, (Counter, ObservableCounter)), temporality, instrument_aggregation_temporality, start_time_unix_nano, )
Expand Down