diff --git a/velox/common/memory/CMakeLists.txt b/velox/common/memory/CMakeLists.txt index a4ea2a847f2c..2bb6e2d2d154 100644 --- a/velox/common/memory/CMakeLists.txt +++ b/velox/common/memory/CMakeLists.txt @@ -34,6 +34,7 @@ velox_add_library( velox_link_libraries( velox_memory PUBLIC velox_common_base + velox_common_config velox_exception velox_flag_definitions velox_time diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 7c7ffd42574f..5b4e0eb42fe5 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -75,15 +75,15 @@ std::unique_ptr createArbitrator( // SharedArbitrator class. After Prestissimo switches, this part of the // code will be removed. extraArbitratorConfigs["reserved-capacity"] = - folly::to(options.arbitratorReservedCapacity); + folly::to(options.arbitratorReservedCapacity) + "B"; extraArbitratorConfigs["memory-pool-initial-capacity"] = - folly::to(options.memoryPoolInitCapacity); + folly::to(options.memoryPoolInitCapacity) + "B"; extraArbitratorConfigs["memory-pool-reserved-capacity"] = - folly::to(options.memoryPoolReservedCapacity); + folly::to(options.memoryPoolReservedCapacity) + "B"; extraArbitratorConfigs["memory-pool-transfer-capacity"] = - folly::to(options.memoryPoolTransferCapacity); + folly::to(options.memoryPoolTransferCapacity) + "B"; extraArbitratorConfigs["memory-reclaim-wait-ms"] = - folly::to(options.memoryReclaimWaitMs); + folly::to(options.memoryReclaimWaitMs) + "ms"; extraArbitratorConfigs["global-arbitration-enabled"] = folly::to(options.globalArbitrationEnabled); extraArbitratorConfigs["check-usage-leak"] = diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index 8a71f2763ec9..37da20aa799f 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -19,6 +19,7 @@ #include "velox/common/base/Exceptions.h" #include "velox/common/base/RuntimeMetrics.h" +#include "velox/common/config/Config.h" #include "velox/common/memory/Memory.h" #include "velox/common/testutil/TestValue.h" #include "velox/common/time/Timer.h" @@ -82,32 +83,50 @@ T getConfig( int64_t SharedArbitrator::ExtraConfig::getReservedCapacity( const std::unordered_map& configs) { - return getConfig( - configs, kReservedCapacity, kDefaultReservedCapacity); + return config::toCapacity( + getConfig( + configs, kReservedCapacity, std::string(kDefaultReservedCapacity)), + config::CapacityUnit::BYTE); } uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity( const std::unordered_map& configs) { - return getConfig( - configs, kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity); + return config::toCapacity( + getConfig( + configs, + kMemoryPoolInitialCapacity, + std::string(kDefaultMemoryPoolInitialCapacity)), + config::CapacityUnit::BYTE); } uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity( const std::unordered_map& configs) { - return getConfig( - configs, kMemoryPoolReservedCapacity, kDefaultMemoryPoolReservedCapacity); + return config::toCapacity( + getConfig( + configs, + kMemoryPoolReservedCapacity, + std::string(kDefaultMemoryPoolReservedCapacity)), + config::CapacityUnit::BYTE); } uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity( const std::unordered_map& configs) { - return getConfig( - configs, kMemoryPoolTransferCapacity, kDefaultMemoryPoolTransferCapacity); + return config::toCapacity( + getConfig( + configs, + kMemoryPoolTransferCapacity, + std::string(kDefaultMemoryPoolTransferCapacity)), + config::CapacityUnit::BYTE); } -uint64_t SharedArbitrator::ExtraConfig::getMemoryReclaimWaitMs( +uint64_t SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs( const std::unordered_map& configs) { - return getConfig( - configs, kMemoryReclaimWaitMs, kDefaultMemoryReclaimWaitMs); + return std::chrono::duration_cast( + config::toDuration(getConfig( + configs, + kMemoryReclaimMaxWaitTime, + std::string(kDefaultMemoryReclaimMaxWaitTime)))) + .count(); } bool SharedArbitrator::ExtraConfig::getGlobalArbitrationEnabled( @@ -131,7 +150,7 @@ SharedArbitrator::SharedArbitrator(const Config& config) memoryPoolTransferCapacity_( ExtraConfig::getMemoryPoolTransferCapacity(config.extraConfigs)), memoryReclaimWaitMs_( - ExtraConfig::getMemoryReclaimWaitMs(config.extraConfigs)), + ExtraConfig::getMemoryReclaimMaxWaitTimeMs(config.extraConfigs)), globalArbitrationEnabled_( ExtraConfig::getGlobalArbitrationEnabled(config.extraConfigs)), checkUsageLeak_(ExtraConfig::getCheckUsageLeak(config.extraConfigs)), diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index d1bc4d27da79..54cb81e86798 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -40,7 +40,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { /// The memory capacity reserved to ensure each running query has minimal /// capacity of 'memoryPoolReservedCapacity' to run. static constexpr std::string_view kReservedCapacity{"reserved-capacity"}; - static constexpr int64_t kDefaultReservedCapacity{0}; + static constexpr std::string_view kDefaultReservedCapacity{"0B"}; static int64_t getReservedCapacity( const std::unordered_map& configs); @@ -48,14 +48,15 @@ class SharedArbitrator : public memory::MemoryArbitrator { /// pool. static constexpr std::string_view kMemoryPoolInitialCapacity{ "memory-pool-initial-capacity"}; - static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20}; + static constexpr std::string_view kDefaultMemoryPoolInitialCapacity{ + "256MB"}; static uint64_t getMemoryPoolInitialCapacity( const std::unordered_map& configs); /// The minimal amount of memory capacity reserved for each query to run. static constexpr std::string_view kMemoryPoolReservedCapacity{ "memory-pool-reserved-capacity"}; - static constexpr uint64_t kDefaultMemoryPoolReservedCapacity{0}; + static constexpr std::string_view kDefaultMemoryPoolReservedCapacity{"0B"}; static uint64_t getMemoryPoolReservedCapacity( const std::unordered_map& configs); @@ -63,7 +64,8 @@ class SharedArbitrator : public memory::MemoryArbitrator { /// during the memory arbitration. static constexpr std::string_view kMemoryPoolTransferCapacity{ "memory-pool-transfer-capacity"}; - static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20}; + static constexpr std::string_view kDefaultMemoryPoolTransferCapacity{ + "128MB"}; static uint64_t getMemoryPoolTransferCapacity( const std::unordered_map& configs); @@ -72,10 +74,10 @@ class SharedArbitrator : public memory::MemoryArbitrator { /// the memory arbitration from getting stuck when the memory reclaim waits /// for a hanging query task to pause. If it is zero, then there is no /// timeout. - static constexpr std::string_view kMemoryReclaimWaitMs{ - "memory-reclaim-wait-ms"}; - static constexpr uint64_t kDefaultMemoryReclaimWaitMs{0}; - static uint64_t getMemoryReclaimWaitMs( + static constexpr std::string_view kMemoryReclaimMaxWaitTime{ + "memory-reclaim-max-wait-time"}; + static constexpr std::string_view kDefaultMemoryReclaimMaxWaitTime{"0ms"}; + static uint64_t getMemoryReclaimMaxWaitTimeMs( const std::unordered_map& configs); /// If true, it allows memory arbitrator to reclaim used memory cross query diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index ff037691e763..e90d6fbda31f 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -522,19 +522,20 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { // Testing default values std::unordered_map emptyConfigs; ASSERT_EQ( - SharedArbitrator::ExtraConfig::getReservedCapacity(emptyConfigs), - SharedArbitrator::ExtraConfig::kDefaultReservedCapacity); + SharedArbitrator::ExtraConfig::getReservedCapacity(emptyConfigs), 0); ASSERT_EQ( - SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity( - emptyConfigs), - SharedArbitrator::ExtraConfig::kDefaultMemoryPoolReservedCapacity); + SharedArbitrator::ExtraConfig::getReservedCapacity(emptyConfigs), 0); + ASSERT_EQ( + SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity(emptyConfigs), + 256 << 20); ASSERT_EQ( SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity( emptyConfigs), - SharedArbitrator::ExtraConfig::kDefaultMemoryPoolTransferCapacity); + 128 << 20); ASSERT_EQ( - SharedArbitrator::ExtraConfig::getMemoryReclaimWaitMs(emptyConfigs), - SharedArbitrator::ExtraConfig::kDefaultMemoryReclaimWaitMs); + SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs( + emptyConfigs), + 0); ASSERT_EQ( SharedArbitrator::ExtraConfig::getGlobalArbitrationEnabled(emptyConfigs), SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationEnabled); @@ -545,27 +546,32 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { // Testing custom values std::unordered_map configs; configs[std::string(SharedArbitrator::ExtraConfig::kReservedCapacity)] = - "100"; + "100B"; + configs[std::string( + SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] = "512MB"; configs[std::string( - SharedArbitrator::ExtraConfig::kMemoryPoolReservedCapacity)] = "200"; + SharedArbitrator::ExtraConfig::kMemoryPoolReservedCapacity)] = "200B"; configs[std::string( - SharedArbitrator::ExtraConfig::kMemoryPoolTransferCapacity)] = - "256000000"; - configs[std::string(SharedArbitrator::ExtraConfig::kMemoryReclaimWaitMs)] = - "5000"; + SharedArbitrator::ExtraConfig::kMemoryPoolTransferCapacity)] = "256MB"; + configs[std::string( + SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] = "5000ms"; configs[std::string( SharedArbitrator::ExtraConfig::kGlobalArbitrationEnabled)] = "true"; configs[std::string(SharedArbitrator::ExtraConfig::kCheckUsageLeak)] = "false"; ASSERT_EQ(SharedArbitrator::ExtraConfig::getReservedCapacity(configs), 100); + ASSERT_EQ( + SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity(configs), + 512 << 20); ASSERT_EQ( SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(configs), 200); ASSERT_EQ( SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(configs), - 256000000); + 256 << 20); ASSERT_EQ( - SharedArbitrator::ExtraConfig::getMemoryReclaimWaitMs(configs), 5000); + SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(configs), + 5000); ASSERT_TRUE( SharedArbitrator::ExtraConfig::getGlobalArbitrationEnabled(configs)); ASSERT_FALSE(SharedArbitrator::ExtraConfig::getCheckUsageLeak(configs)); @@ -573,28 +579,33 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { // Testing invalid values configs[std::string(SharedArbitrator::ExtraConfig::kReservedCapacity)] = "invalid"; + configs[std::string( + SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] = "invalid"; configs[std::string( SharedArbitrator::ExtraConfig::kMemoryPoolReservedCapacity)] = "invalid"; configs[std::string( SharedArbitrator::ExtraConfig::kMemoryPoolTransferCapacity)] = "invalid"; - configs[std::string(SharedArbitrator::ExtraConfig::kMemoryReclaimWaitMs)] = - "invalid"; + configs[std::string( + SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] = "invalid"; configs[std::string( SharedArbitrator::ExtraConfig::kGlobalArbitrationEnabled)] = "invalid"; configs[std::string(SharedArbitrator::ExtraConfig::kCheckUsageLeak)] = "invalid"; VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::getReservedCapacity(configs), - "Failed while parsing SharedArbitrator configs"); + "Invalid capacity string 'invalid'"); + VELOX_ASSERT_THROW( + SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity(configs), + "Invalid capacity string 'invalid'"); VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(configs), - "Failed while parsing SharedArbitrator configs"); + "Invalid capacity string 'invalid'"); VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(configs), - "Failed while parsing SharedArbitrator configs"); + "Invalid capacity string 'invalid'"); VELOX_ASSERT_THROW( - SharedArbitrator::ExtraConfig::getMemoryReclaimWaitMs(configs), - "Failed while parsing SharedArbitrator configs"); + SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(configs), + "Invalid duration 'invalid'"); VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::getGlobalArbitrationEnabled(configs), "Failed while parsing SharedArbitrator configs");