From 37aac8a96f9beede1622ffc3dbf4b45edca849e8 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Tue, 13 Aug 2024 22:17:16 -0700 Subject: [PATCH] Fix the SortBuffer's noMoreInput called twice when enable smj (#10614) Summary: We encountered an exception while executing Q22 on a 1TB TPC-H dataset using sort merge join. The issue arises because the SortBuffer#noMoreInput() method is invoked multiple times. By eliminating this redundant check, Q22 executes successfully. ``` Caused by: org.apache.gluten.exception.GlutenException: Exception: VeloxRuntimeError Error Source: RUNTIME Error Code: INVALID_STATE Retriable: False Expression: !noMoreInput_ Context: Operator: OrderBy[1] 1 Function: noMoreInput File: /mnt/DP_disk3/jk/projects/gluten/ep/build-velox/build/velox_ep/velox/exec/SortBuffer.cpp Line: 101 Stack trace: # 0 facebook::velox::VeloxException::VeloxException(char const*, unsigned long, char const*, std::basic_string_view >, std::basic_string_view >, std::basic_string_view >, std::basic_string_view >, bool, facebook::velox::VeloxException::Type, std::basic_string_view >) # 1 void facebook::velox::detail::veloxCheckFail(facebook::velox::detail::VeloxCheckFailArgs const&, facebook::velox::detail::CompileTimeEmptyString) # 2 facebook::velox::exec::SortBuffer::noMoreInput() # 3 facebook::velox::exec::OrderBy::noMoreInput() # 4 facebook::velox::exec::Driver::runInternal(std::shared_ptr&, std::shared_ptr&, std::shared_ptr&) # 5 facebook::velox::exec::Driver::next(std::shared_ptr&) # 6 facebook::velox::exec::Task::next(folly::SemiFuture*) ``` Pull Request resolved: https://github.com/facebookincubator/velox/pull/10614 Reviewed By: tanjialiang Differential Revision: D61219673 Pulled By: xiaoxmeng fbshipit-source-id: f854cccac3a4ee990b30a5ac5910658b55ea3a42 --- velox/exec/MergeJoin.cpp | 8 +++++++- velox/exec/tests/MergeJoinTest.cpp | 32 ++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/velox/exec/MergeJoin.cpp b/velox/exec/MergeJoin.cpp index 194be412df52..08359dd05ca9 100644 --- a/velox/exec/MergeJoin.cpp +++ b/velox/exec/MergeJoin.cpp @@ -646,7 +646,13 @@ RowVectorPtr MergeJoin::getOutput() { // No rows survived the filter. Get more rows. continue; } else if (isAntiJoin(joinType_)) { - return filterOutputForAntiJoin(output); + output = filterOutputForAntiJoin(output); + if (output) { + return output; + } + + // No rows survived the filter for anti join. Get more rows. + continue; } else { return output; } diff --git a/velox/exec/tests/MergeJoinTest.cpp b/velox/exec/tests/MergeJoinTest.cpp index a91e62ca7b17..48f1e5700a2d 100644 --- a/velox/exec/tests/MergeJoinTest.cpp +++ b/velox/exec/tests/MergeJoinTest.cpp @@ -770,6 +770,38 @@ TEST_F(MergeJoinTest, antiJoinWithFilter) { "SELECT t0 FROM t WHERE NOT exists (select 1 from u where t0 = u0 AND t.t0 > 2 ) "); } +TEST_F(MergeJoinTest, antiJoinFailed) { + auto size = 1'00; + auto left = makeRowVector( + {"t0"}, {makeFlatVector(size, [](auto row) { return row; })}); + + auto right = makeRowVector( + {"u0"}, {makeFlatVector(size, [](auto row) { return row; })}); + + createDuckDbTable("t", {left}); + createDuckDbTable("u", {right}); + + // Anti join. + auto planNodeIdGenerator = std::make_shared(); + auto plan = + PlanBuilder(planNodeIdGenerator) + .values(split(left, 10)) + .orderBy({"t0"}, false) + .mergeJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator).values({right}).planNode(), + "", + {"t0"}, + core::JoinType::kAnti) + .planNode(); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kMaxOutputBatchRows, "10") + .assertResults( + "SELECT t0 FROM t WHERE NOT exists (select 1 from u where t0 = u0) "); +} + TEST_F(MergeJoinTest, antiJoinWithTwoJoinKeys) { auto left = makeRowVector( {"a", "b"},