Skip to content

Commit

Permalink
Support Spring Boot's KafkaConnectionDetails for Kafka connections
Browse files Browse the repository at this point in the history
- Integrate KafkaConnectionDetails, a Spring Boot component, in binder
- Update KafkaBinderConfigurationProperties to use KafkaConnectionDetails
- Modify KafkaTopicProvisioner to leverage KafkaConnectionDetails
- Adjust Kafka binder configurations to pass KafkaConnectionDetails
- Update tests to accommodate KafkaConnectionDetails changes
- Add KafkaConnectionDetails to shared.beans for auto-configuration

This change improves flexibility in configuring Kafka connections,
allowing for better support of externalized configuration management
and aligning with Spring Boot's connection abstraction model.
  • Loading branch information
wilkinsona authored and sobychacko committed Sep 24, 2024
1 parent 4175513 commit b7f2f1b
Show file tree
Hide file tree
Showing 23 changed files with 200 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.ProducerProperties;
Expand Down Expand Up @@ -77,6 +79,8 @@ public class KafkaBinderConfigurationProperties {

private final KafkaProperties kafkaProperties;

private final KafkaConnectionDetails kafkaConnectionDetails;

/**
* Arbitrary kafka properties that apply to both producers and consumers.
*/
Expand Down Expand Up @@ -170,10 +174,12 @@ public class KafkaBinderConfigurationProperties {
* https://github.com/spring-projects/spring-boot/issues/35564
*
* @param kafkaProperties Spring Kafka properties autoconfigured by Spring Boot
* @param kafkaConnectionDetails Kafka connection details autoconfigured by Spring Boot
*/
public KafkaBinderConfigurationProperties(KafkaProperties kafkaProperties) {
public KafkaBinderConfigurationProperties(KafkaProperties kafkaProperties, ObjectProvider<KafkaConnectionDetails> kafkaConnectionDetails) {
Assert.notNull(kafkaProperties, "'kafkaProperties' cannot be null");
this.kafkaProperties = kafkaProperties;
this.kafkaConnectionDetails = kafkaConnectionDetails.getIfAvailable();
}

public KafkaProperties getKafkaProperties() {
Expand Down Expand Up @@ -395,6 +401,9 @@ public void setProducerProperties(Map<String, String> producerProperties) {
*/
public Map<String, Object> mergedConsumerConfiguration() {
Map<String, Object> consumerConfiguration = new HashMap<>(this.kafkaProperties.buildConsumerProperties(null));
if (this.kafkaConnectionDetails != null) {
consumerConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConnectionDetails.getConsumerBootstrapServers());
}
// Copy configured binder properties that apply to consumers
// allow schema registry properties to be propagated to consumer configuration
for (Map.Entry<String, String> configurationEntry : this.configuration
Expand All @@ -421,6 +430,9 @@ public Map<String, Object> mergedConsumerConfiguration() {
*/
public Map<String, Object> mergedProducerConfiguration() {
Map<String, Object> producerConfiguration = new HashMap<>(this.kafkaProperties.buildProducerProperties(null));
if (this.kafkaConnectionDetails != null) {
producerConfiguration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConnectionDetails.getProducerBootstrapServers());
}
// Copy configured binder properties that apply to producers
for (Map.Entry<String, String> configurationEntry : this.configuration
.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
Expand Down Expand Up @@ -122,10 +123,26 @@ public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties,
AdminClientConfigCustomizer adminClientConfigCustomizer) {
this(kafkaBinderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer != null ?
this(kafkaBinderConfigurationProperties, kafkaProperties, null, adminClientConfigCustomizer != null ?
Arrays.asList(adminClientConfigCustomizer) : new ArrayList<>());
}

/**
* Create an instance.
* @param kafkaBinderConfigurationProperties the binder configuration properties.
* @param kafkaProperties the boot Kafka properties used to build the instance.
* @parak kafkaConnectionDetails the Kafka connection details used to build the instance
* @param adminClientConfigCustomizer to customize {@link AdminClient}.
* @since 4.1.4
*/
public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties, KafkaConnectionDetails kafkaConnectionDetails,
AdminClientConfigCustomizer adminClientConfigCustomizer) {
this(kafkaBinderConfigurationProperties, kafkaProperties, kafkaConnectionDetails,
adminClientConfigCustomizer != null ? Arrays.asList(adminClientConfigCustomizer) : new ArrayList<>());
}

/**
* Create an instance.
*
Expand All @@ -138,17 +155,41 @@ public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties,
List<AdminClientConfigCustomizer> adminClientConfigCustomizers) {
this(kafkaBinderConfigurationProperties, kafkaProperties, null, adminClientConfigCustomizers);
}

/**
* Create an instance.
*
* @param kafkaBinderConfigurationProperties the binder configuration properties.
* @param kafkaProperties the boot Kafka properties used to build the instance.
* @param kafkaConnectionDetails the Kafka connection deatils used to build the instance.
* @param adminClientConfigCustomizers to customize {@link AdminClient}.
* @since 4.1.4
*/
public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties, KafkaConnectionDetails kafkaConnectionDetails,
List<AdminClientConfigCustomizer> adminClientConfigCustomizers) {

Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
this.configurationProperties = kafkaBinderConfigurationProperties;
this.adminClientProperties = kafkaProperties.buildAdminProperties(null);
this.adminClientProperties = createAdminClientProperties(kafkaProperties, kafkaConnectionDetails);
normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties,
kafkaBinderConfigurationProperties);
// If the application provides AdminConfig customizers
// and overrides properties, those take precedence.
adminClientConfigCustomizers.forEach(customizer -> customizer.configure(this.adminClientProperties));
}

private Map<String, Object> createAdminClientProperties(KafkaProperties properties, KafkaConnectionDetails connectionDetails) {
Map<String, Object> adminProperties = properties.buildAdminProperties(null);
if (connectionDetails != null) {
adminProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getAdminBootstrapServers());
}
return adminProperties;
}

/**
* Return an unmodifiable map of merged admin properties.
* @return the properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@
import org.assertj.core.util.Files;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.core.io.ClassPathResource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

class KafkaBinderConfigurationPropertiesTest {

@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getConsumer().setGroupId("group1");
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));

Map<String, Object> mergedConsumerConfiguration =
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
Expand All @@ -49,11 +52,12 @@ void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() {
}

@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getConsumer().setEnableAutoCommit(true);
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));

Map<String, Object> mergedConsumerConfiguration =
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
Expand All @@ -62,10 +66,11 @@ void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() {
}

@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
kafkaBinderConfigurationProperties
.setConfiguration(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));

Expand All @@ -75,10 +80,11 @@ void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationProper
}

@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
kafkaBinderConfigurationProperties
.setConfiguration(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));

Expand All @@ -88,10 +94,11 @@ void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurat
}

@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConsumerProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
kafkaBinderConfigurationProperties
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));

Expand All @@ -101,10 +108,11 @@ void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationProper
}

@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConsumerProps() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
kafkaBinderConfigurationProperties
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));

Expand All @@ -114,10 +122,11 @@ void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurat
}

@Test
@SuppressWarnings("unchecked")
void certificateFilesAreConvertedToAbsolutePathsFromClassPathResources() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
configuration.put("ssl.truststore.location", "classpath:testclient.truststore");
configuration.put("ssl.keystore.location", "classpath:testclient.keystore");
Expand All @@ -132,6 +141,7 @@ void certificateFilesAreConvertedToAbsolutePathsFromClassPathResources() {
}

@Test
@SuppressWarnings("unchecked")
void certificateFilesAreConvertedToAbsolutePathsFromHttpResources() throws IOException {
HttpServer server = HttpServer.create(new InetSocketAddress("localhost", 5869), 0);
createContextWithCertFileHandler(server, "testclient.truststore");
Expand All @@ -141,7 +151,7 @@ void certificateFilesAreConvertedToAbsolutePathsFromHttpResources() throws IOExc

KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
configuration.put("ssl.truststore.location", "http://localhost:5869/testclient.truststore");
configuration.put("ssl.keystore.location", "http://localhost:5869/testclient.keystore");
Expand All @@ -164,10 +174,11 @@ void certificateFilesAreConvertedToAbsolutePathsFromHttpResources() throws IOExc
}

@Test
@SuppressWarnings("unchecked")
void certificateFilesAreConvertedToGivenAbsolutePathsFromClassPathResources() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
configuration.put("ssl.truststore.location", "classpath:testclient.truststore");
configuration.put("ssl.keystore.location", "classpath:testclient.keystore");
Expand All @@ -182,10 +193,11 @@ void certificateFilesAreConvertedToGivenAbsolutePathsFromClassPathResources() {
}

@Test
@SuppressWarnings("unchecked")
void certificateFilesAreMovedForSchemaRegistryConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();

configuration.put("schema.registry.ssl.truststore.location", "classpath:testclient.truststore");
Expand Down Expand Up @@ -213,10 +225,11 @@ void certificateFilesAreMovedForSchemaRegistryConfiguration() {
}

@Test
@SuppressWarnings("unchecked")
void schemaRegistryPropertiesPropagatedToMergedProducerProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();

configuration.put("schema.registry.url", "https://localhost:8081,https://localhost:8082");
Expand Down Expand Up @@ -254,10 +267,11 @@ void schemaRegistryPropertiesPropagatedToMergedProducerProperties() {
}

@Test
@SuppressWarnings("unchecked")
public void testEmptyLocationsAreIgnored() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
configuration.put("schema.registry.ssl.truststore.location", "");
configuration.put("schema.registry.ssl.keystore.location", "");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,13 +27,15 @@
import org.apache.kafka.common.network.SslChannelBuilder;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.core.io.ClassPathResource;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.mock;

/**
* @author Gary Russell
Expand All @@ -44,15 +46,15 @@ class KafkaTopicProvisionerTests {

AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar");

@SuppressWarnings("rawtypes")
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void bootPropertiesOverriddenExceptServers() throws Exception {
KafkaProperties bootConfig = new KafkaProperties();
bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"PLAINTEXT");
bootConfig.setBootstrapServers(Collections.singletonList("localhost:1234"));
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
bootConfig);
bootConfig, mock(ObjectProvider.class));
binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"SSL");
ClassPathResource ts = new ClassPathResource("test.truststore.ks");
Expand All @@ -73,15 +75,15 @@ void bootPropertiesOverriddenExceptServers() throws Exception {
adminClient.close();
}

@SuppressWarnings("rawtypes")
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void bootPropertiesOverriddenIncludingServers() throws Exception {
KafkaProperties bootConfig = new KafkaProperties();
bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"PLAINTEXT");
bootConfig.setBootstrapServers(Collections.singletonList("localhost:9092"));
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
bootConfig);
bootConfig, mock(ObjectProvider.class));
binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"SSL");
ClassPathResource ts = new ClassPathResource("test.truststore.ks");
Expand All @@ -102,10 +104,11 @@ void bootPropertiesOverriddenIncludingServers() throws Exception {
}

@Test
@SuppressWarnings("unchecked")
void brokersInvalid() throws Exception {
KafkaProperties bootConfig = new KafkaProperties();
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
bootConfig);
bootConfig, mock(ObjectProvider.class));
binderConfig.getConfiguration().put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"localhost:1234");
try {
Expand Down
Loading

0 comments on commit b7f2f1b

Please sign in to comment.