Skip to content

Commit

Permalink
Enable more combinations in join fuzzer (#10676)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #10676

Refactoring the isSupported() method for MergeJoin and NestedLoopJoin
to centralize them, and enabling a few types not tested in JoinFuzzer today.

Reviewed By: xiaoxmeng

Differential Revision: D60797642

fbshipit-source-id: fcd4359777c4a7edbc67a1198142dba852bfc17b
  • Loading branch information
pedroerp authored and facebook-github-bot committed Aug 7, 2024
1 parent f40fa8a commit ad1f393
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 45 deletions.
59 changes: 56 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1093,10 +1093,50 @@ PlanNodePtr HashJoinNode::create(const folly::dynamic& obj, void* context) {
outputType);
}

MergeJoinNode::MergeJoinNode(
const PlanNodeId& id,
JoinType joinType,
const std::vector<FieldAccessTypedExprPtr>& leftKeys,
const std::vector<FieldAccessTypedExprPtr>& rightKeys,
TypedExprPtr filter,
PlanNodePtr left,
PlanNodePtr right,
RowTypePtr outputType)
: AbstractJoinNode(
id,
joinType,
leftKeys,
rightKeys,
std::move(filter),
std::move(left),
std::move(right),
std::move(outputType)) {
VELOX_USER_CHECK(
isSupported(joinType_),
"The join type is not supported by merge join: ",
joinTypeName(joinType_));
}

folly::dynamic MergeJoinNode::serialize() const {
return serializeBase();
}

// static
bool MergeJoinNode::isSupported(core::JoinType joinType) {
switch (joinType) {
case core::JoinType::kInner:
case core::JoinType::kLeft:
case core::JoinType::kRight:
case core::JoinType::kLeftSemiFilter:
case core::JoinType::kRightSemiFilter:
case core::JoinType::kAnti:
return true;

default:
return false;
}
}

// static
PlanNodePtr MergeJoinNode::create(const folly::dynamic& obj, void* context) {
auto sources = deserializeSources(obj, context);
Expand Down Expand Up @@ -1136,9 +1176,8 @@ NestedLoopJoinNode::NestedLoopJoinNode(
sources_({std::move(left), std::move(right)}),
outputType_(std::move(outputType)) {
VELOX_USER_CHECK(
core::isInnerJoin(joinType_) || core::isLeftJoin(joinType_) ||
core::isRightJoin(joinType_) || core::isFullJoin(joinType_),
"{} unsupported, NestedLoopJoin only supports inner and outer join",
isSupported(joinType_),
"The join type is not supported by nested loop join: ",
joinTypeName(joinType_));

auto leftType = sources_[0]->outputType();
Expand Down Expand Up @@ -1170,6 +1209,20 @@ NestedLoopJoinNode::NestedLoopJoinNode(
right,
outputType) {}

// static
bool NestedLoopJoinNode::isSupported(core::JoinType joinType) {
switch (joinType) {
case core::JoinType::kInner:
case core::JoinType::kLeft:
case core::JoinType::kRight:
case core::JoinType::kFull:
return true;

default:
return false;
}
}

void NestedLoopJoinNode::addDetails(std::stringstream& stream) const {
stream << joinTypeName(joinType_);
if (joinCondition_) {
Expand Down
23 changes: 9 additions & 14 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ namespace facebook::velox::core {

typedef std::string PlanNodeId;

/**
* Generic representation of InsertTable
*/
/// Generic representation of InsertTable
struct InsertTableHandle {
public:
InsertTableHandle(
Expand Down Expand Up @@ -1643,31 +1641,25 @@ class MergeJoinNode : public AbstractJoinNode {
TypedExprPtr filter,
PlanNodePtr left,
PlanNodePtr right,
RowTypePtr outputType)
: AbstractJoinNode(
id,
joinType,
leftKeys,
rightKeys,
std::move(filter),
std::move(left),
std::move(right),
std::move(outputType)) {}
RowTypePtr outputType);

std::string_view name() const override {
return "MergeJoin";
}

folly::dynamic serialize() const override;

/// If merge join supports this join type.
static bool isSupported(core::JoinType joinType);

static PlanNodePtr create(const folly::dynamic& obj, void* context);
};

/// Represents inner/outer nested loop joins. Translates to an
/// exec::NestedLoopJoinProbe and exec::NestedLoopJoinBuild. A separate pipeline
/// is produced for the build side when generating exec::Operators.
///
/// Nested loop join supports both equal and non-equal joins. Expressions
/// Nested loop join (NLJ) supports both equal and non-equal joins. Expressions
/// specified in joinCondition are evaluated on every combination of left/right
/// tuple, to emit result. Results are emitted following the same input order of
/// probe rows for inner and left joins, for each thread of execution.
Expand Down Expand Up @@ -1712,6 +1704,9 @@ class NestedLoopJoinNode : public PlanNode {

folly::dynamic serialize() const override;

/// If nested loop join supports this join type.
static bool isSupported(core::JoinType joinType);

static PlanNodePtr create(const folly::dynamic& obj, void* context);

private:
Expand Down
18 changes: 1 addition & 17 deletions velox/exec/MergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,11 @@ MergeJoin::MergeJoin(
numKeys_{joinNode->leftKeys().size()},
joinNode_(joinNode) {
VELOX_USER_CHECK(
isSupported(joinNode_->joinType()),
core::MergeJoinNode::isSupported(joinNode_->joinType()),
"The join type is not supported by merge join: ",
joinTypeName(joinNode_->joinType()));
}

// static
bool MergeJoin::isSupported(core::JoinType joinType) {
switch (joinType) {
case core::JoinType::kInner:
case core::JoinType::kLeft:
case core::JoinType::kRight:
case core::JoinType::kLeftSemiFilter:
case core::JoinType::kRightSemiFilter:
case core::JoinType::kAnti:
return true;

default:
return false;
}
}

void MergeJoin::initialize() {
Operator::initialize();
VELOX_CHECK_NOT_NULL(joinNode_);
Expand Down
3 changes: 0 additions & 3 deletions velox/exec/MergeJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ class MergeJoin : public Operator {
Operator::close();
}

/// If merge join supports this join type.
static bool isSupported(core::JoinType joinType);

private:
// Sets up 'filter_' and related member variables.
void initializeFilter(
Expand Down
13 changes: 5 additions & 8 deletions velox/exec/fuzzer/JoinFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/PartitionIdGenerator.h"
#include "velox/exec/MergeJoin.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/fuzzer/FuzzerUtil.h"
#include "velox/exec/fuzzer/ReferenceQueryRunner.h"
Expand Down Expand Up @@ -148,7 +147,7 @@ class JoinFuzzer {
const std::vector<std::string>& outputColumns);

// Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes.
// If withFilter is true, uses the equiality filter between probeKeys and
// If withFilter is true, uses the equality filter between probeKeys and
// buildKeys as the join filter. Uses empty join filter otherwise.
JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan(
core::JoinType joinType,
Expand Down Expand Up @@ -860,7 +859,7 @@ void JoinFuzzer::makeAlternativePlans(
.planNode()});

// Use OrderBy + MergeJoin
if (exec::MergeJoin::isSupported(joinNode->joinType())) {
if (core::MergeJoinNode::isSupported(joinNode->joinType())) {
auto planWithSplits = makeMergeJoinPlan(
joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns);
plans.push_back(planWithSplits);
Expand All @@ -869,8 +868,7 @@ void JoinFuzzer::makeAlternativePlans(
}

// Use NestedLoopJoin.
if (joinNode->isInnerJoin() || joinNode->isLeftJoin() ||
joinNode->isFullJoin()) {
if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) {
auto planWithSplits = makeNestedLoopJoinPlan(
joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns);
plans.push_back(planWithSplits);
Expand Down Expand Up @@ -1285,7 +1283,7 @@ void JoinFuzzer::addPlansWithTableScan(
}

// Add ungrouped MergeJoin with TableScan.
if (joinNode->isInnerJoin() || joinNode->isLeftJoin()) {
if (core::MergeJoinNode::isSupported(joinNode->joinType())) {
auto planWithSplits = makeMergeJoinPlanWithTableScan(
joinType,
probeType,
Expand All @@ -1307,8 +1305,7 @@ void JoinFuzzer::addPlansWithTableScan(
}

// Add ungrouped NestedLoopJoin with TableScan.
if (joinNode->isInnerJoin() || joinNode->isLeftJoin() ||
joinNode->isFullJoin()) {
if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) {
auto planWithSplits = makeNestedLoopJoinPlanWithTableScan(
joinType,
probeType,
Expand Down

0 comments on commit ad1f393

Please sign in to comment.