Skip to content

Commit

Permalink
Add shrink target adjustment mechanism in memory arbitration
Browse files Browse the repository at this point in the history
Summary:
Switch to string based config for extra configs in shared arbitrator for better consistency and fewer back and forth conversions.

Pull Request resolved: facebookincubator#10685

Reviewed By: xiaoxmeng

Differential Revision: D60920773

Pulled By: tanjialiang

fbshipit-source-id: 215d4e45d89c0b319f883a7c4b1ed27abd1c3e9f
  • Loading branch information
tanjialiang committed Aug 18, 2024
1 parent d805d31 commit 5d2d8c3
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 15 deletions.
4 changes: 4 additions & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ std::unique_ptr<MemoryArbitrator> createArbitrator(
"B";
extraArbitratorConfigs["slow-capacity-grow-pct"] =
folly::to<std::string>(options.slowCapacityGrowPct);
extraArbitratorConfigs["memory-pool-min-free-capacity"] =
folly::to<std::string>(options.memoryPoolMinFreeCapacity) + "B";
extraArbitratorConfigs["memory-pool-min-free-capacity-pct"] =
folly::to<std::string>(options.memoryPoolMinFreeCapacityPct);
extraArbitratorConfigs["memory-reclaim-max-wait-time"] =
folly::to<std::string>(options.memoryReclaimWaitMs) + "ms";
extraArbitratorConfigs["global-arbitration-enabled"] =
Expand Down
18 changes: 18 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ struct MemoryManagerOptions {
bool coreOnAllocationFailureEnabled{false};

/// ================== 'MemoryAllocator' settings ==================

/// Specifies the max memory allocation capacity in bytes enforced by
/// MemoryAllocator, default unlimited.
int64_t allocatorCapacity{kMaxMemory};
Expand Down Expand Up @@ -196,6 +197,23 @@ struct MemoryManagerOptions {
uint64_t fastExponentialGrowthCapacityLimit{512 << 20};
double slowCapacityGrowPct{0.25};

/// When shrinking capacity, the shrink bytes will be adjusted in a way such
/// that AFTER shrink, the stricter (whichever is smaller) of the following
/// conditions is met, in order to better fit the pool's current memory
/// usage:
/// - Free capacity is greater or equal to capacity *
/// 'memoryPoolMinFreeCapacityPct'
/// - Free capacity is greater or equal to 'memoryPoolMinFreeCapacity'
///
/// NOTE: In the conditions when original requested shrink bytes ends up
/// with more free capacity than above 2 conditions, the adjusted shrink
/// bytes is not respected.
///
/// NOTE: Capacity shrink adjustment is enabled when both
/// 'memoryPoolMinFreeCapacityPct' and 'memoryPoolMinFreeCapacity' are set.
uint64_t memoryPoolMinFreeCapacity{128 << 20};
double memoryPoolMinFreeCapacityPct{0.25};

/// Specifies the max time to wait for memory reclaim by arbitration. The
/// memory reclaim might fail if the max wait time has exceeded. If it is
/// zero, then there is no timeout. The default is 5 mins.
Expand Down
76 changes: 67 additions & 9 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,24 @@ uint64_t SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(
.count();
}

uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolMinFreeCapacity(
const std::unordered_map<std::string, std::string>& configs) {
return config::toCapacity(
getConfig<std::string>(
configs,
kMemoryPoolMinFreeCapacity,
std::string(kDefaultMemoryPoolMinFreeCapacity)),
config::CapacityUnit::BYTE);
}

double SharedArbitrator::ExtraConfig::getMemoryPoolMinFreeCapacityPct(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<double>(
configs,
kMemoryPoolMinFreeCapacityPct,
kDefaultMemoryPoolMinFreeCapacityPct);
}

bool SharedArbitrator::ExtraConfig::getGlobalArbitrationEnabled(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<bool>(
Expand Down Expand Up @@ -176,18 +194,33 @@ SharedArbitrator::SharedArbitrator(const Config& config)
config.extraConfigs)),
slowCapacityGrowPct_(
ExtraConfig::getSlowCapacityGrowPct(config.extraConfigs)),
memoryPoolMinFreeCapacity_(
ExtraConfig::getMemoryPoolMinFreeCapacity(config.extraConfigs)),
memoryPoolMinFreeCapacityPct_(
ExtraConfig::getMemoryPoolMinFreeCapacityPct(config.extraConfigs)),
freeReservedCapacity_(reservedCapacity_),
freeNonReservedCapacity_(capacity_ - freeReservedCapacity_) {
VELOX_CHECK_EQ(kind_, config.kind);
VELOX_CHECK_LE(reservedCapacity_, capacity_);
VELOX_CHECK_GE(slowCapacityGrowPct_, 0);
VELOX_CHECK_GE(memoryPoolMinFreeCapacityPct_, 0);
VELOX_CHECK_LE(memoryPoolMinFreeCapacityPct_, 1);
VELOX_CHECK_EQ(
fastExponentialGrowthCapacityLimit_ == 0,
slowCapacityGrowPct_ == 0,
"fastExponentialGrowthCapacityLimit_ {} and slowCapacityGrowPct_ {} "
"both need to be set at the same time to enable growth capacity "
"both need to be set (non-zero) at the same time to enable growth capacity "
"adjustment.",
fastExponentialGrowthCapacityLimit_,
slowCapacityGrowPct_);
VELOX_CHECK_EQ(
memoryPoolMinFreeCapacity_ == 0,
memoryPoolMinFreeCapacityPct_ == 0,
"memoryPoolMinFreeCapacity_ {} and memoryPoolMinFreeCapacityPct_ {} both "
"need to be set (non-zero) at the same time to enable shrink capacity "
"adjustment.",
memoryPoolMinFreeCapacity_,
memoryPoolMinFreeCapacityPct_);
}

std::string SharedArbitrator::Candidate::toString() const {
Expand Down Expand Up @@ -238,7 +271,7 @@ void SharedArbitrator::addPool(const std::shared_ptr<MemoryPool>& pool) {

void SharedArbitrator::removePool(MemoryPool* pool) {
VELOX_CHECK_EQ(pool->reservedBytes(), 0);
shrinkCapacity(pool, pool->capacity());
shrinkCapacity(pool);

std::unique_lock guard{poolLock_};
const auto ret = candidates_.erase(pool);
Expand Down Expand Up @@ -359,11 +392,13 @@ void SharedArbitrator::updateArbitrationFailureStats() {
int64_t SharedArbitrator::maxReclaimableCapacity(
const MemoryPool& pool,
bool isSelfReclaim) const {
// Checks if a query memory pool has finished processing or not. If it has
// finished, then we don't have to respect the memory pool reserved capacity
// limit check.
// NOTE: for query system like Prestissimo, it holds a finished query
// state in minutes for query stats fetch request from the Presto coordinator.
// Checks if a query memory pool has likely finished processing. It is likely
// this pool has finished when it has 0 current usage and non-0 past usage. If
// there is a high chance this pool finished, then we don't have to respect
// the memory pool reserved capacity limit check.
//
// NOTE: for query system like Prestissimo, it holds a finished query state in
// minutes for query stats fetch request from the Presto coordinator.
if (isSelfReclaim || (pool.reservedBytes() == 0 && pool.peakBytes() != 0)) {
return pool.capacity();
}
Expand All @@ -373,8 +408,13 @@ int64_t SharedArbitrator::maxReclaimableCapacity(
int64_t SharedArbitrator::reclaimableFreeCapacity(
const MemoryPool& pool,
bool isSelfReclaim) const {
const auto freeBytes = pool.freeBytes();
if (freeBytes == 0) {
return 0;
}
return std::min<int64_t>(
pool.freeBytes(), maxReclaimableCapacity(pool, isSelfReclaim));
isSelfReclaim ? freeBytes : getCapacityShrinkTarget(pool, freeBytes),
maxReclaimableCapacity(pool, isSelfReclaim));
}

int64_t SharedArbitrator::reclaimableUsedCapacity(
Expand Down Expand Up @@ -419,12 +459,30 @@ uint64_t SharedArbitrator::decrementFreeCapacityLocked(
return allocatedBytes;
}

uint64_t SharedArbitrator::getCapacityShrinkTarget(
const MemoryPool& pool,
uint64_t requestBytes) const {
VELOX_CHECK_NE(requestBytes, 0);
auto targetBytes = requestBytes;
if (memoryPoolMinFreeCapacity_ != 0) {
const auto minFreeBytes = std::min(
static_cast<uint64_t>(pool.capacity() * memoryPoolMinFreeCapacityPct_),
memoryPoolMinFreeCapacity_);
const auto maxShrinkBytes = std::max<int64_t>(
0LL, pool.freeBytes() - static_cast<int64_t>(minFreeBytes));
targetBytes = std::min(targetBytes, static_cast<uint64_t>(maxShrinkBytes));
}
return targetBytes;
}

uint64_t SharedArbitrator::shrinkCapacity(
MemoryPool* pool,
uint64_t requestBytes) {
std::lock_guard<std::mutex> l(stateLock_);
++numShrinks_;
const uint64_t freedBytes = shrinkPool(pool, requestBytes);
const uint64_t freedBytes = shrinkPool(
pool,
requestBytes == 0 ? 0 : getCapacityShrinkTarget(*pool, requestBytes));
incrementFreeCapacityLocked(freedBytes);
return freedBytes;
}
Expand Down
39 changes: 38 additions & 1 deletion velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,33 @@ class SharedArbitrator : public memory::MemoryArbitrator {
static uint64_t getMemoryReclaimMaxWaitTimeMs(
const std::unordered_map<std::string, std::string>& configs);

/// When shrinking capacity, the shrink bytes will be adjusted in a way such
/// that AFTER shrink, the stricter (whichever is smaller) of the following
/// conditions is met, in order to better fit the pool's current memory
/// usage:
/// - Free capacity is greater or equal to capacity *
/// 'memoryPoolMinFreeCapacityPct'
/// - Free capacity is greater or equal to 'memoryPoolMinFreeCapacity'
///
/// NOTE: In the conditions when original requested shrink bytes ends up
/// with more free capacity than above 2 conditions, the adjusted shrink
/// bytes is not respected.
///
/// NOTE: Capacity shrink adjustment is enabled when both
/// 'memoryPoolMinFreeCapacityPct' and 'memoryPoolMinFreeCapacity' are set.
static constexpr std::string_view kMemoryPoolMinFreeCapacity{
"memory-pool-min-free-capacity"};
static constexpr std::string_view kDefaultMemoryPoolMinFreeCapacity{
"128MB"};
static uint64_t getMemoryPoolMinFreeCapacity(
const std::unordered_map<std::string, std::string>& configs);

static constexpr std::string_view kMemoryPoolMinFreeCapacityPct{
"memory-pool-min-free-capacity-pct"};
static constexpr double kDefaultMemoryPoolMinFreeCapacityPct{0.25};
static double getMemoryPoolMinFreeCapacityPct(
const std::unordered_map<std::string, std::string>& configs);

/// If true, it allows memory arbitrator to reclaim used memory cross query
/// memory pools.
static constexpr std::string_view kGlobalArbitrationEnabled{
Expand Down Expand Up @@ -143,7 +170,7 @@ class SharedArbitrator : public memory::MemoryArbitrator {

bool growCapacity(MemoryPool* pool, uint64_t requestBytes) final;

uint64_t shrinkCapacity(MemoryPool* pool, uint64_t requestBytes) final;
uint64_t shrinkCapacity(MemoryPool* pool, uint64_t requestBytes = 0) final;

uint64_t shrinkCapacity(
uint64_t requestBytes,
Expand Down Expand Up @@ -470,6 +497,14 @@ class SharedArbitrator : public memory::MemoryArbitrator {
const MemoryPool& pool,
uint64_t requestBytes) const;

// The capacity shrink target is adjusted from request shrink bytes to give
// the memory pool more headroom free capacity after shrink. It can help to
// reduce the number of future grow calls, and hence reducing the number of
// unnecessary memory arbitration requests.
uint64_t getCapacityShrinkTarget(
const MemoryPool& pool,
uint64_t requestBytes) const;

// Returns true if 'pool' is under memory arbitration.
bool isUnderArbitrationLocked(MemoryPool* pool) const;

Expand All @@ -487,6 +522,8 @@ class SharedArbitrator : public memory::MemoryArbitrator {

const uint64_t fastExponentialGrowthCapacityLimit_;
const double slowCapacityGrowPct_;
const uint64_t memoryPoolMinFreeCapacity_;
const double memoryPoolMinFreeCapacityPct_;

mutable folly::SharedMutex poolLock_;
std::unordered_map<MemoryPool*, std::weak_ptr<MemoryPool>> candidates_;
Expand Down
83 changes: 79 additions & 4 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ class MockSharedArbitrationTest : public testing::Test {
uint64_t memoryPoolTransferCapacity = kMemoryPoolTransferCapacity,
uint64_t fastExponentialGrowthCapacityLimit = 0,
double slowCapacityGrowPct = 0,
uint64_t memoryPoolMinFreeCapacity = 0,
double memoryPoolMinFreeCapacityPct = 0,
std::function<void(MemoryPool&)> arbitrationStateCheckCb = nullptr,
bool globalArtbitrationEnabled = true) {
MemoryManagerOptions options;
Expand All @@ -440,6 +442,8 @@ class MockSharedArbitrationTest : public testing::Test {
options.fastExponentialGrowthCapacityLimit =
fastExponentialGrowthCapacityLimit;
options.slowCapacityGrowPct = slowCapacityGrowPct;
options.memoryPoolMinFreeCapacity = memoryPoolMinFreeCapacity;
options.memoryPoolMinFreeCapacityPct = memoryPoolMinFreeCapacityPct;
options.globalArbitrationEnabled = globalArtbitrationEnabled;
options.arbitrationStateCheckCb = std::move(arbitrationStateCheckCb);
options.checkUsageLeak = true;
Expand Down Expand Up @@ -656,7 +660,7 @@ TEST_F(MockSharedArbitrationTest, arbitrationStateCheck) {
ASSERT_TRUE(RE2::FullMatch(pool.name(), re)) << pool.name();
++checkCount;
};
setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, checkCountCb);
setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, 0, 0, checkCountCb);

const int numTasks{5};
std::vector<std::shared_ptr<MockTask>> tasks;
Expand All @@ -681,7 +685,7 @@ TEST_F(MockSharedArbitrationTest, arbitrationStateCheck) {
MemoryArbitrationStateCheckCB badCheckCb = [&](MemoryPool& /*unused*/) {
VELOX_FAIL("bad check");
};
setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, badCheckCb);
setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, 0, 0, badCheckCb);
std::shared_ptr<MockTask> task = addTask(kMemoryCapacity);
ASSERT_EQ(task->capacity(), 0);
MockMemoryOperator* memOp = task->addMemoryOp();
Expand Down Expand Up @@ -1698,6 +1702,8 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationEnableCheck) {
memoryPoolTransferCapacity,
0,
0,
0,
0,
nullptr,
globalArbitrationEnabled);

Expand Down Expand Up @@ -1904,9 +1910,78 @@ DEBUG_ONLY_TEST_F(
ASSERT_EQ(waitTask->capacity(), memoryCapacity / 2);
}

TEST_F(MockSharedArbitrationTest, singlePoolShrinkWithoutArbitration) {
const int64_t memoryCapacity = 512 * MB;
struct TestParam {
uint64_t memoryPoolReservedBytes;
uint64_t memoryPoolMinFreeCapacity;
double memoryPoolMinFreeCapacityPct;
uint64_t requestBytes;
bool expectThrow;
uint64_t expectedCapacity;
std::string debugString() const {
return fmt::format(
"memoryPoolReservedBytes {}, "
"memoryPoolMinFreeCapacity {}, "
"memoryPoolMinFreeCapacityPct {}, "
"requestBytes {}, ",
succinctBytes(memoryPoolReservedBytes),
succinctBytes(memoryPoolMinFreeCapacity),
memoryPoolMinFreeCapacityPct,
succinctBytes(requestBytes));
}
} testParams[] = {
{0, 128 * MB, 0, 256 * MB, true, 0},
{0, 0, 0.1, 256 * MB, true, 0},
{256 * MB, 128 * MB, 0.5, 256 * MB, false, 384 * MB},
{256 * MB, 128 * MB, 0.125, 256 * MB, false, 320 * MB},
{0, 128 * MB, 0.25, 512 * MB, false, 0},
{256 * MB, 128 * MB, 0.125, 512 * MB, false, 320 * MB}};

for (const auto& testParam : testParams) {
SCOPED_TRACE(testParam.debugString());
if (testParam.expectThrow) {
VELOX_ASSERT_THROW(
setupMemory(
memoryCapacity,
0,
memoryCapacity,
0,
0,
0,
0,
testParam.memoryPoolMinFreeCapacity,
testParam.memoryPoolMinFreeCapacityPct),
"both need to be set (non-zero) at the same time to enable shrink "
"capacity adjustment.");
continue;
} else {
setupMemory(
memoryCapacity,
0,
memoryCapacity,
0,
0,
0,
0,
testParam.memoryPoolMinFreeCapacity,
testParam.memoryPoolMinFreeCapacityPct);
}

auto* memOp = addMemoryOp();
memOp->allocate(testParam.memoryPoolReservedBytes);

ASSERT_EQ(
memOp->pool()->reservedBytes(), testParam.memoryPoolReservedBytes);
arbitrator_->shrinkCapacity(memOp->pool(), testParam.requestBytes);
ASSERT_EQ(memOp->pool()->capacity(), testParam.expectedCapacity);
clearTasks();
}
}

TEST_F(MockSharedArbitrationTest, singlePoolGrowWithoutArbitration) {
int64_t memoryCapacity = 512 << 20;
uint64_t memoryPoolInitCapacity = 32 << 20;
const int64_t memoryCapacity = 512 << 20;
const uint64_t memoryPoolInitCapacity = 32 << 20;
struct TestParam {
uint64_t memoryPoolTransferCapacity;
uint64_t fastExponentialGrowthCapacityLimit;
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ class SharedArbitrationTestBase : public exec::test::HiveConnectorTestBase {
VELOX_CHECK_EQ(
stats.customStats.count(SharedArbitrator::kGlobalArbitrationCount),
1);
VELOX_CHECK_EQ(
VELOX_CHECK_GE(
stats.customStats.at(SharedArbitrator::kGlobalArbitrationCount).sum,
1);
VELOX_CHECK_EQ(
Expand Down

0 comments on commit 5d2d8c3

Please sign in to comment.