Skip to content

Commit

Permalink
Remove redundant delaySubscription from FunctionConfiguration
Browse files Browse the repository at this point in the history
Related to: spring-projects/spring-integration#9362

After the fix in Spring Integration: spring-projects/spring-integration@bdcb856
we ended up in a deadlock situation with a `beginPublishingTrigger` in the `FunctionConfiguration`
used for the `delaySubscription()` on an original `Publisher`.
The `FluxMessageChannel` uses its own `delaySubscription()` until the channel has its subscribers.
Apparently the logic before was with errors, so the `FluxMessageChannel` was marked as active
even if its subscriber is not ready yet, leading to famous `Dispatcher does not have subscribers` error.
So, looks like this `beginPublishingTrigger` was introduced back in days in Spring Cloud Stream
to mitigate that situation until we really emit a `BindingCreatedEvent`.

The deadlock (and the flaw, respectively) is with the `setupBindingTrigger()` method implementation
where `FluxMessageChannel` now "really" delays a subscription to the provided `Publisher`,
therefore not triggering that `Mono.create()` fulfilment immediately.
The `BindingCreatedEvent` arrives earlier, than we have a subscriber on the channel,
but `triggerRef.get()` is `null`, so we don't `success()` it and in the end don't subscribe
to an original `Publisher` since `delaySubscription()` on it is never completed.

Since `FunctionConfiguration` fully relies on `IntegrationFlow.from(Publisher)`,
which ends up with the mentioned  `FluxMessageChannel.subscribeTo()` and its own `delaySubscription()`
(which, in turn, apparently fixed now), we don't need our own `delaySubscription()` any more.
Therefore the fix in this PR is to propose to remove `beginPublishingTrigger` logic altogether.
  • Loading branch information
artembilan committed Jul 26, 2024
1 parent f356599 commit 3b5bb2e
Showing 1 changed file with 6 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.function.Tuples;

import org.springframework.beans.BeansException;
Expand All @@ -65,7 +64,6 @@
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.ProducerProperties;
Expand Down Expand Up @@ -129,6 +127,7 @@
* @author Byungjun You
* @author Ivan Shapoval
* @author Patrik Péter Süli
* @author Artem Bilan
* @since 2.1
*/
@Lazy(false)
Expand Down Expand Up @@ -224,8 +223,6 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc
functionWrapper = functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0]));
}

Publisher<Object> beginPublishingTrigger = setupBindingTrigger(context);

if (!functionProperties.isComposeFrom() && !functionProperties.isComposeTo()) {
String integrationFlowName = proxyFactory.getFunctionDefinition() + "_integrationflow";

Expand All @@ -239,7 +236,7 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc

if (functionWrapper != null) {
IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(functionWrapper, context, producerProperties),
beginPublishingTrigger, pollable, context, taskScheduler, producerProperties, outputName)
pollable, context, taskScheduler, producerProperties, outputName)
.route(Message.class, message -> {
if (message.getHeaders().get("spring.cloud.stream.sendto.destination") != null) {
String destinationName = (String) message.getHeaders().get("spring.cloud.stream.sendto.destination");
Expand All @@ -253,7 +250,7 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc
}
else {
IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(supplier, context, producerProperties),
beginPublishingTrigger, pollable, context, taskScheduler, producerProperties, outputName)
pollable, context, taskScheduler, producerProperties, outputName)
.channel(c -> c.direct())
.fluxTransform((Function<? super Flux<Message<Object>>, ? extends Publisher<Object>>) function)
.route(Message.class, message -> {
Expand All @@ -274,26 +271,9 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc
};
}


/*
* Creates a publishing trigger to ensure Supplier does not begin publishing until binding is created
*/
private Publisher<Object> setupBindingTrigger(GenericApplicationContext context) {
AtomicReference<MonoSink<Object>> triggerRef = new AtomicReference<>();
Publisher<Object> beginPublishingTrigger = Mono.create(triggerRef::set);
context.addApplicationListener(event -> {
if (event instanceof BindingCreatedEvent) {
if (triggerRef.get() != null) {
triggerRef.get().success();
}
}
});
return beginPublishingTrigger;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier,
Publisher<Object> beginPublishingTrigger, PollableBean pollable, GenericApplicationContext context,
PollableBean pollable, GenericApplicationContext context,
TaskScheduler taskScheduler, ProducerProperties producerProperties, String bindingName) {

IntegrationFlowBuilder integrationFlowBuilder;
Expand All @@ -309,8 +289,8 @@ private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> s
if (pollable == null && reactive) {
Publisher publisher = (Publisher) supplier.get();
publisher = publisher instanceof Mono
? ((Mono) publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary)
: ((Flux) publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary);
? ((Mono) publisher).map(this::wrapToMessageIfNecessary)
: ((Flux) publisher).map(this::wrapToMessageIfNecessary);

integrationFlowBuilder = IntegrationFlow.from(publisher);

Expand Down

0 comments on commit 3b5bb2e

Please sign in to comment.