Skip to content

Commit

Permalink
attempt to implement
Browse files Browse the repository at this point in the history
  • Loading branch information
hughsimpson committed Aug 22, 2023
1 parent cf5c6a0 commit 1abb8b3
Showing 1 changed file with 122 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,51 +112,137 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
d))
}

private def getExpoBucketCounts(maxBucketCount: Int)(s: Snapshot[Distribution]) = {
def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v)
class ItWithLast[T](it: Iterator[T], last: T) extends Iterator[T] {
private var showedLast: Boolean = false

val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble))
def hasNext: Boolean = it.hasNext || !showedLast

def next(): T = if (it.hasNext) it.next() else if (!showedLast) {
showedLast = true; last
} else throw new RuntimeException("Next on empty Iterator")
}

private def getExpoBucketCounts(scale: Int, maxBucketCount: Int)(s: Snapshot[Distribution]) = {
val base = Math.pow(2, Math.pow(2, -scale))
// lower boundary of index 0 is always 0 (inclusive) https://opentelemetry.io/blog/2023/exponential-histograms/#bucket-calculation
val lowerBoundaryIterator: Iterator[Double] = (-maxBucketCount to maxBucketCount).map(i => Math.pow(base, i)).iterator
val lowerBoundaryIterator: Iterator[Double] = ((-maxBucketCount to maxBucketCount).map(i => Math.pow(base, i)) :+ Double.MaxValue).iterator
val valuesIterator = new ItWithLast[Distribution.Bucket](s.value.bucketsIterator, new Distribution.Bucket {
def value: Long = Long.MaxValue
def frequency: Long = 0
})
var fromLowerBound = valuesIterator.next()
var fromUpperBound = valuesIterator.next()
var toLowerBound = lowerBoundaryIterator.next()
var toUpperBound = lowerBoundaryIterator.next()
var zeroCount = 0d
var countInBucket = 0d

val counts = ArrayBuffer.newBuilder[JLong]
val foo = new ExponentialHistogramBuckets {
def getOffset: Int = ???
def getBucketCounts: util.List[JLong] = ???
def getTotalCount: Long = ???
val negativeCounts = ArrayBuffer.newBuilder[JDouble]
val positiveCounts = ArrayBuffer.newBuilder[JDouble]

def iterFrom: Double = {
val d = fromLowerBound.frequency.toDouble
fromLowerBound = fromUpperBound
fromUpperBound = valuesIterator.next()
d
}
// val boundaryIterator: Iterator[JDouble] = (bucketConfiguration :+ maxDouble).iterator
var nextBoundary = lowerBoundaryIterator.next()
var inBucketCount = 0L
for (el <- s.value.bucketsIterator) {
while (el.value > nextBoundary) {
nextBoundary = boundaryIterator.next()
counts += inBucketCount
inBucketCount = 0L

def iterTo: Double = {
toLowerBound = toUpperBound
toUpperBound = lowerBoundaryIterator.next()
val res = countInBucket
countInBucket = 0
res
}
// normal case
while (lowerBoundaryIterator.hasNext && valuesIterator.hasNext) {
if (fromUpperBound.value <= toLowerBound) {
countInBucket += iterFrom // Or drop?
} else if (fromLowerBound.value >= toUpperBound) toLowerBound match {
case 1 => zeroCount += iterTo
case b if b < 1 => negativeCounts += iterTo
case b if b > 1 => positiveCounts += iterTo
} else if (fromUpperBound.value == toUpperBound) toLowerBound match {
case 1 =>
zeroCount += iterFrom
iterTo
case b if b < 1 =>
countInBucket += iterFrom
negativeCounts += iterTo
case b if b > 1 =>
countInBucket += iterFrom
positiveCounts += iterTo
} else if (fromUpperBound.value > toUpperBound) {
val firstBonus: JDouble = countInBucket
var negBuckets = 0
var zeroBuckets = 0
var posBuckets = 0
while (fromUpperBound.value > toUpperBound && lowerBoundaryIterator.hasNext) {
if (toLowerBound < 1) negBuckets += 1
else if (toLowerBound == 1) zeroBuckets += 1
else if (toLowerBound >= 1) posBuckets += 1
toLowerBound = toUpperBound
toUpperBound = lowerBoundaryIterator.next()
}
val totalBuckets = negBuckets + zeroBuckets + posBuckets
val avg = JDouble valueOf iterFrom / totalBuckets
negativeCounts ++= (if (negBuckets > 0) JDouble.valueOf(firstBonus + avg) +: Array.fill(negBuckets - 1)(avg) else Nil)
zeroCount += (if (negBuckets == 0 && zeroBuckets == 1) JDouble.valueOf(firstBonus + avg) else if (zeroBuckets == 1) avg else JDouble.valueOf(0))
positiveCounts ++= (
if (negBuckets == 0 && zeroBuckets == 0 && posBuckets > 0)
JDouble.valueOf(firstBonus + avg) +: Array.fill(posBuckets - 1)(avg)
else Array.fill(posBuckets)(avg))
} else /*if (fromUpperBound.value < toUpperBound) */ toLowerBound match {
case 1 => zeroCount += iterFrom
case _ => countInBucket += iterFrom
}
inBucketCount += el.frequency
}
while (boundaryIterator.hasNext) {
counts += inBucketCount
boundaryIterator.next()
inBucketCount = 0L
var usedLastValue = false
// more buckets left to fill but only one unused value, sitting in fromLowerBound.
while (lowerBoundaryIterator.hasNext) {
if (fromLowerBound.value > toLowerBound && fromLowerBound.value < toUpperBound) {
usedLastValue = true
countInBucket += fromLowerBound.frequency
}
toLowerBound match {
case 1 => zeroCount += iterTo
case b if b < 1 => negativeCounts += iterTo
case b if b > 1 => positiveCounts += iterTo
}
}
counts += inBucketCount
counts
// more values left, but only one unfilled bucket, sitting in toLowerBound
while (valuesIterator.hasNext) {
countInBucket += iterFrom
}
if (!usedLastValue) countInBucket += fromLowerBound.frequency
positiveCounts += countInBucket

val negBucket = new ExponentialHistogramBuckets {
val getOffset: Int = -maxBucketCount
private val doubles: ArrayBuffer[JLong] = negativeCounts.result().map(JLong valueOf _.toLong) // TODO: toLong here loses things
val getBucketCounts: util.List[JLong] = new JArrayList(doubles.asJava)
val getTotalCount: Long = doubles.foldLeft(0L)(_ + _)
}
val posBucket = new ExponentialHistogramBuckets {
val getOffset: Int = 1
private val doubles: ArrayBuffer[JLong] = positiveCounts.result().map(JLong valueOf _.toLong) // TODO: we should normalise at avg
val getBucketCounts: util.List[JLong] = new JArrayList(doubles.asJava)
val getTotalCount: Long = doubles.foldLeft(0L)(_ + _)
}
(negBucket, zeroCount.longValue(), posBucket) // TODO: instead of having these toLongs
}

private def toExponentialHistogramData(distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] =
private def toExponentialHistogramData(maxBucketCount: Int, distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] =
distributions.filter(_.value.buckets.nonEmpty) match {
case Nil => None
case nonEmpty =>
val mapped = nonEmpty.flatMap { s =>
s.value match {
case zigZag: Distribution.ZigZagCounts =>
logger.error("Unable to construct exponential histogram data - Unimplemented")
None
def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v)
val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble))
val (neg, zero, pos) = getExpoBucketCounts(scale, maxBucketCount)(s)
Some(ExponentialHistogramPointData.create(
???, zigZag.sum, ???, ???, ???, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
scale, zigZag.sum, zero, pos, neg, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
))
case _ =>
logger.error("Unable to construct exponential histogram data - only ZigZagCounts distribution can be converted")
Expand All @@ -167,15 +253,17 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
else None
}

def convertExponentialHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] =
toExponentialHistogramData(histogram.instruments).map(d =>
def convertExponentialHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] = {
val maxBucketCount = expoBucketConfig(histogram.name, histogram.settings.unit)
toExponentialHistogramData(maxBucketCount, histogram.instruments).map(d =>
ImmutableMetricData.createExponentialHistogram(
resource,
instrumentationScopeInfo(histogram),
histogram.name,
histogram.description,
toString(histogram.settings.unit),
d))
}

def convertHistogram(histogramFormat: HistogramFormat)(histogram: MetricSnapshot.Distributions): Option[MetricData] = histogramFormat match {
case Explicit => convertExplicitHistogram(histogram)
Expand Down Expand Up @@ -220,8 +308,9 @@ private[otel] object MetricsConverter {
}

private val bases = (maxScale to minScale by -1).map(scale => (scale, Math.pow(2, Math.pow(2, -scale)))).toArray

def maxScale(maxBucketCount: Int)(v: JDouble): Int = {
if (v >= 1) bases.collectFirst{ case (scale, base) if Math.pow(base, maxBucketCount) >= v => scale}.getOrElse(minScale)
else bases.collectFirst{ case (scale, base) if Math.pow(base, -maxBucketCount) <= v => scale}.getOrElse(minScale)
if (v >= 1) bases.collectFirst { case (scale, base) if Math.pow(base, maxBucketCount) >= v => scale }.getOrElse(minScale)
else bases.collectFirst { case (scale, base) if Math.pow(base, -maxBucketCount) <= v => scale }.getOrElse(minScale)
}
}

0 comments on commit 1abb8b3

Please sign in to comment.