From b7f2f1bb2d5339d625bfefb27de7f4df03c4103d Mon Sep 17 00:00:00 2001 From: Andy Wilkinson Date: Tue, 24 Sep 2024 11:55:24 +0100 Subject: [PATCH] Support Spring Boot's KafkaConnectionDetails for Kafka connections - Integrate KafkaConnectionDetails, a Spring Boot component, in binder - Update KafkaBinderConfigurationProperties to use KafkaConnectionDetails - Modify KafkaTopicProvisioner to leverage KafkaConnectionDetails - Adjust Kafka binder configurations to pass KafkaConnectionDetails - Update tests to accommodate KafkaConnectionDetails changes - Add KafkaConnectionDetails to shared.beans for auto-configuration This change improves flexibility in configuring Kafka connections, allowing for better support of externalized configuration management and aligning with Spring Boot's connection abstraction model. --- .../KafkaBinderConfigurationProperties.java | 14 +++++- .../provisioning/KafkaTopicProvisioner.java | 45 ++++++++++++++++++- ...afkaBinderConfigurationPropertiesTest.java | 38 +++++++++++----- .../KafkaTopicProvisionerTests.java | 15 ++++--- .../ReactorKafkaBinderConfiguration.java | 12 ++--- .../reactorkafka/ReactorKafkaBinderTests.java | 9 ++-- .../GlobalKTableBinderConfiguration.java | 9 ++-- .../streams/KStreamBinderConfiguration.java | 8 ++-- .../streams/KTableBinderConfiguration.java | 9 ++-- ...StreamsBinderSupportAutoConfiguration.java | 37 ++++++++++++--- .../MultiBinderPropertiesConfiguration.java | 8 ++-- ...aStreamsBinderConfigurationProperties.java | 8 ++-- ...reamsInteractiveQueryIntegrationTests.java | 8 +++- .../config/KafkaBinderConfiguration.java | 12 ++--- .../stream/binder/kafka/AdminConfigTests.java | 2 +- .../kafka/AutoCreateTopicDisabledTests.java | 8 +++- ...BinderAutoConfigurationPropertiesTest.java | 2 +- .../kafka/KafkaBinderConfigurationTest.java | 2 +- .../stream/binder/kafka/KafkaBinderTests.java | 4 +- .../binder/kafka/KafkaBinderUnitTests.java | 9 ++-- .../binder/kafka/KafkaTransactionTests.java | 5 ++- .../KafkaBinderMeterRegistryTest.java | 4 +- .../src/main/resources/META-INF/shared.beans | 3 +- 23 files changed, 200 insertions(+), 71 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java index bc03862793..2ac961023e 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java @@ -36,6 +36,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.cloud.stream.binder.HeaderMode; import org.springframework.cloud.stream.binder.ProducerProperties; @@ -77,6 +79,8 @@ public class KafkaBinderConfigurationProperties { private final KafkaProperties kafkaProperties; + private final KafkaConnectionDetails kafkaConnectionDetails; + /** * Arbitrary kafka properties that apply to both producers and consumers. */ @@ -170,10 +174,12 @@ public class KafkaBinderConfigurationProperties { * https://github.com/spring-projects/spring-boot/issues/35564 * * @param kafkaProperties Spring Kafka properties autoconfigured by Spring Boot + * @param kafkaConnectionDetails Kafka connection details autoconfigured by Spring Boot */ - public KafkaBinderConfigurationProperties(KafkaProperties kafkaProperties) { + public KafkaBinderConfigurationProperties(KafkaProperties kafkaProperties, ObjectProvider kafkaConnectionDetails) { Assert.notNull(kafkaProperties, "'kafkaProperties' cannot be null"); this.kafkaProperties = kafkaProperties; + this.kafkaConnectionDetails = kafkaConnectionDetails.getIfAvailable(); } public KafkaProperties getKafkaProperties() { @@ -395,6 +401,9 @@ public void setProducerProperties(Map producerProperties) { */ public Map mergedConsumerConfiguration() { Map consumerConfiguration = new HashMap<>(this.kafkaProperties.buildConsumerProperties(null)); + if (this.kafkaConnectionDetails != null) { + consumerConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConnectionDetails.getConsumerBootstrapServers()); + } // Copy configured binder properties that apply to consumers // allow schema registry properties to be propagated to consumer configuration for (Map.Entry configurationEntry : this.configuration @@ -421,6 +430,9 @@ public Map mergedConsumerConfiguration() { */ public Map mergedProducerConfiguration() { Map producerConfiguration = new HashMap<>(this.kafkaProperties.buildProducerProperties(null)); + if (this.kafkaConnectionDetails != null) { + producerConfiguration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConnectionDetails.getProducerBootstrapServers()); + } // Copy configured binder properties that apply to producers for (Map.Entry configurationEntry : this.configuration .entrySet()) { diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java index f236d0b7b7..99eb00eee9 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java @@ -55,6 +55,7 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.springframework.beans.factory.InitializingBean; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.cloud.stream.binder.BinderException; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; @@ -122,10 +123,26 @@ public KafkaTopicProvisioner( KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties, AdminClientConfigCustomizer adminClientConfigCustomizer) { - this(kafkaBinderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer != null ? + this(kafkaBinderConfigurationProperties, kafkaProperties, null, adminClientConfigCustomizer != null ? Arrays.asList(adminClientConfigCustomizer) : new ArrayList<>()); } + /** + * Create an instance. + * @param kafkaBinderConfigurationProperties the binder configuration properties. + * @param kafkaProperties the boot Kafka properties used to build the instance. + * @parak kafkaConnectionDetails the Kafka connection details used to build the instance + * @param adminClientConfigCustomizer to customize {@link AdminClient}. + * @since 4.1.4 + */ + public KafkaTopicProvisioner( + KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, + KafkaProperties kafkaProperties, KafkaConnectionDetails kafkaConnectionDetails, + AdminClientConfigCustomizer adminClientConfigCustomizer) { + this(kafkaBinderConfigurationProperties, kafkaProperties, kafkaConnectionDetails, + adminClientConfigCustomizer != null ? Arrays.asList(adminClientConfigCustomizer) : new ArrayList<>()); + } + /** * Create an instance. * @@ -138,10 +155,26 @@ public KafkaTopicProvisioner( KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties, List adminClientConfigCustomizers) { + this(kafkaBinderConfigurationProperties, kafkaProperties, null, adminClientConfigCustomizers); + } + + /** + * Create an instance. + * + * @param kafkaBinderConfigurationProperties the binder configuration properties. + * @param kafkaProperties the boot Kafka properties used to build the instance. + * @param kafkaConnectionDetails the Kafka connection deatils used to build the instance. + * @param adminClientConfigCustomizers to customize {@link AdminClient}. + * @since 4.1.4 + */ + public KafkaTopicProvisioner( + KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, + KafkaProperties kafkaProperties, KafkaConnectionDetails kafkaConnectionDetails, + List adminClientConfigCustomizers) { Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null"); this.configurationProperties = kafkaBinderConfigurationProperties; - this.adminClientProperties = kafkaProperties.buildAdminProperties(null); + this.adminClientProperties = createAdminClientProperties(kafkaProperties, kafkaConnectionDetails); normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties); // If the application provides AdminConfig customizers @@ -149,6 +182,14 @@ public KafkaTopicProvisioner( adminClientConfigCustomizers.forEach(customizer -> customizer.configure(this.adminClientProperties)); } + private Map createAdminClientProperties(KafkaProperties properties, KafkaConnectionDetails connectionDetails) { + Map adminProperties = properties.buildAdminProperties(null); + if (connectionDetails != null) { + adminProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getAdminBootstrapServers()); + } + return adminProperties; + } + /** * Return an unmodifiable map of merged admin properties. * @return the properties. diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java index d7184c1ea4..9a4360b464 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java @@ -28,19 +28,22 @@ import org.assertj.core.util.Files; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.core.io.ClassPathResource; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; class KafkaBinderConfigurationPropertiesTest { @Test + @SuppressWarnings("unchecked") void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() { KafkaProperties kafkaProperties = new KafkaProperties(); kafkaProperties.getConsumer().setGroupId("group1"); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); Map mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration(); @@ -49,11 +52,12 @@ void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() { } @Test + @SuppressWarnings("unchecked") void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() { KafkaProperties kafkaProperties = new KafkaProperties(); kafkaProperties.getConsumer().setEnableAutoCommit(true); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); Map mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration(); @@ -62,10 +66,11 @@ void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() { } @Test + @SuppressWarnings("unchecked") void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConfiguration() { KafkaProperties kafkaProperties = new KafkaProperties(); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); kafkaBinderConfigurationProperties .setConfiguration(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1")); @@ -75,10 +80,11 @@ void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationProper } @Test + @SuppressWarnings("unchecked") void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConfiguration() { KafkaProperties kafkaProperties = new KafkaProperties(); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); kafkaBinderConfigurationProperties .setConfiguration(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")); @@ -88,10 +94,11 @@ void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurat } @Test + @SuppressWarnings("unchecked") void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConsumerProperties() { KafkaProperties kafkaProperties = new KafkaProperties(); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); kafkaBinderConfigurationProperties .setConsumerProperties(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1")); @@ -101,10 +108,11 @@ void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationProper } @Test + @SuppressWarnings("unchecked") void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConsumerProps() { KafkaProperties kafkaProperties = new KafkaProperties(); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); kafkaBinderConfigurationProperties .setConsumerProperties(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")); @@ -114,10 +122,11 @@ void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurat } @Test + @SuppressWarnings("unchecked") void certificateFilesAreConvertedToAbsolutePathsFromClassPathResources() { KafkaProperties kafkaProperties = new KafkaProperties(); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); configuration.put("ssl.truststore.location", "classpath:testclient.truststore"); configuration.put("ssl.keystore.location", "classpath:testclient.keystore"); @@ -132,6 +141,7 @@ void certificateFilesAreConvertedToAbsolutePathsFromClassPathResources() { } @Test + @SuppressWarnings("unchecked") void certificateFilesAreConvertedToAbsolutePathsFromHttpResources() throws IOException { HttpServer server = HttpServer.create(new InetSocketAddress("localhost", 5869), 0); createContextWithCertFileHandler(server, "testclient.truststore"); @@ -141,7 +151,7 @@ void certificateFilesAreConvertedToAbsolutePathsFromHttpResources() throws IOExc KafkaProperties kafkaProperties = new KafkaProperties(); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); configuration.put("ssl.truststore.location", "http://localhost:5869/testclient.truststore"); configuration.put("ssl.keystore.location", "http://localhost:5869/testclient.keystore"); @@ -164,10 +174,11 @@ void certificateFilesAreConvertedToAbsolutePathsFromHttpResources() throws IOExc } @Test + @SuppressWarnings("unchecked") void certificateFilesAreConvertedToGivenAbsolutePathsFromClassPathResources() { KafkaProperties kafkaProperties = new KafkaProperties(); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); configuration.put("ssl.truststore.location", "classpath:testclient.truststore"); configuration.put("ssl.keystore.location", "classpath:testclient.keystore"); @@ -182,10 +193,11 @@ void certificateFilesAreConvertedToGivenAbsolutePathsFromClassPathResources() { } @Test + @SuppressWarnings("unchecked") void certificateFilesAreMovedForSchemaRegistryConfiguration() { KafkaProperties kafkaProperties = new KafkaProperties(); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); configuration.put("schema.registry.ssl.truststore.location", "classpath:testclient.truststore"); @@ -213,10 +225,11 @@ void certificateFilesAreMovedForSchemaRegistryConfiguration() { } @Test + @SuppressWarnings("unchecked") void schemaRegistryPropertiesPropagatedToMergedProducerProperties() { KafkaProperties kafkaProperties = new KafkaProperties(); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); configuration.put("schema.registry.url", "https://localhost:8081,https://localhost:8082"); @@ -254,10 +267,11 @@ void schemaRegistryPropertiesPropagatedToMergedProducerProperties() { } @Test + @SuppressWarnings("unchecked") public void testEmptyLocationsAreIgnored() { KafkaProperties kafkaProperties = new KafkaProperties(); KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = - new KafkaBinderConfigurationProperties(kafkaProperties); + new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); configuration.put("schema.registry.ssl.truststore.location", ""); configuration.put("schema.registry.ssl.keystore.location", ""); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisionerTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisionerTests.java index bd76aa50e2..4d1abcc013 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisionerTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisionerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import org.apache.kafka.common.network.SslChannelBuilder; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; import org.springframework.core.io.ClassPathResource; @@ -34,6 +35,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.mockito.Mockito.mock; /** * @author Gary Russell @@ -44,7 +46,7 @@ class KafkaTopicProvisionerTests { AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar"); - @SuppressWarnings("rawtypes") + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void bootPropertiesOverriddenExceptServers() throws Exception { KafkaProperties bootConfig = new KafkaProperties(); @@ -52,7 +54,7 @@ void bootPropertiesOverriddenExceptServers() throws Exception { "PLAINTEXT"); bootConfig.setBootstrapServers(Collections.singletonList("localhost:1234")); KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties( - bootConfig); + bootConfig, mock(ObjectProvider.class)); binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SSL"); ClassPathResource ts = new ClassPathResource("test.truststore.ks"); @@ -73,7 +75,7 @@ void bootPropertiesOverriddenExceptServers() throws Exception { adminClient.close(); } - @SuppressWarnings("rawtypes") + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void bootPropertiesOverriddenIncludingServers() throws Exception { KafkaProperties bootConfig = new KafkaProperties(); @@ -81,7 +83,7 @@ void bootPropertiesOverriddenIncludingServers() throws Exception { "PLAINTEXT"); bootConfig.setBootstrapServers(Collections.singletonList("localhost:9092")); KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties( - bootConfig); + bootConfig, mock(ObjectProvider.class)); binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SSL"); ClassPathResource ts = new ClassPathResource("test.truststore.ks"); @@ -102,10 +104,11 @@ void bootPropertiesOverriddenIncludingServers() throws Exception { } @Test + @SuppressWarnings("unchecked") void brokersInvalid() throws Exception { KafkaProperties bootConfig = new KafkaProperties(); KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties( - bootConfig); + bootConfig, mock(ObjectProvider.class)); binderConfig.getConfiguration().put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:1234"); try { diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java index e507b6a345..7aeef5220e 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java @@ -20,6 +20,7 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -62,16 +63,17 @@ public class ReactorKafkaBinderConfiguration { @Bean @ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder") KafkaBinderConfigurationProperties configurationProperties( - KafkaProperties kafkaProperties) { - return new KafkaBinderConfigurationProperties(kafkaProperties); + KafkaProperties kafkaProperties, ObjectProvider kafkaConnectionDetails) { + return new KafkaBinderConfigurationProperties(kafkaProperties, kafkaConnectionDetails); } @Bean KafkaTopicProvisioner provisioningProvider( KafkaBinderConfigurationProperties configurationProperties, - ObjectProvider adminClientConfigCustomizer, KafkaProperties kafkaProperties) { - return new KafkaTopicProvisioner(configurationProperties, - kafkaProperties, adminClientConfigCustomizer.getIfUnique()); + ObjectProvider adminClientConfigCustomizer, + KafkaProperties kafkaProperties, ObjectProvider kafkaConnectionDetails) { + return new KafkaTopicProvisioner(configurationProperties, kafkaProperties, + kafkaConnectionDetails.getIfAvailable(), adminClientConfigCustomizer.getIfUnique()); } @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java index e34c7d704a..8209e79fd0 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java @@ -79,7 +79,7 @@ void consumerBinding() throws Exception { KafkaProperties kafkaProperties = new KafkaProperties(); kafkaProperties.setBootstrapServers( Collections.singletonList(EmbeddedKafkaCondition.getBroker().getBrokersAsString())); - KafkaBinderConfigurationProperties binderProps = new KafkaBinderConfigurationProperties(kafkaProperties); + KafkaBinderConfigurationProperties binderProps = new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> { }); provisioner.setMetadataRetryOperations(new RetryTemplate()); @@ -144,7 +144,7 @@ void concurrency(String topic, String group, boolean atMostOnce) throws Exceptio KafkaProperties kafkaProperties = new KafkaProperties(); kafkaProperties.setBootstrapServers( Collections.singletonList(EmbeddedKafkaCondition.getBroker().getBrokersAsString())); - KafkaBinderConfigurationProperties binderProps = new KafkaBinderConfigurationProperties(kafkaProperties); + KafkaBinderConfigurationProperties binderProps = new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> { }); provisioner.setMetadataRetryOperations(new RetryTemplate()); @@ -225,7 +225,7 @@ void autoCommit() throws Exception { KafkaProperties kafkaProperties = new KafkaProperties(); kafkaProperties.setBootstrapServers( Collections.singletonList(EmbeddedKafkaCondition.getBroker().getBrokersAsString())); - KafkaBinderConfigurationProperties binderProps = new KafkaBinderConfigurationProperties(kafkaProperties); + KafkaBinderConfigurationProperties binderProps = new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> { }); provisioner.setMetadataRetryOperations(new RetryTemplate()); @@ -290,11 +290,12 @@ public void onComplete() { } @Test + @SuppressWarnings("unchecked") void producerBinding() throws InterruptedException { KafkaProperties kafkaProperties = new KafkaProperties(); kafkaProperties.setBootstrapServers( Collections.singletonList(EmbeddedKafkaCondition.getBroker().getBrokersAsString())); - KafkaBinderConfigurationProperties binderProps = new KafkaBinderConfigurationProperties(kafkaProperties); + KafkaBinderConfigurationProperties binderProps = new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class)); KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> { }); provisioner.setMetadataRetryOperations(new RetryTemplate()); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java index 8a6cc6d55b..2c8c4c5b87 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer; @@ -50,8 +51,10 @@ public class GlobalKTableBinderConfiguration { @Bean public KafkaTopicProvisioner provisioningProvider( KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, - KafkaProperties kafkaProperties, ObjectProvider adminClientConfigCustomizer) { - return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique()); + KafkaProperties kafkaProperties, ObjectProvider kafkaConnectionDetails, + ObjectProvider adminClientConfigCustomizer) { + return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, kafkaConnectionDetails.getIfAvailable(), + adminClientConfigCustomizer.getIfUnique()); } @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java index 93d4e269f4..9d324178e5 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer; @@ -48,9 +49,10 @@ public class KStreamBinderConfiguration { @Bean public KafkaTopicProvisioner provisioningProvider( KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, - KafkaProperties kafkaProperties, ObjectProvider adminClientConfigCustomizer) { + KafkaProperties kafkaProperties, ObjectProvider kafkaConnectionDetails, + ObjectProvider adminClientConfigCustomizer) { return new KafkaTopicProvisioner(kafkaStreamsBinderConfigurationProperties, - kafkaProperties, adminClientConfigCustomizer.getIfUnique()); + kafkaProperties, kafkaConnectionDetails.getIfAvailable(), adminClientConfigCustomizer.getIfUnique()); } @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java index 48f8543f9b..e76b9460ca 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer; @@ -50,8 +51,10 @@ public class KTableBinderConfiguration { @Bean public KafkaTopicProvisioner provisioningProvider( KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, - KafkaProperties kafkaProperties, ObjectProvider adminClientConfigCustomizer) { - return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique()); + KafkaProperties kafkaProperties, ObjectProvider kafkaConnectionDetails, + ObjectProvider adminClientConfigCustomizer) { + return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, kafkaConnectionDetails.getIfAvailable(), + adminClientConfigCustomizer.getIfUnique()); } @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java index fa65da9cfe..2af4d986c0 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.springframework.beans.BeanUtils; +import org.springframework.beans.BeansException; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.AutoConfigureAfter; @@ -37,6 +38,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -75,6 +77,8 @@ import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; + + /** * Kafka Streams binder configuration. * @@ -101,7 +105,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration { @Bean @ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder") public KafkaStreamsBinderConfigurationProperties binderConfigurationProperties( - KafkaProperties kafkaProperties, ConfigurableEnvironment environment, + KafkaProperties kafkaProperties, ObjectProvider kafkaConnectionDetails, ConfigurableEnvironment environment, BindingServiceProperties properties, ConfigurableApplicationContext context) throws Exception { final Map binderConfigurations = getBinderConfigurations( properties); @@ -121,16 +125,16 @@ public KafkaStreamsBinderConfigurationProperties binderConfigurationProperties( new PropertySourcesPlaceholdersResolver(environment), IntegrationUtils.getConversionService(context.getBeanFactory()), null); final Constructor kafkaStreamsBinderConfigurationPropertiesConstructor = - ReflectionUtils.accessibleConstructor(KafkaStreamsBinderConfigurationProperties.class, KafkaProperties.class); + ReflectionUtils.accessibleConstructor(KafkaStreamsBinderConfigurationProperties.class, KafkaProperties.class, ObjectProvider.class); final KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties = - BeanUtils.instantiateClass(kafkaStreamsBinderConfigurationPropertiesConstructor, kafkaProperties); + BeanUtils.instantiateClass(kafkaStreamsBinderConfigurationPropertiesConstructor, kafkaProperties, new EmptyObjectProvider<>()); final BindResult bind = binder.bind("spring.cloud.stream.kafka.streams.binder", Bindable.ofInstance(kafkaStreamsBinderConfigurationProperties)); context.getBeanFactory().registerSingleton( entry.getKey() + "-KafkaStreamsBinderConfigurationProperties", bind.get()); } } - return new KafkaStreamsBinderConfigurationProperties(kafkaProperties); + return new KafkaStreamsBinderConfigurationProperties(kafkaProperties, kafkaConnectionDetails); } // TODO: Lifted from core - good candidate for exposing as a utility method in core. @@ -449,4 +453,27 @@ public KafkaStreamsMicrometerListener binderStreamsListener(MeterRegistry meterR } } } + + private static class EmptyObjectProvider implements ObjectProvider { + + public T getObject() throws BeansException { + return null; + } + + @Override + public T getObject(Object... args) throws BeansException { + return null; + } + + @Override + public T getIfAvailable() throws BeansException { + return null; + } + + @Override + public T getIfUnique() throws BeansException { + return null; + } + + } } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/MultiBinderPropertiesConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/MultiBinderPropertiesConfiguration.java index c0a07f7a92..11d134f0dd 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/MultiBinderPropertiesConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/MultiBinderPropertiesConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,9 @@ package org.springframework.cloud.stream.binder.kafka.streams; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; @@ -34,7 +36,7 @@ public class MultiBinderPropertiesConfiguration { @Bean @ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder") @ConditionalOnBean(name = "outerContext") - public KafkaBinderConfigurationProperties binderConfigurationProperties(KafkaProperties kafkaProperties) { - return new KafkaStreamsBinderConfigurationProperties(kafkaProperties); + public KafkaBinderConfigurationProperties binderConfigurationProperties(KafkaProperties kafkaProperties, ObjectProvider kafkaConnectionDetails) { + return new KafkaStreamsBinderConfigurationProperties(kafkaProperties, kafkaConnectionDetails); } } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/properties/KafkaStreamsBinderConfigurationProperties.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/properties/KafkaStreamsBinderConfigurationProperties.java index 9698b0bc59..bfea3f638c 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/properties/KafkaStreamsBinderConfigurationProperties.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/properties/KafkaStreamsBinderConfigurationProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ import java.util.HashMap; import java.util.Map; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.streams.DeserializationExceptionHandler; @@ -32,8 +34,8 @@ public class KafkaStreamsBinderConfigurationProperties extends KafkaBinderConfigurationProperties { - public KafkaStreamsBinderConfigurationProperties(KafkaProperties kafkaProperties) { - super(kafkaProperties); + public KafkaStreamsBinderConfigurationProperties(KafkaProperties kafkaProperties, ObjectProvider kafkaConnectionDetails) { + super(kafkaProperties, kafkaConnectionDetails); } private String applicationId; diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java index 0d86a52205..f6aac471bc 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java @@ -44,6 +44,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -64,6 +65,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; import static org.mockito.internal.verification.VerificationModeFactory.times; /** @@ -96,6 +98,7 @@ public static void tearDown() { } @Test + @SuppressWarnings("unchecked") void stateStoreRetrievalRetriedOnFailure() { StreamsBuilderFactoryBean mock = Mockito.mock(StreamsBuilderFactoryBean.class); @@ -108,7 +111,7 @@ void stateStoreRetrievalRetriedOnFailure() { mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "fooApp"); Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties); KafkaStreamsBinderConfigurationProperties binderConfigurationProperties = - new KafkaStreamsBinderConfigurationProperties(new KafkaProperties()); + new KafkaStreamsBinderConfigurationProperties(new KafkaProperties(), mock(ObjectProvider.class)); binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3); InteractiveQueryService interactiveQueryService = new InteractiveQueryService(kafkaStreamsRegistry, binderConfigurationProperties); @@ -125,6 +128,7 @@ void stateStoreRetrievalRetriedOnFailure() { } @Test + @SuppressWarnings("unchecked") void hostInfoRetrievalRetriedOnFailure() { StreamsBuilderFactoryBean mock = Mockito.mock(StreamsBuilderFactoryBean.class); KafkaStreams mockKafkaStreams = Mockito.mock(KafkaStreams.class); @@ -136,7 +140,7 @@ void hostInfoRetrievalRetriedOnFailure() { mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "foobarApp-123"); Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties); KafkaStreamsBinderConfigurationProperties binderConfigurationProperties = - new KafkaStreamsBinderConfigurationProperties(new KafkaProperties()); + new KafkaStreamsBinderConfigurationProperties(new KafkaProperties(), mock(ObjectProvider.class)); binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3); InteractiveQueryService interactiveQueryService = new InteractiveQueryService(kafkaStreamsRegistry, binderConfigurationProperties); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java index 0d43aabe40..6e7632d68d 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2023 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -106,16 +107,17 @@ public class KafkaBinderConfiguration { @Bean @ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder") KafkaBinderConfigurationProperties configurationProperties( - KafkaProperties kafkaProperties) { - return new KafkaBinderConfigurationProperties(kafkaProperties); + KafkaProperties kafkaProperties, ObjectProvider kafkaConnectionDetails) { + return new KafkaBinderConfigurationProperties(kafkaProperties, kafkaConnectionDetails); } @Bean KafkaTopicProvisioner provisioningProvider( KafkaBinderConfigurationProperties configurationProperties, - ObjectProvider adminClientConfigCustomizers, KafkaProperties kafkaProperties) { + ObjectProvider adminClientConfigCustomizers, KafkaProperties kafkaProperties, + ObjectProvider kafkaConnectionDetails) { return new KafkaTopicProvisioner(configurationProperties, - kafkaProperties, adminClientConfigCustomizers.orderedStream().collect(Collectors.toList())); + kafkaProperties, kafkaConnectionDetails.getIfAvailable(), adminClientConfigCustomizers.orderedStream().collect(Collectors.toList())); } @SuppressWarnings({"rawtypes", "unchecked"}) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AdminConfigTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AdminConfigTests.java index 704b5b9a8a..9f54e0c15d 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AdminConfigTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AdminConfigTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AutoCreateTopicDisabledTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AutoCreateTopicDisabledTests.java index 972c39b858..3b99b97c79 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AutoCreateTopicDisabledTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AutoCreateTopicDisabledTests.java @@ -22,6 +22,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.cloud.stream.binder.BinderException; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; @@ -38,6 +39,7 @@ import org.springframework.retry.support.RetryTemplate; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; /** * @author Soby Chacko @@ -53,13 +55,14 @@ public static void setUp() { } @Test + @SuppressWarnings("unchecked") void autoCreateTopicDisabledFailsOnConsumerIfTopicNonExistentOnBroker() { KafkaProperties kafkaProperties = new TestKafkaProperties(); kafkaProperties.setBootstrapServers(Collections .singletonList(embeddedKafka.getBrokersAsString())); KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties( - kafkaProperties); + kafkaProperties, mock(ObjectProvider.class)); // disable auto create topic on the binder. configurationProperties.setAutoCreateTopics(false); @@ -82,6 +85,7 @@ void autoCreateTopicDisabledFailsOnConsumerIfTopicNonExistentOnBroker() { } @Test + @SuppressWarnings("unchecked") void autoCreateTopicDisabledFailsOnProducerIfTopicNonExistentOnBroker() { KafkaProperties kafkaProperties = new TestKafkaProperties(); @@ -89,7 +93,7 @@ void autoCreateTopicDisabledFailsOnProducerIfTopicNonExistentOnBroker() { .singletonList(embeddedKafka.getBrokersAsString())); KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties( - kafkaProperties); + kafkaProperties, mock(ObjectProvider.class)); // disable auto create topic on the binder. configurationProperties.setAutoCreateTopics(false); // reduce the wait time on the producer blocking operations. diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderAutoConfigurationPropertiesTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderAutoConfigurationPropertiesTest.java index 2fdd6d6b8a..fd44e0cc6c 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderAutoConfigurationPropertiesTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderAutoConfigurationPropertiesTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationTest.java index 1a4ee2f960..d56dd4859d 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index 3894e7378f..311499d382 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -76,6 +76,7 @@ import org.mockito.ArgumentMatchers; import org.springframework.beans.DirectFieldAccessor; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.BinderException; import org.springframework.cloud.stream.binder.BinderHeaders; @@ -263,9 +264,10 @@ kafkaBinderConfigurationProperties, new TestKafkaProperties(), prop -> { provisioningProvider, dlqPartitionFunction, dlqDestinationResolver); } + @SuppressWarnings("unchecked") private KafkaBinderConfigurationProperties createConfigurationProperties() { var binderConfiguration = new KafkaBinderConfigurationProperties( - new TestKafkaProperties()); + new TestKafkaProperties(), mock(ObjectProvider.class)); binderConfiguration.setBrokers(embeddedKafka.getBrokersAsString()); return binderConfiguration; } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java index ada5d12f1d..3499edb2c6 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.cloud.stream.binder.Binding; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; @@ -79,7 +80,7 @@ class KafkaBinderUnitTests { void propertyOverrides() throws Exception { KafkaProperties kafkaProperties = new TestKafkaProperties(); KafkaBinderConfigurationProperties binderConfigurationProperties = new KafkaBinderConfigurationProperties( - kafkaProperties); + kafkaProperties, mock(ObjectProvider.class)); KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( binderConfigurationProperties, kafkaProperties, prop -> { }); @@ -128,7 +129,7 @@ void mergedConsumerProperties() { bootProps.getConsumer().getProperties() .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "bar"); KafkaBinderConfigurationProperties props = new KafkaBinderConfigurationProperties( - bootProps); + bootProps, mock(ObjectProvider.class)); assertThat(props.mergedConsumerConfiguration() .get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("bar"); props.getConfiguration().put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "baz"); @@ -144,7 +145,7 @@ void mergedProducerProperties() { KafkaProperties bootProps = new TestKafkaProperties(); bootProps.getProducer().getProperties().put(ProducerConfig.RETRIES_CONFIG, "bar"); KafkaBinderConfigurationProperties props = new KafkaBinderConfigurationProperties( - bootProps); + bootProps, mock(ObjectProvider.class)); assertThat(props.mergedProducerConfiguration().get(ProducerConfig.RETRIES_CONFIG)) .isEqualTo("bar"); props.getConfiguration().put(ProducerConfig.RETRIES_CONFIG, "baz"); @@ -186,7 +187,7 @@ void testOffsetResetWithGroupManagement(final boolean earliest, partitions.add(new TopicPartition(topic, 0)); partitions.add(new TopicPartition(topic, 1)); KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties( - new TestKafkaProperties()); + new TestKafkaProperties(), mock(ObjectProvider.class)); KafkaTopicProvisioner provisioningProvider = mock(KafkaTopicProvisioner.class); ConsumerDestination dest = mock(ConsumerDestination.class); given(dest.getName()).willReturn(topic); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java index 5d7cf826b7..25a65a10b8 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test; import org.mockito.InOrder; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; @@ -75,7 +76,7 @@ void producerRunsInTx() { kafkaProperties.setBootstrapServers(Collections .singletonList(embeddedKafka.getBrokersAsString())); KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties( - kafkaProperties); + kafkaProperties, mock(ObjectProvider.class)); configurationProperties.getTransaction().setTransactionIdPrefix("foo-"); configurationProperties.getTransaction().getProducer().setUseNativeEncoding(true); KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderMeterRegistryTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderMeterRegistryTest.java index 7ab25cf129..c6dc9fb4b4 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderMeterRegistryTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderMeterRegistryTest.java @@ -28,7 +28,9 @@ import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLoggingListener; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.logging.LogLevel; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -102,7 +104,7 @@ void metricsWithSingleBinder() throws Exception { @Test void metricsWithMultiBinders() { ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class) - .web(WebApplicationType.NONE) + .web(WebApplicationType.NONE).initializers(ConditionEvaluationReportLoggingListener.forLogLevel(LogLevel.DEBUG)) .run("--spring.cloud.stream.bindings.uppercase-in-0.destination=inputTopic", "--spring.cloud.stream.bindings.uppercase-in-0.group=inputGroup", "--spring.cloud.stream.bindings.uppercase-in-0.binder=kafka1", diff --git a/core/spring-cloud-stream/src/main/resources/META-INF/shared.beans b/core/spring-cloud-stream/src/main/resources/META-INF/shared.beans index fe46b0a02d..8e94ab3d94 100644 --- a/core/spring-cloud-stream/src/main/resources/META-INF/shared.beans +++ b/core/spring-cloud-stream/src/main/resources/META-INF/shared.beans @@ -1,11 +1,12 @@ org.springframework.boot.autoconfigure.amqp.ConnectionFactoryCustomizer +org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails +org.springframework.boot.autoconfigure.kafka.StreamsBuilderFactoryBeanCustomizer org.springframework.cloud.stream.config.SpelExpressionConverterConfiguration$SpelConverter org.springframework.cloud.stream.config.ListenerContainerCustomizer org.springframework.cloud.stream.binder.kafka.ListenerContainerWithDlqAndRetryCustomizer org.springframework.cloud.stream.binder.kafka.support.ConsumerConfigCustomizer org.springframework.cloud.stream.binder.kafka.support.ProducerConfigCustomizer org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer -org.springframework.boot.autoconfigure.kafka.StreamsBuilderFactoryBeanCustomizer org.springframework.kafka.config.KafkaStreamsCustomizer org.springframework.rabbit.stream.listener.ConsumerCustomizer org.springframework.amqp.core.DeclarableCustomizer