diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionMetrics.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionMetrics.java index a4a5aafe08..44a6e4cb05 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionMetrics.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionMetrics.java @@ -12,6 +12,7 @@ public class SubscriptionMetrics { private MetricDecimalValue codes2xx; private MetricDecimalValue codes4xx; private MetricDecimalValue codes5xx; + private MetricDecimalValue retries; private MetricLongValue lag; private Subscription.State state; private MetricDecimalValue rate; @@ -30,6 +31,7 @@ public SubscriptionMetrics(@JsonProperty("delivered") long delivered, @JsonProperty("codes2xx") MetricDecimalValue codes2xx, @JsonProperty("codes4xx") MetricDecimalValue codes4xx, @JsonProperty("codes5xx") MetricDecimalValue codes5xx, + @JsonProperty("retries") MetricDecimalValue retries, @JsonProperty("Subscription") Subscription.State state, @JsonProperty("rate") MetricDecimalValue rate, @JsonProperty("throughput") MetricDecimalValue throughput, @@ -42,6 +44,7 @@ public SubscriptionMetrics(@JsonProperty("delivered") long delivered, this.codes2xx = codes2xx; this.codes4xx = codes4xx; this.codes5xx = codes5xx; + this.retries = retries; this.state = state; this.rate = rate; this.throughput = throughput; @@ -84,6 +87,10 @@ public MetricDecimalValue getCodes5xx() { return codes5xx; } + public MetricDecimalValue getRetries() { + return retries; + } + public Subscription.State getState() { return state; } @@ -147,6 +154,11 @@ public Builder withCodes5xx(MetricDecimalValue count) { return this; } + public Builder withRetries(MetricDecimalValue retries) { + subscriptionMetrics.retries = retries; + return this; + } + public Builder withRate(MetricDecimalValue rate) { subscriptionMetrics.rate = rate; return this; diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Counters.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Counters.java index 2b20ee366a..486c053e38 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Counters.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Counters.java @@ -9,6 +9,7 @@ public class Counters { public static final String PUBLISHED = "published." + GROUP + "." + TOPIC; public static final String DELIVERED = "delivered." + GROUP + "." + TOPIC + "." + SUBSCRIPTION; public static final String DISCARDED = "discarded." + GROUP + "." + TOPIC + "." + SUBSCRIPTION; + public static final String RETRIES = "retries." + GROUP + "." + TOPIC + "." + SUBSCRIPTION; public static final String MAXRATE_RATE_HISTORY_FAILURES = "consumers-rate.max-rate.node." + GROUP + "." + TOPIC + "." + SUBSCRIPTION + ".history.failures"; public static final String MAXRATE_FETCH_FAILURES = diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Meters.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Meters.java index c30d031bba..ab18d669c1 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Meters.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Meters.java @@ -30,7 +30,9 @@ public class Meters { public static final String DISCARDED_METER = "discarded-meter"; public static final String DISCARDED_TOPIC_METER = DISCARDED_METER + "." + GROUP + "." + TOPIC; public static final String DISCARDED_SUBSCRIPTION_METER = DISCARDED_TOPIC_METER + "." + SUBSCRIPTION; - + public static final String RETRIES_METER = "retries-meter"; + public static final String RETRIES_TOPIC_METER = RETRIES_METER + "." + GROUP + "." + TOPIC; + public static final String RETRIES_SUBSCRIPTION_METER = RETRIES_TOPIC_METER + "." + SUBSCRIPTION; public static final String DELAYED_PROCESSING = "delayed-processing"; public static final String TOPIC_DELAYED_PROCESSING = DELAYED_PROCESSING + "." + GROUP + "." + TOPIC; diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java index e3adfe3abc..016cf8436b 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java @@ -11,17 +11,20 @@ import static pl.allegro.tech.hermes.common.metric.Counters.DISCARDED; import static pl.allegro.tech.hermes.common.metric.Counters.MAXRATE_FETCH_FAILURES; import static pl.allegro.tech.hermes.common.metric.Counters.MAXRATE_RATE_HISTORY_FAILURES; +import static pl.allegro.tech.hermes.common.metric.Counters.RETRIES; import static pl.allegro.tech.hermes.common.metric.Gauges.MAX_RATE_ACTUAL_RATE_VALUE; import static pl.allegro.tech.hermes.common.metric.Gauges.MAX_RATE_VALUE; import static pl.allegro.tech.hermes.common.metric.Gauges.OUTPUT_RATE; import static pl.allegro.tech.hermes.common.metric.Meters.DISCARDED_SUBSCRIPTION_METER; import static pl.allegro.tech.hermes.common.metric.Meters.FAILED_METER_SUBSCRIPTION; import static pl.allegro.tech.hermes.common.metric.Meters.FILTERED_METER; +import static pl.allegro.tech.hermes.common.metric.Meters.RETRIES_SUBSCRIPTION_METER; import static pl.allegro.tech.hermes.common.metric.Meters.SUBSCRIPTION_BATCH_METER; import static pl.allegro.tech.hermes.common.metric.Meters.SUBSCRIPTION_METER; import static pl.allegro.tech.hermes.common.metric.Meters.SUBSCRIPTION_THROUGHPUT_BYTES; import static pl.allegro.tech.hermes.common.metric.SubscriptionTagsFactory.subscriptionTags; import static pl.allegro.tech.hermes.common.metric.Timers.CONSUMER_IDLE_TIME; +import static pl.allegro.tech.hermes.common.metric.Timers.RATE_LIMITER_ACQUIRE; import static pl.allegro.tech.hermes.common.metric.Timers.SUBSCRIPTION_LATENCY; public class MetricsFacade { @@ -132,11 +135,13 @@ public void unregisterAllMetricsRelatedTo(SubscriptionName subscription) { meterRegistry.remove(meter); } hermesMetrics.unregister(DISCARDED_SUBSCRIPTION_METER, subscription); + hermesMetrics.unregister(RETRIES_SUBSCRIPTION_METER, subscription); hermesMetrics.unregister(FAILED_METER_SUBSCRIPTION, subscription); hermesMetrics.unregister(SUBSCRIPTION_BATCH_METER, subscription); hermesMetrics.unregister(SUBSCRIPTION_METER, subscription); hermesMetrics.unregister(DELIVERED, subscription); hermesMetrics.unregister(DISCARDED, subscription); + hermesMetrics.unregister(RETRIES, subscription); hermesMetrics.unregisterInflightGauge(subscription); hermesMetrics.unregisterInflightTimeHistogram(subscription); hermesMetrics.unregisterConsumerErrorsTimeoutMeter(subscription); @@ -150,6 +155,7 @@ public void unregisterAllMetricsRelatedTo(SubscriptionName subscription) { hermesMetrics.unregister(CONSUMER_IDLE_TIME, subscription); hermesMetrics.unregister(FILTERED_METER, subscription); hermesMetrics.unregister(SUBSCRIPTION_LATENCY, subscription); + hermesMetrics.unregister(RATE_LIMITER_ACQUIRE, subscription); hermesMetrics.unregister(SUBSCRIPTION_THROUGHPUT_BYTES, subscription); } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/SubscriptionMetrics.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/SubscriptionMetrics.java index ea41df5b58..4f4916bc05 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/SubscriptionMetrics.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/SubscriptionMetrics.java @@ -57,6 +57,16 @@ public HermesCounter discarded(SubscriptionName subscription) { }; } + public HermesCounter retries(SubscriptionName subscription) { + return size -> { + hermesMetrics.meter(Meters.RETRIES_METER).mark(size); + hermesMetrics.meter(Meters.RETRIES_TOPIC_METER, subscription.getTopicName()).mark(size); + hermesMetrics.meter(Meters.RETRIES_SUBSCRIPTION_METER, subscription.getTopicName(), subscription.getName()).mark(size); + hermesMetrics.counter(Counters.RETRIES, subscription.getTopicName(), subscription.getName()).inc(size); + micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_RETRIES, subscription).increment(size); + }; + } + public HermesTimer latency(SubscriptionName subscription) { return HermesTimer.from( meterRegistry.timer(SubscriptionMetricsNames.SUBSCRIPTION_LATENCY, subscriptionTags(subscription)), @@ -64,6 +74,13 @@ public HermesTimer latency(SubscriptionName subscription) { ); } + public HermesTimer rateLimiterAcquire(SubscriptionName subscription) { + return HermesTimer.from( + meterRegistry.timer(SubscriptionMetricsNames.SUBSCRIPTION_RATE_LIMITER_ACQUIRE, subscriptionTags(subscription)), + hermesMetrics.timer(Timers.RATE_LIMITER_ACQUIRE, subscription.getTopicName(), subscription.getName()) + ); + } + public void registerInflightGauge(SubscriptionName subscription, T obj, ToDoubleFunction f) { hermesMetrics.registerInflightGauge(subscription, () -> (int) f.applyAsDouble(obj)); meterRegistry.gauge(SubscriptionMetricsNames.SUBSCRIPTION_INFLIGHT, subscriptionTags(subscription), obj, f); @@ -133,7 +150,9 @@ public static class SubscriptionMetricsNames { public static final String SUBSCRIPTION_THROUGHPUT = "subscription.throughput-bytes"; public static final String SUBSCRIPTION_BATCHES = "subscription.batches"; public static final String SUBSCRIPTION_DISCARDED = "subscription.discarded"; + public static final String SUBSCRIPTION_RETRIES = "subscription.retries"; public static final String SUBSCRIPTION_LATENCY = "subscription.latency"; + public static final String SUBSCRIPTION_RATE_LIMITER_ACQUIRE = "subscription.rate-limiter-acquire"; public static final String SUBSCRIPTION_INFLIGHT = "subscription.inflight"; public static final String SUBSCRIPTION_IDLE_DURATION = "subscription.idle-duration"; public static final String SUBSCRIPTION_FILTERED_OUT = "subscription.filtered-out"; diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Timers.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Timers.java index 17d3572570..f130b21410 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Timers.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/Timers.java @@ -20,6 +20,7 @@ public class Timers { public static final String LATENCY = "latency"; public static final String SUBSCRIPTION_LATENCY = LATENCY + "." + GROUP + "." + TOPIC + "." + SUBSCRIPTION; + public static final String RATE_LIMITER_ACQUIRE = "rate-limiter-acquire" + "." + GROUP + "." + TOPIC + "." + SUBSCRIPTION; public static final String SCHEMA = "schema." + SCHEMA_REPO_TYPE; public static final String GET_SCHEMA_LATENCY = SCHEMA + ".get-schema"; diff --git a/hermes-console/json-server/db.json b/hermes-console/json-server/db.json index 313fd39fcd..607194a4a7 100644 --- a/hermes-console/json-server/db.json +++ b/hermes-console/json-server/db.json @@ -501,12 +501,13 @@ "delivered": 39099, "discarded": 2137086, "volume": 1288032256, - "timeouts": "0.0", - "otherErrors": "0.0", - "codes2xx": "0", - "codes4xx": "0.0", - "codes5xx": "0.01", - "rate": "0", + "timeouts": "12.3028857479387", + "otherErrors": "16.3028857479387", + "codes2xx": "1236.3028857479387", + "codes4xx": "123.3028857479387", + "codes5xx": "6.3028857479387", + "retries": "24.3028857479387", + "rate": "1319.6064543974392", "throughput": "8.31", "batchRate": "0.0", "lag": "9055513" @@ -521,6 +522,7 @@ "codes2xx": "0", "codes4xx": "0.0", "codes5xx": "0.01", + "retries": "0.01", "rate": "0", "throughput": "8.36", "batchRate": "0.0", diff --git a/hermes-console/src/api/subscription-metrics.ts b/hermes-console/src/api/subscription-metrics.ts index 13fec3351d..6a0b5fe737 100644 --- a/hermes-console/src/api/subscription-metrics.ts +++ b/hermes-console/src/api/subscription-metrics.ts @@ -7,6 +7,7 @@ export interface SubscriptionMetrics { codes2xx: string; codes4xx: string; codes5xx: string; + retries: string; lag: string; rate: string; throughput: string; diff --git a/hermes-console/src/dummy/subscription.ts b/hermes-console/src/dummy/subscription.ts index 49e26ab53b..8b31670201 100644 --- a/hermes-console/src/dummy/subscription.ts +++ b/hermes-console/src/dummy/subscription.ts @@ -83,10 +83,11 @@ export const dummySubscriptionMetrics: SubscriptionMetrics = { discarded: 2137086, volume: 1288032256, timeouts: '0.0', - otherErrors: '0.0', - codes2xx: '0', - codes4xx: '0.0', - codes5xx: '0.01', + otherErrors: '1.4', + codes2xx: '123', + codes4xx: '2.0', + codes5xx: '1.32', + retries: '2.03', rate: '0', throughput: '8.31', batchRate: '0.0', diff --git a/hermes-console/src/i18n/en-US/index.ts b/hermes-console/src/i18n/en-US/index.ts index 769c67b112..9386580a1b 100644 --- a/hermes-console/src/i18n/en-US/index.ts +++ b/hermes-console/src/i18n/en-US/index.ts @@ -472,8 +472,13 @@ const en_US = { subscriberLatency: 'Subscriber latency', delivered: 'Delivered', discarded: 'Discarded', + timeouts: 'Timeouts', + otherErrors: 'Other errors', + codes2xx: 'Codes 2xx', + codes4xx: 'Codes 4xx', + codes5xx: 'Codes 5xx', + retries: 'Retries', lag: 'Lag', - outputRate: 'Output rate', tooltips: { subscriberLatency: 'Latency of acknowledging messages by subscribing service as ' + @@ -481,10 +486,8 @@ const en_US = { lag: 'Total number of events waiting to be delivered. Each subscription ' + 'has a "natural" lag, which depends on production rate.', - outputRate: - 'Maximum sending rate calculated based on receiving service ' + - 'performance. For well-performing service output rate should be ' + - 'equal to rate limit.', + retries: + 'Total number of message sending retries. Retrying messages significantly reduces the rate on subscriptions.', }, }, propertiesCard: { diff --git a/hermes-console/src/views/subscription/metrics-card/MetricsCard.spec.ts b/hermes-console/src/views/subscription/metrics-card/MetricsCard.spec.ts index 5572f11637..8b78c7e09f 100644 --- a/hermes-console/src/views/subscription/metrics-card/MetricsCard.spec.ts +++ b/hermes-console/src/views/subscription/metrics-card/MetricsCard.spec.ts @@ -99,4 +99,79 @@ describe('MetricsCard', () => { )!; expect(within(deliveryRateRow).getByText('9,055,513')).toBeVisible(); }); + + it('should render subscription otherErrors', () => { + // when + vi.mocked(useMetrics).mockReturnValueOnce(useMetricsStub); + const { getByText } = render(MetricsCard, { + props, + testPinia: createTestingPiniaWithState(), + }); + + // then + const deliveryRateRow = getByText( + 'subscription.metricsCard.otherErrors', + ).closest('tr')!; + expect(within(deliveryRateRow).getByText('1.40')).toBeVisible(); + }); + + it('should render subscription codes2xx', () => { + // when + vi.mocked(useMetrics).mockReturnValueOnce(useMetricsStub); + const { getByText } = render(MetricsCard, { + props, + testPinia: createTestingPiniaWithState(), + }); + + // then + const deliveryRateRow = getByText( + 'subscription.metricsCard.codes2xx', + ).closest('tr')!; + expect(within(deliveryRateRow).getByText('123.00')).toBeVisible(); + }); + + it('should render subscription codes4xx', () => { + // when + vi.mocked(useMetrics).mockReturnValueOnce(useMetricsStub); + const { getByText } = render(MetricsCard, { + props, + testPinia: createTestingPiniaWithState(), + }); + + // then + const deliveryRateRow = getByText( + 'subscription.metricsCard.codes4xx', + ).closest('tr')!; + expect(within(deliveryRateRow).getByText('2.00')).toBeVisible(); + }); + + it('should render subscription codes5xx', () => { + // when + vi.mocked(useMetrics).mockReturnValueOnce(useMetricsStub); + const { getByText } = render(MetricsCard, { + props, + testPinia: createTestingPiniaWithState(), + }); + + // then + const deliveryRateRow = getByText( + 'subscription.metricsCard.codes5xx', + ).closest('tr')!; + expect(within(deliveryRateRow).getByText('1.32')).toBeVisible(); + }); + + it('should render subscription retries', () => { + // when + vi.mocked(useMetrics).mockReturnValueOnce(useMetricsStub); + const { getByText } = render(MetricsCard, { + props, + testPinia: createTestingPiniaWithState(), + }); + + // then + const deliveryRateRow = getByText( + 'subscription.metricsCard.retries', + ).closest('tr')!; + expect(within(deliveryRateRow).getByText('2.03')).toBeVisible(); + }); }); diff --git a/hermes-console/src/views/subscription/metrics-card/MetricsCard.vue b/hermes-console/src/views/subscription/metrics-card/MetricsCard.vue index c20a827be5..210827133f 100644 --- a/hermes-console/src/views/subscription/metrics-card/MetricsCard.vue +++ b/hermes-console/src/views/subscription/metrics-card/MetricsCard.vue @@ -29,11 +29,6 @@ :name="$t('subscription.metricsCard.deliveryRate')" :value="formatNumber(props.subscriptionMetrics.rate, 2)" /> - + + + + + + - diff --git a/hermes-console/src/views/topic/metrics-list/MetricsList.spec.ts b/hermes-console/src/views/topic/metrics-list/MetricsList.spec.ts index 106de46baf..c507ce0b42 100644 --- a/hermes-console/src/views/topic/metrics-list/MetricsList.spec.ts +++ b/hermes-console/src/views/topic/metrics-list/MetricsList.spec.ts @@ -40,8 +40,6 @@ describe('MetricsList', () => { { property: 'topicView.metrics.rate', value: '3.40' }, { property: 'topicView.metrics.deliveryRate', value: '3.50' }, { property: 'topicView.metrics.published', value: 100 }, - { property: 'topicView.metrics.latency', value: '?' }, - { property: 'topicView.metrics.messageSize', value: '?' }, ])('should render all metrics properties %s', ({ property, value }) => { // given vi.mocked(useMetrics).mockReturnValueOnce(useMetricsStub); diff --git a/hermes-console/src/views/topic/metrics-list/MetricsList.vue b/hermes-console/src/views/topic/metrics-list/MetricsList.vue index d1745d9cb5..3fbecf9f4f 100644 --- a/hermes-console/src/views/topic/metrics-list/MetricsList.vue +++ b/hermes-console/src/views/topic/metrics-list/MetricsList.vue @@ -36,11 +36,6 @@ :name="$t('topicView.metrics.published')" :value="formatNumber(props.metrics.published)" /> - - diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java index 7172e944fd..4f46845e58 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSender.java @@ -16,6 +16,7 @@ import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult; import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResultLogInfo; import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout; +import pl.allegro.tech.hermes.metrics.HermesCounter; import pl.allegro.tech.hermes.metrics.HermesTimer; import pl.allegro.tech.hermes.metrics.HermesTimerContext; @@ -45,7 +46,9 @@ public class ConsumerMessageSender { private final InflightsPool inflight; private final SubscriptionLoadRecorder loadRecorder; private final HermesTimer consumerLatencyTimer; + private final HermesCounter retries; private final SerialConsumerRateLimiter rateLimiter; + private final HermesTimer rateLimiterAcquireTimer; private final FutureAsyncTimeout async; private final int asyncTimeoutMs; private final LongAdder inflightCount = new LongAdder(); @@ -82,6 +85,8 @@ public ConsumerMessageSender(Subscription subscription, this.inflight = inflight; this.consumerLatencyTimer = metrics.subscriptions().latency(subscription.getQualifiedName()); metrics.subscriptions().registerInflightGauge(subscription.getQualifiedName(), this, sender -> sender.inflightCount.doubleValue()); + this.retries = metrics.subscriptions().retries(subscription.getQualifiedName()); + this.rateLimiterAcquireTimer = metrics.subscriptions().rateLimiterAcquire(subscription.getQualifiedName()); } public void initialize() { @@ -131,6 +136,7 @@ private int calculateMessageDelay(long publishingMessageTimestamp) { */ private void sendMessage(final Message message) { loadRecorder.recordSingleOperation(); + acquireRateLimiterWithTimer(); HermesTimerContext timer = consumerLatencyTimer.time(); CompletableFuture response = messageSender.send(message); @@ -143,6 +149,12 @@ private void sendMessage(final Message message) { }); } + private void acquireRateLimiterWithTimer() { + HermesTimerContext acquireTimer = rateLimiterAcquireTimer.time(); + rateLimiter.acquire(); + acquireTimer.close(); + } + private MessageSender messageSender(Subscription subscription) { Integer requestTimeoutMs = subscription.getSerialSubscriptionPolicy().getRequestTimeout(); ResilientMessageSender resilientMessageSender = new ResilientMessageSender( @@ -200,6 +212,7 @@ private void retrySending(Message message, MessageSendingResult result) { long retryDelay = extractRetryDelay(message, result); if (shouldAttemptResending(message, result, retryDelay)) { + retries.increment(); retrySingleThreadExecutor.schedule(() -> resend(message, result), retryDelay, TimeUnit.MILLISECONDS); } else { handleMessageDiscarding(message, result); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSender.java index 672a141eb7..7f09b6417d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSender.java @@ -46,7 +46,6 @@ public CompletableFuture send( Function exceptionMapper ) { try { - rateLimiter.acquire(); CompletableFuture resultFuture = new CompletableFuture<>(); resultFutureConsumer.accept(resultFuture); CompletableFuture timeoutGuardedResultFuture = async.within( diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSenderTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSenderTest.groovy index 07eada26f4..5e9eff443d 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSenderTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/ResilientMessageSenderTest.groovy @@ -14,8 +14,8 @@ import java.util.concurrent.Executors import java.util.function.Consumer import java.util.function.Function -import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; -import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE +import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS import static pl.allegro.tech.hermes.api.SubscriptionPolicy.Builder.subscriptionPolicy import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription @@ -63,7 +63,6 @@ class ResilientMessageSenderTest extends Specification { def "should report successful sending"() { given: SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { - 1 * acquire() 1 * registerSuccessfulSending() } ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( @@ -84,7 +83,6 @@ class ResilientMessageSenderTest extends Specification { def "should asynchronously time out send future and report failed sending"() { given: SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { - 1 * acquire() 1 * registerFailedSending() } ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( @@ -105,7 +103,6 @@ class ResilientMessageSenderTest extends Specification { def "should treat 4xx response for subscription with no 4xx retry as success"() { given: SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { - 1 * acquire() 1 * registerSuccessfulSending() } @@ -127,7 +124,6 @@ class ResilientMessageSenderTest extends Specification { def "should report failed sending on error response other than 4xx for subscription with no 4xx retry"() { given: SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { - 1 * acquire() 1 * registerFailedSending() } @@ -149,7 +145,6 @@ class ResilientMessageSenderTest extends Specification { def "should report failed sending on 4xx response for subscription with 4xx retry"() { given: SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { - 1 * acquire() 1 * registerFailedSending() } def subscription = subscription(SubscriptionName.fromString("group.topic\$subscription")) @@ -175,7 +170,6 @@ class ResilientMessageSenderTest extends Specification { def "should report successful sending on retry after"() { given: SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { - 1 * acquire() 1 * registerSuccessfulSending() } @@ -197,7 +191,6 @@ class ResilientMessageSenderTest extends Specification { def "should report failed sending on service unavailable without retry after"() { given: SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { - 1 * acquire() 1 * registerFailedSending() } @@ -219,7 +212,6 @@ class ResilientMessageSenderTest extends Specification { def "should not report failed sending on too many requests without retry after"() { given: SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { - 1 * acquire() 1 * registerSuccessfulSending() } @@ -241,7 +233,6 @@ class ResilientMessageSenderTest extends Specification { def "should report failed sending when future completes exceptionally"() { given: SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { - 1 * acquire() 1 * registerFailedSending() } ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( @@ -266,7 +257,6 @@ class ResilientMessageSenderTest extends Specification { def "should report failed sending when consumer throws exception"() { given: SerialConsumerRateLimiter serialConsumerRateLimiter = Mock(SerialConsumerRateLimiter) { - 1 * acquire() 1 * registerFailedSending() } ResilientMessageSender rateLimitingMessageSender = new ResilientMessageSender( diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/SingleRecipientMessageSenderAdapterTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/SingleRecipientMessageSenderAdapterTest.groovy index 9a444d1be5..79eec88352 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/SingleRecipientMessageSenderAdapterTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/SingleRecipientMessageSenderAdapterTest.groovy @@ -49,7 +49,6 @@ class SingleRecipientMessageSenderAdapterTest extends Specification { def "should register successful send in rate limiter"() { given: ConsumerRateLimiter consumerRateLimiter = Mock(ConsumerRateLimiter) { - 1 * acquire() 1 * registerSuccessfulSending() } ResilientMessageSender rateLimitingMessageSender = rateLimitingMessageSender(consumerRateLimiter) @@ -65,7 +64,6 @@ class SingleRecipientMessageSenderAdapterTest extends Specification { def "should register unsuccessful send in rate limiter"() { given: ConsumerRateLimiter consumerRateLimiter = Mock(ConsumerRateLimiter) { - 1 * acquire() 1 * registerFailedSending() } ResilientMessageSender rateLimitingMessageSender = rateLimitingMessageSender(consumerRateLimiter) @@ -77,7 +75,5 @@ class SingleRecipientMessageSenderAdapterTest extends Specification { then: !future.get().succeeded() - } - } diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy index 7445574d47..06d1bf7832 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy @@ -71,7 +71,7 @@ class JettyBroadCastMessageSenderTest extends Specification { MessageSender getSender(ConsumerRateLimiter rateLimiter) { def address = new ResolvableEndpointAddress(endpoint, new MultiUrlEndpointAddressResolver(), - EndpointAddressResolverMetadata.empty()); + EndpointAddressResolverMetadata.empty()) def httpRequestFactory = new DefaultHttpRequestFactory(client, 1000, 1000, new DefaultHttpMetadataAppender()) Subscription subscription = subscription(SubscriptionName.fromString("group.topic\$subscription")).build() @@ -87,14 +87,13 @@ class JettyBroadCastMessageSenderTest extends Specification { def "should send message successfully in parallel to all urls"() { given: ConsumerRateLimiter rateLimiter = Mock(ConsumerRateLimiter) { - 4 * acquire() 4 * registerSuccessfulSending() } serviceEndpoints.forEach { endpoint -> endpoint.setDelay(300).expectMessages(TEST_MESSAGE_CONTENT) } when: - def future = getSender(rateLimiter).send(testMessage()); + def future = getSender(rateLimiter).send(testMessage()) then: future.get(10, TimeUnit.SECONDS).succeeded() @@ -107,7 +106,6 @@ class JettyBroadCastMessageSenderTest extends Specification { def "should return not succeeded when sending to one of urls fails"() { given: ConsumerRateLimiter rateLimiter = Mock(ConsumerRateLimiter) { - 4 * acquire() 3 * registerSuccessfulSending() 1 * registerFailedSending() } @@ -133,15 +131,14 @@ class JettyBroadCastMessageSenderTest extends Specification { def "should not send to already sent url on retry"() { given: ConsumerRateLimiter rateLimiter = Mock(ConsumerRateLimiter) { - 3 * acquire() 3 * registerSuccessfulSending() } serviceEndpoints.forEach { endpoint -> endpoint.expectMessages(TEST_MESSAGE_CONTENT) } def alreadySentServiceEndpoint = serviceEndpoints[0] - Message message = testMessage(); - message.incrementRetryCounter([alreadySentServiceEndpoint.url]); + Message message = testMessage() + message.incrementRetryCounter([alreadySentServiceEndpoint.url]) when: def future = getSender(rateLimiter).send(message) diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderTest.java index 5443e1421a..a630aafcc8 100644 --- a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderTest.java +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/consumer/ConsumerMessageSenderTest.java @@ -69,9 +69,18 @@ public class ConsumerMessageSenderTest { @Mock private HermesTimer consumerLatencyTimer; + @Mock + private HermesCounter retries; + + @Mock + private HermesTimer rateLimiterAcquireTimer; + @Mock private HermesTimerContext consumerLatencyTimerContext; + @Mock + private HermesTimerContext rateLimiterAcquireTimerContext; + @Mock private HermesCounter failedMeter; @@ -99,9 +108,12 @@ public void setUp() { private void setUpMetrics(Subscription subscription) { when(metricsFacade.subscriptions().latency(subscription.getQualifiedName())).thenReturn(consumerLatencyTimer); + when(metricsFacade.subscriptions().rateLimiterAcquire(subscription.getQualifiedName())).thenReturn(rateLimiterAcquireTimer); when(metricsFacade.subscriptions().otherErrorsCounter(subscription.getQualifiedName())).thenReturn(errors); when(consumerLatencyTimer.time()).thenReturn(consumerLatencyTimerContext); + when(rateLimiterAcquireTimer.time()).thenReturn(rateLimiterAcquireTimerContext); when(metricsFacade.subscriptions().failuresCounter(subscription.getQualifiedName())).thenReturn(failedMeter); + when(metricsFacade.subscriptions().retries(subscription.getQualifiedName())).thenReturn(retries); } @Test @@ -117,8 +129,11 @@ public void shouldHandleSuccessfulSending() { // then verifySemaphoreReleased(); verifyLatencyTimersCountedTimes(subscription, 1, 1); + verifyRateLimiterAcquireTimersCountedTimes(subscription, 1, 1); verifyZeroInteractions(errorHandler); verifyZeroInteractions(failedMeter); + verifyRateLimiterAcquired(); + verifyZeroInteractions(retries); } @Test @@ -134,7 +149,10 @@ public void shouldKeepTryingToSendMessageFailedSending() { // then verifySemaphoreReleased(); verifyLatencyTimersCountedTimes(subscription, 3, 3); + verifyRateLimiterAcquireTimersCountedTimes(subscription, 3, 3); verifyErrorHandlerHandleFailed(message, subscription, 2); + verifyRateLimiterAcquired(3); + verifyRetryCounted(2); } @Test @@ -151,6 +169,9 @@ public void shouldDiscardMessageWhenTTLIsExceeded() { verifySemaphoreReleased(); verifyZeroInteractions(successHandler); verifyLatencyTimersCountedTimes(subscription, 1, 1); + verifyRateLimiterAcquireTimersCountedTimes(subscription, 1, 1); + verifyRateLimiterAcquired(); + verifyZeroInteractions(retries); } @Test @@ -167,11 +188,15 @@ public void shouldNotKeepTryingToSendMessageFailedWithStatusCode4xx() { verifySemaphoreReleased(); verifyZeroInteractions(successHandler); verifyLatencyTimersCountedTimes(subscription, 1, 1); + verifyRateLimiterAcquireTimersCountedTimes(subscription, 1, 1); + verifyRateLimiterAcquired(); + verifyZeroInteractions(retries); } @Test public void shouldKeepTryingToSendMessageFailedWithStatusCode4xxForSubscriptionWith4xxRetry() { // given + final int expectedNumbersOfFailures = 3; ConsumerMessageSender sender = consumerMessageSender(subscriptionWith4xxRetry); Message message = message(); doReturn(failure(403)).doReturn(failure(403)).doReturn(failure(403)).doReturn(success()).when(messageSender).send(message); @@ -183,15 +208,18 @@ public void shouldKeepTryingToSendMessageFailedWithStatusCode4xxForSubscriptionW // then verifySemaphoreReleased(); verify(errorHandler, - timeout(1000).times(3)).handleFailed(eq(message), + timeout(1000).times(expectedNumbersOfFailures)).handleFailed(eq(message), eq(subscriptionWith4xxRetry), any(MessageSendingResult.class) ); + verifyRateLimiterAcquired(expectedNumbersOfFailures + 1); + verifyRetryCounted(expectedNumbersOfFailures); } @Test public void shouldRetryOn401UnauthorizedForOAuthSecuredSubscription() { // given + final int expectedNumbersOfFailures = 2; Subscription subscription = subscriptionWithout4xxRetryAndWithOAuthPolicy(); setUpMetrics(subscription); ConsumerMessageSender sender = consumerMessageSender(subscription); @@ -202,15 +230,18 @@ public void shouldRetryOn401UnauthorizedForOAuthSecuredSubscription() { sender.sendAsync(message); // then - verifyErrorHandlerHandleFailed(message, subscription, 2); + verifyErrorHandlerHandleFailed(message, subscription, expectedNumbersOfFailures); verify(successHandler, timeout(1000)).handleSuccess(eq(message), eq(subscription), any(MessageSendingResult.class)); + verifyRetryCounted(expectedNumbersOfFailures); + verifyRateLimiterAcquired(expectedNumbersOfFailures + 1); } @Test public void shouldBackoffRetriesWhenEndpointFails() throws InterruptedException { // given final int executionTime = 100; - int senderBackoffTime = 50; + final int senderBackoffTime = 50; + final int expectedNumberOfFailures = 1 + executionTime / senderBackoffTime; Subscription subscriptionWithBackoff = subscriptionWithBackoff(senderBackoffTime); setUpMetrics(subscriptionWithBackoff); @@ -223,7 +254,9 @@ public void shouldBackoffRetriesWhenEndpointFails() throws InterruptedException //then Thread.sleep(executionTime); - verifyErrorHandlerHandleFailed(message, subscriptionWithBackoff, 1 + executionTime / senderBackoffTime); + verifyErrorHandlerHandleFailed(message, subscriptionWithBackoff, expectedNumberOfFailures); + verifyRateLimiterAcquired(expectedNumberOfFailures); + verifyRetryCounted(expectedNumberOfFailures); } @Test @@ -241,6 +274,9 @@ public void shouldNotRetryOnRetryAfterAboveTtl() { verifySemaphoreReleased(); verifyZeroInteractions(successHandler); verifyLatencyTimersCountedTimes(subscription, 1, 1); + verifyRateLimiterAcquireTimersCountedTimes(subscription, 1, 1); + verifyRateLimiterAcquired(); + verifyZeroInteractions(retries); } @Test @@ -260,6 +296,8 @@ public void shouldDeliverToModifiedEndpoint() { // then verify(otherMessageSender, timeout(1000)).send(message); + verifyRateLimiterAcquired(); + verifyZeroInteractions(retries); } @Test @@ -279,6 +317,8 @@ public void shouldDeliverToNewSenderAfterModifiedTimeout() { // then verify(otherMessageSender, timeout(1000)).send(message); + verifyRateLimiterAcquired(); + verifyZeroInteractions(retries); } @Test @@ -303,6 +343,8 @@ public void shouldDelaySendingMessageForHalfSecond() { // then long sendingTime = System.currentTimeMillis() - sendingStartTime; assertThat(sendingTime).isGreaterThanOrEqualTo(500); + verifyRateLimiterAcquired(); + verifyZeroInteractions(retries); } @Test @@ -327,11 +369,14 @@ public void shouldCalculateSendingDelayBasingOnPublishingTimestamp() { // then long sendingTime = System.currentTimeMillis() - sendingStartTime; assertThat(sendingTime).isLessThan(300); + verifyRateLimiterAcquired(); + verifyZeroInteractions(retries); } @Test public void shouldIncreaseRetryBackoffExponentially() throws InterruptedException { // given + final int expectedNumberOfFailures = 2; final int backoff = 500; final double multiplier = 2; Subscription subscription = subscriptionWithExponentialRetryBackoff(backoff, multiplier); @@ -346,11 +391,14 @@ public void shouldIncreaseRetryBackoffExponentially() throws InterruptedExceptio // then verifyZeroInteractions(successHandler); + verifyRateLimiterAcquired(expectedNumberOfFailures); + verifyRetryCounted(expectedNumberOfFailures); } @Test public void shouldIgnoreExponentialRetryBackoffWithRetryAfter() { // given + final int expectedNumberOfRetries = 2; final int retrySeconds = 1; final int backoff = 5000; final double multiplier = 3; @@ -366,6 +414,8 @@ public void shouldIgnoreExponentialRetryBackoffWithRetryAfter() { //then verify(successHandler, timeout(retrySeconds * 1000 * 2 + 500)) .handleSuccess(eq(message), eq(subscription), any(MessageSendingResult.class)); + verifyRateLimiterAcquired(expectedNumberOfRetries + 1); + verifyRetryCounted(expectedNumberOfRetries); } @Test @@ -385,6 +435,8 @@ public void shouldIgnoreExponentialRetryBackoffAfterExceededTtl() throws Interru //then verifyZeroInteractions(successHandler); + verifyRateLimiterAcquired(2); + verifyRetryCounted(); } private ConsumerMessageSender consumerMessageSender(Subscription subscription) { @@ -422,6 +474,12 @@ private void verifyLatencyTimersCountedTimes(Subscription subscription, int time verify(consumerLatencyTimerContext, times(closeCount)).close(); } + private void verifyRateLimiterAcquireTimersCountedTimes(Subscription subscription, int timeCount, int closeCount) { + verify(metricsFacade.subscriptions(), times(1)).rateLimiterAcquire(subscription.getQualifiedName()); + verify(rateLimiterAcquireTimer, times(timeCount)).time(); + verify(rateLimiterAcquireTimerContext, times(closeCount)).close(); + } + private Subscription subscriptionWithTtl(int ttl) { return subscriptionBuilderWithTestValues() .withSubscriptionPolicy(subscriptionPolicy().applyDefaults() @@ -506,6 +564,21 @@ private void verifySemaphoreReleased() { assertThat(inflightSemaphore.availablePermits()).isEqualTo(1); } + private void verifyRateLimiterAcquired() { + verifyRateLimiterAcquired(1); + } + + private void verifyRateLimiterAcquired(int times) { + verify(rateLimiter, times(times)).acquire(); + } + + private void verifyRetryCounted() { + verifyRetryCounted(1); + } + + private void verifyRetryCounted(int times) { + verify(retries, times(times)).increment(); + } private Message message() { return messageWithTimestamp(System.currentTimeMillis()); diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringConfiguration.java index e3a828a463..accd325bf2 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ExternalMonitoringConfiguration.java @@ -15,7 +15,6 @@ import org.springframework.web.client.RestTemplate; import pl.allegro.tech.hermes.management.infrastructure.graphite.CachingGraphiteClient; import pl.allegro.tech.hermes.management.infrastructure.graphite.GraphiteClient; -import pl.allegro.tech.hermes.management.infrastructure.graphite.GraphiteMetricsProvider; import pl.allegro.tech.hermes.management.infrastructure.graphite.RestTemplateGraphiteClient; import pl.allegro.tech.hermes.management.infrastructure.prometheus.CachingPrometheusClient; import pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient; @@ -29,13 +28,6 @@ @Configuration public class ExternalMonitoringConfiguration { - @Bean - @ConditionalOnProperty(value = "graphite.client.enabled", havingValue = "true") - public GraphiteMetricsProvider graphiteMetricsProvider(GraphiteClient graphiteClient, - GraphiteMonitoringMetricsProperties properties) { - return new GraphiteMetricsProvider(graphiteClient, properties.getPrefix()); - } - @Bean @ConditionalOnProperty(value = "graphite.client.enabled", havingValue = "true") public GraphiteClient graphiteClient(@Qualifier("monitoringRestTemplate") RestTemplate graphiteRestTemplate, diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/graphite/GraphiteMetricsProvider.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/graphite/GraphiteMetricsProvider.java deleted file mode 100644 index 3e355607ec..0000000000 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/graphite/GraphiteMetricsProvider.java +++ /dev/null @@ -1,110 +0,0 @@ -package pl.allegro.tech.hermes.management.infrastructure.graphite; - -import pl.allegro.tech.hermes.api.SubscriptionName; -import pl.allegro.tech.hermes.api.TopicName; -import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringMetricsContainer; -import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringSubscriptionMetricsProvider; -import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringTopicMetricsProvider; - -import static pl.allegro.tech.hermes.common.metric.HermesMetrics.escapeDots; - -public class GraphiteMetricsProvider implements MonitoringSubscriptionMetricsProvider, MonitoringTopicMetricsProvider { - - private static final String SUBSCRIPTION_PATH = "%s.%s.%s"; - - private static final String SUBSCRIPTION_RATE_PATTERN = "sumSeries(%s.consumer.*.meter.%s.m1_rate)"; - private static final String SUBSCRIPTION_THROUGHPUT_PATTERN = "sumSeries(%s.consumer.*.throughput.%s.m1_rate)"; - private static final String SUBSCRIPTION_HTTP_STATUSES_PATTERN = "sumSeries(%s.consumer.*.status.%s.%s.m1_rate)"; - private static final String SUBSCRIPTION_ERROR_TIMEOUT_PATTERN = "sumSeries(%s.consumer.*.status.%s.errors.timeout.m1_rate)"; - private static final String SUBSCRIPTION_ERROR_OTHER_PATTERN = "sumSeries(%s.consumer.*.status.%s.errors.other.m1_rate)"; - private static final String SUBSCRIPTION_BATCH_RATE_PATTERN = "sumSeries(%s.consumer.*.meter.%s.batch.m1_rate)"; - - private static final String TOPIC_RATE_PATTERN = "sumSeries(%s.producer.*.meter.%s.%s.m1_rate)"; - private static final String TOPIC_DELIVERY_RATE_PATTERN = "sumSeries(%s.consumer.*.meter.%s.%s.m1_rate)"; - private static final String TOPIC_THROUGHPUT_PATTERN = "sumSeries(%s.producer.*.throughput.%s.%s.m1_rate)"; - - private final GraphiteClient graphiteClient; - private final String prefix; - - public GraphiteMetricsProvider(GraphiteClient graphiteClient, String prefix) { - this.graphiteClient = graphiteClient; - this.prefix = prefix; - } - - @Override - public MonitoringSubscriptionMetrics subscriptionMetrics(SubscriptionName name) { - String rateMetric = metricPath(name); - String timeouts = metricPathTimeouts(name); - String throughput = metricPathThroughput(name); - String otherErrors = metricPathOtherErrors(name); - String codes2xxPath = metricPathHttpStatuses(name, "2xx"); - String codes4xxPath = metricPathHttpStatuses(name, "4xx"); - String codes5xxPath = metricPathHttpStatuses(name, "5xx"); - String batchPath = metricPathBatchRate(name); - - MonitoringMetricsContainer metricsContainer = graphiteClient.readMetrics(codes2xxPath, codes4xxPath, codes5xxPath, - rateMetric, throughput, timeouts, otherErrors, batchPath); - - return MonitoringSubscriptionMetricsProvider.metricsBuilder() - .withRate(metricsContainer.metricValue(rateMetric)) - .withTimeouts(metricsContainer.metricValue(timeouts)) - .withThroughput(metricsContainer.metricValue(throughput)) - .withOtherErrors(metricsContainer.metricValue(otherErrors)) - .withCodes2xx(metricsContainer.metricValue(codes2xxPath)) - .withCode4xx(metricsContainer.metricValue(codes4xxPath)) - .withCode5xx(metricsContainer.metricValue(codes5xxPath)) - .withMetricPathBatchRate(metricsContainer.metricValue(batchPath)) - .build(); - } - - @Override - public MonitoringTopicMetrics topicMetrics(TopicName topicName) { - String rateMetric = metricPath(TOPIC_RATE_PATTERN, topicName); - String deliveryRateMetric = metricPath(TOPIC_DELIVERY_RATE_PATTERN, topicName); - String throughputMetric = metricPath(TOPIC_THROUGHPUT_PATTERN, topicName); - - MonitoringMetricsContainer metrics = graphiteClient.readMetrics(rateMetric, deliveryRateMetric, throughputMetric); - return MonitoringTopicMetricsProvider.metricsBuilder() - .withRate(metrics.metricValue(rateMetric)) - .withDeliveryRate(metrics.metricValue(deliveryRateMetric)) - .withThroughput(metrics.metricValue(throughputMetric)) - .build(); - } - - private String metricPath(SubscriptionName name) { - return String.format(SUBSCRIPTION_RATE_PATTERN, prefix, subscriptionNameToPath(name) - ); - } - - private String metricPath(String pattern, TopicName topicName) { - return String.format(pattern, prefix, escapeDots(topicName.getGroupName()), - escapeDots(topicName.getName())); - } - - private String metricPathThroughput(SubscriptionName name) { - return String.format(SUBSCRIPTION_THROUGHPUT_PATTERN, prefix, subscriptionNameToPath(name)); - } - - private String metricPathHttpStatuses(SubscriptionName name, String statusCodeClass) { - return String.format(SUBSCRIPTION_HTTP_STATUSES_PATTERN, prefix, subscriptionNameToPath(name), statusCodeClass); - } - - private String metricPathTimeouts(SubscriptionName name) { - return String.format(SUBSCRIPTION_ERROR_TIMEOUT_PATTERN, prefix, subscriptionNameToPath(name) - ); - } - - private String metricPathOtherErrors(SubscriptionName name) { - return String.format(SUBSCRIPTION_ERROR_OTHER_PATTERN, prefix, subscriptionNameToPath(name)); - } - - private String metricPathBatchRate(SubscriptionName name) { - return String.format(SUBSCRIPTION_BATCH_RATE_PATTERN, prefix, subscriptionNameToPath(name)); - } - - private String subscriptionNameToPath(SubscriptionName name) { - return String.format(SUBSCRIPTION_PATH, - escapeDots(name.getTopicName().getGroupName()), name.getTopicName().getName(), escapeDots(name.getName()) - ); - } -} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepository.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepository.java index ac14b42657..8b82e66be7 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepository.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepository.java @@ -47,6 +47,7 @@ public SubscriptionMetrics loadMetrics(TopicName topicName, String subscriptionN .withCodes2xx(monitoringMetrics.codes2xx()) .withCodes4xx(monitoringMetrics.code4xx()) .withCodes5xx(monitoringMetrics.code5xx()) + .withRetries(monitoringMetrics.retries()) .withTimeouts(monitoringMetrics.timeouts()) .withOtherErrors(monitoringMetrics.otherErrors()) .withThroughput(monitoringMetrics.throughput()) diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/MonitoringSubscriptionMetricsProvider.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/MonitoringSubscriptionMetricsProvider.java index d98cccdb19..8745aa1d1c 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/MonitoringSubscriptionMetricsProvider.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/MonitoringSubscriptionMetricsProvider.java @@ -13,6 +13,7 @@ record MonitoringSubscriptionMetrics(MetricDecimalValue rate, MetricDecimalValue codes2xx, MetricDecimalValue code4xx, MetricDecimalValue code5xx, + MetricDecimalValue retries, MetricDecimalValue metricPathBatchRate) { } @@ -28,6 +29,7 @@ class MetricsBuilder { private MetricDecimalValue codes2xx; private MetricDecimalValue code4xx; private MetricDecimalValue code5xx; + private MetricDecimalValue retries; private MetricDecimalValue metricPathBatchRate; public MetricsBuilder withRate(MetricDecimalValue rate) { @@ -65,6 +67,11 @@ public MetricsBuilder withCode5xx(MetricDecimalValue code5xx) { return this; } + public MetricsBuilder withRetries(MetricDecimalValue retries) { + this.retries = retries; + return this; + } + public MetricsBuilder withMetricPathBatchRate(MetricDecimalValue metricPathBatchRate) { this.metricPathBatchRate = metricPathBatchRate; return this; @@ -72,7 +79,7 @@ public MetricsBuilder withMetricPathBatchRate(MetricDecimalValue metricPathBatch public MonitoringSubscriptionMetrics build() { return new MonitoringSubscriptionMetrics(rate, timeouts, throughput, otherErrors, codes2xx, - code4xx, code5xx, metricPathBatchRate); + code4xx, code5xx, retries, metricPathBatchRate); } } } \ No newline at end of file diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/VictoriaMetricsMetricsProvider.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/VictoriaMetricsMetricsProvider.java index b2c1e3d008..48bcd67600 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/VictoriaMetricsMetricsProvider.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/prometheus/VictoriaMetricsMetricsProvider.java @@ -21,6 +21,7 @@ public class VictoriaMetricsMetricsProvider implements MonitoringSubscriptionMet private static final String SUBSCRIPTION_STATUS_CODES_2XX = SUBSCRIPTION_STATUS_CODES + "_2xx"; private static final String SUBSCRIPTION_STATUS_CODES_4XX = SUBSCRIPTION_STATUS_CODES + "_4xx"; private static final String SUBSCRIPTION_STATUS_CODES_5XX = SUBSCRIPTION_STATUS_CODES + "_5xx"; + private static final String SUBSCRIPTION_RETRIES = "subscription_retries_total"; private static final String TOPIC_RATE = "topic_requests_total"; private static final String TOPIC_DELIVERY_RATE = "subscription_delivered_total"; @@ -39,9 +40,8 @@ public VictoriaMetricsMetricsProvider(PrometheusClient prometheusClient, String this.consumersMetricsPrefix = consumersMetricsPrefix.isEmpty() ? "" : consumersMetricsPrefix + "_"; this.frontendMetricsPrefix = frontendMetricsPrefix.isEmpty() ? "" : frontendMetricsPrefix + "_"; this.additionalFilters = additionalFilters; - this.subscriptionMetricsToQuery = Stream.of(SUBSCRIPTION_DELIVERED, SUBSCRIPTION_TIMEOUTS, - SUBSCRIPTION_THROUGHPUT, SUBSCRIPTION_OTHER_ERRORS, SUBSCRIPTION_BATCHES, - SUBSCRIPTION_STATUS_CODES) + this.subscriptionMetricsToQuery = Stream.of(SUBSCRIPTION_DELIVERED, SUBSCRIPTION_TIMEOUTS, SUBSCRIPTION_RETRIES, + SUBSCRIPTION_THROUGHPUT, SUBSCRIPTION_OTHER_ERRORS, SUBSCRIPTION_BATCHES, SUBSCRIPTION_STATUS_CODES) .map(this::consumerMetricName) .collect(Collectors.joining("|")); this.topicMetricsToQuery = String.join("|", List.of( @@ -72,6 +72,7 @@ public MonitoringSubscriptionMetrics subscriptionMetrics(SubscriptionName subscr .withCodes2xx(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_STATUS_CODES_2XX))) .withCode4xx(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_STATUS_CODES_4XX))) .withCode5xx(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_STATUS_CODES_5XX))) + .withRetries(prometheusMetricsContainer.metricValue(consumerMetricName(SUBSCRIPTION_RETRIES))) .build(); } diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridGraphiteBasedSubscriptionMetricsRepositoryTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridGraphiteBasedSubscriptionMetricsRepositoryTest.groovy deleted file mode 100644 index 0e94f343ac..0000000000 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridGraphiteBasedSubscriptionMetricsRepositoryTest.groovy +++ /dev/null @@ -1,92 +0,0 @@ -package pl.allegro.tech.hermes.management.infrastructure.metrics - -import pl.allegro.tech.hermes.api.MetricLongValue -import pl.allegro.tech.hermes.api.PersistentSubscriptionMetrics -import pl.allegro.tech.hermes.api.SubscriptionMetrics -import pl.allegro.tech.hermes.api.TopicName -import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths -import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionLagSource -import pl.allegro.tech.hermes.management.infrastructure.graphite.GraphiteClient -import pl.allegro.tech.hermes.management.infrastructure.graphite.GraphiteMetricsProvider -import spock.lang.Specification - -import static pl.allegro.tech.hermes.api.MetricDecimalValue.of - -class HybridGraphiteBasedSubscriptionMetricsRepositoryTest extends Specification { - - private GraphiteClient client = Stub(GraphiteClient) - - private SummedSharedCounter summedSharedCounter = Stub(SummedSharedCounter) - - private ZookeeperPaths zookeeperPaths = new ZookeeperPaths("/hermes") - - private SubscriptionLagSource lagSource = new NoOpSubscriptionLagSource() - - private GraphiteMetricsProvider graphiteMetricsProvider = new GraphiteMetricsProvider(client, "stats"); - - private HybridSubscriptionMetricsRepository repository = new HybridSubscriptionMetricsRepository(graphiteMetricsProvider, - summedSharedCounter, zookeeperPaths, lagSource) - - def "should read subscription metrics from multiple places"() { - given: - String rate = 'sumSeries(stats.consumer.*.meter.group.topic.subscription.m1_rate)' - String timeouts = 'sumSeries(stats.consumer.*.status.group.topic.subscription.errors.timeout.m1_rate)' - String otherErrors = 'sumSeries(stats.consumer.*.status.group.topic.subscription.errors.other.m1_rate)' - - client.readMetrics(_ as String, _ as String, _ as String, rate, _ as String, timeouts, otherErrors, _ as String) >> MonitoringMetricsContainer.createEmpty() - .addMetricValue(rate, of('10')) - .addMetricValue(timeouts, of('100')) - .addMetricValue(otherErrors, of('1000')) - summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/delivered') >> 100 - summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/discarded') >> 1 - summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/volume') >> 16 - - when: - SubscriptionMetrics metrics = repository.loadMetrics(new TopicName('group', 'topic'), 'subscription') - - then: - metrics.rate == of('10') - metrics.delivered == 100 - metrics.discarded == 1 - metrics.volume == 16 - metrics.timeouts == of("100") - metrics.otherErrors == of("1000") - metrics.lag == MetricLongValue.of(-1) - } - - def "should read subscription metrics for all http status codes"() { - given: - client.readMetrics(getHttpStatusCodeForFamily(2), getHttpStatusCodeForFamily(4), getHttpStatusCodeForFamily(5), - _ as String, _ as String, _ as String, _ as String, _ as String) >> MonitoringMetricsContainer.createEmpty() - .addMetricValue(getHttpStatusCodeForFamily(2), of('2')) - .addMetricValue(getHttpStatusCodeForFamily(4), of('4')) - .addMetricValue(getHttpStatusCodeForFamily(5), of('5')) - - when: - SubscriptionMetrics metrics = repository.loadMetrics(new TopicName('group', 'topic'), 'subscription') - - then: - metrics.codes2xx == of('2') - metrics.codes4xx == of('4') - metrics.codes5xx == of('5') - } - - def "should read subscription zookeeper metrics"() { - given: - summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/delivered') >> 1000 - summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/discarded') >> 10 - summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/volume') >> 16 - - when: - PersistentSubscriptionMetrics zookeeperMetrics = repository.loadZookeeperMetrics(new TopicName('group', 'topic'), 'subscription') - - then: - zookeeperMetrics.delivered == 1000 - zookeeperMetrics.discarded == 10 - zookeeperMetrics.volume == 16 - } - - private static String getHttpStatusCodeForFamily(int family) { - "sumSeries(stats.consumer.*.status.group.topic.subscription.${family}xx.m1_rate)" - } -} diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridGraphiteBasedTopicMetricsRepositoryTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridGraphiteBasedTopicMetricsRepositoryTest.groovy deleted file mode 100644 index 4c792c7e36..0000000000 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridGraphiteBasedTopicMetricsRepositoryTest.groovy +++ /dev/null @@ -1,53 +0,0 @@ -package pl.allegro.tech.hermes.management.infrastructure.metrics - -import pl.allegro.tech.hermes.api.TopicMetrics -import pl.allegro.tech.hermes.api.TopicName -import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository -import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths -import pl.allegro.tech.hermes.management.infrastructure.graphite.GraphiteClient -import pl.allegro.tech.hermes.management.infrastructure.graphite.GraphiteMetricsProvider -import spock.lang.Specification - -import static pl.allegro.tech.hermes.api.MetricDecimalValue.of - -class HybridGraphiteBasedTopicMetricsRepositoryTest extends Specification { - - private GraphiteClient client = Stub(GraphiteClient) - - private SummedSharedCounter summedSharedCounter = Stub(SummedSharedCounter) - - private ZookeeperPaths zookeeperPaths = new ZookeeperPaths("/hermes") - - private SubscriptionRepository subscriptionRepository = Mock(SubscriptionRepository) - - private GraphiteMetricsProvider graphiteMetricsProvider = new GraphiteMetricsProvider(client, "stats") - - private HybridTopicMetricsRepository repository = new HybridTopicMetricsRepository(graphiteMetricsProvider, - summedSharedCounter, zookeeperPaths, subscriptionRepository) - - def "should load metrics from graphite and zookeeper"() { - given: - String rate = 'sumSeries(stats.producer.*.meter.group.topic.m1_rate)' - String deliveryRate = 'sumSeries(stats.consumer.*.meter.group.topic.m1_rate)' - String throughput = 'sumSeries(stats.producer.*.throughput.group.topic.m1_rate)' - TopicName topic = new TopicName('group', 'topic') - - client.readMetrics(rate, deliveryRate, throughput) >> MonitoringMetricsContainer.createEmpty() - .addMetricValue(rate, of('10')) - .addMetricValue(deliveryRate, of('20')) - summedSharedCounter.getValue('/hermes/groups/group/topics/topic/metrics/published') >> 100 - summedSharedCounter.getValue('/hermes/groups/group/topics/topic/metrics/volume') >> 1024 - subscriptionRepository.listSubscriptionNames(topic) >> ["subscription1", "subscription2"] - - when: - TopicMetrics metrics = repository.loadMetrics(topic) - - then: - metrics.rate == of('10') - metrics.deliveryRate == of('20') - metrics.published == 100 - metrics.subscriptions == 2 - metrics.volume == 1024 - } - -} diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedSubscriptionMetricsRepositoryTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedSubscriptionMetricsRepositoryTest.groovy index 396d2330cf..6c18b04c48 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedSubscriptionMetricsRepositoryTest.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridPrometheusBasedSubscriptionMetricsRepositoryTest.groovy @@ -23,7 +23,7 @@ class HybridPrometheusBasedSubscriptionMetricsRepositoryTest extends Specificati private SubscriptionLagSource lagSource = new NoOpSubscriptionLagSource() private VictoriaMetricsMetricsProvider prometheusMetricsProvider = new VictoriaMetricsMetricsProvider( - client, "hermes_consumers", "hermes_frontend", "service=~'hermes'"); + client, "hermes_consumers", "hermes_frontend", "service=~'hermes'") private HybridSubscriptionMetricsRepository repository = new HybridSubscriptionMetricsRepository(prometheusMetricsProvider, summedSharedCounter, zookeeperPaths, lagSource) @@ -31,6 +31,7 @@ class HybridPrometheusBasedSubscriptionMetricsRepositoryTest extends Specificati private static final String query = "sum by (__name__, group, topic, subscription, status_code) " + "(irate({__name__=~'hermes_consumers_subscription_delivered_total" + "|hermes_consumers_subscription_timeouts_total" + + "|hermes_consumers_subscription_retries_total" + "|hermes_consumers_subscription_throughput_bytes_total" + "|hermes_consumers_subscription_other_errors_total" + "|hermes_consumers_subscription_batches_total" + @@ -42,6 +43,7 @@ class HybridPrometheusBasedSubscriptionMetricsRepositoryTest extends Specificati client.readMetrics(query) >> MonitoringMetricsContainer.createEmpty() .addMetricValue("hermes_consumers_subscription_delivered_total", of('10')) .addMetricValue("hermes_consumers_subscription_timeouts_total", of('100')) + .addMetricValue("hermes_consumers_subscription_retries_total", of('20')) .addMetricValue("hermes_consumers_subscription_other_errors_total", of('1000')) summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/delivered') >> 100 summedSharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/discarded') >> 1 @@ -57,6 +59,7 @@ class HybridPrometheusBasedSubscriptionMetricsRepositoryTest extends Specificati metrics.discarded == 1 metrics.volume == 16 metrics.timeouts == of("100") + metrics.retries == of("20") metrics.otherErrors == of("1000") metrics.lag == MetricLongValue.of(-1) } diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClientTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClientTest.groovy index c44a9c8fa7..6f68f65075 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClientTest.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/prometheus/RestTemplatePrometheusClientTest.groovy @@ -21,6 +21,7 @@ class RestTemplatePrometheusClientTest extends Specification { private static final String query = "sum by (__name__,group,topic,subscription,status_code)" + "(irate({__name__=~'hermes_consumers_subscription_delivered_total" + "|hermes_consumers_subscription_timeouts_total" + + "|hermes_consumers_subscription_retries_total" + "|hermes_consumers_subscription_throughput_bytes_total" + "|hermes_consumers_subscription_other_errors_total" + "|hermes_consumers_subscription_batches_total" + @@ -34,13 +35,13 @@ class RestTemplatePrometheusClientTest extends Specification { private RestTemplatePrometheusClient client void setup() { - RestTemplate restTemplate = new RestTemplate(); - client = new RestTemplatePrometheusClient(restTemplate, URI.create("http://localhost:$PROMETHEUS_HTTP_PORT"),); + RestTemplate restTemplate = new RestTemplate() + client = new RestTemplatePrometheusClient(restTemplate, URI.create("http://localhost:$PROMETHEUS_HTTP_PORT")) } def "should get metrics for path"() { given: - mockPrometheus(query, "full_response.json"); + mockPrometheus(query, "full_response.json") when: MonitoringMetricsContainer metrics = client.readMetrics(query) @@ -48,6 +49,7 @@ class RestTemplatePrometheusClientTest extends Specification { then: metrics.metricValue("hermes_consumers_subscription_delivered_total") == of("1.0") metrics.metricValue("hermes_consumers_subscription_timeouts_total") == of("2.0") + metrics.metricValue("hermes_consumers_subscription_retries_total") == of("1.0") metrics.metricValue("hermes_consumers_subscription_throughput_bytes_total") == of("3.0") metrics.metricValue("hermes_consumers_subscription_other_errors_total") == of("4.0") metrics.metricValue("hermes_consumers_subscription_batches_total") == of("5.0") @@ -66,6 +68,7 @@ class RestTemplatePrometheusClientTest extends Specification { then: metrics.metricValue("hermes_consumers_subscription_delivered_total") == of("0.0") metrics.metricValue("hermes_consumers_subscription_timeouts_total") == of("2.0") + metrics.metricValue("hermes_consumers_subscription_retries_total") == of("1.0") metrics.metricValue("hermes_consumers_subscription_throughput_bytes_total") == of("3.0") metrics.metricValue("hermes_consumers_subscription_other_errors_total") == of("4.0") metrics.metricValue("hermes_consumers_subscription_batches_total") == of("5.0") diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/full_response.json b/hermes-management/src/test/resources/prometheus-stubs/__files/full_response.json index 9a9f457be7..7f813819d0 100644 --- a/hermes-management/src/test/resources/prometheus-stubs/__files/full_response.json +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/full_response.json @@ -29,6 +29,19 @@ ], "group": 1 }, + { + "metric": { + "__name__": "hermes_consumers_subscription_retries_total", + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "1" + ], + "group": 1 + }, { "metric": { "__name__": "hermes_consumers_subscription_throughput_bytes_total", diff --git a/hermes-management/src/test/resources/prometheus-stubs/__files/partial_response.json b/hermes-management/src/test/resources/prometheus-stubs/__files/partial_response.json index a0f61afb2a..a2da55c2b5 100644 --- a/hermes-management/src/test/resources/prometheus-stubs/__files/partial_response.json +++ b/hermes-management/src/test/resources/prometheus-stubs/__files/partial_response.json @@ -16,6 +16,19 @@ ], "group": 1 }, + { + "metric": { + "__name__": "hermes_consumers_subscription_retries_total", + "group": "pl.allegro.tech.hermes", + "subscription": "hermesSubscription", + "topic": "hermesTopic" + }, + "value": [ + 1692281425.609, + "1" + ], + "group": 1 + }, { "metric": { "__name__": "hermes_consumers_subscription_throughput_bytes_total", diff --git a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusExtension.java b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusExtension.java index 78a8bb38a3..8daf23be19 100644 --- a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusExtension.java +++ b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/prometheus/PrometheusExtension.java @@ -52,7 +52,7 @@ public void stubSubscriptionMetrics(SubscriptionMetrics metrics) { String query = """ sum by (__name__, group, topic, subscription, status_code) ( irate( - {__name__=~'hermes_consumers_subscription_delivered_total|hermes_consumers_subscription_timeouts_total|hermes_consumers_subscription_throughput_bytes_total|hermes_consumers_subscription_other_errors_total|hermes_consumers_subscription_batches_total|hermes_consumers_subscription_http_status_codes_total', group='%s', topic='%s', subscription='%s', }[1m] + {__name__=~'hermes_consumers_subscription_delivered_total|hermes_consumers_subscription_timeouts_total|hermes_consumers_subscription_retries_total|hermes_consumers_subscription_throughput_bytes_total|hermes_consumers_subscription_other_errors_total|hermes_consumers_subscription_batches_total|hermes_consumers_subscription_http_status_codes_total', group='%s', topic='%s', subscription='%s', }[1m] ) keep_metric_names ) """