Skip to content

Commit

Permalink
Change SharedArbitrator extra config string based
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Aug 14, 2024
1 parent 38cc99d commit 1f5cec5
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 48 deletions.
1 change: 1 addition & 0 deletions velox/common/memory/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ velox_add_library(
velox_link_libraries(
velox_memory
PUBLIC velox_common_base
velox_common_config
velox_exception
velox_flag_definitions
velox_time
Expand Down
10 changes: 5 additions & 5 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ std::unique_ptr<MemoryArbitrator> createArbitrator(
// SharedArbitrator class. After Prestissimo switches, this part of the
// code will be removed.
extraArbitratorConfigs["reserved-capacity"] =
folly::to<std::string>(options.arbitratorReservedCapacity);
folly::to<std::string>(options.arbitratorReservedCapacity) + "B";
extraArbitratorConfigs["memory-pool-initial-capacity"] =
folly::to<std::string>(options.memoryPoolInitCapacity);
folly::to<std::string>(options.memoryPoolInitCapacity) + "B";
extraArbitratorConfigs["memory-pool-reserved-capacity"] =
folly::to<std::string>(options.memoryPoolReservedCapacity);
folly::to<std::string>(options.memoryPoolReservedCapacity) + "B";
extraArbitratorConfigs["memory-pool-transfer-capacity"] =
folly::to<std::string>(options.memoryPoolTransferCapacity);
folly::to<std::string>(options.memoryPoolTransferCapacity) + "B";
extraArbitratorConfigs["memory-reclaim-wait-ms"] =
folly::to<std::string>(options.memoryReclaimWaitMs);
folly::to<std::string>(options.memoryReclaimWaitMs) + "ms";
extraArbitratorConfigs["global-arbitration-enabled"] =
folly::to<std::string>(options.globalArbitrationEnabled);
extraArbitratorConfigs["check-usage-leak"] =
Expand Down
43 changes: 31 additions & 12 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "velox/common/base/Exceptions.h"
#include "velox/common/base/RuntimeMetrics.h"
#include "velox/common/config/Config.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/common/time/Timer.h"
Expand Down Expand Up @@ -82,32 +83,50 @@ T getConfig(

int64_t SharedArbitrator::ExtraConfig::getReservedCapacity(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<int64_t>(
configs, kReservedCapacity, kDefaultReservedCapacity);
return config::toCapacity(
getConfig<std::string>(
configs, kReservedCapacity, std::string(kDefaultReservedCapacity)),
config::CapacityUnit::BYTE);
}

uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<uint64_t>(
configs, kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity);
return config::toCapacity(
getConfig<std::string>(
configs,
kMemoryPoolInitialCapacity,
std::string(kDefaultMemoryPoolInitialCapacity)),
config::CapacityUnit::BYTE);
}

uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<uint64_t>(
configs, kMemoryPoolReservedCapacity, kDefaultMemoryPoolReservedCapacity);
return config::toCapacity(
getConfig<std::string>(
configs,
kMemoryPoolReservedCapacity,
std::string(kDefaultMemoryPoolReservedCapacity)),
config::CapacityUnit::BYTE);
}

uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<uint64_t>(
configs, kMemoryPoolTransferCapacity, kDefaultMemoryPoolTransferCapacity);
return config::toCapacity(
getConfig<std::string>(
configs,
kMemoryPoolTransferCapacity,
std::string(kDefaultMemoryPoolTransferCapacity)),
config::CapacityUnit::BYTE);
}

uint64_t SharedArbitrator::ExtraConfig::getMemoryReclaimWaitMs(
uint64_t SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<uint64_t>(
configs, kMemoryReclaimWaitMs, kDefaultMemoryReclaimWaitMs);
return std::chrono::duration_cast<std::chrono::milliseconds>(
config::toDuration(getConfig<std::string>(
configs,
kMemoryReclaimMaxWaitTime,
std::string(kDefaultMemoryReclaimMaxWaitTime))))
.count();
}

bool SharedArbitrator::ExtraConfig::getGlobalArbitrationEnabled(
Expand All @@ -131,7 +150,7 @@ SharedArbitrator::SharedArbitrator(const Config& config)
memoryPoolTransferCapacity_(
ExtraConfig::getMemoryPoolTransferCapacity(config.extraConfigs)),
memoryReclaimWaitMs_(
ExtraConfig::getMemoryReclaimWaitMs(config.extraConfigs)),
ExtraConfig::getMemoryReclaimMaxWaitTimeMs(config.extraConfigs)),
globalArbitrationEnabled_(
ExtraConfig::getGlobalArbitrationEnabled(config.extraConfigs)),
checkUsageLeak_(ExtraConfig::getCheckUsageLeak(config.extraConfigs)),
Expand Down
18 changes: 10 additions & 8 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,32 @@ class SharedArbitrator : public memory::MemoryArbitrator {
/// The memory capacity reserved to ensure each running query has minimal
/// capacity of 'memoryPoolReservedCapacity' to run.
static constexpr std::string_view kReservedCapacity{"reserved-capacity"};
static constexpr int64_t kDefaultReservedCapacity{0};
static constexpr std::string_view kDefaultReservedCapacity{"0B"};
static int64_t getReservedCapacity(
const std::unordered_map<std::string, std::string>& configs);

/// The initial memory capacity to reserve for a newly created query memory
/// pool.
static constexpr std::string_view kMemoryPoolInitialCapacity{
"memory-pool-initial-capacity"};
static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
static constexpr std::string_view kDefaultMemoryPoolInitialCapacity{
"256MB"};
static uint64_t getMemoryPoolInitialCapacity(
const std::unordered_map<std::string, std::string>& configs);

/// The minimal amount of memory capacity reserved for each query to run.
static constexpr std::string_view kMemoryPoolReservedCapacity{
"memory-pool-reserved-capacity"};
static constexpr uint64_t kDefaultMemoryPoolReservedCapacity{0};
static constexpr std::string_view kDefaultMemoryPoolReservedCapacity{"0B"};
static uint64_t getMemoryPoolReservedCapacity(
const std::unordered_map<std::string, std::string>& configs);

/// The minimal memory capacity to transfer out of or into a memory pool
/// during the memory arbitration.
static constexpr std::string_view kMemoryPoolTransferCapacity{
"memory-pool-transfer-capacity"};
static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
static constexpr std::string_view kDefaultMemoryPoolTransferCapacity{
"128MB"};
static uint64_t getMemoryPoolTransferCapacity(
const std::unordered_map<std::string, std::string>& configs);

Expand All @@ -72,10 +74,10 @@ class SharedArbitrator : public memory::MemoryArbitrator {
/// the memory arbitration from getting stuck when the memory reclaim waits
/// for a hanging query task to pause. If it is zero, then there is no
/// timeout.
static constexpr std::string_view kMemoryReclaimWaitMs{
"memory-reclaim-wait-ms"};
static constexpr uint64_t kDefaultMemoryReclaimWaitMs{0};
static uint64_t getMemoryReclaimWaitMs(
static constexpr std::string_view kMemoryReclaimMaxWaitTime{
"memory-reclaim-max-wait-time"};
static constexpr std::string_view kDefaultMemoryReclaimMaxWaitTime{"0ms"};
static uint64_t getMemoryReclaimMaxWaitTimeMs(
const std::unordered_map<std::string, std::string>& configs);

/// If true, it allows memory arbitrator to reclaim used memory cross query
Expand Down
57 changes: 34 additions & 23 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,19 +522,20 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
// Testing default values
std::unordered_map<std::string, std::string> emptyConfigs;
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getReservedCapacity(emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultReservedCapacity);
SharedArbitrator::ExtraConfig::getReservedCapacity(emptyConfigs), 0);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(
emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultMemoryPoolReservedCapacity);
SharedArbitrator::ExtraConfig::getReservedCapacity(emptyConfigs), 0);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity(emptyConfigs),
256 << 20);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(
emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultMemoryPoolTransferCapacity);
128 << 20);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryReclaimWaitMs(emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultMemoryReclaimWaitMs);
SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(
emptyConfigs),
0);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getGlobalArbitrationEnabled(emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationEnabled);
Expand All @@ -545,56 +546,66 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
// Testing custom values
std::unordered_map<std::string, std::string> configs;
configs[std::string(SharedArbitrator::ExtraConfig::kReservedCapacity)] =
"100";
"100B";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] = "512MB";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryPoolReservedCapacity)] = "200";
SharedArbitrator::ExtraConfig::kMemoryPoolReservedCapacity)] = "200B";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryPoolTransferCapacity)] =
"256000000";
configs[std::string(SharedArbitrator::ExtraConfig::kMemoryReclaimWaitMs)] =
"5000";
SharedArbitrator::ExtraConfig::kMemoryPoolTransferCapacity)] = "256MB";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] = "5000ms";
configs[std::string(
SharedArbitrator::ExtraConfig::kGlobalArbitrationEnabled)] = "true";
configs[std::string(SharedArbitrator::ExtraConfig::kCheckUsageLeak)] =
"false";
ASSERT_EQ(SharedArbitrator::ExtraConfig::getReservedCapacity(configs), 100);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity(configs),
512 << 20);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(configs),
200);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(configs),
256000000);
256 << 20);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryReclaimWaitMs(configs), 5000);
SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(configs),
5000);
ASSERT_TRUE(
SharedArbitrator::ExtraConfig::getGlobalArbitrationEnabled(configs));
ASSERT_FALSE(SharedArbitrator::ExtraConfig::getCheckUsageLeak(configs));

// Testing invalid values
configs[std::string(SharedArbitrator::ExtraConfig::kReservedCapacity)] =
"invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] = "invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryPoolReservedCapacity)] = "invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryPoolTransferCapacity)] = "invalid";
configs[std::string(SharedArbitrator::ExtraConfig::kMemoryReclaimWaitMs)] =
"invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] = "invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kGlobalArbitrationEnabled)] = "invalid";
configs[std::string(SharedArbitrator::ExtraConfig::kCheckUsageLeak)] =
"invalid";
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::getReservedCapacity(configs),
"Failed while parsing SharedArbitrator configs");
"Invalid capacity string 'invalid'");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity(configs),
"Invalid capacity string 'invalid'");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(configs),
"Failed while parsing SharedArbitrator configs");
"Invalid capacity string 'invalid'");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(configs),
"Failed while parsing SharedArbitrator configs");
"Invalid capacity string 'invalid'");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::getMemoryReclaimWaitMs(configs),
"Failed while parsing SharedArbitrator configs");
SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(configs),
"Invalid duration 'invalid'");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::getGlobalArbitrationEnabled(configs),
"Failed while parsing SharedArbitrator configs");
Expand Down

0 comments on commit 1f5cec5

Please sign in to comment.