Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allegro-internal/flex-roadmap#819 Migrated metrics to prometheus #433

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.envoyproxy.controlplane.server.callback.SnapshotCollectingCallback
import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
Expand Down Expand Up @@ -221,10 +222,12 @@ class ControlPlane private constructor(
nioEventLoopExecutor
)
)
.bossEventLoopGroup(NioEventLoopGroup(
properties.server.nioBossEventLoopThreadCount,
nioBossEventLoopExecutor
))
.bossEventLoopGroup(
NioEventLoopGroup(
properties.server.nioBossEventLoopThreadCount,
nioBossEventLoopExecutor
)
)
.channelType(NioServerSocketChannel::class.java)
.executor(grpcServerExecutor)
.keepAliveTime(properties.server.netty.keepAliveTime.toMillis(), TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -410,7 +413,12 @@ class ControlPlane private constructor(
}

private fun meterExecutor(executor: ExecutorService, executorServiceName: String) {
ExecutorServiceMetrics(executor, executorServiceName, executorServiceName, emptySet())
ExecutorServiceMetrics(
executor,
executorServiceName,
"envoy-control",
Tags.of("executor", executorServiceName)
)
.bindTo(meterRegistry)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package pl.allegro.tech.servicemesh.envoycontrol.server.callbacks

import com.google.common.net.InetAddresses.increment
import io.envoyproxy.controlplane.cache.Resources
import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import java.util.concurrent.atomic.AtomicInteger

class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) : DiscoveryServerCallbacks {
Expand Down Expand Up @@ -34,9 +36,9 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)
.map { type -> type to AtomicInteger(0) }
.toMap()

meterRegistry.gauge("grpc.all-connections", connections)
meterRegistry.gauge("grpc.connections", Tags.of("connection-type", "all"), connections)
connectionsByType.forEach { (type, typeConnections) ->
meterRegistry.gauge("grpc.connections.${type.name.toLowerCase()}", typeConnections)
meterRegistry.gauge("grpc.connections", Tags.of("connection-type", type.name.lowercase()), typeConnections)
}
}

Expand All @@ -51,15 +53,21 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)
}

override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) {
meterRegistry.counter("grpc.requests.${StreamType.fromTypeUrl(request.typeUrl).name.toLowerCase()}")
meterRegistry.counter(
"grpc.requests.count",
Tags.of("type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "metric-type", "total")
)
.increment()
}

override fun onV3StreamDeltaRequest(
streamId: Long,
request: V3DeltaDiscoveryRequest
) {
meterRegistry.counter("grpc.requests.${StreamType.fromTypeUrl(request.typeUrl).name.toLowerCase()}.delta")
meterRegistry.counter(
"grpc.requests.count",
Tags.of("type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "metric-type", "delta")
)
.increment()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.envoyproxy.envoy.config.listener.v3.Listener
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.Timer
import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesGroup
import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode
Expand Down Expand Up @@ -67,7 +68,12 @@ class EnvoySnapshotFactory(
endpoints = endpoints,
properties = properties.outgoingPermissions
)
sample.stop(meterRegistry.timer("snapshot-factory.new-snapshot.time"))
sample.stop(
meterRegistry.timer(
"snapshot-factory.seconds",
Tags.of("operation", "new-snapshot", "type", "global")
)
)

return snapshot
}
Expand Down Expand Up @@ -155,7 +161,12 @@ class EnvoySnapshotFactory(
val groupSample = Timer.start(meterRegistry)

val newSnapshotForGroup = newSnapshotForGroup(group, globalSnapshot)
groupSample.stop(meterRegistry.timer("snapshot-factory.get-snapshot-for-group.time"))
groupSample.stop(
meterRegistry.timer(
"snapshot-factory.seconds",
Tags.of("operation", "new-snapshot", "type", "group")
)
)
return newSnapshotForGroup
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot
import io.envoyproxy.controlplane.cache.SnapshotCache
import io.envoyproxy.controlplane.cache.v3.Snapshot
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.Timer
import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.ADS
import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS
Expand Down Expand Up @@ -50,9 +51,12 @@ class SnapshotUpdater(
// step 2: only watches groups. if groups change we use the last services state and update those groups
groups().subscribeOn(globalSnapshotScheduler)
)
.measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2)
.measureBuffer("snapshot.updater.count.total", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name("snapshot-updater-merged").metrics()
.name("snapshot.updater.count.total")
.tag("status", "merged")
.tag("type", "global")
.metrics()
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
.scan { previous: UpdateResult, newUpdate: UpdateResult ->
Expand Down Expand Up @@ -87,28 +91,40 @@ class SnapshotUpdater(
// see GroupChangeWatcher
return onGroupAdded
.publishOn(globalSnapshotScheduler)
.measureBuffer("snapshot-updater-groups-published", meterRegistry)
.measureBuffer("snapshot-updater.count.total", meterRegistry)
.checkpoint("snapshot-updater-groups-published")
.name("snapshot-updater-groups-published").metrics()
.name("snapshot-updater.count.total")
.tag("type", "groups")
.tag("status", "published").metrics()
.map { groups ->
UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups)
}
.onErrorResume { e ->
meterRegistry.counter("snapshot-updater.groups.updates.errors").increment()
meterRegistry.counter(
"snapshot-updater.errors.total",
Tags.of("type", "groups")
)
.increment()
logger.error("Unable to process new group", e)
Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES))
}
}

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name("snapshot-updater-services-sampled").metrics()
.onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry)
.name("snapshot-updater.count.total")
.tag("type", "services")
.tag("status", "sampled")
.metrics()
.onBackpressureLatestMeasured("snapshot-updater.count.total", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot-updater-services-published", meterRegistry)
.measureBuffer("snapshot-updater.count.total", meterRegistry) // todo
.checkpoint("snapshot-updater-services-published")
.name("snapshot-updater-services-published").metrics()
.name("snapshot-updater.count.total")
.tag("type", "services")
.tag("status", "published")
.metrics()
.createClusterConfigurations()
.map { (states, clusters) ->
var lastXdsSnapshot: GlobalSnapshot? = null
Expand All @@ -135,14 +151,19 @@ class SnapshotUpdater(
}
.filter { it != emptyUpdateResult }
.onErrorResume { e ->
meterRegistry.counter("snapshot-updater.services.updates.errors").increment()
meterRegistry.counter(
"snapshot-updater.errors.total",
Tags.of("type", "services")
).increment()
logger.error("Unable to process service changes", e)
Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES))
}
}

private fun snapshotTimer(serviceName: String) = if (properties.metrics.cacheSetSnapshot) {
meterRegistry.timer("snapshot-updater.set-snapshot.$serviceName.time")
meterRegistry.timer(
"simple-cache.duration.seconds", Tags.of("service", serviceName, "operation", "set-snapshot")
)
} else {
noopTimer
}
Expand All @@ -154,12 +175,15 @@ class SnapshotUpdater(
cache.setSnapshot(group, groupSnapshot)
}
} catch (e: Throwable) {
meterRegistry.counter("snapshot-updater.services.${group.serviceName}.updates.errors").increment()
meterRegistry.counter(
"snapshot-updater.errors.total", Tags.of("service", group.serviceName)
).increment()
logger.error("Unable to create snapshot for group ${group.serviceName}", e)
}
}

private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot-updater.update-snapshot-for-groups.time")
private val updateSnapshotForGroupsTimer =
meterRegistry.timer("snapshot-updater.duration.seconds", Tags.of("type", "groups"))

private fun updateSnapshotForGroups(
groups: Collection<Group>,
Expand All @@ -174,10 +198,13 @@ class SnapshotUpdater(
} else if (result.xdsSnapshot != null && group.communicationMode == XDS) {
updateSnapshotForGroup(group, result.xdsSnapshot)
} else {
meterRegistry.counter("snapshot-updater.communication-mode.errors").increment()
logger.error("Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " +
"Handling Envoy with not supported communication mode should have been rejected before." +
" Please report this to EC developers.")
meterRegistry.counter("snapshot-updater.errors.total", Tags.of("type", "communication-mode"))
.increment()
logger.error(
"Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " +
"Handling Envoy with not supported communication mode should have been rejected before." +
" Please report this to EC developers."
)
}
}
return results.then(Mono.fromCallable {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.servicemesh.envoycontrol.synchronization

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.Locality
Expand Down Expand Up @@ -29,9 +30,10 @@ class RemoteServices(
fun getChanges(interval: Long): Flux<MultiClusterState> {
val aclFlux: Flux<MultiClusterState> = Flux.create({ sink ->
scheduler.scheduleWithFixedDelay({
meterRegistry.timer("sync-dc.get-multi-cluster-states.time").recordCallable {
getChanges(sink::next, interval)
}
meterRegistry.timer("cross-dc-synchronization.seconds", Tags.of("operation", "get-multi-cluster-state"))
.recordCallable {
getChanges(sink::next, interval)
}
}, 0, interval, TimeUnit.SECONDS)
}, FluxSink.OverflowStrategy.LATEST)
return aclFlux.doOnCancel {
Expand Down Expand Up @@ -59,7 +61,10 @@ class RemoteServices(
.thenApply { servicesStateFromCluster(cluster, it) }
.orTimeout(interval, TimeUnit.SECONDS)
.exceptionally {
meterRegistry.counter("cross-dc-synchronization.$cluster.state-fetcher.errors").increment()
meterRegistry.counter(
"cross-dc-synchronization.errors",
Tags.of("cluster", cluster, "operation", "get-cluster-state")
).increment()
logger.warn("Error synchronizing instances ${it.message}", it)
clusterStateCache[cluster]
}
Expand All @@ -70,7 +75,10 @@ class RemoteServices(
val instances = controlPlaneInstanceFetcher.instances(cluster)
cluster to instances
} catch (e: Exception) {
meterRegistry.counter("cross-dc-synchronization.$cluster.instance-fetcher.errors").increment()
meterRegistry.counter(
"cross-dc-synchronization.errors",
Tags.of("cluster", cluster, "operation", "get-cluster-state")
).increment()
logger.warn("Failed fetching instances from $cluster", e)
cluster to emptyList()
}
Expand All @@ -80,7 +88,11 @@ class RemoteServices(
cluster: String,
state: ServicesState
): ClusterState {
meterRegistry.counter("cross-dc-service-update-$cluster").increment()
meterRegistry.counter(
"cross-dc-synchronization",
Tags.of("operation", "service-update", "cluster", cluster)
)
.increment()
val clusterState = ClusterState(
state.removeServicesWithoutInstances(),
Locality.REMOTE,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.allegro.tech.servicemesh.envoycontrol.metrics

import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
Expand All @@ -25,13 +26,26 @@ class ThreadPoolMetricTest {
controlPlane.start()

// then
val allMeterNames = meterRegistry.meters.map { it.id.name }
val requiredMeterNames = listOf("grpc-server-worker", "grpc-worker-event-loop", "snapshot-update", "group-snapshot").flatMap {
listOf("$it.executor.completed", "$it.executor.active", "$it.executor.queued", "$it.executor.pool.size")
val metricNames = listOf("executor.completed", "executor.active", "executor.queued", "executor.pool.size")
.map { "envoy-control.$it" }

val metricMap = listOf(
"grpc-server-worker",
"grpc-worker-event-loop",
"snapshot-update",
"group-snapshot"
).associateWith { metricNames }

assertThat(metricMap.entries).allSatisfy {
assertThat(it.value.all { metricName ->
meterRegistry.meters.any { meter ->
meter.id.name == metricName && meter.id.tags.contains(
Tag.of("executor", it.key)
)
}
}).isTrue()
}

assertThat(allMeterNames).containsAll(requiredMeterNames)

// and
controlPlane.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import io.envoyproxy.envoy.config.listener.v3.Listener
import io.envoyproxy.envoy.config.route.v3.RetryPolicy
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -468,7 +469,8 @@ class SnapshotUpdaterTest {
val snapshot = cache.getSnapshot(servicesGroup)
assertThat(snapshot).isEqualTo(null)
assertThat(
simpleMeterRegistry.find("snapshot-updater.services.example-service.updates.errors")
simpleMeterRegistry.find("snapshot-updater.errors.total")
.tags(Tags.of("service", "example-service"))
.counter()?.count()
).isEqualTo(1.0)
}
Expand Down
Loading
Loading