From 12983f93a45a6434ab80a2f2a2ca913680f51048 Mon Sep 17 00:00:00 2001 From: hughsimpson Date: Mon, 9 Oct 2023 13:45:49 +0100 Subject: [PATCH 01/12] fix: Regression on jar size, slf4j (#1299) * pekko dependencies should only be provided * bump sbt, slf4j --- build.sbt | 2 +- instrumentation/kamon-pekko/build.sbt | 16 ++++++++-------- project/Build.scala | 6 +++--- project/build.properties | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/build.sbt b/build.sbt index 5d9bbea3b..8a31a09fb 100644 --- a/build.sbt +++ b/build.sbt @@ -60,7 +60,7 @@ lazy val `kamon-core` = (project in file("core/kamon-core")) ), libraryDependencies ++= Seq( "com.typesafe" % "config" % "1.4.1", - "org.slf4j" % "slf4j-api" % "1.7.25", + "org.slf4j" % "slf4j-api" % "1.7.36", "org.hdrhistogram" % "HdrHistogram" % "2.1.9" % "provided,shaded", "org.jctools" % "jctools-core" % "3.3.0" % "provided,shaded", "com.oracle.substratevm" % "svm" % "19.2.1" % "provided" diff --git a/instrumentation/kamon-pekko/build.sbt b/instrumentation/kamon-pekko/build.sbt index 83c083f55..c4b7ac397 100644 --- a/instrumentation/kamon-pekko/build.sbt +++ b/instrumentation/kamon-pekko/build.sbt @@ -6,16 +6,16 @@ inConfig(Compile)(Defaults.compileSettings ++ Seq( val pekkoVersion = "1.0.1" libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else Seq( - kanelaAgent, + kanelaAgent % "provided", scalatest % Test, logbackClassic % Test, - "org.apache.pekko" %% "pekko-actor" % pekkoVersion, - "org.apache.pekko" %% "pekko-testkit" % pekkoVersion, - "org.apache.pekko" %% "pekko-slf4j" % pekkoVersion, - "org.apache.pekko" %% "pekko-remote" % pekkoVersion, - "org.apache.pekko" %% "pekko-cluster" % pekkoVersion, - "org.apache.pekko" %% "pekko-cluster-sharding" % pekkoVersion, - "org.apache.pekko" %% "pekko-protobuf" % pekkoVersion, + "org.apache.pekko" %% "pekko-actor" % pekkoVersion % "provided,test", + "org.apache.pekko" %% "pekko-testkit" % pekkoVersion % "provided,test", + "org.apache.pekko" %% "pekko-slf4j" % pekkoVersion % "provided,test", + "org.apache.pekko" %% "pekko-remote" % pekkoVersion % "provided,test", + "org.apache.pekko" %% "pekko-cluster" % pekkoVersion % "provided,test", + "org.apache.pekko" %% "pekko-cluster-sharding" % pekkoVersion % "provided,test", + "org.apache.pekko" %% "pekko-protobuf" % pekkoVersion % "provided,test", "org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test )} diff --git a/project/Build.scala b/project/Build.scala index 82185ce78..bd76ec5fd 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -27,9 +27,9 @@ object BaseProject extends AutoPlugin { lazy val Shaded = config("shaded").hide val kanelaAgent = "io.kamon" % "kanela-agent" % "1.0.17" - val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.25" - val slf4jnop = "org.slf4j" % "slf4j-nop" % "1.7.24" - val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.3" + val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.36" + val slf4jnop = "org.slf4j" % "slf4j-nop" % "1.7.36" + val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.12" val scalatest = "org.scalatest" %% "scalatest" % "3.2.9" val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.10" val okHttp = "com.squareup.okhttp3" % "okhttp" % "4.10.0" diff --git a/project/build.properties b/project/build.properties index 875b706a8..27430827b 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.2 +sbt.version=1.9.6 From d2db875a38c0d94915f4dbcefd869760e79a3dd9 Mon Sep 17 00:00:00 2001 From: Takayuki Takagi Date: Tue, 10 Oct 2023 16:29:41 +0900 Subject: [PATCH 02/12] Add Datadog specific error tags (#1228) --- .../kamon/datadog/DatadogSpanReporter.scala | 10 ++++++++- .../datadog/DatadogSpanReporterSpec.scala | 21 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogSpanReporter.scala b/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogSpanReporter.scala index cdd013850..de07f31e9 100644 --- a/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogSpanReporter.scala +++ b/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogSpanReporter.scala @@ -25,6 +25,7 @@ import kamon.{ ClassLoading, Kamon } import kamon.datadog.DatadogSpanReporter.Configuration import kamon.module.{ ModuleFactory, SpanReporter } import kamon.tag.{ Lookups, Tag, TagSet } +import kamon.trace.Span.TagKeys import kamon.util.{ EnvironmentTags, Filter } import org.slf4j.LoggerFactory @@ -52,7 +53,14 @@ object KamonDataDogTranslatorDefault extends KamonDataDogTranslator { val start = from.getEpochNano val duration = Duration.between(from, span.to) val marks = span.marks.map { m => m.key -> m.instant.getEpochNano.toString }.toMap - val tags = (span.tags.all() ++ span.metricTags.all() ++ additionalTags.all()).map { t => + val errorTags = if (span.hasError) { + val builder = TagSet.builder() + span.tags.get(Lookups.option(TagKeys.ErrorMessage)).foreach(msg => builder.add("error.msg", msg)) + span.tags.get(Lookups.option(TagKeys.ErrorStacktrace)).foreach(st => builder.add("error.stack", st)) + builder.build() + } else TagSet.Empty + + val tags = (span.tags.all() ++ span.metricTags.all() ++ errorTags.all() ++ additionalTags.all()).map { t => t.key -> Tag.unwrapValue(t).toString } val meta = (marks ++ tags).filterKeys(tagFilter.accept(_)).toMap diff --git a/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogSpanReporterSpec.scala b/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogSpanReporterSpec.scala index 896b68e46..2257fa0ae 100644 --- a/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogSpanReporterSpec.scala +++ b/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogSpanReporterSpec.scala @@ -90,6 +90,26 @@ trait TestData { "error" -> 1 ) + val spanWithErrorTags = span.copy(tags = TagSet.from(Map( + "error" -> true, + "error.type" -> "RuntimeException", + "error.message" -> "Error message", + "error.stacktrace" -> "Error stacktrace" + )), hasError = true) + + val jsonWithErrorTags = json ++ Json.obj( + "meta" -> Json.obj( + "error" -> "true", + "env" -> "staging", + "error.type" -> "RuntimeException", + "error.message" -> "Error message", + "error.msg" -> "Error message", + "error.stacktrace" -> "Error stacktrace", + "error.stack" -> "Error stacktrace" + ), + "error" -> 1 + ) + val spanWithTags = span.copy(metricTags = TagSet.from( Map( @@ -152,6 +172,7 @@ trait TestData { "span with marks" -> (Seq(spanWithMarks), Json.arr(Json.arr(jsonWithMarks))), "span with meta and marks" -> (Seq(spanWithTagsAndMarks), Json.arr(Json.arr(jsonWithTagsAndMarks))), "span with error" -> (Seq(spanWithError), Json.arr(Json.arr(jsonWithError))), + "span with error tags" -> (Seq(spanWithErrorTags), Json.arr(Json.arr(jsonWithErrorTags))), "multiple spans with same trace" -> (Seq(span, spanWithTags), Json.arr(Json.arr(json, jsonWithTags))) // "multiple spans with two traces" -> (Seq(span, spanWithTags, otherTraceSpan, span), Json.arr(Json.arr(json, jsonWithTags, json), Json.arr(otherTraceJson))) From a96a353924612f8214f6c3c9d96461d5176c1108 Mon Sep 17 00:00:00 2001 From: Diego Parra Date: Thu, 12 Oct 2023 09:34:56 -0300 Subject: [PATCH 03/12] AtomicGetOrElseUpdateOnTrieMap::atomicGetOrElseUpdate -> TrieMap::getOrElseUpdate (#591) * remove AtomicGetOrElseUpdateOnTrieMap::atomicGetOrElseUpdate method in favor of TrieMap::getOrElseUpdate * fix rebase bugs --------- Co-authored-by: Hugh Simpson --- core/kamon-core/src/main/scala/kamon/Utilities.scala | 2 +- .../src/main/scala/kamon/metric/MetricRegistry.scala | 10 +++++----- core/kamon-core/src/main/scala/kamon/package.scala | 6 +----- .../src/main/scala/kamon/trace/AdaptiveSampler.scala | 6 +++--- .../akka/AkkaClusterShardingMetrics.scala | 4 ++-- .../scala/kamon/instrumentation/akka/AkkaMetrics.scala | 2 +- .../kamon/instrumentation/akka/AkkaRemoteMetrics.scala | 2 +- .../pekko/PekkoClusterShardingMetrics.scala | 4 ++-- .../kamon/instrumentation/pekko/PekkoMetrics.scala | 2 +- .../instrumentation/pekko/PekkoRemoteMetrics.scala | 2 +- 10 files changed, 18 insertions(+), 22 deletions(-) diff --git a/core/kamon-core/src/main/scala/kamon/Utilities.scala b/core/kamon-core/src/main/scala/kamon/Utilities.scala index 2c1efb07f..070be64ea 100644 --- a/core/kamon-core/src/main/scala/kamon/Utilities.scala +++ b/core/kamon-core/src/main/scala/kamon/Utilities.scala @@ -46,7 +46,7 @@ trait Utilities { self: Configuration => * empty. */ def filter(configKey: String): Filter = - _filters.atomicGetOrElseUpdate(configKey, Filter.from(configKey)) + _filters.getOrElseUpdate(configKey, Filter.from(configKey)) /** * Kamon's Clock implementation. diff --git a/core/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/core/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala index 23c175222..569066805 100644 --- a/core/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala +++ b/core/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala @@ -50,7 +50,7 @@ class MetricRegistry(config: Config, clock: Clock) { Metric.Counter = { val metric = validateInstrumentType[Metric.Counter] { - _metrics.atomicGetOrElseUpdate(name, _factory.counter(name, description, unit, autoUpdateInterval)) + _metrics.getOrElseUpdate(name, _factory.counter(name, description, unit, autoUpdateInterval)) } (name, Instrument.Type.Counter) checkDescription(metric.name, metric.description, description) @@ -66,7 +66,7 @@ class MetricRegistry(config: Config, clock: Clock) { Metric.Gauge = { val metric = validateInstrumentType[Metric.Gauge] { - _metrics.atomicGetOrElseUpdate(name, _factory.gauge(name, description, unit, autoUpdateInterval)) + _metrics.getOrElseUpdate(name, _factory.gauge(name, description, unit, autoUpdateInterval)) } (name, Instrument.Type.Gauge) checkDescription(metric.name, metric.description, description) @@ -82,7 +82,7 @@ class MetricRegistry(config: Config, clock: Clock) { autoUpdateInterval: Option[Duration]): Metric.Histogram = { val metric = validateInstrumentType[Metric.Histogram] { - _metrics.atomicGetOrElseUpdate(name, _factory.histogram(name, description, unit, dynamicRange, autoUpdateInterval)) + _metrics.getOrElseUpdate(name, _factory.histogram(name, description, unit, dynamicRange, autoUpdateInterval)) } (name, Instrument.Type.Histogram) checkDescription(metric.name, metric.description, description) @@ -98,7 +98,7 @@ class MetricRegistry(config: Config, clock: Clock) { def timer(name: String, description: Option[String], dynamicRange: Option[DynamicRange], autoUpdateInterval: Option[Duration]): Metric.Timer = { val metric = validateInstrumentType[Metric.Timer] { - _metrics.atomicGetOrElseUpdate(name, _factory.timer(name, description, Some(MeasurementUnit.time.nanoseconds), + _metrics.getOrElseUpdate(name, _factory.timer(name, description, Some(MeasurementUnit.time.nanoseconds), dynamicRange, autoUpdateInterval)) } (name, Instrument.Type.Timer) @@ -115,7 +115,7 @@ class MetricRegistry(config: Config, clock: Clock) { autoUpdateInterval: Option[Duration]): Metric.RangeSampler = { val metric = validateInstrumentType[Metric.RangeSampler] { - _metrics.atomicGetOrElseUpdate(name, _factory.rangeSampler(name, description, unit, dynamicRange, autoUpdateInterval)) + _metrics.getOrElseUpdate(name, _factory.rangeSampler(name, description, unit, dynamicRange, autoUpdateInterval)) } (name, Instrument.Type.RangeSampler) checkDescription(metric.name, metric.description, description) diff --git a/core/kamon-core/src/main/scala/kamon/package.scala b/core/kamon-core/src/main/scala/kamon/package.scala index 7ddc3f5a0..80f099566 100644 --- a/core/kamon-core/src/main/scala/kamon/package.scala +++ b/core/kamon-core/src/main/scala/kamon/package.scala @@ -64,14 +64,10 @@ package object kamon { /** - * Workaround to the non thread-safe [scala.collection.concurrent.TrieMap#getOrElseUpdate()] method. More details on - * why this is necessary can be found at [[https://issues.scala-lang.org/browse/SI-7943]]. + * Atomic variant of [scala.collection.concurrent.TrieMap#getOrElseUpdate()] method with cleanup and init functions. */ implicit class AtomicGetOrElseUpdateOnTrieMap[K, V](val trieMap: TrieMap[K, V]) extends AnyVal { - def atomicGetOrElseUpdate(key: K, op: => V): V = - atomicGetOrElseUpdate(key, op, { _: V => () }, { _: V => () }) - def atomicGetOrElseUpdate(key: K, op: => V, cleanup: V => Unit, init: V => Unit): V = trieMap.get(key) match { case Some(v) => v diff --git a/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala b/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala index 09eeffcce..feb7e35a7 100644 --- a/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala +++ b/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala @@ -48,13 +48,13 @@ class AdaptiveSampler extends Sampler { override def decide(operation: Sampler.Operation): SamplingDecision = { val operationName = operation.operationName() - val operationSampler = _samplers.get(operationName).getOrElse { + val operationSampler = _samplers.getOrElse(operationName, { // It might happen that the first time we see an operation under high concurrent throughput we will reach this // block more than once, but worse case effect is that we will rebalance the operation samplers more than once. - val sampler = _samplers.atomicGetOrElseUpdate(operationName, buildOperationSampler(operationName)) + val sampler = _samplers.getOrElseUpdate(operationName, buildOperationSampler(operationName)) rebalance() sampler - } + }) val decision = operationSampler.decide() if(decision == SamplingDecision.Sample) diff --git a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala index e6de342c4..b3c4a5351 100644 --- a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala +++ b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaClusterShardingMetrics.scala @@ -50,10 +50,10 @@ object AkkaClusterShardingMetrics { private val _shardTelemetry = ShardingInstruments.shardTelemetry(system, typeName, shardHostedEntities, shardProcessedMessages) def hostedEntitiesPerShardCounter(shardID: String): AtomicLong = - _shardTelemetry.entitiesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong()) + _shardTelemetry.entitiesPerShard.getOrElseUpdate(shardID, new AtomicLong()) def processedMessagesPerShardCounter(shardID: String): AtomicLong = - _shardTelemetry.messagesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong()) + _shardTelemetry.messagesPerShard.getOrElseUpdate(shardID, new AtomicLong()) // We should only remove when the ShardRegion actor is terminated. override def remove(): Unit = { diff --git a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaMetrics.scala b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaMetrics.scala index c4e5f404b..bc8c8bfe2 100644 --- a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaMetrics.scala +++ b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaMetrics.scala @@ -181,7 +181,7 @@ object AkkaMetrics { ) def forSystem(name: String): ActorSystemInstruments = - _systemInstrumentsCache.atomicGetOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name))) + _systemInstrumentsCache.getOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name))) class ActorSystemInstruments(tags: TagSet) extends InstrumentGroup(tags) { val deadLetters = register(SystemDeadLetters) diff --git a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaRemoteMetrics.scala b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaRemoteMetrics.scala index 7247b1aad..e0ddb3f73 100644 --- a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaRemoteMetrics.scala +++ b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaRemoteMetrics.scala @@ -41,5 +41,5 @@ object AkkaRemoteMetrics { } def serializationInstruments(system: String): SerializationInstruments = - _serializationInstrumentsCache.atomicGetOrElseUpdate(system, new SerializationInstruments(system)) + _serializationInstrumentsCache.getOrElseUpdate(system, new SerializationInstruments(system)) } diff --git a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala index 858dc9b9b..804b1e77c 100644 --- a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala +++ b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoClusterShardingMetrics.scala @@ -50,10 +50,10 @@ object PekkoClusterShardingMetrics { private val _shardTelemetry = ShardingInstruments.shardTelemetry(system, typeName, shardHostedEntities, shardProcessedMessages) def hostedEntitiesPerShardCounter(shardID: String): AtomicLong = - _shardTelemetry.entitiesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong()) + _shardTelemetry.entitiesPerShard.getOrElseUpdate(shardID, new AtomicLong()) def processedMessagesPerShardCounter(shardID: String): AtomicLong = - _shardTelemetry.messagesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong()) + _shardTelemetry.messagesPerShard.getOrElseUpdate(shardID, new AtomicLong()) // We should only remove when the ShardRegion actor is terminated. override def remove(): Unit = { diff --git a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoMetrics.scala b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoMetrics.scala index 69ae24fd2..b50a81ad7 100644 --- a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoMetrics.scala +++ b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoMetrics.scala @@ -181,7 +181,7 @@ object PekkoMetrics { ) def forSystem(name: String): ActorSystemInstruments = - _systemInstrumentsCache.atomicGetOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name))) + _systemInstrumentsCache.getOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name))) class ActorSystemInstruments(tags: TagSet) extends InstrumentGroup(tags) { val deadLetters = register(SystemDeadLetters) diff --git a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoRemoteMetrics.scala b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoRemoteMetrics.scala index dca48d5a9..9ea27cdca 100644 --- a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoRemoteMetrics.scala +++ b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/PekkoRemoteMetrics.scala @@ -41,5 +41,5 @@ object PekkoRemoteMetrics { } def serializationInstruments(system: String): SerializationInstruments = - _serializationInstrumentsCache.atomicGetOrElseUpdate(system, new SerializationInstruments(system)) + _serializationInstrumentsCache.getOrElseUpdate(system, new SerializationInstruments(system)) } From 305b2c54b9d4cec40198e711767518bfd61cf895 Mon Sep 17 00:00:00 2001 From: Nobuya Inooku <32607544+inobu@users.noreply.github.com> Date: Mon, 23 Oct 2023 22:08:44 +0900 Subject: [PATCH 04/12] feat: add pekko-grpc instrumentation (#1307) --- build.sbt | 27 ++++++++ ...koGRPCUnmarshallingContextPropagation.java | 49 ++++++++++++++ .../src/main/resources/reference.conf | 20 ++++++ .../grpc/PekkoGrpcServerInstrumentation.scala | 57 +++++++++++++++++ .../src/test/protobuf/helloworld.proto | 41 ++++++++++++ .../src/test/resources/application.conf | 1 + .../src/test/resources/logback.xml | 12 ++++ .../pekko/grpc/GreeterServiceImpl.scala | 44 +++++++++++++ .../pekko/grpc/PekkoGrpcTracingSpec.scala | 64 +++++++++++++++++++ project/plugins.sbt | 1 + 10 files changed, 316 insertions(+) create mode 100644 instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java create mode 100644 instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf create mode 100644 instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala create mode 100644 instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto create mode 100644 instrumentation/kamon-pekko-grpc/src/test/resources/application.conf create mode 100644 instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml create mode 100644 instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala create mode 100644 instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala diff --git a/build.sbt b/build.sbt index 8a31a09fb..1a47a4922 100644 --- a/build.sbt +++ b/build.sbt @@ -139,6 +139,7 @@ val instrumentationProjects = Seq[ProjectReference]( `kamon-akka-grpc`, `kamon-pekko`, `kamon-pekko-http`, + `kamon-pekko-grpc`, `kamon-play`, `kamon-okhttp`, `kamon-tapir`, @@ -526,6 +527,31 @@ lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http ), )).dependsOn(`kamon-pekko`, `kamon-testkit` % "test") +lazy val `kamon-pekko-grpc` = (project in file("instrumentation/kamon-pekko-grpc")) + .enablePlugins(JavaAgent, PekkoGrpcPlugin) + .disablePlugins(AssemblyPlugin) + .settings(instrumentationSettings) + .settings(Seq( + PB.additionalDependencies := Seq.empty, + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`), + libraryDependencies ++= Seq( + kanelaAgent % "provided", + + "org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % "provided", + "org.apache.pekko" %% "pekko-stream" % "1.0.1" % "provided", + "org.apache.pekko" %% "pekko-discovery"% "1.0.0" % "provided", + + "com.thesamet.scalapb" %% "scalapb-runtime" % "0.11.8" % "provided", + "org.apache.pekko" %% "pekko-grpc-runtime" % "1.0.0" % "provided", + "io.grpc" % "grpc-stub" % "1.43.2" % "provided", + + + scalatest % "test", + slf4jApi % "test", + logbackClassic % "test", + ) + )).dependsOn(`kamon-pekko-http`, `kamon-testkit` % "test") + lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc")) .enablePlugins(JavaAgent, AkkaGrpcPlugin) .disablePlugins(AssemblyPlugin) @@ -998,6 +1024,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo `kamon-finagle`, `kamon-pekko`, `kamon-pekko-http`, + `kamon-pekko-grpc`, `kamon-tapir`, `kamon-alpakka-kafka` ) diff --git a/instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java b/instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java new file mode 100644 index 000000000..5810d1dc0 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java @@ -0,0 +1,49 @@ +package kamon.instrumentation.pekko.grpc; + +import org.apache.pekko.http.javadsl.model.HttpEntity; +import kamon.Kamon; +import kamon.context.Context; +import kanela.agent.libs.net.bytebuddy.asm.Advice; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; + +public class PekkoGRPCUnmarshallingContextPropagation { + + @Advice.OnMethodExit() + public static void onExit( + @Advice.Return(readOnly = false) CompletionStage returnValue, + @Advice.Argument(0) Object firstArgument) { + + if(firstArgument instanceof HttpEntity && returnValue instanceof CompletableFuture) { + final Context currentContext = Kamon.currentContext(); + + // NOTES: The wrapper is only overriding thenCompose because it is the only function that gets called + // after GrpcMarshalling.unmarshall in the auto-generated HandlerFactory for gRPC services. In + // the future this might be removed if we instrument CompletionStage directly. + returnValue = new ContextPropagatingCompletionStage<>((CompletableFuture) returnValue, currentContext); + } + } + + + public static class ContextPropagatingCompletionStage extends CompletableFuture { + private final CompletableFuture wrapped; + private final Context context; + + public ContextPropagatingCompletionStage(CompletableFuture wrapped, Context context) { + this.wrapped = wrapped; + this.context = context; + } + + @Override + public CompletableFuture thenCompose(Function> fn) { + Function> wrapperFunction = (t) -> { + return Kamon.runWithContext(context, () -> fn.apply(t)); + }; + + return wrapped.thenCompose(wrapperFunction); + } + } + +} diff --git a/instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf b/instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf new file mode 100644 index 000000000..ba773a8b0 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf @@ -0,0 +1,20 @@ +# ======================================= # +# Kamon-Pekko-gRPC Reference Configuration # +# ======================================= # + +kanela.modules { + pekko-grpc { + name = "Pekko gRPC Instrumentation" + description = "Context propagation and tracing for Pekko gRPC" + enabled = yes + + instrumentations = [ + "kamon.instrumentation.pekko.grpc.PekkoGrpcServerInstrumentation" + ] + + within = [ + "^org.apache.pekko.grpc.internal..*", + "^org.apache.pekko.grpc.scaladsl.GrpcMarshalling$" + ] + } +} diff --git a/instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala b/instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala new file mode 100644 index 000000000..a7cbbc7c2 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.pekko.grpc + +import kamon.Kamon +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.asm.Advice + +class PekkoGrpcServerInstrumentation extends InstrumentationBuilder { + + /** + * Support for Pekko gRPC servers. + * + * gRPC requests get their spans started by the ServerFlowWrapper in the Pekko HTTP instrumentation like any other + * requests, but they never go through any instrumentation that gives a good operation name to the Span and forces + * taking a sampling decision. + * + * This instrumentation gives a proper name and tags to the span when it matches one of the exposed services, + * otherwise the span remains unchanged. Assumes no actual implementation of `pekko.grpc.internal.TelemetrySpi` is + * configured. + */ + onType("org.apache.pekko.grpc.internal.NoOpTelemetry$") + .advise(method("onRequest"), PekkoGRPCServerRequestHandler) + + + onType("org.apache.pekko.grpc.scaladsl.GrpcMarshalling") + .advise(method("unmarshal"), classOf[PekkoGRPCUnmarshallingContextPropagation]) +} + +object PekkoGRPCServerRequestHandler { + + @Advice.OnMethodEnter() + def enter(@Advice.Argument(0) serviceName: String, @Advice.Argument(1) method: String): Unit = { + val fullSpanName = serviceName + "/" + method + Kamon.currentSpan() + .name(fullSpanName) + .tagMetrics("component", "pekko.grpc.server") + .tagMetrics("rpc.system", "grpc") + .tagMetrics("rpc.service", serviceName) + .tagMetrics("rpc.method", method) + .takeSamplingDecision() + } +} diff --git a/instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto b/instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto new file mode 100644 index 000000000..bf204a6d0 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "kamon.instrumentation.pekko.grpc"; +option java_outer_classname = "HelloWorldProto"; + +package helloworld; + +////////////////////////////////////// The greeting service definition. +service GreeterService { + ////////////////////// + // Sends a greeting // + ////////*****///////// + // HELLO // + ////////*****///////// + rpc SayHello (HelloRequest) returns (HelloReply) {} + + // Comment spanning + // on several lines + rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {} + + /* + * C style comments + */ + rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {} + + /* C style comments + * on several lines + * with non-empty heading/trailing line */ + rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} \ No newline at end of file diff --git a/instrumentation/kamon-pekko-grpc/src/test/resources/application.conf b/instrumentation/kamon-pekko-grpc/src/test/resources/application.conf new file mode 100644 index 000000000..5d726de4a --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/resources/application.conf @@ -0,0 +1 @@ +pekko.http.server.preview.enable-http2 = on \ No newline at end of file diff --git a/instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml b/instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml new file mode 100644 index 000000000..742815603 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml @@ -0,0 +1,12 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala new file mode 100644 index 000000000..1b59ca4b5 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.pekko.grpc + +import scala.concurrent.Future +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.scaladsl.Source + + +class GreeterServiceImpl(implicit mat: Materializer) extends GreeterService { + import mat.executionContext + + override def sayHello(in: HelloRequest): Future[HelloReply] = { + Future.successful(HelloReply(s"Hello, ${in.name}")) + } + + override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = { + in.runWith(Sink.seq).map(elements => HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}")) + } + + override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = { + Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString)) + } + + override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = { + in.map(request => HelloReply(s"Hello, ${request.name}")) + } +} diff --git a/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala new file mode 100644 index 000000000..bcdcee56b --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.pekko.grpc + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.grpc.GrpcClientSettings +import org.apache.pekko.http.scaladsl.Http +import kamon.tag.Lookups.plain +import kamon.testkit.{InitAndStopKamonAfterAll, TestSpanReporter} +import org.scalatest.OptionValues +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import scala.concurrent.duration._ + +class PekkoGrpcTracingSpec extends AnyWordSpec with InitAndStopKamonAfterAll with Matchers with Eventually + with TestSpanReporter with OptionValues { + + implicit val system = ActorSystem("pekko-grpc-instrumentation") + implicit val ec = system.dispatcher + + val greeterService = GreeterServiceHandler(new GreeterServiceImpl()) + val serverBinding = Http() + .newServerAt("127.0.0.1", 8598) + .bind(greeterService) + + + val client = GreeterServiceClient(GrpcClientSettings.connectToServiceAt("127.0.0.1", 8598).withTls(false)) + + "the Pekko gRPC instrumentation" should { + "create spans for the server-side" in { + client.sayHello(HelloRequest("kamon")) + + eventually(timeout(5 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe "helloworld.GreeterService/SayHello" + span.metricTags.get(plain("component")) shouldBe "pekko.grpc.server" + span.metricTags.get(plain("rpc.system")) shouldBe "grpc" + span.metricTags.get(plain("rpc.service")) shouldBe "helloworld.GreeterService" + span.metricTags.get(plain("rpc.method")) shouldBe "SayHello" + } + } + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + enableFastSpanFlushing() + } +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 4f0653992..437e1df70 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -14,3 +14,4 @@ addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.5.0") addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.1.3") +addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.0.0") From 99fb2df97866e85c2019886f532bca994841649a Mon Sep 17 00:00:00 2001 From: danischroeter Date: Mon, 23 Oct 2023 15:32:08 +0200 Subject: [PATCH 05/12] upgrades kanela agent to 1.0.18 which includes support for jdk21 (#1300) * upgrades kanela agent from 1.0.17 to 1.0.18 * make shaky test less shaky --- .../CassandraClientTracingInstrumentationSpec.scala | 2 +- project/Build.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/kamon-cassandra/src/testCas3/scala/kamon/instrumentation/instrumentation/CassandraClientTracingInstrumentationSpec.scala b/instrumentation/kamon-cassandra/src/testCas3/scala/kamon/instrumentation/instrumentation/CassandraClientTracingInstrumentationSpec.scala index 65e0c5456..247ef207e 100644 --- a/instrumentation/kamon-cassandra/src/testCas3/scala/kamon/instrumentation/instrumentation/CassandraClientTracingInstrumentationSpec.scala +++ b/instrumentation/kamon-cassandra/src/testCas3/scala/kamon/instrumentation/instrumentation/CassandraClientTracingInstrumentationSpec.scala @@ -105,7 +105,7 @@ class CassandraClientTracingInstrumentationSpec session.execute(query).iterator().asScala.foreach(_ => ()) - eventually(timeout(10 seconds)) { + eventually(timeout(20 seconds)) { val spans = testSpanReporter().spans() val clientSpan = spans.find(span => span.operationName == QueryOperations.QueryOperationName) val executionSpans = spans.filter(span => span.operationName == QueryOperations.ExecutionOperationName) diff --git a/project/Build.scala b/project/Build.scala index bd76ec5fd..f28406208 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -26,7 +26,7 @@ object BaseProject extends AutoPlugin { /** Marker configuration for dependencies that will be shaded into their module's jar. */ lazy val Shaded = config("shaded").hide - val kanelaAgent = "io.kamon" % "kanela-agent" % "1.0.17" + val kanelaAgent = "io.kamon" % "kanela-agent" % "1.0.18" val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.36" val slf4jnop = "org.slf4j" % "slf4j-nop" % "1.7.36" val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.12" From 8fd9f244e878c0627aefd709124edc22ae7e320d Mon Sep 17 00:00:00 2001 From: Vivek Mahajan Date: Thu, 26 Oct 2023 11:08:45 +0200 Subject: [PATCH 06/12] Properly parse unsigned integer trace id (#1305) * Fix issue * import Long * Add tests * fix test name * checking upper bound * fix tests --------- Co-authored-by: Vivek Mahajan Co-authored-by: hughsimpson --- .../trace/DataDogSpanPropagationSpec.scala | 22 +++++++++++++++++++ .../scala/kamon/trace/SpanPropagation.scala | 3 ++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/core/kamon-core-tests/src/test/scala/kamon/trace/DataDogSpanPropagationSpec.scala b/core/kamon-core-tests/src/test/scala/kamon/trace/DataDogSpanPropagationSpec.scala index 8a22c1fd5..3cfb1682f 100644 --- a/core/kamon-core-tests/src/test/scala/kamon/trace/DataDogSpanPropagationSpec.scala +++ b/core/kamon-core-tests/src/test/scala/kamon/trace/DataDogSpanPropagationSpec.scala @@ -118,6 +118,28 @@ class DataDogSpanPropagationSpec extends AnyWordSpec with Matchers with OptionVa } } + + "SpanPropagation.DataDog.decodeUnsignedLongToHex" should { + "decode unsigned long to expected hex value " in { + val expectedHex1 = "0"; + val actualHex1 = SpanPropagation.DataDog.decodeUnsignedLongToHex("0"); + expectedHex1 shouldBe actualHex1; + + val expectedHex2 = "ff"; + val actualHex2 = SpanPropagation.DataDog.decodeUnsignedLongToHex("255"); + expectedHex2 shouldBe actualHex2; + + val expectedHex3 = "c5863f7d672b65bf"; + val actualHex3 = SpanPropagation.DataDog.decodeUnsignedLongToHex("14233133480185390527"); + expectedHex3 shouldBe actualHex3; + + val expectedHex4 = "ffffffffffffffff"; + val actualHex4 = SpanPropagation.DataDog.decodeUnsignedLongToHex("18446744073709551615"); + expectedHex4 shouldBe actualHex4; + + } + } + def unsignedLongString(id: String): String = BigInt(id, 16).toString def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader { diff --git a/core/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/core/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala index 0133bc199..77034f739 100644 --- a/core/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala +++ b/core/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala @@ -24,6 +24,7 @@ import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} import kamon.context.generated.binary.span.{Span => ColferSpan} import kamon.context.{Context, _} import kamon.trace.Trace.SamplingDecision +import java.lang.{Long => JLong} import scala.util.Try @@ -450,7 +451,7 @@ object W3CTraceContext { * https://docs.datadoghq.com/tracing/guide/send_traces_to_agent_by_api/ */ def decodeUnsignedLongToHex(id: String): String = - urlDecode(id).toLong.toHexString + JLong.parseUnsignedLong(urlDecode(id), 10).toHexString } class DataDog extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] { From a8bef44040c20047de700bc09f608bbdfdea033b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Cardoso?= Date: Fri, 27 Oct 2023 05:28:33 +0100 Subject: [PATCH 07/12] Use ConfigUtil.splitPath to build environment tag keys from a config path (#1303) * = kamon-core: add tests that fail using tags with special chars * = kamon-core: Use ConfigUtil.splitPath to build a tag key from a config path * = kamon-core: Improve spec examples and formatting --------- Co-authored-by: hughsimpson --- .../kamon/util/EnvironmentTagsSpec.scala | 31 ++++++++++++++++++- .../main/scala/kamon/status/Environment.scala | 4 +-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/core/kamon-core-tests/src/test/scala/kamon/util/EnvironmentTagsSpec.scala b/core/kamon-core-tests/src/test/scala/kamon/util/EnvironmentTagsSpec.scala index c0dac90cc..c2c405f87 100644 --- a/core/kamon-core-tests/src/test/scala/kamon/util/EnvironmentTagsSpec.scala +++ b/core/kamon-core-tests/src/test/scala/kamon/util/EnvironmentTagsSpec.scala @@ -39,8 +39,15 @@ class EnvironmentTagsSpec extends AnyWordSpec with Matchers { | some { | tag { | inside = example + | "@inside" = value | } | } + | + | "defined-using-quotes" = value + | + | "\"tag-with-quotes\"" = value + | + | "@tag-with-special-chars" = value | } |} """.stripMargin @@ -80,6 +87,19 @@ class EnvironmentTagsSpec extends AnyWordSpec with Matchers { tags("env") shouldBe "staging" tags("region") shouldBe "asia-1" + tags.toMap shouldBe Map( + "@tag-with-special-chars" -> "value", + "env" -> "staging", + "host" -> "my-hostname", + "instance" -> "my-instance-name", + "k8s.namespace.name" -> "production", + "region" -> "asia-1", + "service" -> "environment-spec", + "some.tag.@inside" -> "value", + "some.tag.inside" -> "example", + "defined-using-quotes" -> "value", + "\"tag-with-quotes\"" -> "value" + ) } "remove excluded tags" in { @@ -105,7 +125,16 @@ class EnvironmentTagsSpec extends AnyWordSpec with Matchers { |include-service = no |include-host = no |include-instance = no - |exclude = [ "region", "env", "k8s.namespace.name", "some.tag.inside" ] + |exclude = [ + | "region", + | "env", + | "k8s.namespace.name", + | "some.tag.inside", + | "some.tag.@inside", + | "defined-using-quotes", + | "@tag-with-special-chars", + | "\"tag-with-quotes\"" + |] """.stripMargin) val tags = EnvironmentTags.from(testEnv, config) diff --git a/core/kamon-core/src/main/scala/kamon/status/Environment.scala b/core/kamon-core/src/main/scala/kamon/status/Environment.scala index 48e616baa..4292bcd8a 100644 --- a/core/kamon-core/src/main/scala/kamon/status/Environment.scala +++ b/core/kamon-core/src/main/scala/kamon/status/Environment.scala @@ -19,7 +19,7 @@ package status import java.net.InetAddress import java.util.concurrent.ThreadLocalRandom -import com.typesafe.config.Config +import com.typesafe.config.{Config, ConfigUtil} import kamon.tag.TagSet import kamon.util.HexCodec import org.slf4j.LoggerFactory @@ -83,7 +83,7 @@ object Environment { tagsConfig.entrySet() .iterator() .asScala - .map { e => e.getKey -> e.getValue.unwrapped().toString } + .map { e => ConfigUtil.splitPath(e.getKey).asScala.mkString(".") -> e.getValue.unwrapped().toString } .toMap ) } From 89eaf472d44af168cc94116af2fca3e39280f2ec Mon Sep 17 00:00:00 2001 From: tjarko grossmann Date: Mon, 4 Sep 2023 14:47:58 +0200 Subject: [PATCH 08/12] + kamon-akka: add scala-3.2.0 reloease --- build.sbt | 1 + instrumentation/kamon-akka/build.sbt | 53 ++++++++++++++++++++-------- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/build.sbt b/build.sbt index 1a47a4922..33e38787c 100644 --- a/build.sbt +++ b/build.sbt @@ -456,6 +456,7 @@ lazy val `kamon-akka` = (project in file("instrumentation/kamon-akka")) .enablePlugins(JavaAgent) .disablePlugins(AssemblyPlugin) .settings(instrumentationSettings: _*) + .settings(crossScalaVersions += `scala_3_version`) .dependsOn( `kamon-scala-future` % "compile,common,akka-2.5,akka-2.6", `kamon-testkit` % "test,test-common,test-akka-2.5,test-akka-2.6" diff --git a/instrumentation/kamon-akka/build.sbt b/instrumentation/kamon-akka/build.sbt index dee2bb4a6..25a586a88 100644 --- a/instrumentation/kamon-akka/build.sbt +++ b/instrumentation/kamon-akka/build.sbt @@ -31,7 +31,7 @@ configs( // The Common configuration should always depend on the latest version of Akka. All code in the Common configuration // should be source compatible with all Akka versions. inConfig(Common)(Defaults.compileSettings ++ Seq( - crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`) + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version) )) libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else Seq( @@ -50,7 +50,7 @@ libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else inConfig(`Compile-Akka-2.6`)(Defaults.compileSettings ++ Seq( - crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`), + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version), sources := joinSources(Common, `Compile-Akka-2.6`).value )) @@ -73,7 +73,7 @@ inConfig(`Compile-Akka-2.5`)(Defaults.compileSettings ++ Seq( sources := joinSources(Common, `Compile-Akka-2.5`).value )) -libraryDependencies ++= Seq( +libraryDependencies ++= {if (scalaVersion.value startsWith "3") Seq.empty else Seq( kanelaAgent % `Compile-Akka-2.5`, scalatest % `Test-Akka-2.5`, logbackClassic % `Test-Akka-2.5`, @@ -85,21 +85,28 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding" % `Akka-2.5-version` % `Compile-Akka-2.5`, "com.typesafe.akka" %% "akka-protobuf" % `Akka-2.5-version` % `Compile-Akka-2.5`, "com.typesafe.akka" %% "akka-testkit" % `Akka-2.5-version` % `Test-Akka-2.5` -) +)} // Ensure that the packaged artifact contains the instrumentation for all Akka versions. Compile / packageBin / mappings := Def.taskDyn { - if(scalaBinaryVersion.value == "2.11") + if(scalaBinaryVersion.value == "2.11") { Def.task { joinProducts((`Compile-Akka-2.5` / products).value) ++ joinProducts((Common / unmanagedResourceDirectories).value) } - else + } else if (scalaVersion.value startsWith "3") { + Def.task { + joinProducts((`Compile-Akka-2.6` / products).value) ++ + joinProducts((Common / unmanagedResourceDirectories).value) + } + } else { Def.task { joinProducts( (`Compile-Akka-2.5` / products).value ++ (`Compile-Akka-2.6` / products).value - ) ++ joinProducts((Common / unmanagedResourceDirectories).value)} + ) ++ joinProducts((Common / unmanagedResourceDirectories).value) + } + } }.value // Ensure that the packaged sources contains the instrumentation for all Akka versions. @@ -108,26 +115,38 @@ Compile / packageSrc / mappings := Def.taskDyn { Def.task { (`Compile-Akka-2.5` / packageSrc / mappings).value ++ (Common / packageSrc / mappings).value + } + } else if (scalaVersion.value startsWith "3") { + Def.task { + (`Compile-Akka-2.6` / packageSrc / mappings).value ++ + (Common / packageSrc / mappings).value } - } else + } else { Def.task { (`Compile-Akka-2.5` / packageSrc / mappings).value ++ (`Compile-Akka-2.6` / packageSrc / mappings).value ++ (Common / packageSrc / mappings).value } + } }.value // Compile will return the compile analysis for the Common configuration but will run on all Akka configurations. Compile / compile := Def.taskDyn { - if(scalaBinaryVersion.value == "2.11") + if(scalaBinaryVersion.value == "2.11") { Def.task { (`Compile-Akka-2.5` / compile).value } - else + } else if (scalaVersion.value startsWith "3"){ + + Def.task { + (`Compile-Akka-2.6` / compile).value + } + } else { Def.task { (`Compile-Akka-2.5` / compile).value (`Compile-Akka-2.6` / compile).value } + } }.value exportJars := true @@ -145,7 +164,7 @@ lazy val baseTestSettings = Seq( ) inConfig(TestCommon)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq( - crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`) + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version) )) inConfig(`Test-Akka-2.5`)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq( @@ -155,20 +174,26 @@ inConfig(`Test-Akka-2.5`)(Defaults.testSettings ++ instrumentationSettings ++ ba )) inConfig(`Test-Akka-2.6`)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq( - crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`), + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version), sources := joinSources(TestCommon, `Test-Akka-2.6`).value, unmanagedResourceDirectories ++= (Common / unmanagedResourceDirectories).value, unmanagedResourceDirectories ++= (TestCommon / unmanagedResourceDirectories).value )) Test / test := Def.taskDyn { - if(scalaBinaryVersion.value == "2.11") + if(scalaBinaryVersion.value == "2.11") { Def.task { (`Test-Akka-2.5` / test).value } - else + } else if (scalaVersion.value startsWith "3") { + Def.task { + (`Test-Akka-2.6` / test).value + } + } + else { Def.task { (`Test-Akka-2.5` / test).value (`Test-Akka-2.6` / test).value } + } }.value \ No newline at end of file From c8b714e70eee71c32a6dc92ae253ba1d47f592d1 Mon Sep 17 00:00:00 2001 From: tjarko grossmann Date: Mon, 4 Sep 2023 14:48:36 +0200 Subject: [PATCH 09/12] + kamon-pekko: add scala-3.2.0 release --- build.sbt | 2 +- instrumentation/kamon-pekko/build.sbt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index 33e38787c..81919b660 100644 --- a/build.sbt +++ b/build.sbt @@ -510,7 +510,7 @@ lazy val pekkoHttpVersion = "1.0.0" lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http")) .enablePlugins(JavaAgent) .disablePlugins(AssemblyPlugin) - .settings(instrumentationSettings) + .settings(instrumentationSettings :+ (crossScalaVersions += `scala_3_version`)) .settings(Seq( javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test", crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`), diff --git a/instrumentation/kamon-pekko/build.sbt b/instrumentation/kamon-pekko/build.sbt index c4b7ac397..5596992a1 100644 --- a/instrumentation/kamon-pekko/build.sbt +++ b/instrumentation/kamon-pekko/build.sbt @@ -1,7 +1,7 @@ // The Common configuration should always depend on the latest version of Pekko. All code in the Common configuration // should be source compatible with all Pekko versions. inConfig(Compile)(Defaults.compileSettings ++ Seq( - crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`) + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version) )) val pekkoVersion = "1.0.1" @@ -33,5 +33,5 @@ lazy val baseTestSettings = Seq( ) inConfig(Test)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq( - crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`) + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version), )) From 6902a851ec25866227432aa604c6293c89cfd432 Mon Sep 17 00:00:00 2001 From: tjarko grossmann Date: Mon, 4 Sep 2023 14:52:41 +0200 Subject: [PATCH 10/12] = kamon-akka: update akka version --- instrumentation/kamon-akka/build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/kamon-akka/build.sbt b/instrumentation/kamon-akka/build.sbt index 25a586a88..dadb10f94 100644 --- a/instrumentation/kamon-akka/build.sbt +++ b/instrumentation/kamon-akka/build.sbt @@ -3,7 +3,7 @@ import Def.Initialize val `Akka-2.4-version` = "2.4.20" val `Akka-2.5-version` = "2.5.32" -val `Akka-2.6-version` = "2.6.20" +val `Akka-2.6-version` = "2.6.21" /** * Compile Configurations From e18c8715d1e4e118da09ae5ba80f090f9133f57b Mon Sep 17 00:00:00 2001 From: tjarko grossmann Date: Wed, 4 Oct 2023 13:54:14 +0200 Subject: [PATCH 11/12] ! pekko: readd scala-3 release upgrading scala to 3.3.0 because pekko_3 tasty files are written by scala 3.3.0 --- build.sbt | 6 +- .../http/PekkoHttpServerInstrumentation.scala | 323 ++++++++++++++++++ project/Build.scala | 2 +- 3 files changed, 327 insertions(+), 4 deletions(-) create mode 100644 instrumentation/kamon-pekko-http/src/main/scala-3/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala diff --git a/build.sbt b/build.sbt index 81919b660..6c128f3d4 100644 --- a/build.sbt +++ b/build.sbt @@ -498,7 +498,7 @@ lazy val `kamon-pekko` = (project in file("instrumentation/kamon-pekko")) .disablePlugins(AssemblyPlugin) .settings(instrumentationSettings: _*) .settings(Seq( - crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`), + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version), )) .dependsOn( `kamon-scala-future` % "compile", @@ -510,10 +510,10 @@ lazy val pekkoHttpVersion = "1.0.0" lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http")) .enablePlugins(JavaAgent) .disablePlugins(AssemblyPlugin) - .settings(instrumentationSettings :+ (crossScalaVersions += `scala_3_version`)) + .settings(instrumentationSettings) .settings(Seq( javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test", - crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`), + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version), libraryDependencies ++= Seq( kanelaAgent % "provided", "org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % "provided", diff --git a/instrumentation/kamon-pekko-http/src/main/scala-3/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala b/instrumentation/kamon-pekko-http/src/main/scala-3/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala new file mode 100644 index 000000000..59b02528f --- /dev/null +++ b/instrumentation/kamon-pekko-http/src/main/scala-3/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala @@ -0,0 +1,323 @@ +package kamon.instrumentation.pekko.http + +import java.util.concurrent.Callable +import org.apache.pekko.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable, ToResponseMarshaller} +import org.apache.pekko.http.scaladsl.model.StatusCodes.Redirection +import org.apache.pekko.http.scaladsl.model.{HttpHeader, HttpRequest, HttpResponse, StatusCode, Uri} +import org.apache.pekko.http.scaladsl.server.PathMatcher.{Matched, Unmatched} +import org.apache.pekko.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet} +import org.apache.pekko.http.scaladsl.server.directives.RouteDirectives.reject +import org.apache.pekko.http.scaladsl.server._ +import org.apache.pekko.http.scaladsl.server.util.Tupler +import org.apache.pekko.http.scaladsl.util.FastFuture +import kamon.Kamon +import kamon.instrumentation.pekko.http.HasMatchingContext.PathMatchingContext +import kamon.instrumentation.context.{HasContext, InvokeWithCapturedContext} +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.api.instrumentation.mixin.Initializer +import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation._ + +import scala.concurrent.{Batchable, ExecutionContext, Future, Promise} +import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} +import java.util.regex.Pattern +import org.apache.pekko.NotUsed +import org.apache.pekko.http.scaladsl.server.RouteResult.Rejected +import org.apache.pekko.stream.scaladsl.Flow +import kamon.context.Context +import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic + +import scala.collection.immutable + + +class PekkoHttpServerInstrumentation extends InstrumentationBuilder { + /** + * When instrumenting bindAndHandle what we do is wrap the Flow[HttpRequest, HttpResponse, NotUsed] provided by + * the user and add all the processing there. This is the part of the instrumentation that performs Context + * propagation, tracing and gather metrics using the HttpServerInstrumentation packed in common. + * + * One important point about the HTTP Server instrumentation is that because it is almost impossible to have a proper + * operation name before the request processing hits the routing tree, we are delaying the sampling decision to the + * point at which we have some operation name. + */ + onType("org.apache.pekko.http.scaladsl.HttpExt") + .advise(method("bindAndHandle"), classOf[HttpExtBindAndHandleAdvice]) + + /** + * For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow + * creation happen at different times we are wrapping the handler with the interface/port data and reading that + * information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1. + * + */ + onType("org.apache.pekko.http.impl.engine.http2.Http2Ext") + .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) + + onType("org.apache.pekko.http.impl.engine.http2.Http2Blueprint$") + .intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor) + + /** + * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free + * of variables) and take a Sampling Decision in case none has been taken so far. + */ + onType("org.apache.pekko.http.scaladsl.server.RequestContextImpl") + .mixin(classOf[HasMatchingContext.Mixin]) + .intercept(method("copy"), RequestContextCopyInterceptor) + + onType("org.apache.pekko.http.scaladsl.server.directives.PathDirectives") + .intercept(method("rawPathPrefix"), classOf[PathDirectivesRawPathPrefixInterceptor]) + + onType("org.apache.pekko.http.scaladsl.server.directives.FutureDirectives") + .intercept(method("onComplete"), classOf[ResolveOperationNameOnRouteInterceptor]) + + onTypes("org.apache.pekko.http.scaladsl.server.directives.OnSuccessMagnet$", "org.apache.pekko.http.scaladsl.server.directives.CompleteOrRecoverWithMagnet$") + .intercept(method("apply"), classOf[ResolveOperationNameOnRouteInterceptor]) + + onType("org.apache.pekko.http.scaladsl.server.directives.RouteDirectives") + .intercept(method("complete"), classOf[ResolveOperationNameOnRouteInterceptor]) + .intercept(method("redirect"), classOf[ResolveOperationNameOnRouteInterceptor]) + .intercept(method("failWith"), classOf[ResolveOperationNameOnRouteInterceptor]) + + + /** + * Support for HTTP/1 and HTTP/2 at the same time. + * + */ + + onType("org.apache.pekko.stream.scaladsl.FlowOps") + .advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice]) +} + +trait HasMatchingContext { + def defaultOperationName: String + def matchingContext: Seq[PathMatchingContext] + def setMatchingContext(ctx: Seq[PathMatchingContext]): Unit + def setDefaultOperationName(defaultOperationName: String): Unit + def prependMatchingContext(matched: PathMatchingContext): Unit + def popOneMatchingContext(): Unit +} + +object HasMatchingContext { + + case class PathMatchingContext ( + fullPath: String, + matched: Matched[_] + ) + + class Mixin(var matchingContext: Seq[PathMatchingContext], var defaultOperationName: String) extends HasMatchingContext { + + override def setMatchingContext(matchingContext: Seq[PathMatchingContext]): Unit = + this.matchingContext = matchingContext + + override def setDefaultOperationName(defaultOperationName: String): Unit = + this.defaultOperationName = defaultOperationName + + override def prependMatchingContext(matched: PathMatchingContext): Unit = + matchingContext = matched +: matchingContext + + override def popOneMatchingContext(): Unit = + matchingContext = matchingContext.tail + + @Initializer + def initialize(): Unit = + matchingContext = Seq.empty + } +} + +class ResolveOperationNameOnRouteInterceptor +object ResolveOperationNameOnRouteInterceptor { + import org.apache.pekko.http.scaladsl.util.FastFuture._ + + // We are replacing some of the basic directives here to ensure that we will resolve both the Sampling Decision and + // the operation name before the request gets to the actual handling code (presumably inside of a "complete" + // directive. + + def complete(m: => ToResponseMarshallable): StandardRoute = + StandardRoute(resolveOperationName(_).complete(m)) + + def complete[T](status: StatusCode, v: => T)(implicit m: ToEntityMarshaller[T]): StandardRoute = + StandardRoute(resolveOperationName(_).complete((status, v))) + + def complete[T](status: StatusCode, headers: immutable.Seq[HttpHeader], v: => T)(implicit m: ToEntityMarshaller[T]): StandardRoute = + complete((status, headers, v)) + + def redirect(uri: Uri, redirectionType: Redirection): StandardRoute = + StandardRoute(resolveOperationName(_).redirect(uri, redirectionType)) + + def failWith(error: Throwable): StandardRoute = { + Kamon.currentSpan().fail(error) + StandardRoute(resolveOperationName(_).fail(error)) + } + + def onComplete[T](future: => Future[T]): Directive1[Try[T]] = + Directive { inner => ctx => + import ctx.executionContext + resolveOperationName(ctx) + future.fast.transformWith(t => inner(Tuple1(t))(ctx)) + } + + def apply[T](future: => Future[T])(implicit tupler: Tupler[T]): OnSuccessMagnet { type Out = tupler.Out } = + new OnSuccessMagnet { + type Out = tupler.Out + val directive = Directive[tupler.Out] { inner => ctx => + import ctx.executionContext + resolveOperationName(ctx) + future.fast.flatMap(t => inner(tupler(t))(ctx)) + }(tupler.OutIsTuple) + } + + def apply[T](future: => Future[T])(implicit m: ToResponseMarshaller[T]): CompleteOrRecoverWithMagnet = + new CompleteOrRecoverWithMagnet { + val directive = Directive[Tuple1[Throwable]] { inner => ctx => + import ctx.executionContext + resolveOperationName(ctx) + future.fast.transformWith { + case Success(res) => ctx.complete(res) + case Failure(error) => inner(Tuple1(error))(ctx) + } + } + } + + private def resolveOperationName(requestContext: RequestContext): RequestContext = { + + // We will only change the operation name if the last edit made to it was an automatic one. At this point, the only + // way in which the operation name might have changed is if the user changed it with the operationName directive or + // by accessing the Span and changing it directly there, so we wouldn't want to overwrite that. + + Kamon.currentContext().get(LastAutomaticOperationNameEdit.Key).foreach(lastEdit => { + val currentSpan = Kamon.currentSpan() + + if(lastEdit.allowAutomaticChanges) { + if(currentSpan.operationName() == lastEdit.operationName) { + val allMatches = requestContext.asInstanceOf[HasMatchingContext].matchingContext.reverse.map(singleMatch) + val operationName = allMatches.mkString("") + + if (operationName.nonEmpty) { + currentSpan + .name(operationName) + .takeSamplingDecision() + + lastEdit.operationName = operationName + } + } else { + lastEdit.allowAutomaticChanges = false + } + } else { + currentSpan.takeSamplingDecision() + } + }) + + requestContext + } + + private def singleMatch(matching: PathMatchingContext): String = { + val rest = matching.matched.pathRest.toString() + val consumedCount = matching.fullPath.length - rest.length + val consumedSegment = matching.fullPath.substring(0, consumedCount) + + matching.matched.extractions match { + case () => //string segment matched + consumedSegment + case tuple: Product => + val values = tuple.productIterator.toList map { + case Some(x) => List(x.toString) + case None => Nil + case long: Long => List(long.toString, long.toHexString) + case int: Int => List(int.toString, int.toHexString) + case a: Any => List(a.toString) + } + values.flatten.fold(consumedSegment) { (full, value) => + val r = "(?i)(^|/)" + Pattern.quote(value) + "($|/)" + full.replaceFirst(r, "$1{}$2") + } + } + } +} + +/** + * Tracks the last operation name that was automatically assigned to an operation via instrumentation. The + * instrumentation might assign a name to the operations via settings on the HTTP Server instrumentation instance or + * via the Path directives instrumentation, but might never reassign a name if the user somehow assigned their own name + * to the operation. Users chan change operation names by: + * - Using operation mappings via configuration of the HTTP Server. + * - Providing a custom HTTP Operation Name Generator for the server. + * - Using the "operationName" directive. + * - Directly accessing the Span for the current operation and changing the name on it. + * + */ +class LastAutomaticOperationNameEdit( + @volatile var operationName: String, + @volatile var allowAutomaticChanges: Boolean +) + +object LastAutomaticOperationNameEdit { + val Key = Context.key[Option[LastAutomaticOperationNameEdit]]("laone", None) + + def apply(operationName: String, allowAutomaticChanges: Boolean): LastAutomaticOperationNameEdit = + new LastAutomaticOperationNameEdit(operationName, allowAutomaticChanges) +} + +object RequestContextCopyInterceptor { + + @RuntimeType + def copy(@This context: RequestContext, @SuperCall copyCall: Callable[RequestContext]): RequestContext = { + val copiedRequestContext = copyCall.call() + copiedRequestContext.asInstanceOf[HasMatchingContext].setMatchingContext(context.asInstanceOf[HasMatchingContext].matchingContext) + copiedRequestContext + } +} + +class PathDirectivesRawPathPrefixInterceptor +object PathDirectivesRawPathPrefixInterceptor { + import BasicDirectives._ + + def rawPathPrefix[T](@Argument(0) matcher: PathMatcher[T]): Directive[T] = { + implicit val LIsTuple = matcher.ev + + extract { ctx => + val fullPath = ctx.unmatchedPath.toString() + val matching = matcher(ctx.unmatchedPath) + + matching match { + case m: Matched[_] => + ctx.asInstanceOf[HasMatchingContext] + .prependMatchingContext(PathMatchingContext(fullPath, m)) + case _ => + } + + (ctx, matching) + } flatMap { + case (ctx, Matched(rest, values)) => + tprovide(values)(LIsTuple) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult => + + if(routeResult.isInstanceOf[Rejected]) + ctx.asInstanceOf[HasMatchingContext].popOneMatchingContext() + + routeResult + } + + case (_, Unmatched) => reject + } + } +} + +object Http2BlueprintInterceptor { + + case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse]) + extends (HttpRequest => Future[HttpResponse]) { + + override def apply(request: HttpRequest): Future[HttpResponse] = handler(request) + } + + @RuntimeType + def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse], + @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = { + + handler match { + case HandlerWithEndpoint(interface, port, _) => + ServerFlowWrapper(zuper.call(), interface, port) + + case _ => + zuper.call() + } + } +} diff --git a/project/Build.scala b/project/Build.scala index f28406208..72f6d9047 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -59,7 +59,7 @@ object BaseProject extends AutoPlugin { val `scala_2.11_version` = "2.11.12" val `scala_2.12_version` = "2.12.15" val `scala_2.13_version` = "2.13.8" - val scala_3_version = "3.2.0" + val scala_3_version = "3.3.0" // This installs the GPG signing key from the setupGpg() From ffb77bebeeec55830c4c0ac5a1b9064a346acc64 Mon Sep 17 00:00:00 2001 From: tjarko grossmann Date: Wed, 4 Oct 2023 15:05:25 +0200 Subject: [PATCH 12/12] = kamon-pekko: Fix compile-errors in tests --- .../akka/ActorCellInstrumentationSpec.scala | 19 +- .../akka/AskPatternInstrumentationSpec.scala | 7 +- .../akka/MessageTracingSpec.scala | 4 +- .../SystemMessageInstrumentationSpec.scala | 5 +- .../http/PekkoHttpServerInstrumentation.scala | 341 ------------------ .../http/PekkoHttpServerInstrumentation.scala | 323 ----------------- .../http/PekkoHttpServerInstrumentation.scala | 34 +- .../http/PekkoHttpClientTracingSpec.scala | 11 +- .../http/PekkoHttpServerMetricsSpec.scala | 12 +- .../http/PekkoHttpServerTracingSpec.scala | 6 +- .../pekko/http/ServerFlowWrapperSpec.scala | 10 +- .../scala/kamon/testkit/TestWebServer.scala | 9 +- .../pekko/ActorCellInstrumentationSpec.scala | 5 +- .../pekko/AskPatternInstrumentationSpec.scala | 16 +- .../pekko/MessageTracingSpec.scala | 34 +- .../SystemMessageInstrumentationSpec.scala | 21 +- 16 files changed, 99 insertions(+), 758 deletions(-) delete mode 100644 instrumentation/kamon-pekko-http/src/main/scala-2.12/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala delete mode 100644 instrumentation/kamon-pekko-http/src/main/scala-3/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala rename instrumentation/kamon-pekko-http/src/main/{scala-2.13 => scala}/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala (96%) diff --git a/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala b/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala index 44ac75834..aefa73139 100644 --- a/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala +++ b/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala @@ -21,20 +21,21 @@ import akka.routing._ import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout import kamon.Kamon -import kamon.testkit.{InitAndStopKamonAfterAll, MetricInspection} import kamon.tag.Lookups._ +import kamon.testkit.{InitAndStopKamonAfterAll, MetricInspection} import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import scala.collection.mutable.ListBuffer +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("ActorCellInstrumentationSpec")) with AnyWordSpecLike with BeforeAndAfterAll with ImplicitSender with Eventually with MetricInspection.Syntax with Matchers with InitAndStopKamonAfterAll { - implicit lazy val executionContext = system.dispatcher + implicit lazy val executionContext: ExecutionContext = system.dispatcher import ContextTesting._ "the message passing instrumentation" should { @@ -59,7 +60,7 @@ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("ActorCellInstrum } "propagate the current context when using the ask pattern" in new EchoActorFixture { - implicit val timeout = Timeout(1 seconds) + implicit val timeout: Timeout = Timeout(1 seconds) Kamon.runWithContext(testContext("propagate-with-ask")) { // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. (contextEchoActor ? "test") pipeTo (testActor) @@ -122,11 +123,11 @@ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("ActorCellInstrum } trait EchoActorFixture { - val contextEchoActor = system.actorOf(Props[ContextStringEcho]) + val contextEchoActor: ActorRef = system.actorOf(Props[ContextStringEcho]) } trait EchoSimpleRouterFixture { - val router = { + val router: Router = { val routees = Vector.fill(5) { val r = system.actorOf(Props[ContextStringEcho]) ActorRefRoutee(r) @@ -136,22 +137,22 @@ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("ActorCellInstrum } trait EchoPoolRouterFixture { - val pool = system.actorOf(RoundRobinPool(nrOfInstances = 5).props(Props[ContextStringEcho]), "pool-router") + val pool: ActorRef = system.actorOf(RoundRobinPool(nrOfInstances = 5).props(Props[ContextStringEcho]), "pool-router") } trait EchoGroupRouterFixture { - val routees = Vector.fill(5) { + val routees: Vector[ActorRef] = Vector.fill(5) { system.actorOf(Props[ContextStringEcho]) } - val group = system.actorOf(RoundRobinGroup(routees.map(_.path.toStringWithoutAddress)).props(), "group-router") + val group: ActorRef = system.actorOf(RoundRobinGroup(routees.map(_.path.toStringWithoutAddress)).props(), "group-router") } } class ContextStringEcho extends Actor { import ContextTesting._ - def receive = { + def receive: Receive = { case _: String => sender ! Kamon.currentContext().getTag(plain(TestKey)) } diff --git a/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala index b355b9f31..8f28f5e0d 100644 --- a/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala +++ b/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala @@ -28,13 +28,14 @@ import kamon.instrumentation.akka.ContextTesting._ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ class AskPatternInstrumentationSpec extends TestKit(ActorSystem("AskPatternInstrumentationSpec")) with AnyWordSpecLike with InitAndStopKamonAfterAll with ImplicitSender { - implicit lazy val ec = system.dispatcher - implicit val askTimeout = Timeout(10 millis) + implicit lazy val ec: ExecutionContextExecutor = system.dispatcher + implicit val askTimeout: Timeout = Timeout(10 millis) // TODO: Make this work with ActorSelections @@ -93,7 +94,7 @@ class AskPatternInstrumentationSpec extends TestKit(ActorSystem("AskPatternInstr } class NoReply extends Actor { - def receive = { + def receive: Receive = { case _ => } } diff --git a/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/MessageTracingSpec.scala b/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/MessageTracingSpec.scala index 1e2aeae41..3790d9afb 100644 --- a/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/MessageTracingSpec.scala +++ b/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/MessageTracingSpec.scala @@ -178,7 +178,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any } "not track Akka Streams actors" in { - implicit val timeout = Timeout(10 seconds) + implicit val timeout: Timeout = Timeout(10 seconds) val actorWithMaterializer = system.actorOf(Props[ActorWithMaterializer]) val finishedStream = Kamon.runWithSpan(Kamon.serverSpanBuilder("wrapper", "test").start()) { @@ -222,7 +222,7 @@ class TracingTestActor extends Actor { } class ActorWithMaterializer extends Actor { - implicit val mat = ActorMaterializer() + implicit val mat: Materializer = Materializer(context) override def receive: Receive = { case "stream" => diff --git a/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/SystemMessageInstrumentationSpec.scala b/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/SystemMessageInstrumentationSpec.scala index a9b56f76e..5c57c74f6 100644 --- a/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/SystemMessageInstrumentationSpec.scala +++ b/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/SystemMessageInstrumentationSpec.scala @@ -25,13 +25,14 @@ import kamon.instrumentation.akka.ContextTesting._ import kamon.tag.Lookups._ import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.{AnyWordSpec, AnyWordSpecLike} +import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.ExecutionContext import scala.util.control.NonFatal class SystemMessageInstrumentationSpec extends TestKit(ActorSystem("ActorSystemMessageInstrumentationSpec")) with AnyWordSpecLike with Matchers with BeforeAndAfterAll with ImplicitSender { - implicit lazy val executionContext = system.dispatcher + implicit lazy val executionContext: ExecutionContext = system.dispatcher "the system message passing instrumentation" should { "capture and propagate the current context while processing the Create message in top level actors" in { diff --git a/instrumentation/kamon-pekko-http/src/main/scala-2.12/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala b/instrumentation/kamon-pekko-http/src/main/scala-2.12/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala deleted file mode 100644 index eb82331c2..000000000 --- a/instrumentation/kamon-pekko-http/src/main/scala-2.12/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kamon.instrumentation.pekko.http - -import java.util.concurrent.Callable -import org.apache.pekko.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable, ToResponseMarshaller} -import org.apache.pekko.http.scaladsl.model.StatusCodes.Redirection -import org.apache.pekko.http.scaladsl.model.{HttpHeader, HttpRequest, HttpResponse, StatusCode, Uri} -import org.apache.pekko.http.scaladsl.server.PathMatcher.{Matched, Unmatched} -import org.apache.pekko.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet} -import org.apache.pekko.http.scaladsl.server.directives.RouteDirectives.reject -import org.apache.pekko.http.scaladsl.server._ -import org.apache.pekko.http.scaladsl.server.util.Tupler -import org.apache.pekko.http.scaladsl.util.FastFuture -import kamon.Kamon -import kamon.instrumentation.pekko.http.HasMatchingContext.PathMatchingContext -import kamon.instrumentation.context.{HasContext, InvokeWithCapturedContext} -import kanela.agent.api.instrumentation.InstrumentationBuilder -import kanela.agent.api.instrumentation.mixin.Initializer -import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation._ - -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.control.NonFatal -import scala.util.{Failure, Success, Try} -import java.util.regex.Pattern -import org.apache.pekko.NotUsed -import org.apache.pekko.http.scaladsl.server.RouteResult.Rejected -import org.apache.pekko.stream.scaladsl.Flow -import kamon.context.Context -import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic - -import scala.collection.immutable - - -class PekkoHttpServerInstrumentation extends InstrumentationBuilder { - - /** - * When instrumenting bindAndHandle what we do is wrap the Flow[HttpRequest, HttpResponse, NotUsed] provided by - * the user and add all the processing there. This is the part of the instrumentation that performs Context - * propagation, tracing and gather metrics using the HttpServerInstrumentation packed in common. - * - * One important point about the HTTP Server instrumentation is that because it is almost impossible to have a proper - * operation name before the request processing hits the routing tree, we are delaying the sampling decision to the - * point at which we have some operation name. - */ - - onType("org.apache.pekko.http.scaladsl.HttpExt") - .advise(method("bindAndHandle"), classOf[HttpExtBindAndHandleAdvice]) - - /** - * For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow - * creation happen at different times we are wrapping the handler with the interface/port data and reading that - * information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1. - */ - - onType("org.apache.pekko.http.impl.engine.http2.Http2Ext") - .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) - - onType("org.apache.pekko.http.impl.engine.http2.Http2Blueprint$") - .intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor) - - /** - * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free - * of variables) and take a Sampling Decision in case none has been taken so far. - */ - onType("org.apache.pekko.http.scaladsl.server.RequestContextImpl") - .mixin(classOf[HasMatchingContext.Mixin]) - .intercept(method("copy"), RequestContextCopyInterceptor) - - onType("org.apache.pekko.http.scaladsl.server.directives.PathDirectives") - .intercept(method("rawPathPrefix"), classOf[PathDirectivesRawPathPrefixInterceptor]) - - onType("org.apache.pekko.http.scaladsl.server.directives.FutureDirectives") - .intercept(method("onComplete"), classOf[ResolveOperationNameOnRouteInterceptor]) - - onTypes("org.apache.pekko.http.scaladsl.server.directives.OnSuccessMagnet$", "org.apache.pekko.http.scaladsl.server.directives.CompleteOrRecoverWithMagnet$") - .intercept(method("apply"), classOf[ResolveOperationNameOnRouteInterceptor]) - - onType("org.apache.pekko.http.scaladsl.server.directives.RouteDirectives") - .intercept(method("complete"), classOf[ResolveOperationNameOnRouteInterceptor]) - .intercept(method("redirect"), classOf[ResolveOperationNameOnRouteInterceptor]) - .intercept(method("failWith"), classOf[ResolveOperationNameOnRouteInterceptor]) - - /** - * Support for HTTP/1 and HTTP/2 at the same time. - * - */ - - onType("org.apache.pekko.stream.scaladsl.FlowOps") - .advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice]) -} - -trait HasMatchingContext { - def defaultOperationName: String - def matchingContext: Seq[PathMatchingContext] - def setMatchingContext(ctx: Seq[PathMatchingContext]): Unit - def setDefaultOperationName(defaultOperationName: String): Unit - def prependMatchingContext(matched: PathMatchingContext): Unit - def popOneMatchingContext(): Unit -} - -object HasMatchingContext { - - case class PathMatchingContext ( - fullPath: String, - matched: Matched[_] - ) - - class Mixin(var matchingContext: Seq[PathMatchingContext], var defaultOperationName: String) extends HasMatchingContext { - - override def setMatchingContext(matchingContext: Seq[PathMatchingContext]): Unit = - this.matchingContext = matchingContext - - override def setDefaultOperationName(defaultOperationName: String): Unit = - this.defaultOperationName = defaultOperationName - - override def prependMatchingContext(matched: PathMatchingContext): Unit = - matchingContext = matched +: matchingContext - - override def popOneMatchingContext(): Unit = - matchingContext = matchingContext.tail - - @Initializer - def initialize(): Unit = - matchingContext = Seq.empty - } -} - -class ResolveOperationNameOnRouteInterceptor -object ResolveOperationNameOnRouteInterceptor { - import org.apache.pekko.http.scaladsl.util.FastFuture._ - - // We are replacing some of the basic directives here to ensure that we will resolve both the Sampling Decision and - // the operation name before the request gets to the actual handling code (presumably inside of a "complete" - // directive. - - def complete(m: => ToResponseMarshallable): StandardRoute = - StandardRoute(resolveOperationName(_).complete(m)) - - def complete[T](status: StatusCode, v: => T)(implicit m: ToEntityMarshaller[T]): StandardRoute = - StandardRoute(resolveOperationName(_).complete((status, v))) - - def complete[T](status: StatusCode, headers: immutable.Seq[HttpHeader], v: => T)(implicit m: ToEntityMarshaller[T]): StandardRoute = - complete((status, headers, v)) - - def redirect(uri: Uri, redirectionType: Redirection): StandardRoute = - StandardRoute(resolveOperationName(_).redirect(uri, redirectionType)) - - def failWith(error: Throwable): StandardRoute = { - Kamon.currentSpan().fail(error) - StandardRoute(resolveOperationName(_).fail(error)) - } - - def onComplete[T](future: => Future[T]): Directive1[Try[T]] = - Directive { inner => ctx => - import ctx.executionContext - resolveOperationName(ctx) - future.fast.transformWith(t => inner(Tuple1(t))(ctx)) - } - - def apply[T](future: => Future[T])(implicit tupler: Tupler[T]): OnSuccessMagnet { type Out = tupler.Out } = - new OnSuccessMagnet { - type Out = tupler.Out - val directive = Directive[tupler.Out] { inner => ctx => - import ctx.executionContext - resolveOperationName(ctx) - future.fast.flatMap(t => inner(tupler(t))(ctx)) - }(tupler.OutIsTuple) - } - - def apply[T](future: => Future[T])(implicit m: ToResponseMarshaller[T]): CompleteOrRecoverWithMagnet = - new CompleteOrRecoverWithMagnet { - val directive = Directive[Tuple1[Throwable]] { inner => ctx => - import ctx.executionContext - resolveOperationName(ctx) - future.fast.transformWith { - case Success(res) => ctx.complete(res) - case Failure(error) => inner(Tuple1(error))(ctx) - } - } - } - - private def resolveOperationName(requestContext: RequestContext): RequestContext = { - - // We will only change the operation name if the last edit made to it was an automatic one. At this point, the only - // way in which the operation name might have changed is if the user changed it with the operationName directive or - // by accessing the Span and changing it directly there, so we wouldn't want to overwrite that. - - Kamon.currentContext().get(LastAutomaticOperationNameEdit.Key).foreach(lastEdit => { - val currentSpan = Kamon.currentSpan() - - if(lastEdit.allowAutomaticChanges) { - if(currentSpan.operationName() == lastEdit.operationName) { - val allMatches = requestContext.asInstanceOf[HasMatchingContext].matchingContext.reverse.map(singleMatch) - val operationName = allMatches.mkString("") - - if(operationName.nonEmpty) { - currentSpan - .name(operationName) - .takeSamplingDecision() - - lastEdit.operationName = operationName - } - } else { - lastEdit.allowAutomaticChanges = false - } - } else { - currentSpan.takeSamplingDecision() - } - }) - - requestContext - } - - private def singleMatch(matching: PathMatchingContext): String = { - val rest = matching.matched.pathRest.toString() - val consumedCount = matching.fullPath.length - rest.length - val consumedSegment = matching.fullPath.substring(0, consumedCount) - - matching.matched.extractions match { - case () => //string segment matched - consumedSegment - case tuple: Product => - val values = tuple.productIterator.toList map { - case Some(x) => List(x.toString) - case None => Nil - case long: Long => List(long.toString, long.toHexString) - case int: Int => List(int.toString, int.toHexString) - case a: Any => List(a.toString) - } - values.flatten.fold(consumedSegment) { (full, value) => - val r = "(?i)(^|/)" + Pattern.quote(value) + "($|/)" - full.replaceFirst(r, "$1{}$2") - } - } - } -} - -/** - * Tracks the last operation name that was automatically assigned to an operation via instrumentation. The - * instrumentation might assign a name to the operations via settings on the HTTP Server instrumentation instance or - * via the Path directives instrumentation, but might never reassign a name if the user somehow assigned their own name - * to the operation. Users chan change operation names by: - * - Using operation mappings via configuration of the HTTP Server. - * - Providing a custom HTTP Operation Name Generator for the server. - * - Using the "operationName" directive. - * - Directly accessing the Span for the current operation and changing the name on it. - * - */ -class LastAutomaticOperationNameEdit( - @volatile var operationName: String, - @volatile var allowAutomaticChanges: Boolean -) - -object LastAutomaticOperationNameEdit { - val Key = Context.key[Option[LastAutomaticOperationNameEdit]]("laone", None) - - def apply(operationName: String, allowAutomaticChanges: Boolean): LastAutomaticOperationNameEdit = - new LastAutomaticOperationNameEdit(operationName, allowAutomaticChanges) -} - -object RequestContextCopyInterceptor { - - @RuntimeType - def copy(@This context: RequestContext, @SuperCall copyCall: Callable[RequestContext]): RequestContext = { - val copiedRequestContext = copyCall.call() - copiedRequestContext.asInstanceOf[HasMatchingContext].setMatchingContext(context.asInstanceOf[HasMatchingContext].matchingContext) - copiedRequestContext - } -} - -class PathDirectivesRawPathPrefixInterceptor -object PathDirectivesRawPathPrefixInterceptor { - import BasicDirectives._ - - def rawPathPrefix[T](@Argument(0) matcher: PathMatcher[T]): Directive[T] = { - implicit val LIsTuple = matcher.ev - - extract { ctx => - val fullPath = ctx.unmatchedPath.toString() - val matching = matcher(ctx.unmatchedPath) - - matching match { - case m: Matched[_] => - ctx.asInstanceOf[HasMatchingContext] - .prependMatchingContext(PathMatchingContext(fullPath, m)) - case _ => - } - - (ctx, matching) - } flatMap { - case (ctx, Matched(rest, values)) => - tprovide(values) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult => - - if(routeResult.isInstanceOf[Rejected]) - ctx.asInstanceOf[HasMatchingContext].popOneMatchingContext() - - routeResult - } - - case (_, Unmatched) => reject - } - } -} - - -object Http2BlueprintInterceptor { - - case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse]) - extends (HttpRequest => Future[HttpResponse]) { - - override def apply(request: HttpRequest): Future[HttpResponse] = handler(request) - } - - @RuntimeType - def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse], - @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = { - - handler match { - case HandlerWithEndpoint(interface, port, _) => - ServerFlowWrapper(zuper.call(), interface, port) - - case _ => - zuper.call() - } - } -} diff --git a/instrumentation/kamon-pekko-http/src/main/scala-3/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala b/instrumentation/kamon-pekko-http/src/main/scala-3/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala deleted file mode 100644 index 59b02528f..000000000 --- a/instrumentation/kamon-pekko-http/src/main/scala-3/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala +++ /dev/null @@ -1,323 +0,0 @@ -package kamon.instrumentation.pekko.http - -import java.util.concurrent.Callable -import org.apache.pekko.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable, ToResponseMarshaller} -import org.apache.pekko.http.scaladsl.model.StatusCodes.Redirection -import org.apache.pekko.http.scaladsl.model.{HttpHeader, HttpRequest, HttpResponse, StatusCode, Uri} -import org.apache.pekko.http.scaladsl.server.PathMatcher.{Matched, Unmatched} -import org.apache.pekko.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet} -import org.apache.pekko.http.scaladsl.server.directives.RouteDirectives.reject -import org.apache.pekko.http.scaladsl.server._ -import org.apache.pekko.http.scaladsl.server.util.Tupler -import org.apache.pekko.http.scaladsl.util.FastFuture -import kamon.Kamon -import kamon.instrumentation.pekko.http.HasMatchingContext.PathMatchingContext -import kamon.instrumentation.context.{HasContext, InvokeWithCapturedContext} -import kanela.agent.api.instrumentation.InstrumentationBuilder -import kanela.agent.api.instrumentation.mixin.Initializer -import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation._ - -import scala.concurrent.{Batchable, ExecutionContext, Future, Promise} -import scala.util.control.NonFatal -import scala.util.{Failure, Success, Try} -import java.util.regex.Pattern -import org.apache.pekko.NotUsed -import org.apache.pekko.http.scaladsl.server.RouteResult.Rejected -import org.apache.pekko.stream.scaladsl.Flow -import kamon.context.Context -import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic - -import scala.collection.immutable - - -class PekkoHttpServerInstrumentation extends InstrumentationBuilder { - /** - * When instrumenting bindAndHandle what we do is wrap the Flow[HttpRequest, HttpResponse, NotUsed] provided by - * the user and add all the processing there. This is the part of the instrumentation that performs Context - * propagation, tracing and gather metrics using the HttpServerInstrumentation packed in common. - * - * One important point about the HTTP Server instrumentation is that because it is almost impossible to have a proper - * operation name before the request processing hits the routing tree, we are delaying the sampling decision to the - * point at which we have some operation name. - */ - onType("org.apache.pekko.http.scaladsl.HttpExt") - .advise(method("bindAndHandle"), classOf[HttpExtBindAndHandleAdvice]) - - /** - * For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow - * creation happen at different times we are wrapping the handler with the interface/port data and reading that - * information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1. - * - */ - onType("org.apache.pekko.http.impl.engine.http2.Http2Ext") - .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) - - onType("org.apache.pekko.http.impl.engine.http2.Http2Blueprint$") - .intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor) - - /** - * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free - * of variables) and take a Sampling Decision in case none has been taken so far. - */ - onType("org.apache.pekko.http.scaladsl.server.RequestContextImpl") - .mixin(classOf[HasMatchingContext.Mixin]) - .intercept(method("copy"), RequestContextCopyInterceptor) - - onType("org.apache.pekko.http.scaladsl.server.directives.PathDirectives") - .intercept(method("rawPathPrefix"), classOf[PathDirectivesRawPathPrefixInterceptor]) - - onType("org.apache.pekko.http.scaladsl.server.directives.FutureDirectives") - .intercept(method("onComplete"), classOf[ResolveOperationNameOnRouteInterceptor]) - - onTypes("org.apache.pekko.http.scaladsl.server.directives.OnSuccessMagnet$", "org.apache.pekko.http.scaladsl.server.directives.CompleteOrRecoverWithMagnet$") - .intercept(method("apply"), classOf[ResolveOperationNameOnRouteInterceptor]) - - onType("org.apache.pekko.http.scaladsl.server.directives.RouteDirectives") - .intercept(method("complete"), classOf[ResolveOperationNameOnRouteInterceptor]) - .intercept(method("redirect"), classOf[ResolveOperationNameOnRouteInterceptor]) - .intercept(method("failWith"), classOf[ResolveOperationNameOnRouteInterceptor]) - - - /** - * Support for HTTP/1 and HTTP/2 at the same time. - * - */ - - onType("org.apache.pekko.stream.scaladsl.FlowOps") - .advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice]) -} - -trait HasMatchingContext { - def defaultOperationName: String - def matchingContext: Seq[PathMatchingContext] - def setMatchingContext(ctx: Seq[PathMatchingContext]): Unit - def setDefaultOperationName(defaultOperationName: String): Unit - def prependMatchingContext(matched: PathMatchingContext): Unit - def popOneMatchingContext(): Unit -} - -object HasMatchingContext { - - case class PathMatchingContext ( - fullPath: String, - matched: Matched[_] - ) - - class Mixin(var matchingContext: Seq[PathMatchingContext], var defaultOperationName: String) extends HasMatchingContext { - - override def setMatchingContext(matchingContext: Seq[PathMatchingContext]): Unit = - this.matchingContext = matchingContext - - override def setDefaultOperationName(defaultOperationName: String): Unit = - this.defaultOperationName = defaultOperationName - - override def prependMatchingContext(matched: PathMatchingContext): Unit = - matchingContext = matched +: matchingContext - - override def popOneMatchingContext(): Unit = - matchingContext = matchingContext.tail - - @Initializer - def initialize(): Unit = - matchingContext = Seq.empty - } -} - -class ResolveOperationNameOnRouteInterceptor -object ResolveOperationNameOnRouteInterceptor { - import org.apache.pekko.http.scaladsl.util.FastFuture._ - - // We are replacing some of the basic directives here to ensure that we will resolve both the Sampling Decision and - // the operation name before the request gets to the actual handling code (presumably inside of a "complete" - // directive. - - def complete(m: => ToResponseMarshallable): StandardRoute = - StandardRoute(resolveOperationName(_).complete(m)) - - def complete[T](status: StatusCode, v: => T)(implicit m: ToEntityMarshaller[T]): StandardRoute = - StandardRoute(resolveOperationName(_).complete((status, v))) - - def complete[T](status: StatusCode, headers: immutable.Seq[HttpHeader], v: => T)(implicit m: ToEntityMarshaller[T]): StandardRoute = - complete((status, headers, v)) - - def redirect(uri: Uri, redirectionType: Redirection): StandardRoute = - StandardRoute(resolveOperationName(_).redirect(uri, redirectionType)) - - def failWith(error: Throwable): StandardRoute = { - Kamon.currentSpan().fail(error) - StandardRoute(resolveOperationName(_).fail(error)) - } - - def onComplete[T](future: => Future[T]): Directive1[Try[T]] = - Directive { inner => ctx => - import ctx.executionContext - resolveOperationName(ctx) - future.fast.transformWith(t => inner(Tuple1(t))(ctx)) - } - - def apply[T](future: => Future[T])(implicit tupler: Tupler[T]): OnSuccessMagnet { type Out = tupler.Out } = - new OnSuccessMagnet { - type Out = tupler.Out - val directive = Directive[tupler.Out] { inner => ctx => - import ctx.executionContext - resolveOperationName(ctx) - future.fast.flatMap(t => inner(tupler(t))(ctx)) - }(tupler.OutIsTuple) - } - - def apply[T](future: => Future[T])(implicit m: ToResponseMarshaller[T]): CompleteOrRecoverWithMagnet = - new CompleteOrRecoverWithMagnet { - val directive = Directive[Tuple1[Throwable]] { inner => ctx => - import ctx.executionContext - resolveOperationName(ctx) - future.fast.transformWith { - case Success(res) => ctx.complete(res) - case Failure(error) => inner(Tuple1(error))(ctx) - } - } - } - - private def resolveOperationName(requestContext: RequestContext): RequestContext = { - - // We will only change the operation name if the last edit made to it was an automatic one. At this point, the only - // way in which the operation name might have changed is if the user changed it with the operationName directive or - // by accessing the Span and changing it directly there, so we wouldn't want to overwrite that. - - Kamon.currentContext().get(LastAutomaticOperationNameEdit.Key).foreach(lastEdit => { - val currentSpan = Kamon.currentSpan() - - if(lastEdit.allowAutomaticChanges) { - if(currentSpan.operationName() == lastEdit.operationName) { - val allMatches = requestContext.asInstanceOf[HasMatchingContext].matchingContext.reverse.map(singleMatch) - val operationName = allMatches.mkString("") - - if (operationName.nonEmpty) { - currentSpan - .name(operationName) - .takeSamplingDecision() - - lastEdit.operationName = operationName - } - } else { - lastEdit.allowAutomaticChanges = false - } - } else { - currentSpan.takeSamplingDecision() - } - }) - - requestContext - } - - private def singleMatch(matching: PathMatchingContext): String = { - val rest = matching.matched.pathRest.toString() - val consumedCount = matching.fullPath.length - rest.length - val consumedSegment = matching.fullPath.substring(0, consumedCount) - - matching.matched.extractions match { - case () => //string segment matched - consumedSegment - case tuple: Product => - val values = tuple.productIterator.toList map { - case Some(x) => List(x.toString) - case None => Nil - case long: Long => List(long.toString, long.toHexString) - case int: Int => List(int.toString, int.toHexString) - case a: Any => List(a.toString) - } - values.flatten.fold(consumedSegment) { (full, value) => - val r = "(?i)(^|/)" + Pattern.quote(value) + "($|/)" - full.replaceFirst(r, "$1{}$2") - } - } - } -} - -/** - * Tracks the last operation name that was automatically assigned to an operation via instrumentation. The - * instrumentation might assign a name to the operations via settings on the HTTP Server instrumentation instance or - * via the Path directives instrumentation, but might never reassign a name if the user somehow assigned their own name - * to the operation. Users chan change operation names by: - * - Using operation mappings via configuration of the HTTP Server. - * - Providing a custom HTTP Operation Name Generator for the server. - * - Using the "operationName" directive. - * - Directly accessing the Span for the current operation and changing the name on it. - * - */ -class LastAutomaticOperationNameEdit( - @volatile var operationName: String, - @volatile var allowAutomaticChanges: Boolean -) - -object LastAutomaticOperationNameEdit { - val Key = Context.key[Option[LastAutomaticOperationNameEdit]]("laone", None) - - def apply(operationName: String, allowAutomaticChanges: Boolean): LastAutomaticOperationNameEdit = - new LastAutomaticOperationNameEdit(operationName, allowAutomaticChanges) -} - -object RequestContextCopyInterceptor { - - @RuntimeType - def copy(@This context: RequestContext, @SuperCall copyCall: Callable[RequestContext]): RequestContext = { - val copiedRequestContext = copyCall.call() - copiedRequestContext.asInstanceOf[HasMatchingContext].setMatchingContext(context.asInstanceOf[HasMatchingContext].matchingContext) - copiedRequestContext - } -} - -class PathDirectivesRawPathPrefixInterceptor -object PathDirectivesRawPathPrefixInterceptor { - import BasicDirectives._ - - def rawPathPrefix[T](@Argument(0) matcher: PathMatcher[T]): Directive[T] = { - implicit val LIsTuple = matcher.ev - - extract { ctx => - val fullPath = ctx.unmatchedPath.toString() - val matching = matcher(ctx.unmatchedPath) - - matching match { - case m: Matched[_] => - ctx.asInstanceOf[HasMatchingContext] - .prependMatchingContext(PathMatchingContext(fullPath, m)) - case _ => - } - - (ctx, matching) - } flatMap { - case (ctx, Matched(rest, values)) => - tprovide(values)(LIsTuple) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult => - - if(routeResult.isInstanceOf[Rejected]) - ctx.asInstanceOf[HasMatchingContext].popOneMatchingContext() - - routeResult - } - - case (_, Unmatched) => reject - } - } -} - -object Http2BlueprintInterceptor { - - case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse]) - extends (HttpRequest => Future[HttpResponse]) { - - override def apply(request: HttpRequest): Future[HttpResponse] = handler(request) - } - - @RuntimeType - def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse], - @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = { - - handler match { - case HandlerWithEndpoint(interface, port, _) => - ServerFlowWrapper(zuper.call(), interface, port) - - case _ => - zuper.call() - } - } -} diff --git a/instrumentation/kamon-pekko-http/src/main/scala-2.13/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala b/instrumentation/kamon-pekko-http/src/main/scala/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala similarity index 96% rename from instrumentation/kamon-pekko-http/src/main/scala-2.13/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala rename to instrumentation/kamon-pekko-http/src/main/scala/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala index 778c32911..3a8c1dd67 100644 --- a/instrumentation/kamon-pekko-http/src/main/scala-2.13/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala +++ b/instrumentation/kamon-pekko-http/src/main/scala/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala @@ -1,33 +1,29 @@ package kamon.instrumentation.pekko.http -import java.util.concurrent.Callable -import org.apache.pekko.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable, ToResponseMarshaller} -import org.apache.pekko.http.scaladsl.model.StatusCodes.Redirection -import org.apache.pekko.http.scaladsl.model.{HttpHeader, HttpRequest, HttpResponse, StatusCode, Uri} -import org.apache.pekko.http.scaladsl.server.PathMatcher.{Matched, Unmatched} -import org.apache.pekko.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet} -import org.apache.pekko.http.scaladsl.server.directives.RouteDirectives.reject -import org.apache.pekko.http.scaladsl.server._ -import org.apache.pekko.http.scaladsl.server.util.Tupler -import org.apache.pekko.http.scaladsl.util.FastFuture import kamon.Kamon +import kamon.context.Context import kamon.instrumentation.pekko.http.HasMatchingContext.PathMatchingContext -import kamon.instrumentation.context.{HasContext, InvokeWithCapturedContext} import kanela.agent.api.instrumentation.InstrumentationBuilder import kanela.agent.api.instrumentation.mixin.Initializer import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation._ - -import scala.concurrent.{Batchable, ExecutionContext, Future, Promise} -import scala.util.control.NonFatal -import scala.util.{Failure, Success, Try} -import java.util.regex.Pattern +import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic import org.apache.pekko.NotUsed +import org.apache.pekko.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable, ToResponseMarshaller} +import org.apache.pekko.http.scaladsl.model.StatusCodes.Redirection +import org.apache.pekko.http.scaladsl.model._ +import org.apache.pekko.http.scaladsl.server.PathMatcher.{Matched, Unmatched} import org.apache.pekko.http.scaladsl.server.RouteResult.Rejected +import org.apache.pekko.http.scaladsl.server._ +import org.apache.pekko.http.scaladsl.server.directives.RouteDirectives.reject +import org.apache.pekko.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet} +import org.apache.pekko.http.scaladsl.server.util.Tupler import org.apache.pekko.stream.scaladsl.Flow -import kamon.context.Context -import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic +import java.util.concurrent.Callable +import java.util.regex.Pattern import scala.collection.immutable +import scala.concurrent.Future +import scala.util.{Failure, Success, Try} class PekkoHttpServerInstrumentation extends InstrumentationBuilder { @@ -287,7 +283,7 @@ object PathDirectivesRawPathPrefixInterceptor { (ctx, matching) } flatMap { case (ctx, Matched(rest, values)) => - tprovide(values) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult => + tprovide[T](values) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult => if(routeResult.isInstanceOf[Rejected]) ctx.asInstanceOf[HasMatchingContext].popOneMatchingContext() diff --git a/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpClientTracingSpec.scala b/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpClientTracingSpec.scala index 2f5822320..7ff1f0c44 100644 --- a/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpClientTracingSpec.scala +++ b/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpClientTracingSpec.scala @@ -23,7 +23,7 @@ import org.apache.pekko.actor.ActorSystem import org.apache.pekko.http.scaladsl.Http import org.apache.pekko.http.scaladsl.model.HttpRequest import org.apache.pekko.http.scaladsl.model.headers.RawHeader -import org.apache.pekko.stream.ActorMaterializer +import org.apache.pekko.stream.{ActorMaterializer, Materializer} import org.json4s._ import org.json4s.native.JsonMethods._ import org.scalatest.OptionValues @@ -31,6 +31,7 @@ import org.scalatest.concurrent.Eventually import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ class PekkoHttpClientTracingSpec extends AnyWordSpecLike with Matchers with InitAndStopKamonAfterAll with MetricInspection.Syntax @@ -38,14 +39,14 @@ class PekkoHttpClientTracingSpec extends AnyWordSpecLike with Matchers with Init import TestWebServer.Endpoints._ - implicit private val system = ActorSystem("http-client-instrumentation-spec") - implicit private val executor = system.dispatcher - implicit private val materializer = ActorMaterializer() + implicit private val system: ActorSystem = ActorSystem("http-client-instrumentation-spec") + implicit private val executor: ExecutionContextExecutor = system.dispatcher + implicit private val materializer: Materializer = Materializer(system) val timeoutTest: FiniteDuration = 5 second val interface = "127.0.0.1" val port = 8080 - val webServer = startServer(interface, port) + val webServer: WebServer = startServer(interface, port) "the Pekko HTTP client instrumentation" should { "create a client Span when using the request level API - Http().singleRequest(...)" in { diff --git a/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpServerMetricsSpec.scala b/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpServerMetricsSpec.scala index 9dbfb4b6a..3e655015b 100644 --- a/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpServerMetricsSpec.scala +++ b/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpServerMetricsSpec.scala @@ -22,14 +22,14 @@ import org.apache.pekko.actor.ActorSystem import org.apache.pekko.http.scaladsl.Http import org.apache.pekko.http.scaladsl.model.{HttpRequest, HttpResponse} import org.apache.pekko.http.scaladsl.settings.ClientConnectionSettings -import org.apache.pekko.stream.ActorMaterializer +import org.apache.pekko.stream.{ActorMaterializer, Materializer} import org.apache.pekko.stream.scaladsl.{Sink, Source} import org.scalatest.OptionValues import org.scalatest.concurrent.Eventually import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike -import scala.concurrent.Future +import scala.concurrent.{ExecutionContextExecutor, Future} import scala.concurrent.duration._ class PekkoHttpServerMetricsSpec extends AnyWordSpecLike with Matchers with InitAndStopKamonAfterAll with InstrumentInspection.Syntax @@ -37,14 +37,14 @@ class PekkoHttpServerMetricsSpec extends AnyWordSpecLike with Matchers with Init import TestWebServer.Endpoints._ - implicit private val system = ActorSystem("http-server-metrics-instrumentation-spec") - implicit private val executor = system.dispatcher - implicit private val materializer = ActorMaterializer() + implicit private val system: ActorSystem = ActorSystem("http-server-metrics-instrumentation-spec") + implicit private val executor: ExecutionContextExecutor = system.dispatcher + implicit private val materializer: Materializer = Materializer(system) val port = 8083 val interface = "127.0.0.1" val timeoutTest: FiniteDuration = 5 second - val webServer = startServer(interface, port) + val webServer: WebServer = startServer(interface, port) "the Pekko HTTP server instrumentation" should { "track the number of open connections and active requests on the Server side" in { diff --git a/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpServerTracingSpec.scala b/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpServerTracingSpec.scala index ae80ee77a..5da805ac1 100644 --- a/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpServerTracingSpec.scala +++ b/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/PekkoHttpServerTracingSpec.scala @@ -17,7 +17,6 @@ package kamon.pekko.http import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.stream.ActorMaterializer import kamon.tag.Lookups.{plain, plainBoolean, plainLong} import kamon.testkit._ import kamon.trace.Span.Mark @@ -31,6 +30,7 @@ import java.util.UUID import javax.net.ssl.{HostnameVerifier, SSLSession} import scala.concurrent.duration._ import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext import scala.util.control.NonFatal class PekkoHttpServerTracingSpec extends AnyWordSpecLike with Matchers with ScalaFutures with Inside with InitAndStopKamonAfterAll @@ -38,8 +38,8 @@ class PekkoHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scal import TestWebServer.Endpoints._ - implicit private val system = ActorSystem("http-server-instrumentation-spec") - implicit private val executor = system.dispatcher + implicit private val system: ActorSystem = ActorSystem("http-server-instrumentation-spec") + implicit private val executor: ExecutionContext = system.dispatcher val (sslSocketFactory, trustManager) = clientSSL() val okHttp = new OkHttpClient.Builder() diff --git a/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/ServerFlowWrapperSpec.scala b/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/ServerFlowWrapperSpec.scala index 9246dbbaa..06def45ff 100644 --- a/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/ServerFlowWrapperSpec.scala +++ b/instrumentation/kamon-pekko-http/src/test/scala/kamon/pekko/http/ServerFlowWrapperSpec.scala @@ -4,18 +4,20 @@ import kamon.instrumentation.pekko.http.ServerFlowWrapper import kamon.testkit.InitAndStopKamonAfterAll import org.apache.pekko.actor.ActorSystem import org.apache.pekko.http.scaladsl.model._ -import org.apache.pekko.stream.ActorMaterializer +import org.apache.pekko.stream.{ActorMaterializer, Materializer} import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source} import org.apache.pekko.util.ByteString import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.ExecutionContextExecutor + class ServerFlowWrapperSpec extends AnyWordSpecLike with Matchers with ScalaFutures with InitAndStopKamonAfterAll { - implicit private val system = ActorSystem("http-client-instrumentation-spec") - implicit private val executor = system.dispatcher - implicit private val materializer = ActorMaterializer() + implicit private val system: ActorSystem = ActorSystem("http-client-instrumentation-spec") + implicit private val executor: ExecutionContextExecutor = system.dispatcher + implicit private val materializer: Materializer = Materializer(system) private val okReturningFlow = Flow[HttpRequest].map { _ => HttpResponse(status = StatusCodes.OK, entity = HttpEntity("OK")) diff --git a/instrumentation/kamon-pekko-http/src/test/scala/kamon/testkit/TestWebServer.scala b/instrumentation/kamon-pekko-http/src/test/scala/kamon/testkit/TestWebServer.scala index 490e6bd42..e4d3f892d 100644 --- a/instrumentation/kamon-pekko-http/src/test/scala/kamon/testkit/TestWebServer.scala +++ b/instrumentation/kamon-pekko-http/src/test/scala/kamon/testkit/TestWebServer.scala @@ -33,14 +33,15 @@ import org.apache.pekko.util.ByteString import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLSocketFactory, TrustManagerFactory, X509TrustManager} import kamon.Kamon import kamon.instrumentation.pekko.http.TracingDirectives -import org.json4s.{DefaultFormats, native} +import org.json4s.{DefaultFormats, native, Serialization} import kamon.tag.Lookups.plain import kamon.trace.Trace + import scala.concurrent.{ExecutionContext, Future} trait TestWebServer extends TracingDirectives { - implicit val serialization = native.Serialization - implicit val formats = DefaultFormats + implicit val serialization: Serialization = native.Serialization + implicit val formats: DefaultFormats = DefaultFormats import Json4sSupport._ def startServer(interface: String, port: Int, https: Boolean = false)(implicit system: ActorSystem): WebServer = { @@ -184,7 +185,7 @@ trait TestWebServer extends TracingDirectives { new WebServer(interface, port, "http", Http().newServerAt(interface, port).bindFlow(routes)) } - def httpContext() = { + def httpContext(): HttpsConnectionContext = { val password = "kamon".toCharArray val ks = KeyStore.getInstance("PKCS12") ks.load(getClass.getClassLoader.getResourceAsStream("https/server.p12"), password) diff --git a/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/ActorCellInstrumentationSpec.scala b/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/ActorCellInstrumentationSpec.scala index c85ed0d29..abe57b89c 100644 --- a/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/ActorCellInstrumentationSpec.scala +++ b/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/ActorCellInstrumentationSpec.scala @@ -29,12 +29,13 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import scala.collection.mutable.ListBuffer +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("ActorCellInstrumentationSpec")) with AnyWordSpecLike with BeforeAndAfterAll with ImplicitSender with Eventually with MetricInspection.Syntax with Matchers with InitAndStopKamonAfterAll { - implicit lazy val executionContext = system.dispatcher + implicit lazy val executionContext: ExecutionContext = system.dispatcher import ContextTesting._ "the message passing instrumentation" should { @@ -59,7 +60,7 @@ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("ActorCellInstrum } "propagate the current context when using the ask pattern" in new EchoActorFixture { - implicit val timeout = Timeout(1 seconds) + implicit val timeout: Timeout = Timeout(1 seconds) Kamon.runWithContext(testContext("propagate-with-ask")) { // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. (contextEchoActor ? "test") pipeTo (testActor) diff --git a/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/AskPatternInstrumentationSpec.scala b/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/AskPatternInstrumentationSpec.scala index ab4fc7fec..a5f51a267 100644 --- a/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/AskPatternInstrumentationSpec.scala +++ b/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/AskPatternInstrumentationSpec.scala @@ -17,24 +17,24 @@ package kamon.instrumentation.pekko +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.instrumentation.pekko.ContextTesting._ +import kamon.testkit.InitAndStopKamonAfterAll import org.apache.pekko.actor._ import org.apache.pekko.pattern.ask import org.apache.pekko.testkit.{EventFilter, ImplicitSender, TestKit} import org.apache.pekko.util.Timeout -import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.testkit.InitAndStopKamonAfterAll -import kamon.instrumentation.pekko.ContextTesting._ -import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ class AskPatternInstrumentationSpec extends TestKit(ActorSystem("AskPatternInstrumentationSpec")) with AnyWordSpecLike with InitAndStopKamonAfterAll with ImplicitSender { - implicit lazy val ec = system.dispatcher - implicit val askTimeout = Timeout(10 millis) + implicit lazy val ec: ExecutionContext = system.dispatcher + implicit val askTimeout: Timeout = Timeout(10 millis) // TODO: Make this work with ActorSelections @@ -93,7 +93,7 @@ class AskPatternInstrumentationSpec extends TestKit(ActorSystem("AskPatternInstr } class NoReply extends Actor { - def receive = { + def receive: Receive = { case _ => } } diff --git a/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/MessageTracingSpec.scala b/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/MessageTracingSpec.scala index c11cd5877..e5071e676 100644 --- a/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/MessageTracingSpec.scala +++ b/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/MessageTracingSpec.scala @@ -39,10 +39,10 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any expectMsg("pong") eventually(timeout(2 seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value val spanTags = stringTag(span) _ spanTags("component") shouldBe "pekko.actor" - span.operationName shouldBe("tell(String)") + span.operationName shouldBe "tell(String)" spanTags("pekko.actor.path") shouldNot include ("filteredout") spanTags("pekko.actor.path") should be ("MessageTracing/user/traced-probe-1") } @@ -54,9 +54,9 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any expectMsg("pong") eventually(timeout(2 seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value val spanTags = stringTag(span) _ - span.operationName shouldBe("tell(String)") + span.operationName shouldBe "tell(String)" spanTags("component") shouldBe "pekko.actor" spanTags("pekko.system") shouldBe "MessageTracing" spanTags("pekko.actor.path") shouldBe "MessageTracing/user/traced" @@ -68,9 +68,9 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any Await.ready(pong, 10 seconds) eventually(timeout(2 seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value val spanTags = stringTag(span) _ - span.operationName shouldBe("ask(String)") + span.operationName shouldBe "ask(String)" spanTags("component") shouldBe "pekko.actor" spanTags("pekko.system") shouldBe "MessageTracing" spanTags("pekko.actor.path") shouldBe "MessageTracing/user/traced" @@ -88,7 +88,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any // Span for the first actor message val firstSpanID = eventually(timeout(2 seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value val spanTags = stringTag(span) _ spanTags("component") shouldBe "pekko.actor" @@ -101,7 +101,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any // Span for the second actor message eventually(timeout(2 seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value val spanTags = stringTag(span) _ span.parentId shouldBe firstSpanID span.operationName should include("tell(String)") @@ -123,9 +123,9 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any // Span for the first actor message val firstSpanID = eventually(timeout(2 seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value val spanTags = stringTag(span) _ - span.operationName shouldBe("tell(Tuple2)") + span.operationName shouldBe "tell(Tuple2)" spanTags("component") shouldBe "pekko.actor" spanTags("pekko.system") shouldBe "MessageTracing" spanTags("pekko.actor.path") shouldBe "MessageTracing/user/traced-chain-first" @@ -137,10 +137,10 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any // Span for the second actor message eventually(timeout(2 seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value val spanTags = stringTag(span) _ span.parentId shouldBe firstSpanID - span.operationName shouldBe("tell(String)") + span.operationName shouldBe "tell(String)" spanTags("component") shouldBe "pekko.actor" spanTags("pekko.system") shouldBe "MessageTracing" spanTags("pekko.actor.path") shouldBe "MessageTracing/user/traced-chain-last" @@ -157,7 +157,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any expectMsg("pong") eventually(timeout(2 seconds)) { - val spanTags = stringTag(testSpanReporter.nextSpan().value) _ + val spanTags = stringTag(testSpanReporter().nextSpan().value) _ spanTags("component") shouldBe "pekko.actor" spanTags("pekko.actor.path") shouldNot include ("nontraced-pool-router") spanTags("pekko.actor.path") should be ("MessageTracing/user/traced-routee-one") @@ -171,17 +171,17 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any expectMsg("pong") eventually(timeout(2 seconds)) { - val spanTags = stringTag(testSpanReporter.nextSpan().value) _ + val spanTags = stringTag(testSpanReporter().nextSpan().value) _ spanTags("component") shouldBe "pekko.actor" spanTags("pekko.actor.path") should be ("MessageTracing/user/traced-pool-router") } } "not track Pekko Streams actors" in { - implicit val timeout = Timeout(10 seconds) + implicit val timeout: Timeout = Timeout(10 seconds) val actorWithMaterializer = system.actorOf(Props[ActorWithMaterializer]) - val finishedStream = Kamon.runWithSpan(Kamon.serverSpanBuilder("wrapper", "test").start()) { + val _ = Kamon.runWithSpan(Kamon.serverSpanBuilder("wrapper", "test").start()) { actorWithMaterializer.ask("stream").mapTo[String] } @@ -222,7 +222,7 @@ class TracingTestActor extends Actor { } class ActorWithMaterializer extends Actor { - implicit val mat = ActorMaterializer() + implicit val mat: Materializer = Materializer(context) override def receive: Receive = { case "stream" => diff --git a/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/SystemMessageInstrumentationSpec.scala b/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/SystemMessageInstrumentationSpec.scala index 3aeb7c84d..063104857 100644 --- a/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/SystemMessageInstrumentationSpec.scala +++ b/instrumentation/kamon-pekko/src/test/scala/kamon/instrumentation/pekko/SystemMessageInstrumentationSpec.scala @@ -17,28 +17,29 @@ package kamon.instrumentation.pekko -import org.apache.pekko.actor.SupervisorStrategy.{Escalate, Restart, Resume, Stop} -import org.apache.pekko.actor._ -import org.apache.pekko.testkit.{ImplicitSender, TestKit} import kamon.Kamon import kamon.instrumentation.pekko.ContextTesting._ import kamon.tag.Lookups._ +import org.apache.pekko.actor.SupervisorStrategy.{Escalate, Restart, Resume, Stop} +import org.apache.pekko.actor._ +import org.apache.pekko.testkit.{ImplicitSender, TestKit} import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.{AnyWordSpec, AnyWordSpecLike} +import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.ExecutionContextExecutor import scala.util.control.NonFatal class SystemMessageInstrumentationSpec extends TestKit(ActorSystem("ActorSystemMessageInstrumentationSpec")) with AnyWordSpecLike with Matchers with BeforeAndAfterAll with ImplicitSender { - implicit lazy val executionContext = system.dispatcher + implicit lazy val executionContext: ExecutionContextExecutor = system.dispatcher "the system message passing instrumentation" should { "capture and propagate the current context while processing the Create message in top level actors" in { Kamon.runWithContext(testContext("creating-top-level-actor")) { system.actorOf(Props(new Actor { testActor ! propagatedContextKey() - def receive: Actor.Receive = { case any => } + def receive: Actor.Receive = { case _ => } })) } @@ -123,19 +124,19 @@ class SystemMessageInstrumentationSpec extends TestKit(ActorSystem("ActorSystemM sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = { class GrandParent extends Actor { - val child = context.actorOf(Props(new Parent)) + val child: ActorRef = context.actorOf(Props(new Parent)) override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { case NonFatal(_) => testActor ! propagatedContextKey(); Stop } - def receive = { + def receive: Receive = { case any => child forward any } } class Parent extends Actor { - val child = context.actorOf(Props(new Child)) + val child: ActorRef = context.actorOf(Props(new Child)) override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { case NonFatal(_) => testActor ! propagatedContextKey(); directive @@ -152,7 +153,7 @@ class SystemMessageInstrumentationSpec extends TestKit(ActorSystem("ActorSystemM } class Child extends Actor { - def receive = { + def receive: Receive = { case "fail" => throw new ArithmeticException("Division by zero.") case "context" => sender ! propagatedContextKey() }