Skip to content

Commit

Permalink
Fix producer with incorrect ACK level being returned for remote sender (
Browse files Browse the repository at this point in the history
#1859)

* fix incorrect producer being returned for remote

* use remote dc name constant
  • Loading branch information
moscicky authored May 10, 2024
1 parent 0a6cfd7 commit 69df8b0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ KafkaMessageSender<byte[], byte[]> get(Topic topic) {
}

List<KafkaMessageSender<byte[], byte[]>> getRemote(Topic topic) {
return topic.isReplicationConfirmRequired() ? remoteAckLeader : remoteAckAll;
return topic.isReplicationConfirmRequired() ? remoteAckAll : remoteAckLeader;
}

List<String> getDatacenters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.testcontainers.lifecycle.Startable;
import pl.allegro.tech.hermes.api.PublishingChaosPolicy;
import pl.allegro.tech.hermes.api.PublishingChaosPolicy.ChaosMode;
import pl.allegro.tech.hermes.api.PublishingChaosPolicy.ChaosPolicy;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.integrationtests.assertions.PrometheusMetricsAssertion;
import pl.allegro.tech.hermes.integrationtests.setup.HermesConsumersTestApp;
import pl.allegro.tech.hermes.integrationtests.setup.HermesFrontendTestApp;
import pl.allegro.tech.hermes.integrationtests.setup.HermesManagementTestApp;
Expand Down Expand Up @@ -47,6 +47,7 @@ public class RemoteDatacenterProduceFallbackTest {
private static HermesConsumersTestApp consumerDC2;

private static HermesTestClient DC1;
private static final String REMOTE_DC_NAME = "dc2";

@BeforeAll
public static void setup() {
Expand All @@ -55,13 +56,13 @@ public static void setup() {
.forEach(HermesDatacenter::startKafkaAndZookeeper);
schemaRegistry.start();
management = new HermesManagementTestApp(
Map.of(DEFAULT_DC_NAME, dc1.hermesZookeeper, "dc2", dc2.hermesZookeeper),
Map.of(DEFAULT_DC_NAME, dc1.kafka, "dc2", dc2.kafka),
Map.of(DEFAULT_DC_NAME, dc1.hermesZookeeper, REMOTE_DC_NAME, dc2.hermesZookeeper),
Map.of(DEFAULT_DC_NAME, dc1.kafka, REMOTE_DC_NAME, dc2.kafka),
schemaRegistry
);
management.start();
frontendDC1 = new HermesFrontendTestApp(dc1.hermesZookeeper,
Map.of("dc", dc1.kafka, "dc2", dc2.kafka),
Map.of("dc", dc1.kafka, REMOTE_DC_NAME, dc2.kafka),
schemaRegistry
);
frontendDC1.start();
Expand Down Expand Up @@ -92,7 +93,7 @@ public static void clean() {
public void afterEach() {
Stream.of(dc1, dc2).forEach(dc -> dc.kafka.restoreConnectionsBetweenBrokersAndClients());
DC1.setReadiness(DEFAULT_DC_NAME, true);
DC1.setReadiness("dc2", true);
DC1.setReadiness(REMOTE_DC_NAME, true);
}

@Test
Expand All @@ -104,6 +105,8 @@ public void shouldPublishAndConsumeViaRemoteDCWhenLocalKafkaIsUnavailable() {
subscription(topic.getQualifiedName(), "subscription", subscriber.getEndpoint()).build()
);

double remoteDCInitialSendTotal = assertRemoteDCSendTotalMetric().withInitialValue();

// when dc1 is not available
dc1.kafka.cutOffConnectionsBetweenBrokersAndClients();

Expand All @@ -119,14 +122,17 @@ public void shouldPublishAndConsumeViaRemoteDCWhenLocalKafkaIsUnavailable() {
.expectStatus()
.isOk()
.expectBody(String.class)
.value((body) -> assertThatMetrics(body)
.contains("hermes_frontend_topic_published_total")
.withLabels(
"group", topic.getName().getGroupName(),
"topic", topic.getName().getName(),
"storageDc", "dc2"
)
.withValue(1.0)
.value((body) -> {
assertThatMetrics(body)
.contains("hermes_frontend_topic_published_total")
.withLabels(
"group", topic.getName().getGroupName(),
"topic", topic.getName().getName(),
"storageDc", REMOTE_DC_NAME
)
.withValue(1.0);
assertRemoteDCSendTotalMetric().withValueGreaterThan(remoteDCInitialSendTotal);
}
);
}

Expand Down Expand Up @@ -162,7 +168,7 @@ public void shouldNotFallBackToNotReadyDatacenter() {

// when local datacenter is not available and remote is not ready
dc1.kafka.cutOffConnectionsBetweenBrokersAndClients();
DC1.setReadiness("dc2", false);
DC1.setReadiness(REMOTE_DC_NAME, false);

// and message is published
TestMessage message = TestMessage.of("key1", "value1");
Expand Down Expand Up @@ -204,7 +210,7 @@ public void shouldPublishAndConsumeViaRemoteDCWhenChaosExperimentIsEnabledForLoc
.withLabels(
"group", topic.getName().getGroupName(),
"topic", topic.getName().getName(),
"storageDc", "dc2"
"storageDc", REMOTE_DC_NAME
)
.withValue(1.0)
);
Expand Down Expand Up @@ -279,4 +285,15 @@ void stop() {
.forEach(Startable::stop);
}
}

PrometheusMetricsAssertion.PrometheusMetricAssertion assertRemoteDCSendTotalMetric() {
return assertThatMetrics(DC1
.getFrontendMetrics().expectStatus().isOk()
.expectBody(String.class).returnResult().getResponseBody())
.contains("hermes_frontend_kafka_producer_ack_leader_record_send_total")
.withLabels(
"storageDc", REMOTE_DC_NAME,
"sender", "failFast"
);
}
}

0 comments on commit 69df8b0

Please sign in to comment.