diff --git a/velox/common/base/SpillStats.cpp b/velox/common/base/SpillStats.cpp index 442af9c76502..4056f5c41163 100644 --- a/velox/common/base/SpillStats.cpp +++ b/velox/common/base/SpillStats.cpp @@ -299,10 +299,11 @@ void updateGlobalSpillWriteStats( } void updateGlobalSpillReadStats( + uint64_t spillReads, uint64_t spillReadBytes, uint64_t spillRadTimeUs) { auto statsLocked = localSpillStats().wlock(); - ++statsLocked->spillReads; + statsLocked->spillReads += spillReads; statsLocked->spillReadBytes += spillReadBytes; statsLocked->spillReadTimeUs += spillRadTimeUs; } diff --git a/velox/common/base/SpillStats.h b/velox/common/base/SpillStats.h index c2aa0f3ee578..8b9ad7d11cac 100644 --- a/velox/common/base/SpillStats.h +++ b/velox/common/base/SpillStats.h @@ -142,6 +142,7 @@ void updateGlobalSpillWriteStats( /// Updates the stats for disk read including the number of disk reads, the /// amount of data read in bytes, and the time it takes to read from the disk. void updateGlobalSpillReadStats( + uint64_t spillReads, uint64_t spillReadBytes, uint64_t spillRadTimeUs); diff --git a/velox/common/file/CMakeLists.txt b/velox/common/file/CMakeLists.txt index b68e32322fe6..31d9f1ebe0ff 100644 --- a/velox/common/file/CMakeLists.txt +++ b/velox/common/file/CMakeLists.txt @@ -14,11 +14,16 @@ # for generated headers include_directories(.) -velox_add_library(velox_file File.cpp FileSystems.cpp Utils.cpp) +velox_add_library( + velox_file + File.cpp + FileInputStream.cpp + FileSystems.cpp + Utils.cpp) velox_link_libraries( velox_file PUBLIC velox_exception Folly::folly - PRIVATE velox_common_base fmt::fmt glog::glog) + PRIVATE velox_buffer velox_common_base fmt::fmt glog::glog) if(${VELOX_BUILD_TESTING} OR ${VELOX_BUILD_TEST_UTILS}) add_subdirectory(tests) diff --git a/velox/common/file/FileInputStream.cpp b/velox/common/file/FileInputStream.cpp new file mode 100644 index 000000000000..73fdb2692c24 --- /dev/null +++ b/velox/common/file/FileInputStream.cpp @@ -0,0 +1,257 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/FileInputStream.h" + +namespace facebook::velox::common { + +FileInputStream::FileInputStream( + std::unique_ptr&& file, + uint64_t bufferSize, + memory::MemoryPool* pool) + : file_(std::move(file)), + fileSize_(file_->size()), + bufferSize_(std::min(fileSize_, bufferSize)), + pool_(pool), + readAheadEnabled_((bufferSize_ < fileSize_) && file_->hasPreadvAsync()) { + VELOX_CHECK_NOT_NULL(pool_); + VELOX_CHECK_GT(fileSize_, 0, "Empty FileInputStream"); + + buffers_.push_back(AlignedBuffer::allocate(bufferSize_, pool_)); + if (readAheadEnabled_) { + buffers_.push_back(AlignedBuffer::allocate(bufferSize_, pool_)); + } + readNextRange(); +} + +FileInputStream::~FileInputStream() { + if (!readAheadWait_.valid()) { + return; + } + try { + readAheadWait_.wait(); + } catch (const std::exception& ex) { + // ignore any prefetch error when query has failed. + LOG(WARNING) << "FileInputStream read-ahead failed on destruction " + << ex.what(); + } +} + +void FileInputStream::readNextRange() { + VELOX_CHECK(current_ == nullptr || current_->availableBytes() == 0); + ranges_.clear(); + current_ = nullptr; + + int32_t readBytes{0}; + uint64_t readTimeUs{0}; + { + MicrosecondTimer timer{&readTimeUs}; + if (readAheadWait_.valid()) { + readBytes = std::move(readAheadWait_) + .via(&folly::QueuedImmediateExecutor::instance()) + .wait() + .value(); + VELOX_CHECK(!readAheadWait_.valid()); + VELOX_CHECK_LT( + 0, readBytes, "Read past end of FileInputStream {}", fileSize_); + advanceBuffer(); + } else { + readBytes = readSize(); + VELOX_CHECK_LT( + 0, readBytes, "Read past end of FileInputStream {}", fileSize_); + MicrosecondTimer timer{&readTimeUs}; + file_->pread(fileOffset_, readBytes, buffer()->asMutable()); + } + } + + ranges_.resize(1); + ranges_[0] = {buffer()->asMutable(), readBytes, 0}; + current_ = ranges_.data(); + fileOffset_ += readBytes; + + updateStats(readBytes, readTimeUs); + + maybeIssueReadahead(); +} + +size_t FileInputStream::size() const { + return fileSize_; +} + +bool FileInputStream::atEnd() const { + return tellp() >= fileSize_; +} + +std::streampos FileInputStream::tellp() const { + if (current_ == nullptr) { + VELOX_CHECK_EQ(fileOffset_, fileSize_); + return fileOffset_; + } + return fileOffset_ - current_->availableBytes(); +} + +void FileInputStream::seekp(std::streampos position) { + static_assert(sizeof(std::streamsize) <= sizeof(int64_t)); + const int64_t seekPos = position; + const int64_t curPos = tellp(); + VELOX_CHECK_GE( + seekPos, curPos, "Backward seek is not supported by FileInputStream"); + + const int64_t toSkip = seekPos - curPos; + if (toSkip == 0) { + return; + } + doSeek(toSkip); +} + +void FileInputStream::skip(int32_t size) { + doSeek(size); +} + +void FileInputStream::doSeek(int64_t skipBytes) { + VELOX_CHECK_GE(skipBytes, 0, "Attempting to skip negative number of bytes"); + if (skipBytes == 0) { + return; + } + + VELOX_CHECK_LE( + skipBytes, + remainingSize(), + "Skip past the end of FileInputStream: {}", + fileSize_); + + for (;;) { + const int64_t skippedBytes = + std::min(current_->availableBytes(), skipBytes); + skipBytes -= skippedBytes; + current_->position += skippedBytes; + if (skipBytes == 0) { + return; + } + readNextRange(); + } +} + +size_t FileInputStream::remainingSize() const { + return fileSize_ - tellp(); +} + +uint8_t FileInputStream::readByte() { + VELOX_CHECK_GT( + remainingSize(), 0, "Read past the end of input file {}", fileSize_); + + if (current_->availableBytes() > 0) { + return current_->buffer[current_->position++]; + } + readNextRange(); + return readByte(); +} + +void FileInputStream::readBytes(uint8_t* bytes, int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes"); + if (size == 0) { + return; + } + + VELOX_CHECK_LE( + size, remainingSize(), "Read past the end of input file {}", fileSize_); + + int32_t offset{0}; + for (;;) { + const int32_t readBytes = + std::min(current_->availableBytes(), size); + simd::memcpy( + bytes + offset, current_->buffer + current_->position, readBytes); + offset += readBytes; + size -= readBytes; + current_->position += readBytes; + if (size == 0) { + return; + } + readNextRange(); + } +} + +std::string_view FileInputStream::nextView(int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes"); + if (remainingSize() == 0) { + return std::string_view(nullptr, 0); + } + + if (current_->availableBytes() == 0) { + readNextRange(); + } + + VELOX_CHECK_GT(current_->availableBytes(), 0); + const auto position = current_->position; + const auto viewSize = std::min(current_->availableBytes(), size); + current_->position += viewSize; + return std::string_view( + reinterpret_cast(current_->buffer) + position, viewSize); +} + +uint64_t FileInputStream::readSize() const { + return std::min(fileSize_ - fileOffset_, bufferSize_); +} + +void FileInputStream::maybeIssueReadahead() { + VELOX_CHECK(!readAheadWait_.valid()); + if (!readAheadEnabled_) { + return; + } + const auto size = readSize(); + if (size == 0) { + return; + } + std::vector> ranges; + ranges.emplace_back(nextBuffer()->asMutable(), size); + readAheadWait_ = file_->preadvAsync(fileOffset_, ranges); + VELOX_CHECK(readAheadWait_.valid()); +} + +void FileInputStream::updateStats(uint64_t readBytes, uint64_t readTimeUs) { + stats_.readBytes += readBytes; + stats_.readTimeUs += readTimeUs; + ++stats_.numReads; +} + +std::string FileInputStream::toString() const { + return fmt::format( + "file (offset {}/size {}) current (position {}/ size {})", + succinctBytes(fileOffset_), + succinctBytes(fileSize_), + current_ == nullptr ? "NULL" : succinctBytes(current_->position), + current_ == nullptr ? "NULL" : succinctBytes(current_->size)); +} + +FileInputStream::Stats FileInputStream::stats() const { + return stats_; +} + +bool FileInputStream::Stats::operator==( + const FileInputStream::Stats& other) const { + return std::tie(numReads, readBytes, readTimeUs) == + std::tie(other.numReads, other.readBytes, other.readTimeUs); +} + +std::string FileInputStream::Stats::toString() const { + return fmt::format( + "numReads: {}, readBytes: {}, readTimeUs: {}", + numReads, + succinctBytes(readBytes), + succinctMicros(readTimeUs)); +} +} // namespace facebook::velox::common diff --git a/velox/common/file/FileInputStream.h b/velox/common/file/FileInputStream.h new file mode 100644 index 000000000000..4d9b7d83b9ac --- /dev/null +++ b/velox/common/file/FileInputStream.h @@ -0,0 +1,128 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/buffer/Buffer.h" +#include "velox/common/file/File.h" +#include "velox/common/memory/ByteStream.h" + +namespace facebook::velox::common { + +/// Readonly byte input stream backed by file. +class FileInputStream : public ByteInputStream { + public: + FileInputStream( + std::unique_ptr&& file, + uint64_t bufferSize, + memory::MemoryPool* pool); + + ~FileInputStream() override; + + FileInputStream(const FileInputStream&) = delete; + FileInputStream& operator=(const FileInputStream& other) = delete; + FileInputStream(FileInputStream&& other) noexcept = delete; + FileInputStream& operator=(FileInputStream&& other) noexcept = delete; + + size_t size() const override; + + bool atEnd() const override; + + std::streampos tellp() const override; + + void seekp(std::streampos pos) override; + + void skip(int32_t size) override; + + size_t remainingSize() const override; + + uint8_t readByte() override; + + void readBytes(uint8_t* bytes, int32_t size) override; + + std::string_view nextView(int32_t size) override; + + std::string toString() const override; + + /// Records the file read stats. + struct Stats { + uint32_t numReads{0}; + uint64_t readBytes{0}; + uint64_t readTimeUs{0}; + + bool operator==(const Stats& other) const; + + std::string toString() const; + }; + Stats stats() const; + + private: + void doSeek(int64_t skipBytes); + + // Invoked to read the next byte range from the file in a buffer. + void readNextRange(); + + // Issues readahead if underlying file system supports async mode read. + // + // TODO: we might consider to use AsyncSource to support read-ahead on + // filesystem which doesn't support async mode read. + void maybeIssueReadahead(); + + inline uint64_t readSize() const; + + inline uint32_t bufferIndex() const { + return bufferIndex_; + } + + inline uint32_t nextBufferIndex() const { + return (bufferIndex_ + 1) % buffers_.size(); + } + + // Advances buffer index to point to the next buffer for read. + inline void advanceBuffer() { + bufferIndex_ = nextBufferIndex(); + } + + inline Buffer* buffer() const { + return buffers_[bufferIndex()].get(); + } + + inline Buffer* nextBuffer() const { + return buffers_[nextBufferIndex()].get(); + } + + void updateStats(uint64_t readBytes, uint64_t readTimeUs); + + const std::unique_ptr file_; + const uint64_t fileSize_; + const uint64_t bufferSize_; + memory::MemoryPool* const pool_; + const bool readAheadEnabled_; + + // Offset of the next byte to read from file. + uint64_t fileOffset_ = 0; + + std::vector buffers_; + uint32_t bufferIndex_{0}; + // Sets to read-ahead future if valid. + folly::SemiFuture readAheadWait_{ + folly::SemiFuture::makeEmpty()}; + + Stats stats_; +}; +} // namespace facebook::velox::common diff --git a/velox/common/file/Utils.cpp b/velox/common/file/Utils.cpp index ae2d3ced7dd8..bd9bfea9dcb8 100644 --- a/velox/common/file/Utils.cpp +++ b/velox/common/file/Utils.cpp @@ -34,5 +34,4 @@ bool CoalesceIfDistanceLE::operator()( } return shouldCoalesce; } - } // namespace facebook::velox::file::utils diff --git a/velox/common/file/Utils.h b/velox/common/file/Utils.h index b8532b3416c5..b19468fbe1fe 100644 --- a/velox/common/file/Utils.h +++ b/velox/common/file/Utils.h @@ -165,5 +165,4 @@ class ReadToIOBufs { OutputIter output_; Reader reader_; }; - } // namespace facebook::velox::file::utils diff --git a/velox/common/file/tests/CMakeLists.txt b/velox/common/file/tests/CMakeLists.txt index 20245002e0a1..2c6aa72af6db 100644 --- a/velox/common/file/tests/CMakeLists.txt +++ b/velox/common/file/tests/CMakeLists.txt @@ -19,7 +19,8 @@ target_link_libraries( velox_file_test_utils PUBLIC velox_file) -add_executable(velox_file_test FileTest.cpp UtilsTest.cpp) +add_executable(velox_file_test FileTest.cpp FileInputStreamTest.cpp + UtilsTest.cpp) add_test(velox_file_test velox_file_test) target_link_libraries( velox_file_test diff --git a/velox/common/file/tests/FileInputStreamTest.cpp b/velox/common/file/tests/FileInputStreamTest.cpp new file mode 100644 index 000000000000..73af3375dd1b --- /dev/null +++ b/velox/common/file/tests/FileInputStreamTest.cpp @@ -0,0 +1,115 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/common/memory/ByteStream.h" + +#include "velox/common/base/BitUtil.h" +#include "velox/common/file/FileInputStream.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/memory/MmapAllocator.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" + +#include +#include + +using namespace facebook::velox; +using namespace facebook::velox::memory; + +class FileInputStreamTest : public testing::Test { + protected: + static void SetUpTestCase() { + filesystems::registerLocalFileSystem(); + } + + void SetUp() override { + constexpr uint64_t kMaxMappedMemory = 64 << 20; + MemoryManagerOptions options; + options.useMmapAllocator = true; + options.allocatorCapacity = kMaxMappedMemory; + options.arbitratorCapacity = kMaxMappedMemory; + options.arbitratorReservedCapacity = 0; + memoryManager_ = std::make_unique(options); + mmapAllocator_ = static_cast(memoryManager_->allocator()); + pool_ = memoryManager_->addLeafPool("ByteStreamTest"); + rng_.seed(124); + tempDirPath_ = exec::test::TempDirectoryPath::create(); + fs_ = filesystems::getFileSystem(tempDirPath_->getPath(), nullptr); + } + + void TearDown() override {} + + std::unique_ptr createStream( + uint64_t streamSize, + uint32_t bufferSize = 1024) { + const auto filePath = + fmt::format("{}/{}", tempDirPath_->getPath(), fileId_++); + auto writeFile = fs_->openFileForWrite(filePath); + std::uint8_t buffer[streamSize]; + for (int i = 0; i < streamSize; ++i) { + buffer[i] = i % 256; + } + writeFile->append( + std::string_view(reinterpret_cast(buffer), streamSize)); + writeFile->close(); + return std::make_unique( + fs_->openFileForRead(filePath), bufferSize, pool_.get()); + } + + folly::Random::DefaultGenerator rng_; + std::unique_ptr memoryManager_; + MmapAllocator* mmapAllocator_; + std::shared_ptr pool_; + std::atomic_uint64_t fileId_{0}; + std::shared_ptr tempDirPath_; + std::shared_ptr fs_; +}; + +TEST_F(FileInputStreamTest, stats) { + struct { + size_t streamSize; + size_t bufferSize; + + std::string debugString() const { + return fmt::format( + "streamSize {}, bufferSize {}", streamSize, bufferSize); + } + } testSettings[] = { + {4096, 1024}, {4096, 4096}, {4096, 8192}, {4096, 4096 + 1024}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + auto byteStream = createStream(testData.streamSize, testData.bufferSize); + ASSERT_EQ(byteStream->stats().numReads, 1); + ASSERT_EQ( + byteStream->stats().readBytes, + std::min(testData.streamSize, testData.bufferSize)); + ASSERT_GT(byteStream->stats().readTimeUs, 0); + uint8_t buffer[testData.streamSize / 8]; + for (int offset = 0; offset < testData.streamSize;) { + byteStream->readBytes(buffer, testData.streamSize / 8); + for (int i = 0; i < testData.streamSize / 8; ++i, ++offset) { + ASSERT_EQ(buffer[i], offset % 256); + } + } + ASSERT_TRUE(byteStream->atEnd()); + ASSERT_EQ( + byteStream->stats().numReads, + bits::roundUp(testData.streamSize, testData.bufferSize) / + testData.bufferSize); + ASSERT_EQ(byteStream->stats().readBytes, testData.streamSize); + ASSERT_GT(byteStream->stats().readTimeUs, 0); + } +} diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/memory/ByteStream.cpp index 43616e1c2574..e7802f477823 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/memory/ByteStream.cpp @@ -18,11 +18,15 @@ namespace facebook::velox { +uint32_t ByteRange::availableBytes() const { + return std::max(0, size - position); +} + std::string ByteRange::toString() const { return fmt::format("[{} starting at {}]", succinctBytes(size), position); } -std::string ByteInputStream::toString() const { +std::string BufferInputStream::toString() const { std::stringstream oss; oss << ranges_.size() << " ranges (position/size) ["; for (const auto& range : ranges_) { @@ -36,8 +40,8 @@ std::string ByteInputStream::toString() const { return oss.str(); } -bool ByteInputStream::atEnd() const { - if (!current_) { +bool BufferInputStream::atEnd() const { + if (current_ == nullptr) { return false; } if (current_->position < current_->size) { @@ -48,7 +52,7 @@ bool ByteInputStream::atEnd() const { return current_ == &ranges_.back(); } -size_t ByteInputStream::size() const { +size_t BufferInputStream::size() const { size_t total = 0; for (const auto& range : ranges_) { total += range.size; @@ -56,20 +60,20 @@ size_t ByteInputStream::size() const { return total; } -size_t ByteInputStream::remainingSize() const { +size_t BufferInputStream::remainingSize() const { if (ranges_.empty()) { return 0; } - const auto* lastRange = &ranges_[ranges_.size() - 1]; - auto cur = current_; - size_t total = cur->size - cur->position; + const auto* lastRange = &ranges_.back(); + auto* cur = current_; + size_t remainingBytes = cur->availableBytes(); while (++cur <= lastRange) { - total += cur->size; + remainingBytes += cur->size; } - return total; + return remainingBytes; } -std::streampos ByteInputStream::tellp() const { +std::streampos BufferInputStream::tellp() const { if (ranges_.empty()) { return 0; } @@ -81,10 +85,10 @@ std::streampos ByteInputStream::tellp() const { } size += range.size; } - VELOX_FAIL("ByteInputStream 'current_' is not in 'ranges_'."); + VELOX_FAIL("BufferInputStream 'current_' is not in 'ranges_'."); } -void ByteInputStream::seekp(std::streampos position) { +void BufferInputStream::seekp(std::streampos position) { if (ranges_.empty() && position == 0) { return; } @@ -99,77 +103,72 @@ void ByteInputStream::seekp(std::streampos position) { } static_assert(sizeof(std::streamsize) <= sizeof(long long)); VELOX_FAIL( - "Seeking past end of ByteInputStream: {}", + "Seeking past end of BufferInputStream: {}", static_cast(position)); } -void ByteInputStream::next(bool throwIfPastEnd) { +void BufferInputStream::nextRange() { VELOX_CHECK(current_ >= &ranges_[0]); - size_t position = current_ - &ranges_[0]; - VELOX_CHECK_LT(position, ranges_.size()); - if (position == ranges_.size() - 1) { - if (throwIfPastEnd) { - VELOX_FAIL("Reading past end of ByteInputStream"); - } - return; - } + const size_t rangeIndex = current_ - &ranges_[0]; + VELOX_CHECK_LT( + rangeIndex + 1, ranges_.size(), "Reading past end of BufferInputStream"); ++current_; current_->position = 0; } -uint8_t ByteInputStream::readByte() { +uint8_t BufferInputStream::readByte() { if (current_->position < current_->size) { return current_->buffer[current_->position++]; } - next(); + nextRange(); return readByte(); } -void ByteInputStream::readBytes(uint8_t* bytes, int32_t size) { +void BufferInputStream::readBytes(uint8_t* bytes, int32_t size) { VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes"); int32_t offset = 0; for (;;) { - int32_t available = current_->size - current_->position; - int32_t numUsed = std::min(available, size); + const int32_t availableBytes = current_->size - current_->position; + const int32_t readBytes = std::min(availableBytes, size); simd::memcpy( - bytes + offset, current_->buffer + current_->position, numUsed); - offset += numUsed; - size -= numUsed; - current_->position += numUsed; - if (!size) { + bytes + offset, current_->buffer + current_->position, readBytes); + offset += readBytes; + size -= readBytes; + current_->position += readBytes; + if (size == 0) { return; } - next(); + nextRange(); } } -std::string_view ByteInputStream::nextView(int32_t size) { +std::string_view BufferInputStream::nextView(int32_t size) { VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes"); if (current_->position == current_->size) { if (current_ == &ranges_.back()) { return std::string_view(nullptr, 0); } - next(); + nextRange(); } - VELOX_CHECK(current_->size); - auto position = current_->position; - auto viewSize = std::min(current_->size - current_->position, size); + VELOX_CHECK_GT(current_->size, 0); + const auto position = current_->position; + const auto viewSize = std::min(current_->size - current_->position, size); current_->position += viewSize; return std::string_view( reinterpret_cast(current_->buffer) + position, viewSize); } -void ByteInputStream::skip(int32_t size) { +void BufferInputStream::skip(int32_t size) { VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes"); for (;;) { - int32_t available = current_->size - current_->position; - int32_t numUsed = std::min(available, size); - size -= numUsed; - current_->position += numUsed; - if (!size) { + const int32_t numSkipped = + std::min(current_->availableBytes(), size); + size -= numSkipped; + current_->position += numSkipped; + if (size == 0) { return; } - next(); + nextRange(); } } @@ -378,12 +377,12 @@ void ByteOutputStream::ensureSpace(int32_t bytes) { current_->position = originalPosition; } -ByteInputStream ByteOutputStream::inputStream() const { +std::unique_ptr ByteOutputStream::inputStream() const { VELOX_CHECK(!ranges_.empty()); updateEnd(); auto rangeCopy = ranges_; rangeCopy.back().size = lastRangeEnd_; - return ByteInputStream(std::move(rangeCopy)); + return std::make_unique(std::move(rangeCopy)); } std::string ByteOutputStream::toString() const { diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index 80e89d0ccb8c..288de7080ded 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -35,6 +35,9 @@ struct ByteRange { /// Index of next byte/bit to be read/written in 'buffer'. int32_t position; + /// Returns the available bytes left in this range. + uint32_t availableBytes() const; + std::string toString() const; }; @@ -91,64 +94,29 @@ class OStreamOutputStream : public OutputStream { std::ostream* out_; }; -/// Read-only stream over one or more byte buffers. +/// Read-only byte input stream interface. class ByteInputStream { - protected: - /// TODO Remove after refactoring SpillInput. - ByteInputStream() {} - public: - explicit ByteInputStream(std::vector ranges) - : ranges_{std::move(ranges)} { - VELOX_CHECK(!ranges_.empty()); - current_ = &ranges_[0]; - } - - /// Disable copy constructor. - ByteInputStream(const ByteInputStream&) = delete; - - /// Disable copy assignment operator. - ByteInputStream& operator=(const ByteInputStream& other) = delete; - - /// Enable move constructor. - ByteInputStream(ByteInputStream&& other) noexcept - : ranges_{std::move(other.ranges_)}, current_{other.current_} {} - - /// Enable move assignment operator. - ByteInputStream& operator=(ByteInputStream&& other) noexcept { - if (this != &other) { - ranges_ = std::move(other.ranges_); - current_ = other.current_; - other.current_ = nullptr; - } - return *this; - } - - /// TODO Remove after refactoring SpillInput. virtual ~ByteInputStream() = default; /// Returns total number of bytes available in the stream. - size_t size() const; + virtual size_t size() const = 0; /// Returns true if all input has been read. - /// - /// TODO: Remove 'virtual' after refactoring SpillInput. - virtual bool atEnd() const; + virtual bool atEnd() const = 0; /// Returns current position (number of bytes from the start) in the stream. - std::streampos tellp() const; + virtual std::streampos tellp() const = 0; /// Moves current position to specified one. - void seekp(std::streampos pos); + virtual void seekp(std::streampos pos) = 0; /// Returns the remaining size left from current reading position. - size_t remainingSize() const; + virtual size_t remainingSize() const = 0; - std::string toString() const; - - uint8_t readByte(); + virtual uint8_t readByte() = 0; - void readBytes(uint8_t* bytes, int32_t size); + virtual void readBytes(uint8_t* bytes, int32_t size) = 0; template T read() { @@ -157,9 +125,9 @@ class ByteInputStream { return *reinterpret_cast( current_->buffer + current_->position - sizeof(T)); } - // The number straddles two buffers. We read byte by byte and make - // a little-endian uint64_t. The bytes can be cast to any integer - // or floating point type since the wire format has the machine byte order. + // The number straddles two buffers. We read byte by byte and make a + // little-endian uint64_t. The bytes can be cast to any integer or floating + // point type since the wire format has the machine byte order. static_assert(sizeof(T) <= sizeof(uint64_t)); uint64_t value = 0; for (int32_t i = 0; i < sizeof(T); ++i) { @@ -173,39 +141,64 @@ class ByteInputStream { readBytes(reinterpret_cast(data), size); } - /// Returns a view over the read buffer for up to 'size' next - /// bytes. The size of the value may be less if the current byte - /// range ends within 'size' bytes from the current position. The - /// size will be 0 if at end. - std::string_view nextView(int32_t size); + /// Returns a view over the read buffer for up to 'size' next bytes. The size + /// of the value may be less if the current byte range ends within 'size' + /// bytes from the current position. The size will be 0 if at end. + virtual std::string_view nextView(int32_t size) = 0; + + virtual void skip(int32_t size) = 0; - void skip(int32_t size); + virtual std::string toString() const = 0; protected: - /// Sets 'current_' to point to the next range of input. // The - /// input is consecutive ByteRanges in 'ranges_' for the base class - /// but any view over external buffers can be made by specialization. - /// - /// TODO: Remove 'virtual' after refactoring SpillInput. - virtual void next(bool throwIfPastEnd = true); - - // TODO: Remove after refactoring SpillInput. - const std::vector& ranges() const { - return ranges_; - } + // Points to the current buffered byte range. + ByteRange* current_{nullptr}; + std::vector ranges_; +}; - // TODO: Remove after refactoring SpillInput. - void setRange(ByteRange range) { - ranges_.resize(1); - ranges_[0] = range; - current_ = ranges_.data(); +/// Read-only input stream backed by a set of buffers. +class BufferInputStream : public ByteInputStream { + public: + explicit BufferInputStream(std::vector ranges) { + VELOX_CHECK(!ranges.empty(), "Empty BufferInputStream"); + ranges_ = std::move(ranges); + current_ = &ranges_[0]; } + BufferInputStream(const BufferInputStream&) = delete; + BufferInputStream& operator=(const BufferInputStream& other) = delete; + BufferInputStream(BufferInputStream&& other) noexcept = delete; + BufferInputStream& operator=(BufferInputStream&& other) noexcept = delete; + + size_t size() const override; + + bool atEnd() const override; + + std::streampos tellp() const override; + + void seekp(std::streampos pos) override; + + size_t remainingSize() const override; + + uint8_t readByte() override; + + void readBytes(uint8_t* bytes, int32_t size) override; + + std::string_view nextView(int32_t size) override; + + void skip(int32_t size) override; + + std::string toString() const override; + private: - std::vector ranges_; + // Sets 'current_' to the next range of input. The input is consecutive + // ByteRanges in 'ranges_' for the base class but any view over external + // buffers can be made by specialization. + void nextRange(); - // Pointer to the current element of 'ranges_'. - ByteRange* current_{nullptr}; + const std::vector& ranges() const { + return ranges_; + } }; /// Stream over a chain of ByteRanges. Provides read, write and @@ -268,9 +261,8 @@ class ByteOutputStream { void seekp(std::streampos position); - /// Returns the size written into ranges_. This is the sum of the - /// capacities of non-last ranges + the greatest write position of - /// the last range. + /// Returns the size written into ranges_. This is the sum of the capacities + /// of non-last ranges + the greatest write position of the last range. size_t size() const; int32_t lastRangeEnd() const { @@ -346,7 +338,7 @@ class ByteOutputStream { /// Returns a ByteInputStream to range over the current content of 'this'. The /// result is valid as long as 'this' is live and not changed. - ByteInputStream inputStream() const; + std::unique_ptr inputStream() const; std::string toString() const; diff --git a/velox/common/memory/HashStringAllocator.cpp b/velox/common/memory/HashStringAllocator.cpp index 908ae2b1c043..1a938a0dd571 100644 --- a/velox/common/memory/HashStringAllocator.cpp +++ b/velox/common/memory/HashStringAllocator.cpp @@ -166,7 +166,7 @@ void HashStringAllocator::freeToPool(void* ptr, size_t size) { } // static -ByteInputStream HashStringAllocator::prepareRead( +std::unique_ptr HashStringAllocator::prepareRead( const Header* begin, size_t maxBytes) { std::vector ranges; @@ -187,7 +187,7 @@ ByteInputStream HashStringAllocator::prepareRead( header = header->nextContinued(); } - return ByteInputStream(std::move(ranges)); + return std::make_unique(std::move(ranges)); } HashStringAllocator::Position HashStringAllocator::newWrite( @@ -365,7 +365,7 @@ StringView HashStringAllocator::contiguousString( auto stream = prepareRead(headerOf(view.data())); storage.resize(view.size()); - stream.readBytes(storage.data(), view.size()); + stream->readBytes(storage.data(), view.size()); return StringView(storage); } diff --git a/velox/common/memory/HashStringAllocator.h b/velox/common/memory/HashStringAllocator.h index a3a56b3363a6..4604b7565c4f 100644 --- a/velox/common/memory/HashStringAllocator.h +++ b/velox/common/memory/HashStringAllocator.h @@ -237,7 +237,7 @@ class HashStringAllocator : public StreamArena { /// possible continuation ranges. /// @param maxBytes If provided, the returned stream will cover at most that /// many bytes. - static ByteInputStream prepareRead( + static std::unique_ptr prepareRead( const Header* header, size_t maxBytes = std::numeric_limits::max()); diff --git a/velox/common/memory/tests/ByteStreamTest.cpp b/velox/common/memory/tests/ByteStreamTest.cpp index 14d7a6016cf2..ec9e43941b32 100644 --- a/velox/common/memory/tests/ByteStreamTest.cpp +++ b/velox/common/memory/tests/ByteStreamTest.cpp @@ -14,8 +14,12 @@ * limitations under the License. */ #include "velox/common/memory/ByteStream.h" -#include "velox/common/memory/MemoryAllocator.h" + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileInputStream.h" +#include "velox/common/file/FileSystems.h" #include "velox/common/memory/MmapAllocator.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" #include #include @@ -97,73 +101,6 @@ TEST_F(ByteStreamTest, outputStream) { EXPECT_EQ(0, mmapAllocator_->numAllocated()); } -TEST_F(ByteStreamTest, inputStream) { - uint8_t* const kFakeBuffer = reinterpret_cast(this); - std::vector byteRanges; - size_t totalBytes{0}; - for (int32_t i = 0; i < 32; ++i) { - byteRanges.push_back(ByteRange{kFakeBuffer, 4096 + i, 0}); - totalBytes += 4096 + i; - } - ByteInputStream byteStream(std::move(byteRanges)); - ASSERT_EQ(byteStream.size(), totalBytes); -} - -TEST_F(ByteStreamTest, remainingSize) { - const int32_t kSize = 100; - const int32_t kBufferSize = 4096; - std::vector buffers; - std::vector byteRanges; - for (int32_t i = 0; i < kSize; i++) { - buffers.push_back(pool_->allocate(kBufferSize)); - byteRanges.push_back( - ByteRange{reinterpret_cast(buffers.back()), kBufferSize, 0}); - } - ByteInputStream byteStream(std::move(byteRanges)); - const int32_t kReadBytes = 2048; - int32_t remainingSize = kSize * kBufferSize; - uint8_t* tempBuffer = reinterpret_cast(pool_->allocate(kReadBytes)); - while (byteStream.remainingSize() > 0) { - byteStream.readBytes(tempBuffer, kReadBytes); - remainingSize -= kReadBytes; - ASSERT_EQ(remainingSize, byteStream.remainingSize()); - } - ASSERT_EQ(0, byteStream.remainingSize()); - for (int32_t i = 0; i < kSize; i++) { - pool_->free(buffers[i], kBufferSize); - } - pool_->free(tempBuffer, kReadBytes); -} - -TEST_F(ByteStreamTest, toString) { - const int32_t kSize = 10; - const int32_t kBufferSize = 4096; - std::vector buffers; - std::vector byteRanges; - for (int32_t i = 0; i < kSize; i++) { - buffers.push_back(pool_->allocate(kBufferSize)); - byteRanges.push_back( - ByteRange{reinterpret_cast(buffers.back()), kBufferSize, 0}); - } - ByteInputStream byteStream(std::move(byteRanges)); - const int32_t kReadBytes = 2048; - uint8_t* tempBuffer = reinterpret_cast(pool_->allocate(kReadBytes)); - for (int32_t i = 0; i < kSize / 2; i++) { - byteStream.readBytes(tempBuffer, kReadBytes); - } - - EXPECT_EQ( - byteStream.toString(), - "10 ranges " - "(position/size) [(4096/4096),(4096/4096),(2048/4096 current)," - "(0/4096),(0/4096),(0/4096),(0/4096),(0/4096),(0/4096),(0/4096)]"); - - for (int32_t i = 0; i < kSize; i++) { - pool_->free(buffers[i], kBufferSize); - } - pool_->free(tempBuffer, kReadBytes); -} - TEST_F(ByteStreamTest, newRangeAllocation) { const int kPageSize = AllocationTraits::kPageSize; struct { @@ -360,36 +297,279 @@ TEST_F(ByteStreamTest, appendWindow) { EXPECT_EQ(0, memcmp(stringStream.str().data(), words.data(), words.size())); } -TEST_F(ByteStreamTest, readBytesNegativeSize) { +TEST_F(ByteStreamTest, byteRange) { + ByteRange range; + range.size = 0; + range.position = 1; + ASSERT_EQ(range.availableBytes(), 0); + range.size = 1; + ASSERT_EQ(range.availableBytes(), 0); + range.size = 2; + ASSERT_EQ(range.availableBytes(), 1); +} + +TEST_F(ByteStreamTest, reuse) { + auto arena = newArena(); + ByteOutputStream stream(arena.get()); + char bytes[10000] = {}; + for (auto i = 0; i < 10; ++i) { + arena->clear(); + stream.startWrite(i * 100); + stream.appendStringView(std::string_view(bytes, sizeof(bytes))); + EXPECT_EQ(sizeof(bytes), stream.size()); + } +} + +class InputByteStreamTest : public ByteStreamTest, + public testing::WithParamInterface { + protected: + static void SetUpTestCase() { + filesystems::registerLocalFileSystem(); + } + + void SetUp() override { + ByteStreamTest::SetUp(); + tempDirPath_ = exec::test::TempDirectoryPath::create(); + fs_ = filesystems::getFileSystem(tempDirPath_->getPath(), nullptr); + } + + std::unique_ptr createStream( + const std::vector& byteRanges, + uint32_t bufferSize = 1024) { + if (GetParam()) { + return std::make_unique(std::move(byteRanges)); + } else { + const auto filePath = + fmt::format("{}/{}", tempDirPath_->getPath(), fileId_++); + auto writeFile = fs_->openFileForWrite(filePath); + for (auto& byteRange : byteRanges) { + writeFile->append(std::string_view( + reinterpret_cast(byteRange.buffer), byteRange.size)); + } + writeFile->close(); + return std::make_unique( + fs_->openFileForRead(filePath), bufferSize, pool_.get()); + } + } + + std::atomic_uint64_t fileId_{0}; + std::shared_ptr tempDirPath_; + std::shared_ptr fs_; +}; + +TEST_P(InputByteStreamTest, inputStream) { + uint8_t kFakeBuffer[8192]; + std::vector byteRanges; + size_t totalBytes{0}; + for (int32_t i = 0; i < 32; ++i) { + byteRanges.push_back(ByteRange{kFakeBuffer, 4096 + i, 0}); + totalBytes += 4096 + i; + } + auto byteStream = createStream(byteRanges); + ASSERT_EQ(byteStream->size(), totalBytes); + ASSERT_FALSE(byteStream->atEnd()); + byteStream->skip(totalBytes); + ASSERT_TRUE(byteStream->atEnd()); + if (GetParam()) { + VELOX_ASSERT_THROW( + byteStream->skip(1), + "(32 vs. 32) Reading past end of BufferInputStream"); + } else { + VELOX_ASSERT_THROW( + byteStream->skip(1), + "(1 vs. 0) Skip past the end of FileInputStream: 131568"); + } + ASSERT_TRUE(byteStream->atEnd()); +} + +TEST_P(InputByteStreamTest, emptyInputStreamError) { + if (GetParam()) { + VELOX_ASSERT_THROW(createStream({}), "Empty BufferInputStream"); + } else { + VELOX_ASSERT_THROW(createStream({}), "(0 vs. 0) Empty FileInputStream"); + } +} + +TEST_P(InputByteStreamTest, remainingSize) { + const int32_t kSize = 100; + const int32_t kBufferSize = 4096; + std::vector buffers; + std::vector byteRanges; + for (int32_t i = 0; i < kSize; i++) { + buffers.push_back(pool_->allocate(kBufferSize)); + byteRanges.push_back( + ByteRange{reinterpret_cast(buffers.back()), kBufferSize, 0}); + } + auto byteStream = createStream(byteRanges); + const int32_t kReadBytes = 2048; + int32_t remainingSize = kSize * kBufferSize; + ASSERT_EQ(byteStream->remainingSize(), remainingSize); + uint8_t* tempBuffer = reinterpret_cast(pool_->allocate(kReadBytes)); + while (byteStream->remainingSize() > 0) { + byteStream->readBytes(tempBuffer, kReadBytes); + remainingSize -= kReadBytes; + ASSERT_EQ(remainingSize, byteStream->remainingSize()); + } + ASSERT_EQ(byteStream->remainingSize(), 0); + for (int32_t i = 0; i < kSize; i++) { + pool_->free(buffers[i], kBufferSize); + } + pool_->free(tempBuffer, kReadBytes); +} + +TEST_P(InputByteStreamTest, toString) { + const int32_t kSize = 10; + const int32_t kBufferSize = 4096; + std::vector buffers; + std::vector byteRanges; + for (int32_t i = 0; i < kSize; i++) { + buffers.push_back(pool_->allocate(kBufferSize)); + byteRanges.push_back( + ByteRange{reinterpret_cast(buffers.back()), kBufferSize, 0}); + } + auto byteStream = createStream(std::move(byteRanges)); + const int32_t kReadBytes = 2048; + uint8_t* tempBuffer = reinterpret_cast(pool_->allocate(kReadBytes)); + for (int32_t i = 0; i < kSize / 2; i++) { + byteStream->readBytes(tempBuffer, kReadBytes); + } + + if (GetParam()) { + ASSERT_EQ( + byteStream->toString(), + "10 ranges " + "(position/size) [(4096/4096),(4096/4096),(2048/4096 current)," + "(0/4096),(0/4096),(0/4096),(0/4096),(0/4096),(0/4096),(0/4096)]"); + } else { + ASSERT_EQ( + byteStream->toString(), + "file (offset 10.00KB/size 40.00KB) current (position 1.00KB/ size 1.00KB)"); + } + + for (int32_t i = 0; i < kSize; i++) { + pool_->free(buffers[i], kBufferSize); + } + pool_->free(tempBuffer, kReadBytes); +} + +TEST_P(InputByteStreamTest, readBytesNegativeSize) { constexpr int32_t kBufferSize = 4096; uint8_t buffer[kBufferSize]; - ByteInputStream byteStream({ByteRange{buffer, kBufferSize, 0}}); - std::string output; - EXPECT_THROW(byteStream.readBytes(output.data(), -100), VeloxRuntimeError); + auto byteStream = + createStream(std::vector{ByteRange{buffer, kBufferSize, 0}}); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize); + uint8_t outputBuffer[kBufferSize]; + VELOX_ASSERT_THROW( + byteStream->readBytes(outputBuffer, -100), + "(-100 vs. 0) Attempting to read negative number of byte"); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize); } -TEST_F(ByteStreamTest, skipNegativeSize) { +TEST_P(InputByteStreamTest, skipNegativeSize) { constexpr int32_t kBufferSize = 4096; uint8_t buffer[kBufferSize]; - ByteInputStream byteStream({ByteRange{buffer, kBufferSize, 0}}); - EXPECT_THROW(byteStream.skip(-100), VeloxRuntimeError); + auto byteStream = std::make_unique( + std::vector{ByteRange{buffer, kBufferSize, 0}}); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize); + VELOX_ASSERT_THROW( + byteStream->skip(-100), + "(-100 vs. 0) Attempting to skip negative number of bytes"); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize); } -TEST_F(ByteStreamTest, nextViewNegativeSize) { +TEST_P(InputByteStreamTest, nextViewNegativeSize) { constexpr int32_t kBufferSize = 4096; uint8_t buffer[kBufferSize]; - ByteInputStream byteStream({ByteRange{buffer, kBufferSize, 0}}); - EXPECT_THROW(byteStream.nextView(-100), VeloxRuntimeError); + auto byteStream = + createStream(std::vector{ByteRange{buffer, kBufferSize, 0}}); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize); + VELOX_ASSERT_THROW( + byteStream->nextView(-100), + "(-100 vs. 0) Attempting to view negative number of bytes"); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize); } -TEST_F(ByteStreamTest, reuse) { - auto arena = newArena(); - ByteOutputStream stream(arena.get()); - char bytes[10000] = {}; - for (auto i = 0; i < 10; ++i) { - arena->clear(); - stream.startWrite(i * 100); - stream.appendStringView(std::string_view(bytes, sizeof(bytes))); - EXPECT_EQ(sizeof(bytes), stream.size()); +TEST_P(InputByteStreamTest, view) { + SCOPED_TRACE(fmt::format("BufferInputStream: {}", GetParam())); + constexpr int32_t kBufferSize = 1024; + uint8_t buffer[kBufferSize]; + constexpr int32_t kNumRanges = 10; + std::vector fakeRanges; + fakeRanges.reserve(kNumRanges); + for (int i = 0; i < kNumRanges; ++i) { + fakeRanges.push_back(ByteRange{buffer, kBufferSize, 0}); } + auto byteStream = createStream(fakeRanges); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize * kNumRanges); + ASSERT_EQ(byteStream->nextView(kBufferSize / 2).size(), kBufferSize / 2); + ASSERT_EQ(byteStream->nextView(kBufferSize).size(), kBufferSize / 2); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize * (kNumRanges - 1)); + byteStream->skip(byteStream->remainingSize()); + ASSERT_EQ(byteStream->remainingSize(), 0); + ASSERT_TRUE(byteStream->atEnd()); + ASSERT_EQ(byteStream->nextView(100).size(), 0); +} + +TEST_P(InputByteStreamTest, tellP) { + constexpr int32_t kBufferSize = 4096; + uint8_t buffer[kBufferSize]; + auto byteStream = + createStream(std::vector{ByteRange{buffer, kBufferSize, 0}}); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize); + byteStream->readBytes(buffer, kBufferSize / 2); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize / 2); + ASSERT_EQ(byteStream->tellp(), kBufferSize / 2); + byteStream->skip(kBufferSize / 2); + ASSERT_EQ(byteStream->remainingSize(), 0); + ASSERT_EQ(byteStream->tellp(), kBufferSize); } + +TEST_P(InputByteStreamTest, skip) { + constexpr int32_t kBufferSize = 4096; + uint8_t buffer[kBufferSize]; + auto byteStream = + createStream(std::vector{ByteRange{buffer, kBufferSize, 0}}); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize); + byteStream->skip(0); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize); + byteStream->skip(1); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize - 1); + if (GetParam()) { + VELOX_ASSERT_THROW( + byteStream->skip(kBufferSize), + "(1 vs. 1) Reading past end of BufferInputStream"); + ASSERT_EQ(byteStream->remainingSize(), 0); + ASSERT_TRUE(byteStream->atEnd()); + } else { + VELOX_ASSERT_THROW( + byteStream->skip(kBufferSize), + "(4096 vs. 4095) Skip past the end of FileInputStream: 4096"); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize - 1); + ASSERT_FALSE(byteStream->atEnd()); + } +} + +TEST_P(InputByteStreamTest, seekp) { + constexpr int32_t kBufferSize = 4096; + uint8_t buffer[kBufferSize]; + auto byteStream = + createStream(std::vector{ByteRange{buffer, kBufferSize, 0}}); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize); + byteStream->seekp(kBufferSize / 2); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize / 2); + byteStream->seekp(kBufferSize / 2); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize / 2); + if (GetParam()) { + byteStream->seekp(kBufferSize / 4); + ASSERT_EQ(byteStream->remainingSize(), kBufferSize / 2 + kBufferSize / 4); + } else { + VELOX_ASSERT_THROW( + byteStream->seekp(kBufferSize / 4), + "(1024 vs. 2048) Backward seek is not supported by FileInputStream"); + } +} + +VELOX_INSTANTIATE_TEST_SUITE_P( + InputByteStreamTest, + InputByteStreamTest, + testing::ValuesIn({false, true})); diff --git a/velox/common/memory/tests/HashStringAllocatorTest.cpp b/velox/common/memory/tests/HashStringAllocatorTest.cpp index f99358769154..124220d4d57f 100644 --- a/velox/common/memory/tests/HashStringAllocatorTest.cpp +++ b/velox/common/memory/tests/HashStringAllocatorTest.cpp @@ -258,11 +258,11 @@ TEST_F(HashStringAllocatorTest, finishWrite) { std::string copy; copy.resize(longString.size()); - inputStream.readBytes(copy.data(), copy.size()); + inputStream->readBytes(copy.data(), copy.size()); ASSERT_EQ(copy, longString); copy.resize(4); - inputStream.readBytes(copy.data(), 4); + inputStream->readBytes(copy.data(), 4); ASSERT_EQ(copy, "abcd"); auto allocatedBytes = allocator_->checkConsistency(); @@ -280,7 +280,7 @@ TEST_F(HashStringAllocatorTest, finishWrite) { auto inStream = HSA::prepareRead(start.header); std::string copy; copy.resize(largeString.size()); - inStream.readBytes(copy.data(), copy.size()); + inStream->readBytes(copy.data(), copy.size()); ASSERT_EQ(copy, largeString); allocatedBytes = allocator_->checkConsistency(); ASSERT_EQ(allocatedBytes, allocator_->currentBytes()); @@ -397,9 +397,9 @@ TEST_F(HashStringAllocatorTest, rewrite) { position = allocator_->finishWrite(stream, 0).second; EXPECT_EQ(3 * sizeof(int64_t), HSA::offset(header, position)); auto inStream = HSA::prepareRead(header); - EXPECT_EQ(123456789012345LL, inStream.read()); - EXPECT_EQ(12345LL, inStream.read()); - EXPECT_EQ(67890LL, inStream.read()); + EXPECT_EQ(123456789012345LL, inStream->read()); + EXPECT_EQ(12345LL, inStream->read()); + EXPECT_EQ(67890LL, inStream->read()); } // The stream contains 3 int64_t's. auto end = HSA::seek(header, 3 * sizeof(int64_t)); @@ -694,20 +694,20 @@ TEST_F(HashStringAllocatorTest, sizeAndPosition) { stream.seekp(start); EXPECT_EQ(start, stream.tellp()); EXPECT_EQ(kUnitSize * 10, stream.size()); - ByteInputStream input = stream.inputStream(); - input.seekp(start); - EXPECT_EQ(kUnitSize * 10 - start, input.remainingSize()); + auto input = stream.inputStream(); + input->seekp(start); + EXPECT_EQ(kUnitSize * 10 - start, input->remainingSize()); for (auto c = 0; c < 10; ++c) { - uint8_t byte = input.readByte(); + uint8_t byte = input->readByte(); EXPECT_EQ(byte, (start + c) % kUnitSize); } // Overwrite the bytes just read. stream.seekp(start); stream.appendStringView(std::string_view(allChars.data(), 100)); input = stream.inputStream(); - input.seekp(start); + input->seekp(start); for (auto c = 0; c < 100; ++c) { - uint8_t byte = input.readByte(); + uint8_t byte = input->readByte(); EXPECT_EQ(byte, c % kUnitSize); } } diff --git a/velox/exec/AddressableNonNullValueList.cpp b/velox/exec/AddressableNonNullValueList.cpp index eebb39c6a3ed..5e4cc84d3691 100644 --- a/velox/exec/AddressableNonNullValueList.cpp +++ b/velox/exec/AddressableNonNullValueList.cpp @@ -84,12 +84,13 @@ HashStringAllocator::Position AddressableNonNullValueList::appendSerialized( namespace { -ByteInputStream prepareRead(const AddressableNonNullValueList::Entry& entry) { +std::unique_ptr prepareRead( + const AddressableNonNullValueList::Entry& entry) { auto header = entry.offset.header; auto seek = entry.offset.position - header->begin(); auto stream = HashStringAllocator::prepareRead(header, entry.size + seek); - stream.seekp(seek); + stream->seekp(seek); return stream; } } // namespace @@ -109,7 +110,7 @@ bool AddressableNonNullValueList::equalTo( CompareFlags compareFlags = CompareFlags::equality(CompareFlags::NullHandlingMode::kNullAsValue); return exec::ContainerRowSerde::compare( - leftStream, rightStream, type.get(), compareFlags) == 0; + *leftStream, *rightStream, type.get(), compareFlags) == 0; } // static @@ -118,7 +119,7 @@ void AddressableNonNullValueList::read( BaseVector& result, vector_size_t index) { auto stream = prepareRead(position); - exec::ContainerRowSerde::deserialize(stream, index, &result); + exec::ContainerRowSerde::deserialize(*stream, index, &result); } // static @@ -126,7 +127,7 @@ void AddressableNonNullValueList::readSerialized( const Entry& position, char* dest) { auto stream = prepareRead(position); - stream.readBytes(dest, position.size); + stream->readBytes(dest, position.size); } } // namespace facebook::velox::aggregate::prestosql diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index 78f91843a4a2..7551764f06dd 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -121,9 +121,14 @@ RowVectorPtr Exchange::getOutput() { auto inputStream = page->prepareStreamForDeserialize(); - while (!inputStream.atEnd()) { + while (!inputStream->atEnd()) { getSerde()->deserialize( - &inputStream, pool(), outputType_, &result_, resultOffset, &options_); + inputStream.get(), + pool(), + outputType_, + &result_, + resultOffset, + &options_); resultOffset = result_->size(); } } diff --git a/velox/exec/ExchangeQueue.cpp b/velox/exec/ExchangeQueue.cpp index 1567fc12f054..f19745191d47 100644 --- a/velox/exec/ExchangeQueue.cpp +++ b/velox/exec/ExchangeQueue.cpp @@ -41,8 +41,8 @@ SerializedPage::~SerializedPage() { } } -ByteInputStream SerializedPage::prepareStreamForDeserialize() { - return ByteInputStream(std::move(ranges_)); +std::unique_ptr SerializedPage::prepareStreamForDeserialize() { + return std::make_unique(std::move(ranges_)); } void ExchangeQueue::noMoreSources() { diff --git a/velox/exec/ExchangeQueue.h b/velox/exec/ExchangeQueue.h index c757f7f8af7e..91e3a663aa06 100644 --- a/velox/exec/ExchangeQueue.h +++ b/velox/exec/ExchangeQueue.h @@ -19,11 +19,11 @@ namespace facebook::velox::exec { -// Corresponds to Presto SerializedPage, i.e. a container for -// serialize vectors in Presto wire format. +/// Corresponds to Presto SerializedPage, i.e. a container for serialize vectors +/// in Presto wire format. class SerializedPage { public: - // Construct from IOBuf chain. + /// Construct from IOBuf chain. explicit SerializedPage( std::unique_ptr iobuf, std::function onDestructionCb = nullptr, @@ -31,7 +31,7 @@ class SerializedPage { ~SerializedPage(); - // Returns the size of the serialized data in bytes. + /// Returns the size of the serialized data in bytes. uint64_t size() const { return iobufBytes_; } @@ -40,9 +40,9 @@ class SerializedPage { return numRows_; } - // Makes 'input' ready for deserializing 'this' with - // VectorStreamGroup::read(). - ByteInputStream prepareStreamForDeserialize(); + /// Makes 'input' ready for deserializing 'this' with + /// VectorStreamGroup::read(). + std::unique_ptr prepareStreamForDeserialize(); std::unique_ptr getIOBuf() const { return iobuf_->clone(); diff --git a/velox/exec/MergeSource.cpp b/velox/exec/MergeSource.cpp index cbffaace2a97..3c888d8e1bf3 100644 --- a/velox/exec/MergeSource.cpp +++ b/velox/exec/MergeSource.cpp @@ -153,14 +153,14 @@ class MergeExchangeSource : public MergeSource { return BlockingReason::kWaitForProducer; } } - if (!inputStream_.has_value()) { + if (inputStream_ == nullptr) { mergeExchange_->stats().wlock()->rawInputBytes += currentPage_->size(); - inputStream_.emplace(currentPage_->prepareStreamForDeserialize()); + inputStream_ = currentPage_->prepareStreamForDeserialize(); } if (!inputStream_->atEnd()) { VectorStreamGroup::read( - &inputStream_.value(), + inputStream_.get(), mergeExchange_->pool(), mergeExchange_->outputType(), &data); @@ -191,7 +191,7 @@ class MergeExchangeSource : public MergeSource { private: MergeExchange* const mergeExchange_; std::shared_ptr client_; - std::optional inputStream_; + std::unique_ptr inputStream_; std::unique_ptr currentPage_; bool atEnd_ = false; diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 1fe6bbc68584..7b8a36ed46ac 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -537,7 +537,9 @@ void RowContainer::store( } } -ByteInputStream RowContainer::prepareRead(const char* row, int32_t offset) { +std::unique_ptr RowContainer::prepareRead( + const char* row, + int32_t offset) { const auto& view = reinterpret_cast(row + offset); // We set 'stream' to range over the ranges that start at the Header // immediately below the first character in the std::string_view. @@ -591,7 +593,7 @@ int32_t RowContainer::extractVariableSizeAt( } else { auto stream = HashStringAllocator::prepareRead( HashStringAllocator::headerOf(value.data())); - stream.readBytes(output + 4, size); + stream->readBytes(output + 4, size); } return 4 + size; } @@ -602,7 +604,7 @@ int32_t RowContainer::extractVariableSizeAt( auto stream = prepareRead(row, rowColumn.offset()); ::memcpy(output, &size, 4); - stream.readBytes(output + 4, size); + stream->readBytes(output + 4, size); return 4 + size; } @@ -750,7 +752,7 @@ void RowContainer::extractString( auto rawBuffer = values->getRawStringBufferWithSpace(value.size()); auto stream = HashStringAllocator::prepareRead( HashStringAllocator::headerOf(value.data())); - stream.readBytes(rawBuffer, value.size()); + stream->readBytes(rawBuffer, value.size()); values->setNoCopy(index, StringView(rawBuffer, value.size())); } @@ -799,7 +801,7 @@ int RowContainer::compareComplexType( VELOX_DCHECK(flags.nullAsValue(), "not supported null handling mode"); auto stream = prepareRead(row, offset); - return ContainerRowSerde::compare(stream, decoded, index, flags); + return ContainerRowSerde::compare(*stream, decoded, index, flags); } int32_t RowContainer::compareStringAsc(StringView left, StringView right) { @@ -820,7 +822,7 @@ int32_t RowContainer::compareComplexType( auto leftStream = prepareRead(left, leftOffset); auto rightStream = prepareRead(right, rightOffset); - return ContainerRowSerde::compare(leftStream, rightStream, type, flags); + return ContainerRowSerde::compare(*leftStream, *rightStream, type, flags); } int32_t RowContainer::compareComplexType( @@ -860,7 +862,7 @@ void RowContainer::hashTyped( Kind == TypeKind::ROW || Kind == TypeKind::ARRAY || Kind == TypeKind::MAP) { auto in = prepareRead(row, offset); - hash = ContainerRowSerde::hash(in, type); + hash = ContainerRowSerde::hash(*in, type); } else if constexpr (std::is_floating_point_v) { hash = util::floating_point::NaNAwareHash()(valueAt(row, offset)); } else { diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index dc8c96845689..a2c092bbfa60 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -986,7 +986,9 @@ class RowContainer { } } - static ByteInputStream prepareRead(const char* row, int32_t offset); + static std::unique_ptr prepareRead( + const char* row, + int32_t offset); template void hashTyped( @@ -1153,7 +1155,7 @@ class RowContainer { result->setNull(resultIndex, true); } else { auto stream = prepareRead(row, offset); - ContainerRowSerde::deserialize(stream, resultIndex, result.get()); + ContainerRowSerde::deserialize(*stream, resultIndex, result.get()); } } } diff --git a/velox/exec/SortedAggregations.cpp b/velox/exec/SortedAggregations.cpp index f5271139c472..13d0a2edaced 100644 --- a/velox/exec/SortedAggregations.cpp +++ b/velox/exec/SortedAggregations.cpp @@ -54,7 +54,7 @@ struct RowPointers { auto stream = HashStringAllocator::prepareRead(firstBlock); for (auto i = 0; i < size; ++i) { - rows[i] = reinterpret_cast(stream.read()); + rows[i] = reinterpret_cast(stream->read()); } } }; diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index f014a4d5f0ae..912a11f0c482 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -28,96 +28,6 @@ namespace { static const bool kDefaultUseLosslessTimestamp = true; } // namespace -SpillInputStream::SpillInputStream( - std::unique_ptr&& file, - uint64_t bufferSize, - memory::MemoryPool* pool, - folly::Synchronized* stats) - : file_(std::move(file)), - fileSize_(file_->size()), - bufferSize_(std::min(fileSize_, bufferSize - AlignedBuffer::kPaddedSize)), - pool_(pool), - readaEnabled_((bufferSize_ < fileSize_) && file_->hasPreadvAsync()), - stats_(stats) { - VELOX_CHECK_NOT_NULL(pool_); - VELOX_CHECK_GT( - bufferSize, AlignedBuffer::kPaddedSize, "Buffer size is too small"); - buffers_.push_back(AlignedBuffer::allocate(bufferSize_, pool_)); - if (readaEnabled_) { - buffers_.push_back(AlignedBuffer::allocate(bufferSize_, pool_)); - } - next(/*throwIfPastEnd=*/true); -} - -SpillInputStream::~SpillInputStream() { - if (!readaWait_.valid()) { - return; - } - try { - readaWait_.wait(); - } catch (const std::exception& ex) { - // ignore any prefetch error when query has failed. - LOG(WARNING) << "Spill read-ahead failed on destruction " << ex.what(); - } -} - -void SpillInputStream::next(bool /*throwIfPastEnd*/) { - int32_t readBytes{0}; - uint64_t readTimeUs{0}; - if (readaWait_.valid()) { - { - MicrosecondTimer timer{&readTimeUs}; - readBytes = std::move(readaWait_) - .via(&folly::QueuedImmediateExecutor::instance()) - .wait() - .value(); - VELOX_CHECK(!readaWait_.valid()); - } - VELOX_CHECK_LT(0, readBytes, "Reading past end of spill file"); - advanceBuffer(); - } else { - readBytes = readSize(); - VELOX_CHECK_LT(0, readBytes, "Reading past end of spill file"); - { - MicrosecondTimer timer{&readTimeUs}; - file_->pread(offset_, readBytes, buffer()->asMutable()); - } - } - setRange({buffer()->asMutable(), readBytes, 0}); - updateSpillStats(readBytes, readTimeUs); - - offset_ += readBytes; - maybeIssueReadahead(); -} - -uint64_t SpillInputStream::readSize() const { - return std::min(fileSize_ - offset_, bufferSize_); -} - -void SpillInputStream::maybeIssueReadahead() { - VELOX_CHECK(!readaWait_.valid()); - if (!readaEnabled_) { - return; - } - const auto size = readSize(); - if (size == 0) { - return; - } - std::vector> ranges; - ranges.emplace_back(nextBuffer()->asMutable(), size); - readaWait_ = file_->preadvAsync(offset_, ranges); - VELOX_CHECK(readaWait_.valid()); -} - -void SpillInputStream::updateSpillStats(uint64_t readBytes, uint64_t readTimeUs) - const { - auto lockedStats = stats_->wlock(); - lockedStats->spillReadBytes += readBytes; - lockedStats->spillReadTimeUs += readTimeUs; - ++(lockedStats->spillReads); - common::updateGlobalSpillReadStats(readBytes, readTimeUs); -} - std::unique_ptr SpillWriteFile::create( uint32_t id, const std::string& pathPrefix, @@ -393,12 +303,13 @@ SpillReadFile::SpillReadFile( stats_(stats) { auto fs = filesystems::getFileSystem(path_, nullptr); auto file = fs->openFileForRead(path_); - input_ = std::make_unique( - std::move(file), bufferSize, pool_, stats_); + input_ = std::make_unique( + std::move(file), bufferSize, pool_); } bool SpillReadFile::nextBatch(RowVectorPtr& rowVector) { if (input_->atEnd()) { + recordSpillStats(); return false; } @@ -410,7 +321,17 @@ bool SpillReadFile::nextBatch(RowVectorPtr& rowVector) { } stats_->wlock()->spillDeserializationTimeUs += timeUs; common::updateGlobalSpillDeserializationTimeUs(timeUs); - return true; } + +void SpillReadFile::recordSpillStats() { + VELOX_CHECK(input_->atEnd()); + const auto readStats = input_->stats(); + common::updateGlobalSpillReadStats( + readStats.numReads, readStats.readBytes, readStats.readTimeUs); + auto lockedSpillStats = stats_->wlock(); + lockedSpillStats->spillReads += readStats.numReads; + lockedSpillStats->spillReadTimeUs += readStats.readTimeUs; + lockedSpillStats->spillReadBytes += readStats.readBytes; +} } // namespace facebook::velox::exec diff --git a/velox/exec/SpillFile.h b/velox/exec/SpillFile.h index 8163cb53b52b..6310d1a9fb43 100644 --- a/velox/exec/SpillFile.h +++ b/velox/exec/SpillFile.h @@ -22,6 +22,7 @@ #include "velox/common/base/SpillStats.h" #include "velox/common/compression/Compression.h" #include "velox/common/file/File.h" +#include "velox/common/file/FileInputStream.h" #include "velox/common/file/FileSystems.h" #include "velox/exec/TreeOfLosers.h" #include "velox/exec/UnorderedStreamReader.h" @@ -199,78 +200,6 @@ class SpillWriter { SpillFiles finishedFiles_; }; -/// Input stream backed by spill file. -/// -/// TODO Usage of ByteInputStream as base class is hacky and just happens to -/// work. For example, ByteInputStream::size(), seekp(), tellp(), -/// remainingSize() APIs do not work properly. -class SpillInputStream : public ByteInputStream { - public: - /// Reads from 'input' using 'buffer' for buffering reads. - SpillInputStream( - std::unique_ptr&& file, - uint64_t bufferSize, - memory::MemoryPool* pool, - folly::Synchronized* stats); - - ~SpillInputStream() override; - - /// True if all of the file has been read into vectors. - bool atEnd() const override { - return offset_ >= fileSize_ && ranges()[0].position >= ranges()[0].size; - } - - private: - void updateSpillStats(uint64_t readBytes, uint64_t readTimeUs) const; - - void next(bool throwIfPastEnd) override; - - // Issues readahead if underlying fs supports async mode read. - // - // TODO: we might consider to use AsyncSource to support read-ahead on - // filesystem which doesn't support async mode read. - void maybeIssueReadahead(); - - inline uint32_t bufferIndex() const { - return bufferIndex_; - } - - inline uint32_t nextBufferIndex() const { - return (bufferIndex_ + 1) % buffers_.size(); - } - - // Advances buffer index to point to the next buffer for read. - inline void advanceBuffer() { - bufferIndex_ = nextBufferIndex(); - } - - inline Buffer* buffer() const { - return buffers_[bufferIndex()].get(); - } - - inline Buffer* nextBuffer() const { - return buffers_[nextBufferIndex()].get(); - } - - // Returns the next read size in bytes. - inline uint64_t readSize() const; - - const std::unique_ptr file_; - const uint64_t fileSize_; - const uint64_t bufferSize_; - memory::MemoryPool* const pool_; - const bool readaEnabled_; - folly::Synchronized* const stats_; - - std::vector buffers_; - uint32_t bufferIndex_{0}; - // Sets to read-ahead future if valid. - folly::SemiFuture readaWait_{ - folly::SemiFuture::makeEmpty()}; - // Offset of first byte not in 'buffer()'. - uint64_t offset_ = 0; -}; - /// Represents a spill file for read which turns the serialized spilled data on /// disk back into a sequence of spilled row vectors. /// @@ -322,6 +251,9 @@ class SpillReadFile { memory::MemoryPool* pool, folly::Synchronized* stats); + // Invoked to record spill read stats at the end of read input. + void recordSpillStats(); + // The spill file id which is monotonically increasing and unique for each // associated spill partition. const uint32_t id_; @@ -337,6 +269,6 @@ class SpillReadFile { memory::MemoryPool* const pool_; folly::Synchronized* const stats_; - std::unique_ptr input_; + std::unique_ptr input_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/fuzzer/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp index d256042e2599..89740c83f3e8 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -59,13 +59,13 @@ void writeToFile( writer->close(); } -ByteInputStream toByteStream(const std::string& input) { +std::unique_ptr toByteStream(const std::string& input) { std::vector ranges; ranges.push_back( {reinterpret_cast(const_cast(input.data())), (int32_t)input.length(), 0}); - return ByteInputStream(std::move(ranges)); + return std::make_unique(std::move(ranges)); } RowVectorPtr deserialize( @@ -75,7 +75,7 @@ RowVectorPtr deserialize( auto byteStream = toByteStream(input); auto serde = std::make_unique(); RowVectorPtr result; - serde->deserialize(&byteStream, pool, rowType, &result, nullptr); + serde->deserialize(byteStream.get(), pool, rowType, &result, nullptr); return result; } diff --git a/velox/exec/tests/ContainerRowSerdeTest.cpp b/velox/exec/tests/ContainerRowSerdeTest.cpp index 0dea90920241..8663d75287b7 100644 --- a/velox/exec/tests/ContainerRowSerdeTest.cpp +++ b/velox/exec/tests/ContainerRowSerdeTest.cpp @@ -82,7 +82,7 @@ class ContainerRowSerdeTest : public testing::Test, auto in = HashStringAllocator::prepareRead(position.header); for (auto i = 0; i < numRows; ++i) { - ContainerRowSerde::deserialize(in, i, data.get()); + ContainerRowSerde::deserialize(*in, i, data.get()); } return data; } @@ -117,13 +117,13 @@ class ContainerRowSerdeTest : public testing::Test, !equalsOnly) { VELOX_ASSERT_THROW( ContainerRowSerde::compareWithNulls( - stream, decodedVector, i, compareFlags), + *stream, decodedVector, i, compareFlags), "Ordering nulls is not supported"); } else { ASSERT_EQ( expected.at(i), ContainerRowSerde::compareWithNulls( - stream, decodedVector, i, compareFlags)); + *stream, decodedVector, i, compareFlags)); } } } @@ -154,13 +154,13 @@ class ContainerRowSerdeTest : public testing::Test, !equalsOnly) { VELOX_ASSERT_THROW( ContainerRowSerde::compareWithNulls( - leftStream, rightStream, type.get(), compareFlags), + *leftStream, *rightStream, type.get(), compareFlags), "Ordering nulls is not supported"); } else { ASSERT_EQ( expected.at(i), ContainerRowSerde::compareWithNulls( - leftStream, rightStream, type.get(), compareFlags)); + *leftStream, *rightStream, type.get(), compareFlags)); } } } @@ -176,7 +176,8 @@ class ContainerRowSerdeTest : public testing::Test, for (auto i = 0; i < positions.size(); ++i) { auto stream = HashStringAllocator::prepareRead(positions.at(i).header); ASSERT_EQ( - 0, ContainerRowSerde::compare(stream, decodedVector, i, compareFlags)) + 0, + ContainerRowSerde::compare(*stream, decodedVector, i, compareFlags)) << "at " << i << ": " << vector->toString(i); } } @@ -593,13 +594,13 @@ TEST_F(ContainerRowSerdeTest, nans) { for (auto i = 0; i < positions.size(); ++i) { auto stream = HashStringAllocator::prepareRead(positions.at(i).header); ASSERT_EQ( - 0, ContainerRowSerde::compare(stream, decodedVector, i, compareFlags)) + 0, ContainerRowSerde::compare(*stream, decodedVector, i, compareFlags)) << "at " << i << ": " << vector->toString(i); stream = HashStringAllocator::prepareRead(positions.at(i).header); ASSERT_EQ( expected->hashValueAt(i), - ContainerRowSerde::hash(stream, vector->type().get())) + ContainerRowSerde::hash(*stream, vector->type().get())) << "at " << i << ": " << vector->toString(i); } } diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index 0023a8ec92a9..9f5690ca9aa7 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -238,12 +238,13 @@ class SpillerTest : public exec::test::RowContainerTestBase { setupSpiller(2'000'000, 0, makeError, 0, readBufferSize); // We spill spillPct% of the data in 10% increments. + auto stats = spiller_->stats(); runSpill(makeError); if (makeError) { return; } // Verify the spilled file exist on file system. - auto stats = spiller_->stats(); + stats = spiller_->stats(); const auto numSpilledFiles = stats.spilledFiles; if (type_ == Spiller::Type::kAggregateOutput) { ASSERT_EQ(numSpilledFiles, 1); @@ -287,6 +288,11 @@ class SpillerTest : public exec::test::RowContainerTestBase { ASSERT_EQ(stats.spilledFiles, spilledFileSet.size()); ASSERT_EQ(stats.spilledPartitions, numPartitions_); ASSERT_EQ(stats.spilledRows, kNumRows); + if (stats.spilledBytes != totalSpilledBytes) { + LOG(ERROR) << "bad"; + } else { + LOG(ERROR) << "good"; + } ASSERT_EQ(stats.spilledBytes, totalSpilledBytes); ASSERT_EQ(stats.spillReadBytes, totalSpilledBytes); ASSERT_GT(stats.spillWriteTimeUs, 0); @@ -1190,13 +1196,8 @@ TEST_P(AllTypes, readaheadTest) { if (type_ == Spiller::Type::kOrderByInput || type_ == Spiller::Type::kAggregateInput) { testSortedSpill(10, 10, false, false, 512); - VELOX_ASSERT_THROW( - testSortedSpill(10, 1, false, false, 1), "Buffer size is too small"); return; } - VELOX_ASSERT_THROW( - testNonSortedSpill(1, 100, 3, 1'000'000'000, maxSpillRunRows_, 1), - "Buffer size is too small"); testNonSortedSpill(1, 5'000, 0, 1'000'000'000, maxSpillRunRows_, 512); } diff --git a/velox/functions/lib/aggregates/SingleValueAccumulator.cpp b/velox/functions/lib/aggregates/SingleValueAccumulator.cpp index 3ae3c8e2afec..2e47cbf382b3 100644 --- a/velox/functions/lib/aggregates/SingleValueAccumulator.cpp +++ b/velox/functions/lib/aggregates/SingleValueAccumulator.cpp @@ -42,7 +42,7 @@ void SingleValueAccumulator::read(const VectorPtr& vector, vector_size_t index) VELOX_CHECK_NOT_NULL(start_.header); auto stream = HashStringAllocator::prepareRead(start_.header); - exec::ContainerRowSerde::deserialize(stream, index, vector.get()); + exec::ContainerRowSerde::deserialize(*stream, index, vector.get()); } bool SingleValueAccumulator::hasValue() const { @@ -57,7 +57,7 @@ std::optional SingleValueAccumulator::compare( auto stream = HashStringAllocator::prepareRead(start_.header); return exec::ContainerRowSerde::compareWithNulls( - stream, decoded, index, compareFlags); + *stream, decoded, index, compareFlags); } void SingleValueAccumulator::destroy(HashStringAllocator* allocator) { diff --git a/velox/functions/lib/aggregates/ValueList.cpp b/velox/functions/lib/aggregates/ValueList.cpp index 216d665287d0..c3e7e97a421a 100644 --- a/velox/functions/lib/aggregates/ValueList.cpp +++ b/velox/functions/lib/aggregates/ValueList.cpp @@ -113,13 +113,13 @@ bool ValueListReader::next(BaseVector& output, vector_size_t outputIndex) { if (pos_ == lastNullsStart_) { nulls_ = lastNulls_; } else if (pos_ % 64 == 0) { - nulls_ = nullsStream_.read(); + nulls_ = nullsStream_->read(); } if (nulls_ & (1UL << (pos_ % 64))) { output.setNull(outputIndex, true); } else { - exec::ContainerRowSerde::deserialize(dataStream_, outputIndex, &output); + exec::ContainerRowSerde::deserialize(*dataStream_, outputIndex, &output); } pos_++; diff --git a/velox/functions/lib/aggregates/ValueList.h b/velox/functions/lib/aggregates/ValueList.h index 27dcce7853c2..74aa4731e921 100644 --- a/velox/functions/lib/aggregates/ValueList.h +++ b/velox/functions/lib/aggregates/ValueList.h @@ -127,8 +127,8 @@ class ValueListReader { const vector_size_t size_; const vector_size_t lastNullsStart_; const uint64_t lastNulls_; - ByteInputStream dataStream_; - ByteInputStream nullsStream_; + std::unique_ptr dataStream_; + std::unique_ptr nullsStream_; uint64_t nulls_; vector_size_t pos_{0}; }; diff --git a/velox/functions/lib/aggregates/ValueSet.cpp b/velox/functions/lib/aggregates/ValueSet.cpp index 57d93068a024..24450a086866 100644 --- a/velox/functions/lib/aggregates/ValueSet.cpp +++ b/velox/functions/lib/aggregates/ValueSet.cpp @@ -55,7 +55,7 @@ void ValueSet::read( VELOX_CHECK_NOT_NULL(header); auto stream = HashStringAllocator::prepareRead(header); - exec::ContainerRowSerde::deserialize(stream, index, vector); + exec::ContainerRowSerde::deserialize(*stream, index, vector); } void ValueSet::free(HashStringAllocator::Header* header) const { diff --git a/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp b/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp index 3f59de1a814c..388908ecd718 100644 --- a/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp +++ b/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp @@ -101,7 +101,7 @@ class SerializeBenchmark { auto in = HashStringAllocator::prepareRead(position.header); for (auto i = 0; i < data->size(); ++i) { - exec::ContainerRowSerde::deserialize(in, i, copy.get()); + exec::ContainerRowSerde::deserialize(*in, i, copy.get()); } VELOX_CHECK_EQ(copy->size(), data->size()); diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 602301f82599..4036e4791266 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4224,10 +4224,10 @@ void PrestoVectorSerde::deserialize( codec->uncompress(compressBuf.get(), header.uncompressedSize); ByteRange byteRange{ uncompress->writableData(), (int32_t)uncompress->length(), 0}; - ByteInputStream uncompressedSource({byteRange}); - + auto uncompressedSource = + std::make_unique(std::vector{byteRange}); readTopColumns( - uncompressedSource, type, pool, *result, resultOffset, prestoOptions); + *uncompressedSource, type, pool, *result, resultOffset, prestoOptions); } } diff --git a/velox/serializers/tests/CompactRowSerializerTest.cpp b/velox/serializers/tests/CompactRowSerializerTest.cpp index 50fcd8d2c91f..7dc49e83a49c 100644 --- a/velox/serializers/tests/CompactRowSerializerTest.cpp +++ b/velox/serializers/tests/CompactRowSerializerTest.cpp @@ -54,7 +54,7 @@ class CompactRowSerializerTest : public ::testing::Test, ASSERT_EQ(size, output->tellp()); } - ByteInputStream toByteStream( + std::unique_ptr toByteStream( const std::string_view& input, size_t pageSize = 32) { auto rawBytes = reinterpret_cast(const_cast(input.data())); @@ -71,7 +71,7 @@ class CompactRowSerializerTest : public ::testing::Test, offset += pageSize; } - return ByteInputStream(std::move(ranges)); + return std::make_unique(std::move(ranges)); } RowVectorPtr deserialize( @@ -80,7 +80,7 @@ class CompactRowSerializerTest : public ::testing::Test, auto byteStream = toByteStream(input); RowVectorPtr result; - serde_->deserialize(&byteStream, pool_.get(), rowType, &result); + serde_->deserialize(byteStream.get(), pool_.get(), rowType, &result); return result; } diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 951d89b393a7..9f4817a00c76 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -151,12 +151,13 @@ class PrestoSerializerTest return {static_cast(size), sizeEstimate}; } - ByteInputStream toByteStream(const std::string& input) { + std::unique_ptr toByteStream(const std::string& input) { ByteRange byteRange{ reinterpret_cast(const_cast(input.data())), (int32_t)input.length(), 0}; - return ByteInputStream({byteRange}); + return std::make_unique( + std::vector{{byteRange}}); } void validateLexer( @@ -199,7 +200,7 @@ class PrestoSerializerTest validateLexer(input, paramOptions); RowVectorPtr result; serde_->deserialize( - &byteStream, pool_.get(), rowType, &result, 0, ¶mOptions); + byteStream.get(), pool_.get(), rowType, &result, 0, ¶mOptions); return result; } @@ -276,7 +277,12 @@ class PrestoSerializerTest for (auto i = 0; i < serialized.size(); ++i) { auto byteStream = toByteStream(serialized[i]); serde_->deserialize( - &byteStream, pool_.get(), rowType, &result, offset, ¶mOptions); + byteStream.get(), + pool_.get(), + rowType, + &result, + offset, + ¶mOptions); offset = result->size(); } @@ -386,7 +392,12 @@ class PrestoSerializerTest for (auto i = 0; i < 3; ++i) { auto byteStream = toByteStream(serialized); serde_->deserialize( - &byteStream, pool_.get(), rowType, &result, offset, ¶mOptions); + byteStream.get(), + pool_.get(), + rowType, + &result, + offset, + ¶mOptions); offset = result->size(); } @@ -445,7 +456,7 @@ class PrestoSerializerTest auto piece = pieces[pieceIdx]; auto byteStream = toByteStream(piece); serde_->deserialize( - &byteStream, + byteStream.get(), pool_.get(), rowType, &deserialized, @@ -456,14 +467,14 @@ class PrestoSerializerTest BaseVector::create(rowType, 0, pool_.get()); byteStream = toByteStream(piece); serde_->deserialize( - &byteStream, pool_.get(), rowType, &single, 0, ¶mOptions); + byteStream.get(), pool_.get(), rowType, &single, 0, ¶mOptions); assertEqualVectors(single->childAt(0), vectors[pieceIdx]); RowVectorPtr single2 = BaseVector::create(rowType, 0, pool_.get()); byteStream = toByteStream(reusedPieces[pieceIdx]); serde_->deserialize( - &byteStream, pool_.get(), rowType, &single2, 0, ¶mOptions); + byteStream.get(), pool_.get(), rowType, &single2, 0, ¶mOptions); assertEqualVectors(single2->childAt(0), vectors[pieceIdx]); } assertEqualVectors(concatenation, deserialized); @@ -931,7 +942,12 @@ TEST_P(PrestoSerializerTest, unknown) { for (auto i = 0; i < serialized.size(); ++i) { auto byteStream = toByteStream(serialized[i]); serde_->deserialize( - &byteStream, pool_.get(), rowType, &result, offset, ¶mOptions); + byteStream.get(), + pool_.get(), + rowType, + &result, + offset, + ¶mOptions); offset = result->size(); } @@ -974,11 +990,16 @@ TEST_P(PrestoSerializerTest, multiPage) { for (int i = 0; i < testVectors.size(); i++) { RowVectorPtr& vec = testVectors[i]; serde_->deserialize( - &byteStream, pool_.get(), rowType, &deserialized, 0, ¶mOptions); + byteStream.get(), + pool_.get(), + rowType, + &deserialized, + 0, + ¶mOptions); if (i < testVectors.size() - 1) { - ASSERT_FALSE(byteStream.atEnd()); + ASSERT_FALSE(byteStream->atEnd()); } else { - ASSERT_TRUE(byteStream.atEnd()); + ASSERT_TRUE(byteStream->atEnd()); } assertEqualVectors(deserialized, vec); deserialized->validate({}); @@ -1434,7 +1455,7 @@ TEST_P(PrestoSerializerTest, checksum) { // This should fail because the checksums don't match. VELOX_ASSERT_THROW( serde_->deserialize( - &byteStream, + byteStream.get(), pool->addLeafChild("child").get(), ROW({BIGINT()}), &result, @@ -1484,7 +1505,7 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) { auto byteStream = toByteStream(input); VectorPtr deserialized; serde_->deserializeSingleColumn( - &byteStream, pool(), vector->type(), &deserialized, nullptr); + byteStream.get(), pool(), vector->type(), &deserialized, nullptr); assertEqualVectors(vector, deserialized); }; diff --git a/velox/serializers/tests/UnsafeRowSerializerTest.cpp b/velox/serializers/tests/UnsafeRowSerializerTest.cpp index ce6ad23aa1f5..cfcb2a61f8bd 100644 --- a/velox/serializers/tests/UnsafeRowSerializerTest.cpp +++ b/velox/serializers/tests/UnsafeRowSerializerTest.cpp @@ -54,7 +54,8 @@ class UnsafeRowSerializerTest : public ::testing::Test, ASSERT_EQ(size, output->tellp()); } - ByteInputStream toByteStream(const std::vector& inputs) { + std::unique_ptr toByteStream( + const std::vector& inputs) { std::vector ranges; ranges.reserve(inputs.size()); @@ -64,7 +65,7 @@ class UnsafeRowSerializerTest : public ::testing::Test, (int32_t)input.length(), 0}); } - return ByteInputStream(std::move(ranges)); + return std::make_unique(std::move(ranges)); } RowVectorPtr deserialize( @@ -73,7 +74,7 @@ class UnsafeRowSerializerTest : public ::testing::Test, auto byteStream = toByteStream(input); RowVectorPtr result; - serde_->deserialize(&byteStream, pool_.get(), rowType, &result); + serde_->deserialize(byteStream.get(), pool_.get(), rowType, &result); return result; } @@ -284,7 +285,7 @@ TEST_F(UnsafeRowSerializerTest, incompleteRow) { buffers = {{rawData, 2}}; VELOX_ASSERT_RUNTIME_THROW( testDeserialize(buffers, expected), - "Reading past end of ByteInputStream"); + "(1 vs. 1) Reading past end of BufferInputStream"); } TEST_F(UnsafeRowSerializerTest, types) { diff --git a/velox/vector/VectorStream.cpp b/velox/vector/VectorStream.cpp index 119611047146..18bdb103f357 100644 --- a/velox/vector/VectorStream.cpp +++ b/velox/vector/VectorStream.cpp @@ -242,14 +242,15 @@ RowVectorPtr IOBufToRowVector( const_cast(range.data()), (int32_t)range.size(), 0}); } - ByteInputStream byteStream(std::move(ranges)); + auto byteStream = std::make_unique(std::move(ranges)); RowVectorPtr outputVector; // If not supplied, use the default one. if (serde == nullptr) { serde = getVectorSerde(); } - serde->deserialize(&byteStream, &pool, outputType, &outputVector, nullptr); + serde->deserialize( + byteStream.get(), &pool, outputType, &outputVector, nullptr); return outputVector; } diff --git a/velox/vector/tests/VectorTest.cpp b/velox/vector/tests/VectorTest.cpp index 11e1ff3a4fca..b42c35c6267e 100644 --- a/velox/vector/tests/VectorTest.cpp +++ b/velox/vector/tests/VectorTest.cpp @@ -754,7 +754,7 @@ class VectorTest : public testing::Test, public test::VectorTestBase { } } - ByteInputStream prepareInput(std::string& string) { + std::unique_ptr prepareInput(std::string& string) { // Put 'string' in 'input' in many pieces. const int32_t size = string.size(); std::vector ranges; @@ -767,7 +767,7 @@ class VectorTest : public testing::Test, public test::VectorTestBase { ranges.back().position = 0; } - return ByteInputStream(std::move(ranges)); + return std::make_unique(std::move(ranges)); } void checkSizes( @@ -874,7 +874,7 @@ class VectorTest : public testing::Test, public test::VectorTestBase { auto evenInput = prepareInput(evenString); RowVectorPtr resultRow; - VectorStreamGroup::read(&evenInput, pool(), sourceRowType, &resultRow); + VectorStreamGroup::read(evenInput.get(), pool(), sourceRowType, &resultRow); VectorPtr result = resultRow->childAt(0); switch (source->encoding()) { case VectorEncoding::Simple::FLAT: @@ -903,7 +903,7 @@ class VectorTest : public testing::Test, public test::VectorTestBase { auto oddString = oddStream.str(); auto oddInput = prepareInput(oddString); - VectorStreamGroup::read(&oddInput, pool(), sourceRowType, &resultRow); + VectorStreamGroup::read(oddInput.get(), pool(), sourceRowType, &resultRow); result = resultRow->childAt(0); for (int32_t i = 0; i < oddIndices.size(); ++i) { EXPECT_TRUE(result->equalValueAt(source.get(), i, oddIndices[i].begin))