Skip to content

Commit

Permalink
Fix ParquetReader initialize schema failed for ARRAY/MAP colum (#10681)
Browse files Browse the repository at this point in the history
Summary:
Fix #10680

CC: Yuhta majetideepak yingsu00

Pull Request resolved: #10681

Reviewed By: DanielHunte

Differential Revision: D63261202

Pulled By: Yuhta

fbshipit-source-id: 7625c3caee84316a843a819eb1a1a76a1e3f0e3b
  • Loading branch information
wypb authored and facebook-github-bot committed Sep 25, 2024
1 parent 0dd3a5d commit 78b6c77
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 7 deletions.
43 changes: 36 additions & 7 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class ReaderBase {
uint32_t parentSchemaIdx,
uint32_t& schemaIdx,
uint32_t& columnIdx,
const TypePtr& requestedType) const;
const TypePtr& requestedType,
const TypePtr& parentRequestedType) const;

TypePtr convertType(
const thrift::SchemaElement& schemaElement,
Expand Down Expand Up @@ -237,7 +238,8 @@ void ReaderBase::initializeSchema() {
0,
schemaIdx,
columnIdx,
options_.fileSchema());
options_.fileSchema(),
nullptr);
schema_ = createRowType(
schemaWithId_->getChildren(), isFileColumnNamesReadAsLowerCase());
}
Expand All @@ -253,7 +255,8 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
uint32_t parentSchemaIdx,
uint32_t& schemaIdx,
uint32_t& columnIdx,
const TypePtr& requestedType) const {
const TypePtr& requestedType,
const TypePtr& parentRequestedType) const {
VELOX_CHECK(fileMetaData_ != nullptr);
VELOX_CHECK_LT(schemaIdx, fileMetaData_->schema.size());

Expand Down Expand Up @@ -287,7 +290,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
VELOX_CHECK(
schemaElement.__isset.num_children && schemaElement.num_children > 0,
"Node has no children but should");
VELOX_CHECK(!requestedType || requestedType->isRow());
VELOX_CHECK(
!requestedType || requestedType->isRow() || requestedType->isArray() ||
requestedType->isMap());

std::vector<std::unique_ptr<ParquetTypeWithId::TypeWithId>> children;

Expand All @@ -298,16 +303,40 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
if (isFileColumnNamesReadAsLowerCase()) {
folly::toLowerAscii(childName);
}
auto childRequestedType =
requestedType ? requestedType->asRow().findChild(childName) : nullptr;

TypePtr childRequestedType = nullptr;
if (requestedType && requestedType->isRow()) {
auto fileTypeIdx =
requestedType->asRow().getChildIdxIfExists(childName);
if (fileTypeIdx.has_value()) {
childRequestedType = requestedType->asRow().childAt(*fileTypeIdx);
}
}

// Handling elements of ARRAY/MAP
if (!requestedType && parentRequestedType) {
if (parentRequestedType->isArray()) {
childRequestedType = parentRequestedType->asArray().elementType();
} else if (parentRequestedType->isMap()) {
auto mapType = parentRequestedType->asMap();
// Processing map keys
if (i == 0) {
childRequestedType = mapType.keyType();
} else {
childRequestedType = mapType.valueType();
}
}
}

auto child = getParquetColumnInfo(
maxSchemaElementIdx,
maxRepeat,
maxDefine,
curSchemaIdx,
schemaIdx,
columnIdx,
childRequestedType);
childRequestedType,
requestedType);
children.push_back(std::move(child));
}
VELOX_CHECK(!children.empty());
Expand Down
Binary file not shown.
47 changes: 47 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,53 @@ TEST_F(ParquetReaderTest, readBinaryAsStringFromNation) {
nameVector->loadedVector()->asFlatVector<StringView>()->valueAt(0));
}

TEST_F(ParquetReaderTest, readComplexType) {
const std::string filename("complex_with_varchar_varbinary.parquet");
const std::string sample(getExampleFilePath(filename));

dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto outputRowType =
ROW({"a", "b", "c", "d"},
{ARRAY(VARCHAR()),
ARRAY(VARBINARY()),
MAP(VARCHAR(), BIGINT()),
MAP(VARBINARY(), BIGINT())});

readerOptions.setFileSchema(outputRowType);
auto reader = createReader(sample, readerOptions);
EXPECT_EQ(reader->numberOfRows(), 1);
auto rowType = reader->rowType();
EXPECT_EQ(rowType->kind(), TypeKind::ROW);
EXPECT_EQ(rowType->size(), 4);
EXPECT_EQ(*rowType, *outputRowType);

auto rowReaderOpts = getReaderOpts(outputRowType);
rowReaderOpts.setScanSpec(makeScanSpec(outputRowType));
auto rowReader = reader->createRowReader(rowReaderOpts);

VectorPtr result = BaseVector::create(outputRowType, 0, &(*leafPool_));
rowReader->next(1, result);
auto aColVector = result->as<RowVector>()
->childAt(0)
->loadedVector()
->as<ArrayVector>()
->elements();
EXPECT_EQ(aColVector->size(), 3);
EXPECT_EQ(aColVector->encoding(), VectorEncoding::Simple::DICTIONARY);
EXPECT_EQ(
aColVector->asUnchecked<DictionaryVector<StringView>>()->valueAt(0).str(),
"AAAA");

auto cColVector =
result->as<RowVector>()->childAt(2)->loadedVector()->as<MapVector>();
auto mapKeys = cColVector->mapKeys();
EXPECT_EQ(mapKeys->size(), 2);
EXPECT_EQ(mapKeys->encoding(), VectorEncoding::Simple::DICTIONARY);
EXPECT_EQ(
mapKeys->asUnchecked<DictionaryVector<StringView>>()->valueAt(0).str(),
"foo");
}

TEST_F(ParquetReaderTest, readFixedLenBinaryAsStringFromUuid) {
const std::string filename("uuid.parquet");
const std::string sample(getExampleFilePath(filename));
Expand Down
38 changes: 38 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
assertQuery(plan, splits_, sql);
}

void assertSelectWithDataColumns(
std::vector<std::string>&& outputColumnNames,
const RowTypePtr& dataColumns,
const std::string& sql) {
auto rowType = getRowType(std::move(outputColumnNames));
auto plan =
PlanBuilder().tableScan(rowType, {}, "", dataColumns).planNode();
assertQuery(plan, splits_, sql);
}

void assertSelectWithAssignments(
std::vector<std::string>&& outputColumnNames,
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>&
Expand Down Expand Up @@ -868,6 +878,34 @@ TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) {
assertEqualResults({expected}, result.second);
}

TEST_F(ParquetTableScanTest, testColumnNotExists) {
auto rowType =
ROW({"a", "b", "not_exists", "not_exists_array", "not_exists_map"},
{BIGINT(),
DOUBLE(),
BIGINT(),
ARRAY(VARBINARY()),
MAP(VARCHAR(), BIGINT())});
// message schema {
// optional int64 a;
// optional double b;
// }
loadData(
getExampleFilePath("sample.parquet"),
rowType,
makeRowVector(
{"a", "b"},
{
makeFlatVector<int64_t>(20, [](auto row) { return row + 1; }),
makeFlatVector<double>(20, [](auto row) { return row + 1; }),
}));

assertSelectWithDataColumns(
{"a", "b", "not_exists", "not_exists_array", "not_exists_map"},
rowType,
"SELECT a, b, NULL, NULL, NULL FROM tmp");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
Expand Down

0 comments on commit 78b6c77

Please sign in to comment.