Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
hughsimpson committed Aug 22, 2023
1 parent 0d1f75d commit ad2044d
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 84 deletions.
68 changes: 26 additions & 42 deletions reporters/kamon-opentelemetry/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -70,54 +70,22 @@ kamon.otel {
}

explicit-histo-boundaries {
default-boundaries = [
10,
30,
100,
300,
1000,
3000,
10000,
30000,
100000
# Same as defaults from https://opentelemetry.io/docs/specs/otel/metrics/sdk/#explicit-bucket-histogram-aggregation
default-buckets = [
0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000
]

time-boundaries = [
0.005,
0.01,
0.025,
0.05,
0.075,
0.1,
0.25,
0.5,
0.75,
1,
2.5,
5,
7.5,
10
# The following are the same as for the prometheus reporter default values
time-buckets = [
0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10
]

information-boundaries = [
512,
1024,
2048,
4096,
16384,
65536,
524288,
1048576
information-buckets = [
512, 1024, 2048, 4096, 16384, 65536, 524288, 1048576
]

percentage-boundaries = [
20,
40,
60,
70,
80,
90,
95
percentage-buckets = [
20, 40, 60, 70, 80, 90, 95
]

# Per metric overrides are possible by specifying the metric name and the histogram buckets here
Expand All @@ -126,6 +94,22 @@ kamon.otel {
// "akka.actor.processing-time" = [0.1, 1.0, 10.0]
}
}

exponential-histo-boundaries {
default-bucket-count = 160

time-bucket-count = 160

information-bucket-count = 160

percentage-bucket-count = 100

# Per metric overrides are possible by specifying the metric name and the histogram buckets here
custom {
// example:
// "akka.actor.processing-time" = 3
}
}
}

# Arbitrary key-value pairs that further identify the environment where this service instance is running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ import io.opentelemetry.sdk.resources.Resource
import kamon.metric.Instrument.Snapshot
import kamon.metric.{Distribution, MeasurementUnit, MetricSnapshot, PeriodSnapshot}
import kamon.otel.HistogramFormat.{Explicit, Exponential, HistogramFormat}
import kamon.otel.MetricsConverter.{ExplBucketFn, ExpoBucketFn}
import org.slf4j.LoggerFactory

import java.lang.{Double => JDouble, Long => JLong}
import java.time.Instant
import java.util.{Collection => JCollection, ArrayList => JArrayList}
import java.util.{ArrayList => JArrayList, Collection => JCollection}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, from: Instant, to: Instant, bucketConfig: (String, MeasurementUnit) => Seq[JDouble]) {
class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, from: Instant, to: Instant,
explBucketConfig: ExplBucketFn, expoBucketConfig: ExpoBucketFn) {
private val maxDouble: JDouble = JDouble.valueOf(JDouble.MAX_VALUE)
private val logger = LoggerFactory.getLogger(getClass)
private val fromNs = from.toEpochMilli * 1000000
Expand All @@ -55,7 +57,7 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
toString(gauge.settings.unit),
toGaugeData(gauge.instruments))

private def toExplicitHistogramDatum(bucketConfiguration: Seq[JDouble])(s: Snapshot[Distribution]): HistogramPointData = {
private def getExplBucketCounts(bucketConfiguration: Seq[JDouble])(s: Snapshot[Distribution]) = {
val counts = ArrayBuffer.newBuilder[JLong]
val boundaryIterator: Iterator[JDouble] = (bucketConfiguration :+ maxDouble).iterator
var nextBoundary = boundaryIterator.next()
Expand All @@ -74,6 +76,11 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
inBucketCount = 0L
}
counts += inBucketCount
counts
}

private def toExplicitHistogramDatum(bucketConfiguration: Seq[JDouble])(s: Snapshot[Distribution]): HistogramPointData = {
val counts = getExplBucketCounts(bucketConfiguration)(s)
ImmutableHistogramPointData.create(
fromNs,
toNs,
Expand All @@ -93,7 +100,7 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
}

def convertExplicitHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] = {
val bucketConfiguration = bucketConfig(histogram.name, histogram.settings.unit)
val bucketConfiguration = explBucketConfig(histogram.name, histogram.settings.unit)
toExplicitHistogramData(bucketConfiguration, histogram.instruments).map(d =>
ImmutableMetricData.createDoubleHistogram(
resource,
Expand All @@ -104,6 +111,30 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
d))
}

private def getExpoBucketCounts(maxBucketCount: Int)(s: Snapshot[Distribution]) = {
val min = s.value.min
val max = s.value.max
val counts = ArrayBuffer.newBuilder[JLong]
// val boundaryIterator: Iterator[JDouble] = (bucketConfiguration :+ maxDouble).iterator
// var nextBoundary = boundaryIterator.next()
// var inBucketCount = 0L
// for (el <- s.value.bucketsIterator) {
// while (el.value > nextBoundary) {
// nextBoundary = boundaryIterator.next()
// counts += inBucketCount
// inBucketCount = 0L
// }
// inBucketCount += el.frequency
// }
// while (boundaryIterator.hasNext) {
// counts += inBucketCount
// boundaryIterator.next()
// inBucketCount = 0L
// }
// counts += inBucketCount
counts
}

private def toExponentialHistogramData(distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] =
distributions.filter(_.value.buckets.nonEmpty) match {
case Nil => None
Expand All @@ -113,9 +144,9 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
case zigZag: Distribution.ZigZagCounts =>
logger.error("Unable to construct exponential histogram data - Unimplemented")
None
// Some(ExponentialHistogramPointData.create(
// ???, zigZag.sum, ???, ???, ???, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
// ))
Some(ExponentialHistogramPointData.create(
???, zigZag.sum, ???, ???, ???, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
))
case _ =>
logger.error("Unable to construct exponential histogram data - only ZigZagCounts distribution can be converted")
None
Expand Down Expand Up @@ -161,8 +192,11 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
* Converts Kamon metrics to OpenTelemetry [[MetricData]]s
*/
private[otel] object MetricsConverter {
def convert(resource: Resource, kamonVersion: String, histogramFormat: HistogramFormat, bucketConfig: (String, MeasurementUnit) => Seq[JDouble])(metrics: PeriodSnapshot): JCollection[MetricData] = {
val converter = new WithResourceMetricsConverter(resource, kamonVersion, metrics.from, metrics.to, bucketConfig)
type ExplBucketFn = (String, MeasurementUnit) => Seq[JDouble]
type ExpoBucketFn = (String, MeasurementUnit) => Int
def convert(resource: Resource, kamonVersion: String, histogramFormat: HistogramFormat,
explicitBucketConfig: ExplBucketFn, exponentialBucketConfig: ExpoBucketFn)(metrics: PeriodSnapshot): JCollection[MetricData] = {
val converter = new WithResourceMetricsConverter(resource, kamonVersion, metrics.from, metrics.to, explicitBucketConfig, exponentialBucketConfig)
val gauges = metrics.gauges.filter(_.instruments.nonEmpty).map(converter.convertGauge)
val histograms = (metrics.histograms ++ metrics.timers ++ metrics.rangeSamplers).filter(_.instruments.nonEmpty)
.flatMap(converter.convertHistogram(histogramFormat))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,54 @@ import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

case class BucketConfig(
defaultBuckets: Seq[JDouble],
timeBuckets: Seq[JDouble],
informationBuckets: Seq[JDouble],
percentageBuckets: Seq[JDouble],
customBuckets: Map[String, Seq[JDouble]]
case class BucketConfig[T](
defaultBuckets: T,
timeBuckets: T,
informationBuckets: T,
percentageBuckets: T,
customBuckets: Map[String, T]
)

object Buckets {

private def readCustomBuckets(customBuckets: Config): Map[String, Seq[java.lang.Double]] =
customBuckets
.topLevelKeys
.map(k => (k, customBuckets.getDoubleList(ConfigUtil.quoteString(k)).asScala.toSeq))
.toMap

private def readCustomBucketsExpo(customBuckets: Config): Map[String, Int] =
customBuckets
.topLevelKeys
.map(k => (k, customBuckets.getInt(ConfigUtil.quoteString(k))))
.toMap

def parseBucketConfig(newConfig: Config): BucketConfig[Seq[JDouble]] = BucketConfig[Seq[JDouble]](
newConfig.getDoubleList("default-buckets").asScala.toSeq,
newConfig.getDoubleList("time-buckets").asScala.toSeq,
informationBuckets = newConfig.getDoubleList("information-buckets").asScala.toSeq,
percentageBuckets = newConfig.getDoubleList("percentage-buckets").asScala.toSeq,
readCustomBuckets(newConfig.getConfig("custom")))

def resolveBucketConfiguration[T](bucketConfig: BucketConfig[T])(metricName: String, unit: MeasurementUnit): T =
bucketConfig.customBuckets.getOrElse(
metricName,
unit.dimension match {
case Dimension.Time => bucketConfig.timeBuckets
case Dimension.Information => bucketConfig.informationBuckets
case Dimension.Percentage => bucketConfig.percentageBuckets
case _ => bucketConfig.defaultBuckets
}
)

def parseExpoBucketConfig(newConfig: Config): BucketConfig[Int] = BucketConfig[Int](
newConfig.getInt("default-bucket-count"),
newConfig.getInt("time-bucket-count"),
informationBuckets = newConfig.getInt("information-bucket-count"),
percentageBuckets = newConfig.getInt("percentage-bucket-count"),
readCustomBucketsExpo(newConfig.getConfig("custom")))

}
object OpenTelemetryMetricsReporter {
private val logger = LoggerFactory.getLogger(classOf[OpenTelemetryMetricsReporter])
private val kamonSettings: Status.Settings = Kamon.status().settings()
Expand Down Expand Up @@ -93,32 +133,14 @@ class OpenTelemetryMetricsReporter(metricsServiceFactory: OpenTelemetryConfigura
Explicit
}

def readCustomBuckets(customBuckets: Config): Map[String, Seq[java.lang.Double]] =
customBuckets
.topLevelKeys
.map(k => (k, customBuckets.getDoubleList(ConfigUtil.quoteString(k)).asScala.toSeq))
.toMap

val bucketConfig = BucketConfig(
newConfig.getDoubleList("kamon.otel.explicit-histo-boundaries.default-boundaries").asScala.toSeq,
newConfig.getDoubleList("kamon.otel.explicit-histo-boundaries.time-boundaries").asScala.toSeq,
informationBuckets = newConfig.getDoubleList("kamon.otel.explicit-histo-boundaries.information-boundaries").asScala.toSeq,
percentageBuckets = newConfig.getDoubleList("kamon.otel.explicit-histo-boundaries.percentage-boundaries").asScala.toSeq,
readCustomBuckets(newConfig.getConfig("kamon.otel.explicit-histo-boundaries.custom")))


def resolveBucketConfiguration(metricName: String, unit: MeasurementUnit): Seq[JDouble] =
bucketConfig.customBuckets.getOrElse(
metricName,
unit.dimension match {
case Dimension.Time => bucketConfig.timeBuckets
case Dimension.Information => bucketConfig.informationBuckets
case Dimension.Percentage => bucketConfig.percentageBuckets
case _ => bucketConfig.defaultBuckets
}
)

this.metricsConverterFunc = MetricsConverter.convert(resource, kamonSettings.version, histogramFormat, resolveBucketConfiguration)
val explicitBucketConfig = Buckets.parseBucketConfig(newConfig.getConfig("kamon.otel.explicit-histo-boundaries"))
val exponentialBucketConfig = Buckets.parseExpoBucketConfig(newConfig.getConfig("kamon.otel.exponential-histo-boundaries"))

val resolveExplicitBucketConfiguration = Buckets.resolveBucketConfiguration(explicitBucketConfig) _

val resolveExponentialBucketConfiguration = Buckets.resolveBucketConfiguration(exponentialBucketConfig) _

this.metricsConverterFunc = MetricsConverter.convert(resource, kamonSettings.version, histogramFormat, resolveExplicitBucketConfiguration, resolveExponentialBucketConfiguration)

this.metricsService = Option(metricsServiceFactory.apply(config))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ kamon {
otel {
attributes = "att1=v1, att2 = v2 ,att3=+a%3Db%2Cc%3Dd+"
explicit-histo-boundaries {
default-boundaries = [
default-buckets = [
1,
2,
3,
Expand Down

0 comments on commit ad2044d

Please sign in to comment.