From 2ee7f309af079bb60e8939e146d8991780a55c4a Mon Sep 17 00:00:00 2001 From: Satadru Pan Date: Thu, 3 Oct 2024 12:46:01 -0700 Subject: [PATCH] De-flake MultiFragmentTest::exchangeStatsOnFailure (#11094) Summary: The test is flaky for two independent reasons: a) The producer task is writing a large payload may take more than 3sec (timeout value) to finish. The solution is decrease the payload. b) The second reason for the flakiness is a bug in ExchangeClient. ExchangeClient::close() was not correctly closing all the ExchangeSources (it keeps shared_ptr of ExchangeSource in a queue). As a result, Task::Close()-->ExchangeClient::close() doesn't cleanup the memory if there is another shared_ptr of same ExchangeClient is alive. Another ExchangeClient ptr can be alive as ExchangeClient::request() passes shared_from_this() to a future. Reviewed By: xiaoxmeng Differential Revision: D63374267 --- velox/exec/ExchangeClient.cpp | 4 ++++ velox/exec/tests/MultiFragmentTest.cpp | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/velox/exec/ExchangeClient.cpp b/velox/exec/ExchangeClient.cpp index 98d4369582b2..b37b50cca361 100644 --- a/velox/exec/ExchangeClient.cpp +++ b/velox/exec/ExchangeClient.cpp @@ -70,6 +70,8 @@ void ExchangeClient::noMoreRemoteTasks() { void ExchangeClient::close() { std::vector> sources; + std::queue producingSources; + std::queue> emptySources; { std::lock_guard l(queue_->mutex()); if (closed_) { @@ -77,6 +79,8 @@ void ExchangeClient::close() { } closed_ = true; sources = std::move(sources_); + producingSources = std::move(producingSources_); + emptySources = std::move(emptySources_); } // Outside of mutex. diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 57e42dfb64e4..0beaba37d2a0 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -1891,7 +1891,7 @@ DEBUG_ONLY_TEST_F(MultiFragmentTest, exchangeStatsOnFailure) { }); auto producerPlan = PlanBuilder() - .values({data}, false, 100) + .values({data}, false, 30) .partitionedOutput({}, 1) .planNode();