From 07e7d81e2db5241d01d6c9ee3a96cead213a44b1 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 17 Jul 2024 16:19:12 +0200 Subject: [PATCH] GH-2971 Properly handle Kafka tombstone payload conversion See the corresponding commit in s-c-function Resolves #2971 --- .../function/FunctionBatchingTests.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/FunctionBatchingTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/FunctionBatchingTests.java index 641d6f28c..ede3ea9d1 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/FunctionBatchingTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/FunctionBatchingTests.java @@ -45,6 +45,23 @@ */ class FunctionBatchingTests { + @Test + void messageWithKafkaNull() { + TestChannelBinderConfiguration.applicationContextRunner(KafkaNullConfiguration.class) + .withPropertyValues("spring.cloud.stream.function.definition=myFunction").run(context -> { + InputDestination inputDestination = context.getBean(InputDestination.class); + OutputDestination outputDestination = context.getBean(OutputDestination.class); + + var message = MessageBuilder.withPayload(KafkaNull.INSTANCE).build(); + inputDestination.send(message); + + Object kn = outputDestination.receive().getPayload(); + + assertThat(kn).isInstanceOf(KafkaNull.class); + context.stop(); + }); + } + @Test void messageBatchConfigurationWithKafkaNull() { TestChannelBinderConfiguration.applicationContextRunner(MessageBatchConfiguration.class) @@ -300,4 +317,12 @@ public void setName(String name) { } + @EnableAutoConfiguration + public static class KafkaNullConfiguration { + @Bean + public Function, Message> myFunction() { + return v -> MessageBuilder.withPayload(KafkaNull.INSTANCE).build(); + } + } + }