Skip to content

Commit

Permalink
Add separate writer threads config for bucketed table write to avoid …
Browse files Browse the repository at this point in the history
…max open file exceeded error (#11087)

Summary:
Pull Request resolved: #11087

The bucket table write could run into max open file exceeding errors. There are problems here:
(1) on GBM cluster which only has 300 nodes, so the query will run on 300 nodes and the bucket table writer has 4 driver threads
per node. And the remote exchange and local exchange both uses the same hive partition function but the remote exchange has 300
partitions while the local has 4 partitions and 300 is a multiple of 4 which could cause the data skew which cause one driver thread has
received all the data which exceeds the open file limit. Confirmed with both 9 and 7 writer threads work
(2) native cluster is generally 1/2 of the java cluster size so it is more easily run into the max open file limit so we need to bump up the
write driver threads for native cluster.

This PR adds a separate config at the native side to bump up the bucket table write driver threads to solve the problem. We don't want to
bump up the partition writer threads as non-bucketed partition table won't have any open files (partitions) and we don't want to
create too many small files unnecessarily. This PR only adds support at Velox side: extends the plan node to include a bucket table
property flag to indicate if this is bucketed table as well as configure that table writer threads based on in local query planner.
The followups will add support at Prestissimo side.

Differential Revision: D63325016
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 25, 2024
1 parent 0dd3a5d commit 3f781b7
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 139 deletions.
3 changes: 3 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,7 @@ folly::dynamic TableWriteNode::serialize() const {
obj["connectorInsertTableHandle"] =
insertTableHandle_->connectorInsertTableHandle()->serialize();
obj["hasPartitioningScheme"] = hasPartitioningScheme_;
obj["hasBucketProperty"] = hasBucketProperty_;
obj["outputType"] = outputType_->serialize();
obj["commitStrategy"] = connector::commitStrategyToString(commitStrategy_);
return obj;
Expand All @@ -1875,6 +1876,7 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) {
ISerializable::deserialize<connector::ConnectorInsertTableHandle>(
obj["connectorInsertTableHandle"]));
const bool hasPartitioningScheme = obj["hasPartitioningScheme"].asBool();
const bool hasBucketProperty = obj["hasBucketProperty"].asBool();
auto outputType = deserializeRowType(obj["outputType"]);
auto commitStrategy =
connector::stringToCommitStrategy(obj["commitStrategy"].asString());
Expand All @@ -1887,6 +1889,7 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) {
std::make_shared<InsertTableHandle>(
connectorId, connectorInsertTableHandle),
hasPartitioningScheme,
hasBucketProperty,
outputType,
commitStrategy,
source);
Expand Down
39 changes: 37 additions & 2 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ class TableWriteNode : public PlanNode {
std::shared_ptr<AggregationNode> aggregationNode,
std::shared_ptr<InsertTableHandle> insertTableHandle,
bool hasPartitioningScheme,
bool hasBucketProperty,
RowTypePtr outputType,
connector::CommitStrategy commitStrategy,
const PlanNodePtr& source)
Expand All @@ -681,6 +682,7 @@ class TableWriteNode : public PlanNode {
aggregationNode_(std::move(aggregationNode)),
insertTableHandle_(std::move(insertTableHandle)),
hasPartitioningScheme_(hasPartitioningScheme),
hasBucketProperty_(hasBucketProperty),
outputType_(std::move(outputType)),
commitStrategy_(commitStrategy) {
VELOX_USER_CHECK_EQ(columns->size(), columnNames.size());
Expand All @@ -693,6 +695,30 @@ class TableWriteNode : public PlanNode {
}
}

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
TableWriteNode(
const PlanNodeId& id,
const RowTypePtr& columns,
const std::vector<std::string>& columnNames,
std::shared_ptr<AggregationNode> aggregationNode,
std::shared_ptr<InsertTableHandle> insertTableHandle,
bool hasPartitioningScheme,
RowTypePtr outputType,
connector::CommitStrategy commitStrategy,
const PlanNodePtr& source)
: TableWriteNode(
id,
columns,
columnNames,
std::move(aggregationNode),
std::move(insertTableHandle),
hasPartitioningScheme,
false,
std::move(outputType),
commitStrategy,
source) {}
#endif

const std::vector<PlanNodePtr>& sources() const override {
return sources_;
}
Expand Down Expand Up @@ -720,12 +746,20 @@ class TableWriteNode : public PlanNode {
/// Indicates if this table write has specified partitioning scheme. If true,
/// the task creates a number of table write operators based on the query
/// config 'task_partitioned_writer_count', otherwise based on
/// 'task__writer_count'. As for now, this is only true for hive bucketed
/// table write.
/// 'task_writer_count'.
bool hasPartitioningScheme() const {
return hasPartitioningScheme_;
}

/// Indicates if this table write has specified bucket property. If true, the
/// task creates a number of table write operators based on the query config
/// 'task_partitioned_bucket_writer_count', otherwise based on
/// 'task_partitioned_writer_count' or 'task__writer_count' depending on
/// whether paritition scheme is specified or not.
bool hasBucketProperty() const {
return hasBucketProperty_;
}

connector::CommitStrategy commitStrategy() const {
return commitStrategy_;
}
Expand Down Expand Up @@ -756,6 +790,7 @@ class TableWriteNode : public PlanNode {
const std::shared_ptr<AggregationNode> aggregationNode_;
const std::shared_ptr<InsertTableHandle> insertTableHandle_;
const bool hasPartitioningScheme_;
const bool hasBucketProperty_;
const RowTypePtr outputType_;
const connector::CommitStrategy commitStrategy_;
};
Expand Down
11 changes: 11 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ class QueryConfig {
static constexpr const char* kTaskPartitionedWriterCount =
"task_partitioned_writer_count";

/// The number of local parallel table writer operators per task for
/// partitioned bucket writes. If not set, use
/// "task_partitioned_writer_count".
static constexpr const char* kTaskPartitionedBucketWriterCount =
"task_partitioned_bucket_writer_count";

/// If true, finish the hash probe on an empty build table for a specific set
/// of hash joins.
static constexpr const char* kHashProbeFinishEarlyOnEmptyBuild =
Expand Down Expand Up @@ -767,6 +773,11 @@ class QueryConfig {
.value_or(taskWriterCount());
}

uint32_t taskPartitionedBucketWriterCount() const {
return get<uint32_t>(kTaskPartitionedBucketWriterCount)
.value_or(taskPartitionedWriterCount());
}

bool hashProbeFinishEarlyOnEmptyBuild() const {
return get<bool>(kHashProbeFinishEarlyOnEmptyBuild, false);
}
Expand Down
45 changes: 34 additions & 11 deletions velox/core/tests/QueryConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,42 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) {
struct {
std::optional<int> numWriterCounter;
std::optional<int> numPartitionedWriterCounter;
std::optional<int> numPartitionedBucketWriterCounter;
int expectedWriterCounter;
int expectedPartitionedWriterCounter;
int expectedPartitionedBucketWriterCounter;

std::string debugString() const {
return fmt::format(
"numWriterCounter[{}] numPartitionedWriterCounter[{}] expectedWriterCounter[{}] expectedPartitionedWriterCounter[{}]",
"numWriterCounter[{}] numPartitionedWriterCounter[{}] expectedWriterCounter[{}] expectedPartitionedWriterCounter[{}] expectedPartitionedBucketWriterCounter[{}]",
numWriterCounter.value_or(0),
numPartitionedWriterCounter.value_or(0),
numPartitionedBucketWriterCounter.value_or(0),
expectedWriterCounter,
expectedPartitionedWriterCounter);
expectedPartitionedWriterCounter,
expectedPartitionedBucketWriterCounter);
}
} testSettings[] = {
{std::nullopt, std::nullopt, 4, 4},
{std::nullopt, 1, 4, 1},
{std::nullopt, 6, 4, 6},
{2, 4, 2, 4},
{4, 2, 4, 2},
{4, 6, 4, 6},
{6, 5, 6, 5},
{6, 4, 6, 4},
{6, std::nullopt, 6, 6}};
{std::nullopt, std::nullopt, std::nullopt, 4, 4, 4},
{std::nullopt, 1, std::nullopt, 4, 1, 1},
{std::nullopt, 6, std::nullopt, 4, 6, 6},
{2, 4, std::nullopt, 2, 4, 4},
{4, 2, std::nullopt, 4, 2, 2},
{4, 6, std::nullopt, 4, 6, 6},
{6, 5, std::nullopt, 6, 5, 5},
{6, 4, std::nullopt, 6, 4, 4},
{6, std::nullopt, 6, 6, 6, 6},
{6, std::nullopt, 1, 6, 6, 1},
{std::nullopt, std::nullopt, 4, 4, 4, 4},
{std::nullopt, std::nullopt, 1, 4, 4, 1},
{std::nullopt, 1, 1, 4, 1, 1},
{std::nullopt, 1, 2, 4, 1, 2},
{std::nullopt, 6, 6, 4, 6, 6},
{std::nullopt, 6, 3, 4, 6, 3},
{2, 4, 3, 2, 4, 3},
{4, 2, 1, 4, 2, 1},
{4, 6, 7, 4, 6, 7},
{6, std::nullopt, 4, 6, 6, 4}};
for (const auto& testConfig : testSettings) {
SCOPED_TRACE(testConfig.debugString());
std::unordered_map<std::string, std::string> configData;
Expand All @@ -91,13 +106,21 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) {
QueryConfig::kTaskPartitionedWriterCount,
std::to_string(testConfig.numPartitionedWriterCounter.value()));
}
if (testConfig.numPartitionedBucketWriterCounter.has_value()) {
configData.emplace(
QueryConfig::kTaskPartitionedBucketWriterCount,
std::to_string(testConfig.numPartitionedBucketWriterCounter.value()));
}
auto queryCtx =
QueryCtx::create(nullptr, QueryConfig{std::move(configData)});
const QueryConfig& config = queryCtx->queryConfig();
ASSERT_EQ(config.taskWriterCount(), testConfig.expectedWriterCounter);
ASSERT_EQ(
config.taskPartitionedWriterCount(),
testConfig.expectedPartitionedWriterCounter);
ASSERT_EQ(
config.taskPartitionedBucketWriterCount(),
testConfig.expectedPartitionedBucketWriterCounter);
}
}

Expand Down
6 changes: 5 additions & 1 deletion velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,11 @@ Table Writer
* - task_partitioned_writer_count
- integer
- task_writer_count
- The number of parallel table writer threads per task for bucketed table writes. If not set, use 'task_writer_count' as default.
- The number of parallel table writer threads per task for partitioned table writes. If not set, use 'task_writer_count' as default.
* - task_partitioned_bucket_writer_count
- integer
- task_partitioned_writer_count
- The number of parallel table writer threads per task for bucketed table writes. If not set, use 'task_partitioned_writer_count' as default.

Hive Connector
--------------
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ uint32_t maxDrivers(
if (!connectorInsertHandle->supportsMultiThreading()) {
return 1;
} else {
if (tableWrite->hasPartitioningScheme()) {
if (tableWrite->hasBucketProperty()) {
return queryConfig.taskPartitionedBucketWriterCount();
} else if (tableWrite->hasPartitioningScheme()) {
return queryConfig.taskPartitionedWriterCount();
} else {
return queryConfig.taskWriterCount();
Expand Down
Loading

0 comments on commit 3f781b7

Please sign in to comment.