Skip to content

Commit

Permalink
Fix the right join result mismatch issue (#11027)
Browse files Browse the repository at this point in the history
Summary:
Right join matches rows in the same way as left join does, which can lead to missing rows on the right side when applying filters https://github.com/facebookincubator/velox/blob/main/velox/exec/MergeJoin.h#L326-L345. This PR has added addToOutputForRightJoin and addToOutputForLeftJoin to distinguish between different scenarios for left join and right join.

Pull Request resolved: #11027

Reviewed By: pedroerp

Differential Revision: D63344019

Pulled By: Yuhta

fbshipit-source-id: 18dfcb6cceb78b729af8df721a52d993318d76d3
  • Loading branch information
JkSelf authored and facebook-github-bot committed Sep 25, 2024
1 parent 94b485f commit 7483a76
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 10 deletions.
100 changes: 96 additions & 4 deletions velox/exec/MergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,14 @@ bool MergeJoin::prepareOutput(
}

bool MergeJoin::addToOutput() {
if (isRightJoin(joinType_) || isRightSemiFilterJoin(joinType_)) {
return addToOutputForRightJoin();
} else {
return addToOutputForLeftJoin();
}
}

bool MergeJoin::addToOutputForLeftJoin() {
size_t firstLeftBatch;
vector_size_t leftStartIndex;
if (leftMatch_->cursor) {
Expand Down Expand Up @@ -552,10 +560,8 @@ bool MergeJoin::addToOutput() {
// one match on the other side, we could explore specialized algorithms
// or data structures that short-circuit the join process once a match
// is found.
if (isLeftSemiFilterJoin(joinType_) ||
isRightSemiFilterJoin(joinType_)) {
if (isLeftSemiFilterJoin(joinType_)) {
// LeftSemiFilter produce each row from the left at most once.
// RightSemiFilter produce each row from the right at most once.
rightEnd = rightStart + 1;
}

Expand Down Expand Up @@ -587,6 +593,84 @@ bool MergeJoin::addToOutput() {
return outputSize_ == outputBatchSize_;
}

bool MergeJoin::addToOutputForRightJoin() {
size_t firstRightBatch;
vector_size_t rightStartIndex;
if (rightMatch_->cursor) {
firstRightBatch = rightMatch_->cursor->batchIndex;
rightStartIndex = rightMatch_->cursor->index;
} else {
firstRightBatch = 0;
rightStartIndex = rightMatch_->startIndex;
}

size_t numRights = rightMatch_->inputs.size();
for (size_t r = firstRightBatch; r < numRights; ++r) {
auto right = rightMatch_->inputs[r];
auto rightStart = r == firstRightBatch ? rightStartIndex : 0;
auto rightEnd = r == numRights - 1 ? rightMatch_->endIndex : right->size();

for (auto i = rightStart; i < rightEnd; ++i) {
auto firstLeftBatch =
(r == firstRightBatch && i == rightStart && leftMatch_->cursor)
? leftMatch_->cursor->batchIndex
: 0;

auto leftStartIndex =
(r == firstRightBatch && i == rightStart && leftMatch_->cursor)
? leftMatch_->cursor->index
: leftMatch_->startIndex;

auto numLefts = leftMatch_->inputs.size();
for (size_t l = firstLeftBatch; l < numLefts; ++l) {
auto left = leftMatch_->inputs[l];
auto leftStart = l == firstLeftBatch ? leftStartIndex : 0;
auto leftEnd = l == numLefts - 1 ? leftMatch_->endIndex : left->size();

if (prepareOutput(left, right)) {
output_->resize(outputSize_);
leftMatch_->setCursor(l, leftStart);
rightMatch_->setCursor(r, i);
return true;
}

// TODO: Since semi joins only require determining if there is at least
// one match on the other side, we could explore specialized algorithms
// or data structures that short-circuit the join process once a match
// is found.
if (isRightSemiFilterJoin(joinType_)) {
// RightSemiFilter produce each row from the right at most once.
leftEnd = leftStart + 1;
}

for (auto j = leftStart; j < leftEnd; ++j) {
if (outputSize_ == outputBatchSize_) {
// If we run out of space in the current output_, we will need to
// produce a buffer and continue processing left later. In this
// case, we cannot leave left as a lazy vector, since we cannot have
// two dictionaries wrapping the same lazy vector.
loadColumns(currentLeft_, *operatorCtx_->execCtx());
rightMatch_->setCursor(r, i);
leftMatch_->setCursor(l, j);
return true;
}
addOutputRow(left, j, right, i);
}
}
}
}

leftMatch_.reset();
rightMatch_.reset();

// If the current key match finished, but there are still records to be
// processed in the left, we need to load lazy vectors (see comment above).
if (rightInput_ && rightIndex_ != rightInput_->size()) {
loadColumns(currentLeft_, *operatorCtx_->execCtx());
}
return outputSize_ == outputBatchSize_;
}

namespace {
vector_size_t firstNonNull(
const RowVectorPtr& rowVector,
Expand Down Expand Up @@ -649,6 +733,7 @@ RowVectorPtr MergeJoin::getOutput() {
if (output != nullptr && output->size() > 0) {
if (filter_) {
output = applyFilter(output);

if (output != nullptr) {
for (const auto [channel, _] : filterInputToOutputChannel_) {
filterInput_->childAt(channel).reset();
Expand Down Expand Up @@ -689,7 +774,14 @@ RowVectorPtr MergeJoin::getOutput() {
if (isFullJoin(joinType_)) {
rightIndex_ = 0;
} else {
rightIndex_ = firstNonNull(rightInput_, rightKeys_);
auto firstNonNullIndex = firstNonNull(rightInput_, rightKeys_);
if (isRightJoin(joinType_) && firstNonNullIndex > 0) {
prepareOutput(nullptr, rightInput_);
for (auto i = 0; i < firstNonNullIndex; ++i) {
addOutputRowForRightJoin(rightInput_, i);
}
}
rightIndex_ = firstNonNullIndex;
if (rightIndex_ == rightInput_->size()) {
// Ran out of rows on the right side.
rightInput_ = nullptr;
Expand Down
21 changes: 15 additions & 6 deletions velox/exec/MergeJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,23 @@ class MergeJoin : public Operator {
bool prepareOutput(const RowVectorPtr& left, const RowVectorPtr& right);

// Appends a cartesian product of the current set of matching rows, leftMatch_
// x rightMatch_, to output_. Returns true if output_ is full. Sets
// leftMatchCursor_ and rightMatchCursor_ if output_ filled up before all the
// rows were added. Fills up output starting from leftMatchCursor_ and
// rightMatchCursor_ positions if these are set. Clears leftMatch_ and
// rightMatch_ if all rows were added. Updates leftMatchCursor_ and
// rightMatchCursor_ if output_ filled up before all rows were added.
// x rightMatch_ for left join and rightMatch_ x leftMatch_ for right join, to
// output_. Returns true if output_ is full. Sets leftMatchCursor_ and
// rightMatchCursor_ if output_ filled up before all the rows were added.
// Fills up output starting from leftMatchCursor_ and rightMatchCursor_
// positions if these are set. Clears leftMatch_ and rightMatch_ if all rows
// were added. Updates leftMatchCursor_ and rightMatchCursor_ if output_
// filled up before all rows were added.
bool addToOutput();

// Appends the current set of matching rows, leftMatch_ x rightMatch_ for
// left.
bool addToOutputForLeftJoin();

// Appends the current set of matching rows, rightMatch_ x leftMatch_ for
// right.
bool addToOutputForRightJoin();

// Adds one row of output by writing to the indices of the output
// dictionaries. By default, this operator returns dictionaries wrapped around
// the input columns from the left and right. If `isRightFlattened_`, the
Expand Down
76 changes: 76 additions & 0 deletions velox/exec/tests/MergeJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,82 @@ TEST_F(MergeJoinTest, leftAndRightJoinFilter) {
}
}

TEST_F(MergeJoinTest, rightJoinWithDuplicateMatch) {
// Each row on the left side has at most one match on the right side.
auto left = makeRowVector(
{"a", "b"},
{
makeNullableFlatVector<int32_t>({1, 2, 2, 2, 3, 5, 6, std::nullopt}),
makeNullableFlatVector<double>(
{2.0, 100.0, 1.0, 1.0, 3.0, 1.0, 6.0, std::nullopt}),
});

auto right = makeRowVector(
{"c", "d"},
{
makeNullableFlatVector<int32_t>(
{0, 2, 2, 2, 2, 3, 4, 5, 7, std::nullopt}),
makeNullableFlatVector<double>(
{0.0, 3.0, -1.0, -1.0, 3.0, 2.0, 1.0, 3.0, 7.0, std::nullopt}),
});

createDuckDbTable("t", {left});
createDuckDbTable("u", {right});

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();

auto rightPlan =
PlanBuilder(planNodeIdGenerator)
.values({left})
.mergeJoin(
{"a"},
{"c"},
PlanBuilder(planNodeIdGenerator).values({right}).planNode(),
"b < d",
{"a", "b", "c", "d"},
core::JoinType::kRight)
.planNode();
AssertQueryBuilder(rightPlan, duckDbQueryRunner_)
.assertResults("SELECT * from t RIGHT JOIN u ON a = c AND b < d");
}

TEST_F(MergeJoinTest, rightJoinFilterWithNull) {
auto left = makeRowVector(
{"a", "b"},
{
makeNullableFlatVector<int32_t>({std::nullopt, std::nullopt}),
makeNullableFlatVector<double>({std::nullopt, std::nullopt}),
});

auto right = makeRowVector(
{"c", "d"},
{
makeNullableFlatVector<int32_t>(
{std::nullopt, std::nullopt, std::nullopt}),
makeNullableFlatVector<double>(
{std::nullopt, std::nullopt, std::nullopt}),
});

createDuckDbTable("t", {left});
createDuckDbTable("u", {right});

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();

auto rightPlan =
PlanBuilder(planNodeIdGenerator)
.values({left})
.mergeJoin(
{"a"},
{"c"},
PlanBuilder(planNodeIdGenerator).values({right}).planNode(),
"b < d",
{"a", "b", "c", "d"},
core::JoinType::kRight)
.planNode();
AssertQueryBuilder(rightPlan, duckDbQueryRunner_)
.assertResults("SELECT * from t RIGHT JOIN u ON a = c AND b < d");
}

// Verify that both left-side and right-side pipelines feeding the merge join
// always run single-threaded.
TEST_F(MergeJoinTest, numDrivers) {
Expand Down

0 comments on commit 7483a76

Please sign in to comment.