From 403cab857ef9f8da6dc554bf596aa7a582f8bcd3 Mon Sep 17 00:00:00 2001 From: Hugh Simpson Date: Tue, 22 Aug 2023 20:14:43 +0100 Subject: [PATCH] seems to work --- .../scala/kamon/otel/MetricsConverter.scala | 72 +++++++++---------- .../OpenTelemetryMetricReporterSpec.scala | 51 ++++++++++++- 2 files changed, 85 insertions(+), 38 deletions(-) diff --git a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/MetricsConverter.scala b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/MetricsConverter.scala index 3540f6758..b6c56dec9 100644 --- a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/MetricsConverter.scala +++ b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/MetricsConverter.scala @@ -118,7 +118,8 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro def hasNext: Boolean = it.hasNext || !showedLast def next(): T = if (it.hasNext) it.next() else if (!showedLast) { - showedLast = true; last + showedLast = true + last } else throw new RuntimeException("Next on empty Iterator") } @@ -127,26 +128,27 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro val lowerBoundaryIterator: Iterator[Double] = ((-maxBucketCount to maxBucketCount).map(i => Math.pow(base, i)) :+ Double.MaxValue).iterator val valuesIterator = new ItWithLast[Distribution.Bucket](s.value.bucketsIterator, new Distribution.Bucket { def value: Long = Long.MaxValue + def frequency: Long = 0 }) var fromLowerBound = valuesIterator.next() var fromUpperBound = valuesIterator.next() var toLowerBound = lowerBoundaryIterator.next() var toUpperBound = lowerBoundaryIterator.next() - var zeroCount = 0d - var countInBucket = 0d + var zeroCount: JLong = 0L + var countInBucket = 0L - val negativeCounts = ArrayBuffer.newBuilder[JDouble] - val positiveCounts = ArrayBuffer.newBuilder[JDouble] + val negativeCounts = ArrayBuffer.newBuilder[JLong] + val positiveCounts = ArrayBuffer.newBuilder[JLong] - def iterFrom: Double = { - val d = fromLowerBound.frequency.toDouble + def iterFrom: JLong = { + val d = fromLowerBound.frequency fromLowerBound = fromUpperBound fromUpperBound = valuesIterator.next() d } - def iterTo: Double = { + def iterTo: JLong = { toLowerBound = toUpperBound toUpperBound = lowerBoundaryIterator.next() val res = countInBucket @@ -172,7 +174,7 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro countInBucket += iterFrom positiveCounts += iterTo } else if (fromUpperBound.value > toUpperBound) { - val firstBonus: JDouble = countInBucket + val firstBonus: JLong = countInBucket var negBuckets = 0 var zeroBuckets = 0 var posBuckets = 0 @@ -183,14 +185,16 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro toLowerBound = toUpperBound toUpperBound = lowerBoundaryIterator.next() } - val totalBuckets = negBuckets + zeroBuckets + posBuckets - val avg = JDouble valueOf iterFrom / totalBuckets - negativeCounts ++= (if (negBuckets > 0) JDouble.valueOf(firstBonus + avg) +: Array.fill(negBuckets - 1)(avg) else Nil) - zeroCount += (if (negBuckets == 0 && zeroBuckets == 1) JDouble.valueOf(firstBonus + avg) else if (zeroBuckets == 1) avg else JDouble.valueOf(0)) + val total = iterFrom + // Not sure about this... everything's going into the first bucket, even though we might be spanning multiple target buckets. + // Might be better to do something like push the avg.floor into each bucket, interpolating the remainder. + // OTOH it may not really come up much in practice, since the internal histos are likely to have similar or finer granularity + negativeCounts ++= (if (negBuckets > 0) JLong.valueOf(firstBonus + total) +: Array.fill(negBuckets - 1)(JLong.valueOf(0)) else Nil) + zeroCount += (if (negBuckets == 0 && zeroBuckets == 1) JLong.valueOf(firstBonus + total) else JLong.valueOf(0)) positiveCounts ++= ( if (negBuckets == 0 && zeroBuckets == 0 && posBuckets > 0) - JDouble.valueOf(firstBonus + avg) +: Array.fill(posBuckets - 1)(avg) - else Array.fill(posBuckets)(avg)) + JLong.valueOf(firstBonus + total) +: Array.fill(posBuckets - 1)(JLong.valueOf(0)) + else Array.fill(posBuckets)(JLong.valueOf(0))) } else /*if (fromUpperBound.value < toUpperBound) */ toLowerBound match { case 1 => zeroCount += iterFrom case _ => countInBucket += iterFrom @@ -216,19 +220,19 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro if (!usedLastValue) countInBucket += fromLowerBound.frequency positiveCounts += countInBucket - val negBucket = new ExponentialHistogramBuckets { + val negBucket: ExponentialHistogramBuckets = new ExponentialHistogramBuckets { val getOffset: Int = -maxBucketCount - private val doubles: ArrayBuffer[JLong] = negativeCounts.result().map(JLong valueOf _.toLong) // TODO: toLong here loses things - val getBucketCounts: util.List[JLong] = new JArrayList(doubles.asJava) - val getTotalCount: Long = doubles.foldLeft(0L)(_ + _) + private val longs: ArrayBuffer[JLong] = negativeCounts.result() + val getBucketCounts: util.List[JLong] = new JArrayList(longs.asJava) + val getTotalCount: Long = longs.foldLeft(0L)(_ + _) } - val posBucket = new ExponentialHistogramBuckets { + val posBucket: ExponentialHistogramBuckets = new ExponentialHistogramBuckets { val getOffset: Int = 1 - private val doubles: ArrayBuffer[JLong] = positiveCounts.result().map(JLong valueOf _.toLong) // TODO: we should normalise at avg - val getBucketCounts: util.List[JLong] = new JArrayList(doubles.asJava) - val getTotalCount: Long = doubles.foldLeft(0L)(_ + _) + private val longs: ArrayBuffer[JLong] = positiveCounts.result() + val getBucketCounts: util.List[JLong] = new JArrayList(longs.asJava) + val getTotalCount: Long = longs.foldLeft(0L)(_ + _) } - (negBucket, zeroCount.longValue(), posBucket) // TODO: instead of having these toLongs + (negBucket, zeroCount, posBucket) } private def toExponentialHistogramData(maxBucketCount: Int, distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] = @@ -236,18 +240,14 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro case Nil => None case nonEmpty => val mapped = nonEmpty.flatMap { s => - s.value match { - case zigZag: Distribution.ZigZagCounts => - def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v) - val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble)) - val (neg, zero, pos) = getExpoBucketCounts(scale, maxBucketCount)(s) - Some(ExponentialHistogramPointData.create( - scale, zigZag.sum, zero, pos, neg, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]() - )) - case _ => - logger.error("Unable to construct exponential histogram data - only ZigZagCounts distribution can be converted") - None - } + def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v) + + // Could also calculate an 'offset' here, but defaulting to offset = 1 for simplicity + val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble)) + val (neg, zero, pos) = getExpoBucketCounts(scale, maxBucketCount)(s) + Some(ExponentialHistogramPointData.create( + scale, s.value.sum, zero, pos, neg, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]() + )) } if (mapped.nonEmpty) Some(ImmutableExponentialHistogramData.create(AggregationTemporality.DELTA, mapped.asJava)) else None diff --git a/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/OpenTelemetryMetricReporterSpec.scala b/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/OpenTelemetryMetricReporterSpec.scala index f41ffabb9..bb5a369ac 100644 --- a/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/OpenTelemetryMetricReporterSpec.scala +++ b/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/OpenTelemetryMetricReporterSpec.scala @@ -16,8 +16,10 @@ package kamon.otel +import com.typesafe.config.{Config, ConfigValue, ConfigValueFactory} import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.ExponentialHistogramData import kamon.Kamon import kamon.Kamon.config import kamon.metric._ @@ -37,10 +39,10 @@ class OpenTelemetryMetricReporterSpec extends AnyWordSpec with Matchers with Reconfigure { reconfigure => - private def openTelemetryMetricsReporter(): (OpenTelemetryMetricsReporter, MockMetricsService) = { + private def openTelemetryMetricsReporter(newConfig: Config = config): (OpenTelemetryMetricsReporter, MockMetricsService) = { val metricsService = new MockMetricsService() val reporter = new OpenTelemetryMetricsReporter(_ => metricsService)(ExecutionContext.global) - reporter.reconfigure(config) + reporter.reconfigure(newConfig) (reporter, metricsService) } @@ -148,6 +150,51 @@ class OpenTelemetryMetricReporterSpec extends AnyWordSpec points.head.getBoundaries.asScala shouldEqual Seq[JDouble](1d, 2d, 3d, 4d, 10d) points.head.getCounts.asScala shouldEqual Seq[JDouble](2d, 2d, 3d, 0d, 1d, 1d) } + "send exponential histogram metrics" in { + val newConfig = config.withValue("kamon.otel.metrics.histogram-format", ConfigValueFactory.fromAnyRef("base2_exponential_bucket_histogram")) + val (reporter, mockService) = openTelemetryMetricsReporter(newConfig) + val now = Instant.now() + reporter.reportPeriodSnapshot( + PeriodSnapshot.apply( + now.minusMillis(1000), + now, + Nil, + Nil, + MetricSnapshot.ofDistributions( + "test.histogram", + "test", + Metric.Settings.ForDistributionInstrument(MeasurementUnit.none, java.time.Duration.ZERO, DynamicRange.Default), + Instrument.Snapshot( + TagSet.from(Map("tag1" -> "value1")), + buildHistogramDist(Seq(1L -> 2L, 2L -> 2L, 3L -> 3L, 5L -> 1L, 15L -> 1L)) + ) :: Nil) :: Nil, + Nil, + Nil + ) + ) + // basic sanity + mockService.exportMetricsServiceRequest should not be empty + mockService.exportMetricsServiceRequest.get should have size 1 + val exportedMetrics: Seq[MetricData] = mockService.exportMetricsServiceRequest.get.asScala.toSeq + exportedMetrics should have size 1 + val metricData = exportedMetrics.head + + + // check value + metricData.getName should equal("test.histogram") + metricData.getDescription should equal("test") + val sumData = ExponentialHistogramData.fromMetricData(metricData) + val points = sumData.getPoints.asScala.toSeq + points should have size 1 + points.head.getAttributes should have size 1 + points.head.getAttributes.get(AttributeKey.stringKey("tag1")) should equal("value1") + points.head.getScale shouldEqual 5 + points.head.getNegativeBuckets.getTotalCount shouldEqual 0L + points.head.getZeroCount shouldEqual 2L + points.head.getPositiveBuckets.getTotalCount shouldEqual 7L + points.head.getSum shouldEqual 35L + points.head.getCount shouldEqual 9L + } "calculate sensible scales for values" in { def randomDouble = Random.nextInt(10) match {