From df11439c31bd744ca5faf565718ea3e51004afc6 Mon Sep 17 00:00:00 2001 From: "nastassia.dailidava" Date: Fri, 27 Sep 2024 18:00:00 +0200 Subject: [PATCH] allegro-internal/flex-roadmap#819 Migrated metrics to prometheus --- .../servicemesh/envoycontrol/ControlPlane.kt | 18 ++- .../MetricsDiscoveryServerCallbacks.kt | 16 ++- .../snapshot/EnvoySnapshotFactory.kt | 15 +- .../envoycontrol/snapshot/SnapshotUpdater.kt | 61 +++++--- .../synchronization/RemoteServices.kt | 24 +++- .../metrics/ThreadPoolMetricTest.kt | 24 +++- .../snapshot/SnapshotUpdaterTest.kt | 4 +- .../infrastructure/ControlPlaneConfig.kt | 17 ++- .../RestTemplateControlPlaneClient.kt | 10 +- .../consul/services/ConsulServiceChanges.kt | 4 +- .../MetricsDiscoveryServerCallbacksTest.kt | 130 +++++++++++------- 11 files changed, 219 insertions(+), 104 deletions(-) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt index d03592962..e4429b858 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt @@ -10,6 +10,7 @@ import io.envoyproxy.controlplane.server.callback.SnapshotCollectingCallback import io.grpc.Server import io.grpc.netty.NettyServerBuilder import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.nio.NioServerSocketChannel @@ -221,10 +222,12 @@ class ControlPlane private constructor( nioEventLoopExecutor ) ) - .bossEventLoopGroup(NioEventLoopGroup( - properties.server.nioBossEventLoopThreadCount, - nioBossEventLoopExecutor - )) + .bossEventLoopGroup( + NioEventLoopGroup( + properties.server.nioBossEventLoopThreadCount, + nioBossEventLoopExecutor + ) + ) .channelType(NioServerSocketChannel::class.java) .executor(grpcServerExecutor) .keepAliveTime(properties.server.netty.keepAliveTime.toMillis(), TimeUnit.MILLISECONDS) @@ -410,7 +413,12 @@ class ControlPlane private constructor( } private fun meterExecutor(executor: ExecutorService, executorServiceName: String) { - ExecutorServiceMetrics(executor, executorServiceName, executorServiceName, emptySet()) + ExecutorServiceMetrics( + executor, + executorServiceName, + "envoy-control", + Tags.of("executor", executorServiceName) + ) .bindTo(meterRegistry) } } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt index c0f65410b..8cb6ff604 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt @@ -1,10 +1,12 @@ package pl.allegro.tech.servicemesh.envoycontrol.server.callbacks +import com.google.common.net.InetAddresses.increment import io.envoyproxy.controlplane.cache.Resources import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import java.util.concurrent.atomic.AtomicInteger class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) : DiscoveryServerCallbacks { @@ -34,9 +36,9 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) .map { type -> type to AtomicInteger(0) } .toMap() - meterRegistry.gauge("grpc.all-connections", connections) + meterRegistry.gauge("grpc.connections", Tags.of("connection-type", "all"), connections) connectionsByType.forEach { (type, typeConnections) -> - meterRegistry.gauge("grpc.connections.${type.name.toLowerCase()}", typeConnections) + meterRegistry.gauge("grpc.connections", Tags.of("connection-type", type.name.lowercase()), typeConnections) } } @@ -51,7 +53,10 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) } override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) { - meterRegistry.counter("grpc.requests.${StreamType.fromTypeUrl(request.typeUrl).name.toLowerCase()}") + meterRegistry.counter( + "grpc.requests.count", + Tags.of("type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "metric-type", "total") + ) .increment() } @@ -59,7 +64,10 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) streamId: Long, request: V3DeltaDiscoveryRequest ) { - meterRegistry.counter("grpc.requests.${StreamType.fromTypeUrl(request.typeUrl).name.toLowerCase()}.delta") + meterRegistry.counter( + "grpc.requests.count", + Tags.of("type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "metric-type", "delta") + ) .increment() } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt index c56165dad..4f1e8900c 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt @@ -7,6 +7,7 @@ import io.envoyproxy.envoy.config.listener.v3.Listener import io.envoyproxy.envoy.config.route.v3.RouteConfiguration import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.Timer import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode @@ -67,7 +68,12 @@ class EnvoySnapshotFactory( endpoints = endpoints, properties = properties.outgoingPermissions ) - sample.stop(meterRegistry.timer("snapshot-factory.new-snapshot.time")) + sample.stop( + meterRegistry.timer( + "snapshot-factory.seconds", + Tags.of("operation", "new-snapshot", "type", "global") + ) + ) return snapshot } @@ -155,7 +161,12 @@ class EnvoySnapshotFactory( val groupSample = Timer.start(meterRegistry) val newSnapshotForGroup = newSnapshotForGroup(group, globalSnapshot) - groupSample.stop(meterRegistry.timer("snapshot-factory.get-snapshot-for-group.time")) + groupSample.stop( + meterRegistry.timer( + "snapshot-factory.seconds", + Tags.of("operation", "new-snapshot", "type", "group") + ) + ) return newSnapshotForGroup } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt index 3baea120d..08020b56e 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot import io.envoyproxy.controlplane.cache.SnapshotCache import io.envoyproxy.controlplane.cache.v3.Snapshot import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.Timer import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.ADS import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS @@ -50,9 +51,12 @@ class SnapshotUpdater( // step 2: only watches groups. if groups change we use the last services state and update those groups groups().subscribeOn(globalSnapshotScheduler) ) - .measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2) + .measureBuffer("snapshot.updater.count.total", meterRegistry, innerSources = 2) .checkpoint("snapshot-updater-merged") - .name("snapshot-updater-merged").metrics() + .name("snapshot.updater.count.total") + .tag("status", "merged") + .tag("type", "global") + .metrics() // step 3: group updates don't provide a snapshot, // so we piggyback the last updated snapshot state for use .scan { previous: UpdateResult, newUpdate: UpdateResult -> @@ -87,14 +91,20 @@ class SnapshotUpdater( // see GroupChangeWatcher return onGroupAdded .publishOn(globalSnapshotScheduler) - .measureBuffer("snapshot-updater-groups-published", meterRegistry) + .measureBuffer("snapshot-updater.count.total", meterRegistry) .checkpoint("snapshot-updater-groups-published") - .name("snapshot-updater-groups-published").metrics() + .name("snapshot-updater.count.total") + .tag("type", "groups") + .tag("status", "published").metrics() .map { groups -> UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups) } .onErrorResume { e -> - meterRegistry.counter("snapshot-updater.groups.updates.errors").increment() + meterRegistry.counter( + "snapshot-updater.errors.total", + Tags.of("type", "groups") + ) + .increment() logger.error("Unable to process new group", e) Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES)) } @@ -102,13 +112,19 @@ class SnapshotUpdater( internal fun services(states: Flux): Flux { return states - .name("snapshot-updater-services-sampled").metrics() - .onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry) + .name("snapshot-updater.count.total") + .tag("type", "services") + .tag("status", "sampled") + .metrics() + .onBackpressureLatestMeasured("snapshot-updater.count.total", meterRegistry) // prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure .publishOn(globalSnapshotScheduler, 1) - .measureBuffer("snapshot-updater-services-published", meterRegistry) + .measureBuffer("snapshot-updater.count.total", meterRegistry) // todo .checkpoint("snapshot-updater-services-published") - .name("snapshot-updater-services-published").metrics() + .name("snapshot-updater.count.total") + .tag("type", "services") + .tag("status", "published") + .metrics() .createClusterConfigurations() .map { (states, clusters) -> var lastXdsSnapshot: GlobalSnapshot? = null @@ -135,14 +151,19 @@ class SnapshotUpdater( } .filter { it != emptyUpdateResult } .onErrorResume { e -> - meterRegistry.counter("snapshot-updater.services.updates.errors").increment() + meterRegistry.counter( + "snapshot-updater.errors.total", + Tags.of("type", "services") + ).increment() logger.error("Unable to process service changes", e) Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES)) } } private fun snapshotTimer(serviceName: String) = if (properties.metrics.cacheSetSnapshot) { - meterRegistry.timer("snapshot-updater.set-snapshot.$serviceName.time") + meterRegistry.timer( + "simple-cache.duration.seconds", Tags.of("service", serviceName, "operation", "set-snapshot") + ) } else { noopTimer } @@ -154,12 +175,15 @@ class SnapshotUpdater( cache.setSnapshot(group, groupSnapshot) } } catch (e: Throwable) { - meterRegistry.counter("snapshot-updater.services.${group.serviceName}.updates.errors").increment() + meterRegistry.counter( + "snapshot-updater.errors.total", Tags.of("service", group.serviceName) + ).increment() logger.error("Unable to create snapshot for group ${group.serviceName}", e) } } - private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot-updater.update-snapshot-for-groups.time") + private val updateSnapshotForGroupsTimer = + meterRegistry.timer("snapshot-updater.duration.seconds", Tags.of("type", "groups")) private fun updateSnapshotForGroups( groups: Collection, @@ -174,10 +198,13 @@ class SnapshotUpdater( } else if (result.xdsSnapshot != null && group.communicationMode == XDS) { updateSnapshotForGroup(group, result.xdsSnapshot) } else { - meterRegistry.counter("snapshot-updater.communication-mode.errors").increment() - logger.error("Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " + - "Handling Envoy with not supported communication mode should have been rejected before." + - " Please report this to EC developers.") + meterRegistry.counter("snapshot-updater.errors.total", Tags.of("type", "communication-mode")) + .increment() + logger.error( + "Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " + + "Handling Envoy with not supported communication mode should have been rejected before." + + " Please report this to EC developers." + ) } } return results.then(Mono.fromCallable { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt index 3782c7952..34aa618eb 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt @@ -1,6 +1,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.Locality @@ -29,9 +30,10 @@ class RemoteServices( fun getChanges(interval: Long): Flux { val aclFlux: Flux = Flux.create({ sink -> scheduler.scheduleWithFixedDelay({ - meterRegistry.timer("sync-dc.get-multi-cluster-states.time").recordCallable { - getChanges(sink::next, interval) - } + meterRegistry.timer("cross-dc-synchronization.seconds", Tags.of("operation", "get-multi-cluster-state")) + .recordCallable { + getChanges(sink::next, interval) + } }, 0, interval, TimeUnit.SECONDS) }, FluxSink.OverflowStrategy.LATEST) return aclFlux.doOnCancel { @@ -59,7 +61,10 @@ class RemoteServices( .thenApply { servicesStateFromCluster(cluster, it) } .orTimeout(interval, TimeUnit.SECONDS) .exceptionally { - meterRegistry.counter("cross-dc-synchronization.$cluster.state-fetcher.errors").increment() + meterRegistry.counter( + "cross-dc-synchronization.errors", + Tags.of("cluster", cluster, "operation", "get-cluster-state") + ).increment() logger.warn("Error synchronizing instances ${it.message}", it) clusterStateCache[cluster] } @@ -70,7 +75,10 @@ class RemoteServices( val instances = controlPlaneInstanceFetcher.instances(cluster) cluster to instances } catch (e: Exception) { - meterRegistry.counter("cross-dc-synchronization.$cluster.instance-fetcher.errors").increment() + meterRegistry.counter( + "cross-dc-synchronization.errors", + Tags.of("cluster", cluster, "operation", "get-cluster-state") + ).increment() logger.warn("Failed fetching instances from $cluster", e) cluster to emptyList() } @@ -80,7 +88,11 @@ class RemoteServices( cluster: String, state: ServicesState ): ClusterState { - meterRegistry.counter("cross-dc-service-update-$cluster").increment() + meterRegistry.counter( + "cross-dc-synchronization", + Tags.of("operation", "service-update", "cluster", cluster) + ) + .increment() val clusterState = ClusterState( state.removeServicesWithoutInstances(), Locality.REMOTE, diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt index 2a91824e1..27ef8f273 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt @@ -1,5 +1,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.metrics +import io.micrometer.core.instrument.Tag import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test @@ -25,13 +26,26 @@ class ThreadPoolMetricTest { controlPlane.start() // then - val allMeterNames = meterRegistry.meters.map { it.id.name } - val requiredMeterNames = listOf("grpc-server-worker", "grpc-worker-event-loop", "snapshot-update", "group-snapshot").flatMap { - listOf("$it.executor.completed", "$it.executor.active", "$it.executor.queued", "$it.executor.pool.size") + val metricNames = listOf("executor.completed", "executor.active", "executor.queued", "executor.pool.size") + .map { "envoy-control.$it" } + + val metricMap = listOf( + "grpc-server-worker", + "grpc-worker-event-loop", + "snapshot-update", + "group-snapshot" + ).associateWith { metricNames } + + assertThat(metricMap.entries).allSatisfy { + assertThat(it.value.all { metricName -> + meterRegistry.meters.any { meter -> + meter.id.name == metricName && meter.id.tags.contains( + Tag.of("executor", it.key) + ) + } + }).isTrue() } - assertThat(allMeterNames).containsAll(requiredMeterNames) - // and controlPlane.close() } diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt index 0cba50150..37cc0ac7d 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt @@ -16,6 +16,7 @@ import io.envoyproxy.envoy.config.listener.v3.Listener import io.envoyproxy.envoy.config.route.v3.RetryPolicy import io.envoyproxy.envoy.config.route.v3.RouteConfiguration import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test @@ -468,7 +469,8 @@ class SnapshotUpdaterTest { val snapshot = cache.getSnapshot(servicesGroup) assertThat(snapshot).isEqualTo(null) assertThat( - simpleMeterRegistry.find("snapshot-updater.services.example-service.updates.errors") + simpleMeterRegistry.find("snapshot-updater.errors.total") + .tags(Tags.of("service", "example-service")) .counter()?.count() ).isEqualTo(1.0) } diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt index 6845fcb5a..a62835a5a 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.infrastructure import com.ecwid.consul.v1.ConsulClient import com.fasterxml.jackson.databind.ObjectMapper import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.ConfigurationProperties @@ -171,15 +172,17 @@ class ControlPlaneConfig { fun localDatacenter(properties: ConsulProperties) = ConsulClient(properties.host, properties.port).agentSelf.value?.config?.datacenter ?: "local" - fun controlPlaneMetrics(meterRegistry: MeterRegistry) = - DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also { - meterRegistry.gauge("services.added", it.servicesAdded) - meterRegistry.gauge("services.removed", it.servicesRemoved) - meterRegistry.gauge("services.instanceChanged", it.instanceChanges) - meterRegistry.gauge("services.snapshotChanged", it.snapshotChanges) - meterRegistry.gauge("cache.groupsCount", it.cacheGroupsCount) + fun controlPlaneMetrics(meterRegistry: MeterRegistry): DefaultEnvoyControlMetrics { + val metricName = "services" + return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also { + meterRegistry.gauge(metricName, Tags.of("status", "added"), it.servicesAdded) + meterRegistry.gauge(metricName, Tags.of("status", "removed"), it.servicesRemoved) + meterRegistry.gauge(metricName, Tags.of("status", "instanceChanged"), it.instanceChanges) + meterRegistry.gauge(metricName, Tags.of("status", "snapshotChanged"), it.snapshotChanges) + meterRegistry.gauge("cache.groups.total", it.cacheGroupsCount) it.meterRegistry.more().counter("services.watch.errors", listOf(), it.errorWatchingServices) } + } @Bean fun protobufJsonFormatHttpMessageConverter(): ProtobufJsonFormatHttpMessageConverter { diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RestTemplateControlPlaneClient.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RestTemplateControlPlaneClient.kt index 557795f7b..881127cbf 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RestTemplateControlPlaneClient.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RestTemplateControlPlaneClient.kt @@ -1,6 +1,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.springframework.web.client.RestTemplate import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState import java.net.URI @@ -32,14 +33,17 @@ class RestTemplateControlPlaneClient( } private fun timed(function: () -> T): T { - return meterRegistry.timer("sync-dc.get-state.time").record(function) + return meterRegistry.timer("cross-dc-synchronization.seconds", Tags.of("operation", "get-state")) + .record(function) } private fun success() { - meterRegistry.counter("sync-dc.get-state.success").increment() + meterRegistry.counter("cross-dc-synchronization", Tags.of("operation", "get-state", "status", "success")) + .increment() } private fun failure() { - meterRegistry.counter("sync-dc.get-state.failure").increment() + meterRegistry.counter("cross-dc-synchronization", Tags.of("operation", "get-state", "status", "failure")) + .increment() } } diff --git a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt index 118ef417e..86b7b36da 100644 --- a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt +++ b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt @@ -226,10 +226,10 @@ class ConsulServiceChanges( if (ready) { val stopTimer = System.currentTimeMillis() readinessStateHandler.ready() - metrics.meterRegistry.timer("envoy-control.warmup.time") + metrics.meterRegistry.timer("envoy-control.warmup.seconds") .record( stopTimer - startTimer, - TimeUnit.MILLISECONDS + TimeUnit.SECONDS ) } } diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt index 912b5c89b..f743cb81e 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt @@ -1,9 +1,10 @@ package pl.allegro.tech.servicemesh.envoycontrol -import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension +import org.junit.platform.commons.util.Preconditions.condition import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted import pl.allegro.tech.servicemesh.envoycontrol.config.Ads import pl.allegro.tech.servicemesh.envoycontrol.config.DeltaAds @@ -21,6 +22,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscover import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.SDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.UNKNOWN import java.util.function.Consumer +import java.util.function.Predicate class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTest { companion object { @@ -61,20 +63,23 @@ class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTe ) override fun expectedGrpcRequestsCounterValues() = mapOf( - CDS.name.toLowerCase() to isGreaterThanZero(), - EDS.name.toLowerCase() to isGreaterThanZero(), - LDS.name.toLowerCase() to isGreaterThanZero(), - RDS.name.toLowerCase() to isGreaterThanZero(), - SDS.name.toLowerCase() to isNull(), - ADS.name.toLowerCase() to isNull(), - UNKNOWN.name.toLowerCase() to isNull(), - "${CDS.name.toLowerCase()}.delta" to isNull(), - "${EDS.name.toLowerCase()}.delta" to isNull(), - "${LDS.name.toLowerCase()}.delta" to isNull(), - "${RDS.name.toLowerCase()}.delta" to isNull(), - "${SDS.name.toLowerCase()}.delta" to isNull(), - "${ADS.name.toLowerCase()}.delta" to isNull(), - "${UNKNOWN.name.toLowerCase()}.delta" to isNull() + CDS.name.lowercase() to isGreaterThanZero(), + EDS.name.lowercase() to isGreaterThanZero(), + LDS.name.lowercase() to isGreaterThanZero(), + RDS.name.lowercase() to isGreaterThanZero(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), + ) + + override fun expectedGrpcRequestsDeltaCounterValues() = mapOf( + CDS.name.lowercase() to isNull(), + EDS.name.lowercase() to isNull(), + LDS.name.lowercase() to isNull(), + RDS.name.lowercase() to isNull(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), ) } @@ -117,20 +122,23 @@ class AdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTes ) override fun expectedGrpcRequestsCounterValues() = mapOf( - CDS.name.toLowerCase() to isGreaterThanZero(), - EDS.name.toLowerCase() to isGreaterThanZero(), - LDS.name.toLowerCase() to isGreaterThanZero(), - RDS.name.toLowerCase() to isGreaterThanZero(), - SDS.name.toLowerCase() to isNull(), - ADS.name.toLowerCase() to isNull(), - UNKNOWN.name.toLowerCase() to isNull(), - "${CDS.name.toLowerCase()}.delta" to isNull(), - "${EDS.name.toLowerCase()}.delta" to isNull(), - "${LDS.name.toLowerCase()}.delta" to isNull(), - "${RDS.name.toLowerCase()}.delta" to isNull(), - "${SDS.name.toLowerCase()}.delta" to isNull(), - "${ADS.name.toLowerCase()}.delta" to isNull(), - "${UNKNOWN.name.toLowerCase()}.delta" to isNull() + CDS.name.lowercase() to isGreaterThanZero(), + EDS.name.lowercase() to isGreaterThanZero(), + LDS.name.lowercase() to isGreaterThanZero(), + RDS.name.lowercase() to isGreaterThanZero(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), + ) + + override fun expectedGrpcRequestsDeltaCounterValues() = mapOf( + CDS.name.lowercase() to isNull(), + EDS.name.lowercase() to isNull(), + LDS.name.lowercase() to isNull(), + RDS.name.lowercase() to isNull(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), ) } @@ -173,24 +181,30 @@ class DeltaAdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbac ) override fun expectedGrpcRequestsCounterValues() = mapOf( - CDS.name.toLowerCase() to isNull(), - EDS.name.toLowerCase() to isNull(), - LDS.name.toLowerCase() to isNull(), - RDS.name.toLowerCase() to isNull(), - SDS.name.toLowerCase() to isNull(), - ADS.name.toLowerCase() to isNull(), - UNKNOWN.name.toLowerCase() to isNull(), - "${CDS.name.toLowerCase()}.delta" to isGreaterThanZero(), - "${EDS.name.toLowerCase()}.delta" to isGreaterThanZero(), - "${LDS.name.toLowerCase()}.delta" to isGreaterThanZero(), - "${RDS.name.toLowerCase()}.delta" to isGreaterThanZero(), - "${SDS.name.toLowerCase()}.delta" to isNull(), - "${ADS.name.toLowerCase()}.delta" to isNull(), - "${UNKNOWN.name.toLowerCase()}.delta" to isNull() + CDS.name.lowercase() to isNull(), + EDS.name.lowercase() to isNull(), + LDS.name.lowercase() to isNull(), + RDS.name.lowercase() to isNull(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), + ) + + override fun expectedGrpcRequestsDeltaCounterValues() = mapOf( + CDS.name.lowercase() to isGreaterThanZero(), + EDS.name.lowercase() to isGreaterThanZero(), + LDS.name.lowercase() to isGreaterThanZero(), + RDS.name.lowercase() to isGreaterThanZero(), + SDS.name.lowercase() to isNull(), + ADS.name.lowercase() to isNull(), + UNKNOWN.name.lowercase() to isNull(), ) } interface MetricsDiscoveryServerCallbacksTest { + companion object { + private val logger by logger() + } fun consul(): ConsulExtension @@ -204,7 +218,7 @@ interface MetricsDiscoveryServerCallbacksTest { fun expectedGrpcRequestsCounterValues(): Map Boolean> - fun MeterRegistry.counterValue(name: String) = this.find(name).counter()?.count()?.toInt() + fun expectedGrpcRequestsDeltaCounterValues(): Map Boolean> fun isGreaterThanZero() = { x: Int? -> x!! > 0 } @@ -219,9 +233,15 @@ interface MetricsDiscoveryServerCallbacksTest { // expect untilAsserted { expectedGrpcConnectionsGaugeValues().forEach { (type, value) -> - val metric = "grpc.connections.${type.name.toLowerCase()}" - assertThat(meterRegistry.find(metric).gauge()).isNotNull - assertThat(meterRegistry.get(metric).gauge().value().toInt()).isEqualTo(value) + val metric = "grpc.connections" + assertThat( + meterRegistry.find(metric) + .tags(Tags.of("connection-type", type.name.lowercase())).gauge() + ).isNotNull + assertThat( + meterRegistry.get(metric) + .tags(Tags.of("connection-type", type.name.lowercase())).gauge().value().toInt() + ).isEqualTo(value) } } } @@ -229,16 +249,22 @@ interface MetricsDiscoveryServerCallbacksTest { @Test fun `should measure gRPC requests`() { // given - val meterRegistry = envoyControl().app.meterRegistry() consul().server.operations.registerService(service(), name = "echo") // expect untilAsserted { - expectedGrpcRequestsCounterValues().forEach { (type, condition) -> - val counterValue = meterRegistry.counterValue("grpc.requests.$type") - println("$type $counterValue") - assertThat(counterValue).satisfies(Consumer { condition(it) }) + expectedGrpcRequestsCounterValues().forEach { + assertCondition(it.key, it.value, "total") } } } + + private fun assertCondition(type: String, condition: Predicate, metricType: String) { + val counterValue = + envoyControl().app.meterRegistry().find("grpc.requests.count") + .tags(Tags.of("type", type, "metric-type", metricType)) + .counter()?.count()?.toInt() + logger.info("$type $counterValue") + assertThat(counterValue).satisfies(Consumer { condition.test(it) }) + } }