diff --git a/velox/functions/lib/SubscriptUtil.cpp b/velox/functions/lib/SubscriptUtil.cpp index 8c9b60ccdb0b..a31772d5de64 100644 --- a/velox/functions/lib/SubscriptUtil.cpp +++ b/velox/functions/lib/SubscriptUtil.cpp @@ -58,7 +58,7 @@ struct SimpleType { template VectorPtr applyMapTyped( bool triggerCaching, - std::shared_ptr& cachedLookupTablePtr, + std::shared_ptr& cachedLookupTablePtr, const SelectivityVector& rows, const VectorPtr& mapArg, const VectorPtr& indexArg, @@ -66,11 +66,11 @@ VectorPtr applyMapTyped( static constexpr vector_size_t kMinCachedMapSize = 100; using TKey = typename TypeTraits::NativeType; - LookupTable* typedLookupTable = nullptr; + detail::LookupTable* typedLookupTable = nullptr; if (triggerCaching) { if (!cachedLookupTablePtr) { cachedLookupTablePtr = - std::make_shared>(*context.pool()); + std::make_shared>(*context.pool()); } typedLookupTable = cachedLookupTablePtr->typedTable(); @@ -178,39 +178,13 @@ VectorPtr applyMapTyped( nullsBuilder.build(), indices, rows.end(), baseMap->mapValues()); } -// A flat vector of map keys, an index into that vector and an index into -// the original map keys vector that may have encodings. -struct MapKey { - const BaseVector* baseVector; - const vector_size_t baseIndex; - const vector_size_t index; - - size_t hash() const { - return baseVector->hashValueAt(baseIndex); - } - - bool operator==(const MapKey& other) const { - return baseVector->equalValueAt( - other.baseVector, baseIndex, other.baseIndex); - } - - bool operator<(const MapKey& other) const { - return baseVector->compare(other.baseVector, baseIndex, other.baseIndex) < - 0; - } -}; - -struct MapKeyHasher { - size_t operator()(const MapKey& key) const { - return key.hash(); - } -}; - VectorPtr applyMapComplexType( const SelectivityVector& rows, const VectorPtr& mapArg, const VectorPtr& indexArg, - exec::EvalCtx& context) { + exec::EvalCtx& context, + bool triggerCaching, + std::shared_ptr& cachedHashMap) { auto* pool = context.pool(); // Use indices with the mapValues wrapped in a dictionary vector. @@ -247,18 +221,32 @@ VectorPtr applyMapComplexType( // Fast path for the case of a single map. It may be constant or dictionary // encoded. Use hash table for quick search. if (baseMap->size() == 1) { - folly::F14FastSet set; - auto numKeys = rawSizes[0]; - set.reserve(numKeys * 1.3); - for (auto i = 0; i < numKeys; ++i) { - set.insert(MapKey{mapKeysBase, mapKeysIndices[i], i}); + detail::ComplexKeyHashMap hashMap{detail::MapKeyAllocator(*pool)}; + detail::ComplexKeyHashMap* hashMapPtr = &hashMap; + + if (triggerCaching) { + if (cachedHashMap == nullptr) { + cachedHashMap = std::make_shared( + detail::MapKeyAllocator(*pool)); + } + + hashMapPtr = cachedHashMap.get(); } + + if (hashMapPtr->empty()) { + auto numKeys = rawSizes[0]; + hashMapPtr->reserve(numKeys * 1.3); + for (auto i = 0; i < numKeys; ++i) { + hashMapPtr->insert(detail::MapKey{mapKeysBase, mapKeysIndices[i], i}); + } + } + rows.applyToSelected([&](vector_size_t row) { VELOX_CHECK_EQ(0, mapIndices[row]); auto searchIndex = searchIndices[row]; - auto it = set.find(MapKey{searchBase, searchIndex, row}); - if (it != set.end()) { + auto it = hashMapPtr->find(detail::MapKey{searchBase, searchIndex, row}); + if (it != hashMapPtr->end()) { rawIndices[row] = it->index; } else { nullsBuilder.setNull(row); @@ -302,6 +290,8 @@ VectorPtr applyMapComplexType( } // namespace +namespace detail { + VectorPtr MapSubscript::applyMap( const SelectivityVector& rows, std::vector& args, @@ -312,20 +302,20 @@ VectorPtr MapSubscript::applyMap( // Ensure map key type and second argument are the same. VELOX_CHECK(mapArg->type()->childAt(0)->equivalent(*indexArg->type())); + bool triggerCaching = shouldTriggerCaching(mapArg); if (indexArg->type()->isPrimitiveType()) { - bool triggerCaching = shouldTriggerCaching(mapArg); - return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( applyMapTyped, indexArg->typeKind(), triggerCaching, - const_cast&>(lookupTable_), + primitiveKeyLookupTable_, rows, mapArg, indexArg, context); } else { - return applyMapComplexType(rows, mapArg, indexArg, context); + return applyMapComplexType( + rows, mapArg, indexArg, context, triggerCaching, complexKeyHashMap_); } } @@ -369,5 +359,6 @@ const std::exception_ptr& negativeSubscriptError() { static std::exception_ptr error = makeNegativeSubscriptError(); return error; } +} // namespace detail } // namespace facebook::velox::functions diff --git a/velox/functions/lib/SubscriptUtil.h b/velox/functions/lib/SubscriptUtil.h index 18daf68e03a9..550b90c6367b 100644 --- a/velox/functions/lib/SubscriptUtil.h +++ b/velox/functions/lib/SubscriptUtil.h @@ -30,6 +30,7 @@ namespace facebook::velox::functions { +namespace detail { // Below functions return a stock instance of each of the possible errors in // SubscriptImpl const std::exception_ptr& zeroSubscriptError(); @@ -96,6 +97,42 @@ class LookupTable : public LookupTableBase { std::unique_ptr map_; }; +// A flat vector of map keys, an index into that vector and an index into +// the original map keys vector that may have encodings. +struct MapKey { + const BaseVector* baseVector; + const vector_size_t baseIndex; + const vector_size_t index; + + size_t hash() const { + return baseVector->hashValueAt(baseIndex); + } + + bool operator==(const MapKey& other) const { + return baseVector->equalValueAt( + other.baseVector, baseIndex, other.baseIndex); + } + + bool operator<(const MapKey& other) const { + return baseVector->compare(other.baseVector, baseIndex, other.baseIndex) < + 0; + } +}; + +struct MapKeyHasher { + size_t operator()(const MapKey& key) const { + return key.hash(); + } +}; + +using MapKeyAllocator = memory::StlAllocator; + +using ComplexKeyHashMap = folly::F14FastSet< + detail::MapKey, + detail::MapKeyHasher, + folly::f14::DefaultKeyEqual, + MapKeyAllocator>; + class MapSubscript { public: explicit MapSubscript(bool allowCaching) : allowCaching_(allowCaching) {} @@ -109,8 +146,12 @@ class MapSubscript { return allowCaching_; } - auto& lookupTable() const { - return lookupTable_; + auto& primitiveKeyLookupTable() const { + return primitiveKeyLookupTable_; + } + + auto& complexKeyHashMap() const { + return complexKeyHashMap_; } auto& firstSeenMap() const { @@ -123,9 +164,8 @@ class MapSubscript { return false; } - if (!mapArg->type()->childAt(0)->isPrimitiveType() && - !!mapArg->type()->childAt(0)->isBoolean()) { - // Disable caching if the key type is not primitive or is boolean. + if (mapArg->type()->childAt(0)->isBoolean()) { + // Disable caching if the key type is boolean. allowCaching_ = false; return false; } @@ -141,7 +181,8 @@ class MapSubscript { // Disable caching forever. allowCaching_ = false; - lookupTable_.reset(); + primitiveKeyLookupTable_.reset(); + complexKeyHashMap_.reset(); firstSeenMap_.reset(); return false; } @@ -155,9 +196,15 @@ class MapSubscript { // seen again then it was not modified. mutable VectorPtr firstSeenMap_; - // Materialized cached version of firstSeenMap_ used to optimize the lookup. - mutable std::shared_ptr lookupTable_; + // Materialized cached version of firstSeenMap_ when the keys are primitive + // used to optimize the lookup. + mutable std::shared_ptr primitiveKeyLookupTable_; + + // Materialized cached version of firstSeenMap_ when the keys are complex used + // to optimize the lookup. + mutable std::shared_ptr complexKeyHashMap_; }; +} // namespace detail /// Generic subscript/element_at implementation for both array and map data /// types. @@ -179,7 +226,7 @@ template < class SubscriptImpl : public exec::Subscript { public: explicit SubscriptImpl(bool allowCaching) - : mapSubscript_(MapSubscript(allowCaching)) {} + : mapSubscript_(detail::MapSubscript(allowCaching)) {} void apply( const SelectivityVector& rows, @@ -286,7 +333,7 @@ class SubscriptImpl : public exec::Subscript { const auto adjustedIndex = adjustIndex(decodedIndices->valueAt(0), isZeroSubscriptError); if (isZeroSubscriptError) { - context.setErrors(rows, zeroSubscriptError()); + context.setErrors(rows, detail::zeroSubscriptError()); allFailed = true; } @@ -307,7 +354,7 @@ class SubscriptImpl : public exec::Subscript { const auto adjustedIndex = adjustIndex(originalIndex, isZeroSubscriptError); if (isZeroSubscriptError) { - context.setVeloxExceptionError(row, zeroSubscriptError()); + context.setVeloxExceptionError(row, detail::zeroSubscriptError()); return; } const auto elementIndex = getIndex( @@ -372,7 +419,7 @@ class SubscriptImpl : public exec::Subscript { index += arraySize; } } else { - context.setVeloxExceptionError(row, negativeSubscriptError()); + context.setVeloxExceptionError(row, detail::negativeSubscriptError()); return -1; } } @@ -383,7 +430,7 @@ class SubscriptImpl : public exec::Subscript { if constexpr (allowOutOfBound) { return -1; } else { - context.setVeloxExceptionError(row, badSubscriptError()); + context.setVeloxExceptionError(row, detail::badSubscriptError()); return -1; } } @@ -394,7 +441,7 @@ class SubscriptImpl : public exec::Subscript { } private: - MapSubscript mapSubscript_; + detail::MapSubscript mapSubscript_; }; } // namespace facebook::velox::functions diff --git a/velox/functions/prestosql/benchmarks/MapSubscriptCachingBenchmark.cpp b/velox/functions/prestosql/benchmarks/MapSubscriptCachingBenchmark.cpp index e46b9d01d66b..63568002bb60 100644 --- a/velox/functions/prestosql/benchmarks/MapSubscriptCachingBenchmark.cpp +++ b/velox/functions/prestosql/benchmarks/MapSubscriptCachingBenchmark.cpp @@ -37,6 +37,7 @@ extern void registerSubscriptFunction( int main(int argc, char** argv) { folly::Init init(&argc, &argv); + memory::MemoryManager::testingSetInstance({}); ExpressionBenchmarkBuilder benchmarkBuilder; facebook::velox::functions::prestosql::registerAllScalarFunctions(); @@ -53,7 +54,8 @@ int main(int argc, char** argv) { VectorFuzzer::Options options; options.vectorSize = 1000; options.containerLength = mapLength; - options.complexElementsMaxSize = 10000000000; + // Make sure it's big enough for nested complex types. + options.complexElementsMaxSize = baseVectorSize * mapLength * mapLength; options.containerVariableLength = false; VectorFuzzer fuzzer(options, pool); @@ -112,6 +114,23 @@ int main(int argc, char** argv) { createSetsForType(INTEGER()); createSetsForType(VARCHAR()); + // For complex types, caching only applies if the Vector has a single constant + // value, so we only run with a baseVectorSize of 1. Also, due to the + // cost of the cardinality explosion from having nested complex types, we + // limit the number of iterations to 100. + auto createSetsForComplexType = [&](const auto& keyType) { + createSet(MAP(keyType, INTEGER()), 10, 1, 100); + + createSet(MAP(keyType, INTEGER()), 100, 1, 100); + + createSet(MAP(keyType, INTEGER()), 1000, 1, 100); + + createSet(MAP(keyType, INTEGER()), 10000, 1, 100); + }; + + createSetsForComplexType(ARRAY(BIGINT())); + createSetsForComplexType(MAP(INTEGER(), VARCHAR())); + benchmarkBuilder.registerBenchmarks(); benchmarkBuilder.testBenchmarks(); diff --git a/velox/functions/prestosql/tests/ElementAtTest.cpp b/velox/functions/prestosql/tests/ElementAtTest.cpp index 384ab8e58070..a711b56fc1c4 100644 --- a/velox/functions/prestosql/tests/ElementAtTest.cpp +++ b/velox/functions/prestosql/tests/ElementAtTest.cpp @@ -143,7 +143,8 @@ class ElementAtTest : public FunctionBaseTest { auto keys = makeFlatVector(std::vector({kSNaN})); std::vector args = {inputMap, keys}; - facebook::velox::functions::MapSubscript mapSubscriptWithCaching(true); + facebook::velox::functions::detail::MapSubscript mapSubscriptWithCaching( + true); auto checkStatus = [&](bool cachingEnabled, bool materializedMapIsNull, @@ -152,7 +153,7 @@ class ElementAtTest : public FunctionBaseTest { EXPECT_EQ(firstSeen, mapSubscriptWithCaching.firstSeenMap()); EXPECT_EQ( materializedMapIsNull, - nullptr == mapSubscriptWithCaching.lookupTable()); + nullptr == mapSubscriptWithCaching.primitiveKeyLookupTable()); }; // Initial state. @@ -1030,7 +1031,7 @@ TEST_F(ElementAtTest, errorStatesArray) { [](auto row) { return row == 40; }); } -TEST_F(ElementAtTest, testCachingOptimzation) { +TEST_F(ElementAtTest, testCachingOptimization) { std::vector>>> inputMapVectorData; inputMapVectorData.push_back({}); @@ -1068,7 +1069,8 @@ TEST_F(ElementAtTest, testCachingOptimzation) { auto keys = makeFlatVector({0, 0, 0}); std::vector args = {inputMap, keys}; - facebook::velox::functions::MapSubscript mapSubscriptWithCaching(true); + facebook::velox::functions::detail::MapSubscript mapSubscriptWithCaching( + true); auto checkStatus = [&](bool cachingEnabled, bool materializedMapIsNull, @@ -1077,7 +1079,8 @@ TEST_F(ElementAtTest, testCachingOptimzation) { EXPECT_EQ(firtSeen, mapSubscriptWithCaching.firstSeenMap()); EXPECT_EQ( materializedMapIsNull, - nullptr == mapSubscriptWithCaching.lookupTable()); + nullptr == mapSubscriptWithCaching.primitiveKeyLookupTable()); + EXPECT_EQ(nullptr, mapSubscriptWithCaching.complexKeyHashMap()); }; // Initial state. @@ -1100,9 +1103,9 @@ TEST_F(ElementAtTest, testCachingOptimzation) { // Test the cached map content. auto verfyCachedContent = [&]() { auto& cachedMapTyped = - *static_cast< - facebook::velox::functions::LookupTable*>( - mapSubscriptWithCaching.lookupTable().get()) + *static_cast*>( + mapSubscriptWithCaching.primitiveKeyLookupTable().get()) ->map(); EXPECT_TRUE(cachedMapTyped.count(0)); @@ -1182,3 +1185,148 @@ TEST_F(ElementAtTest, floatingPointCornerCases) { testFloatingPointCornerCases(); testFloatingPointCornerCases(); } + +TEST_F(ElementAtTest, testCachingOptimizationComplexKey) { + std::vector> keys; + std::vector values; + for (int i = 0; i < 999; i += 3) { + // [0, 1, 2] -> 1000 + // [3, 4, 5] -> 1003 + // ... + keys.push_back({i, i + 1, i + 2}); + values.push_back(i + 1000); + } + + // Make a dummy eval context. + exec::ExprSet exprSet({}, &execCtx_); + auto inputs = makeRowVector({}); + exec::EvalCtx evalCtx(&execCtx_, &exprSet, inputs.get()); + + SelectivityVector rows(1); + auto keysVector = makeArrayVector(keys); + auto valuesVector = makeFlatVector(values); + auto inputMap = makeMapVector({0}, keysVector, valuesVector); + + auto inputKeys = makeArrayVector({{0, 1, 2}}); + std::vector args{inputMap, inputKeys}; + + facebook::velox::functions::detail::MapSubscript mapSubscriptWithCaching( + true); + + auto checkStatus = [&](bool cachingEnabled, + bool materializedMapIsNull, + const VectorPtr& firtSeen) { + EXPECT_EQ(cachingEnabled, mapSubscriptWithCaching.cachingEnabled()); + EXPECT_EQ(firtSeen, mapSubscriptWithCaching.firstSeenMap()); + EXPECT_EQ( + materializedMapIsNull, + nullptr == mapSubscriptWithCaching.complexKeyHashMap()); + EXPECT_EQ(nullptr, mapSubscriptWithCaching.primitiveKeyLookupTable()); + }; + + // Initial state. + checkStatus(true, true, nullptr); + + auto result1 = mapSubscriptWithCaching.applyMap(rows, args, evalCtx); + // Nothing has been materialized yet since the input is seen only once. + checkStatus(true, true, args[0]); + + auto result2 = mapSubscriptWithCaching.applyMap(rows, args, evalCtx); + checkStatus(true, false, args[0]); + + auto result3 = mapSubscriptWithCaching.applyMap(rows, args, evalCtx); + checkStatus(true, false, args[0]); + + // all the result should be the same. + test::assertEqualVectors(result1, result2); + test::assertEqualVectors(result2, result3); + + // Test the cached map content. + auto verfyCachedContent = [&]() { + auto& cachedMap = mapSubscriptWithCaching.complexKeyHashMap(); + + for (int i = 0; i < keysVector->size(); i += 3) { + EXPECT_NE( + cachedMap->end(), + cachedMap->find(facebook::velox::functions::detail::MapKey{ + keysVector.get(), 0, 0})); + } + }; + + verfyCachedContent(); + // Pass different map with same base. + { + auto dictInput = BaseVector::wrapInDictionary( + nullptr, makeIndices({0, 0, 0}), 1, inputMap); + + SelectivityVector rows(3); + std::vector args{ + dictInput, makeArrayVector({{0, 1, 2}, {0, 1, 2}, {0, 1, 2}})}; + auto result = mapSubscriptWithCaching.applyMap(rows, args, evalCtx); + // Last seen map will keep pointing to the original map since both have + // the same base. + checkStatus(true, false, inputMap); + + auto expectedResult = makeFlatVector({1000, 1000, 1000}); + test::assertEqualVectors(expectedResult, result); + verfyCachedContent(); + } + + { + auto constantInput = BaseVector::wrapInConstant(3, 0, inputMap); + + SelectivityVector rows(3); + std::vector args{ + constantInput, + makeArrayVector({{0, 1, 2}, {0, 1, 2}, {0, 1, 2}})}; + auto result = mapSubscriptWithCaching.applyMap(rows, args, evalCtx); + // Last seen map will keep pointing to the original map since both have + // the same base. + checkStatus(true, false, inputMap); + + auto expectedResult = makeFlatVector({1000, 1000, 1000}); + test::assertEqualVectors(expectedResult, result); + verfyCachedContent(); + } + + // Pass a different map, caching will be disabled. + { + args[0] = makeMapVector({0}, keysVector, valuesVector); + auto result = mapSubscriptWithCaching.applyMap(rows, args, evalCtx); + checkStatus(false, true, nullptr); + test::assertEqualVectors(result, result1); + } + + { + args[0] = makeMapVector({0}, keysVector, valuesVector); + auto result = mapSubscriptWithCaching.applyMap(rows, args, evalCtx); + checkStatus(false, true, nullptr); + test::assertEqualVectors(result, result1); + } + + for (int i = 0; i < 999; i += 3) { + // [0, 1, 2] -> 0 + // [2, 3, 4] -> 1 + // ... + keys.push_back({i * 2, i * 2 + 1, i * 2 + 2}); + values.push_back(i); + } + + for (int i = 0; i < 30; i += 3) { + // [0, 1, 2] -> 0 + // [3, 4, 5] -> 3 + // ... + keys.push_back({i, i + 1, i + 2}); + values.push_back(i); + } + + args[0] = makeMapVector({0, 333, 666}, keysVector, valuesVector); + auto resultWithMoreVectors = + mapSubscriptWithCaching.applyMap(rows, args, evalCtx); + checkStatus(false, true, nullptr); + + auto resultWithMoreVectors1 = + mapSubscriptWithCaching.applyMap(rows, args, evalCtx); + checkStatus(false, true, nullptr); + test::assertEqualVectors(resultWithMoreVectors, resultWithMoreVectors1); +} diff --git a/velox/vector/fuzzer/VectorFuzzer.cpp b/velox/vector/fuzzer/VectorFuzzer.cpp index df7457ea3787..1170ea98989a 100644 --- a/velox/vector/fuzzer/VectorFuzzer.cpp +++ b/velox/vector/fuzzer/VectorFuzzer.cpp @@ -135,9 +135,10 @@ Timestamp randTimestamp(FuzzerGenerator& rng, VectorFuzzer::Options opts) { size_t getElementsVectorLength( const VectorFuzzer::Options& opts, vector_size_t size) { - if (opts.containerVariableLength == false && - size * opts.containerLength > opts.complexElementsMaxSize) { - VELOX_USER_FAIL( + if (!opts.containerVariableLength) { + VELOX_USER_CHECK_LE( + size * opts.containerLength, + opts.complexElementsMaxSize, "Requested fixed opts.containerVariableLength can't be satisfied: " "increase opts.complexElementsMaxSize, reduce opts.containerLength" " or make opts.containerVariableLength=true");