Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add support for REST based remote functions #10911

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .github/workflows/linux-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,44 @@ jobs:
mv ./${MINIO_BINARY} /usr/local/bin/
fi

- name: Install Proxygen
run: |
FB_OS_VERSION="v2024.05.20.00"
PROXYGEN_BINARY="proxygen.tar.gz"
if [ ! -f /usr/local/bin/proxygen ]; then
wget https://github.com/facebook/proxygen/archive/refs/tags/${FB_OS_VERSION}.tar.gz -O ${PROXYGEN_BINARY}

# Extract the archive
tar -xzf ${PROXYGEN_BINARY}

# Check the actual directory name after extraction
DIR_NAME=$(tar -tf ${PROXYGEN_BINARY} | head -1 | cut -f1 -d"/")

# Verify that the directory exists before proceeding
if [ -d "${DIR_NAME}" ]; then
cd ${DIR_NAME}

# Build the project (adjust the build command as necessary)
if [ -f "./build.sh" ]; then
./build.sh
else
echo "build.sh not found, adjust the build command as needed."
exit 1
fi

# Make the binary executable and move to /usr/local/bin
chmod +x ./proxygen
mv ./proxygen /usr/local/bin/

# Clean up
cd ..
rm -rf ${DIR_NAME} ${PROXYGEN_BINARY}
else
echo "Directory ${DIR_NAME} not found after extraction."
exit 1
fi
fi

- uses: assignUser/stash/restore@v1
with:
path: '${{ env.CCACHE_DIR }}'
Expand Down
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,12 @@ if(${VELOX_BUILD_TESTING})
resolve_dependency(gRPC)
endif()

if(VELOX_ENABLE_REMOTE_FUNCTIONS)
find_package(fizz CONFIG REQUIRED)
find_package(wangle CONFIG REQUIRED)
find_package(proxygen CONFIG REQUIRED)
endif()

if(VELOX_ENABLE_REMOTE_FUNCTIONS)
# TODO: Move this to use resolve_dependency(). For some reason, FBThrift
# requires clients to explicitly install fizz and wangle.
Expand Down
7 changes: 3 additions & 4 deletions velox/common/config/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

if (${VELOX_BUILD_TESTING})
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif ()
endif()

velox_add_library(velox_common_config Config.cpp)
velox_link_libraries(
velox_common_config
PUBLIC velox_common_base
velox_exception
PUBLIC velox_common_base velox_exception
PRIVATE re2::re2)
21 changes: 21 additions & 0 deletions velox/functions/remote/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.

if(NOT DEFINED PROXYGEN_LIBRARIES)
find_package(Sodium REQUIRED)

find_library(PROXYGEN proxygen)
find_library(PROXYGEN_HTTP_SERVER proxygenhttpserver)
find_library(FIZZ fizz)
find_library(WANGLE wangle)

if(NOT PROXYGEN
OR NOT PROXYGEN_HTTP_SERVER
OR NOT FIZZ
OR NOT WANGLE)
message(
FATAL_ERROR
"One or more proxygen libraries were not found. Please ensure proxygen, proxygenhttpserver, fizz, and wangle are installed."
)
endif()

set(PROXYGEN_LIBRARIES ${PROXYGEN_HTTP_SERVER} ${PROXYGEN} ${WANGLE} ${FIZZ})
endif()

add_subdirectory(if)
add_subdirectory(client)
add_subdirectory(server)
5 changes: 5 additions & 0 deletions velox/functions/remote/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ velox_add_library(velox_functions_remote_thrift_client ThriftClient.cpp)
velox_link_libraries(velox_functions_remote_thrift_client
PUBLIC remote_function_thrift FBThrift::thriftcpp2)

velox_add_library(velox_functions_remote_rest_client RestClient.cpp)
velox_link_libraries(velox_functions_remote_rest_client ${PROXYGEN_LIBRARIES}
Folly::folly)

velox_add_library(velox_functions_remote Remote.cpp)
velox_link_libraries(
velox_functions_remote
PUBLIC velox_expression
velox_functions_remote_thrift_client
velox_functions_remote_rest_client
velox_functions_remote_get_serde
velox_type_fbhive
Folly::folly)
Expand Down
130 changes: 118 additions & 12 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <folly/io/async/EventBase.h>
#include "velox/expression/Expr.h"
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/client/RestClient.h"
#include "velox/functions/remote/client/ThriftClient.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h"
Expand All @@ -33,17 +34,31 @@ std::string serializeType(const TypePtr& type) {
return type::fbhive::HiveTypeSerializer::serialize(type);
}

std::string iobufToString(const folly::IOBuf& buf) {
std::string result;
result.reserve(buf.computeChainDataLength());

for (auto range : buf) {
result.append(reinterpret_cast<const char*>(range.data()), range.size());
}

return result;
}

class RemoteFunction : public exec::VectorFunction {
public:
RemoteFunction(
const std::string& functionName,
const std::vector<exec::VectorFunctionArg>& inputArgs,
const RemoteVectorFunctionMetadata& metadata)
: functionName_(functionName),
location_(metadata.location),
thriftClient_(getThriftClient(location_, &eventBase_)),
serdeFormat_(metadata.serdeFormat),
serde_(getSerde(serdeFormat_)) {
: functionName_(functionName), metadata_(metadata) {
if (metadata.location.type() == typeid(SocketAddress)) {
location_ = boost::get<SocketAddress>(metadata.location);
thriftClient_ = getThriftClient(location_, &eventBase_);
} else if (metadata.location.type() == typeid(URL)) {
url_ = boost::get<URL>(metadata.location);
}

std::vector<TypePtr> types;
types.reserve(inputArgs.size());
serializedInputTypes_.reserve(inputArgs.size());
Expand All @@ -62,7 +77,11 @@ class RemoteFunction : public exec::VectorFunction {
exec::EvalCtx& context,
VectorPtr& result) const override {
try {
applyRemote(rows, args, outputType, context, result);
if ((metadata_.location.type() == typeid(SocketAddress))) {
applyRemote(rows, args, outputType, context, result);
} else if (metadata_.location.type() == typeid(URL)) {
applyRestRemote(rows, args, outputType, context, result);
}
} catch (const VeloxRuntimeError&) {
throw;
} catch (const std::exception&) {
Expand All @@ -71,6 +90,88 @@ class RemoteFunction : public exec::VectorFunction {
}

private:
void applyRestRemote(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx& context,
VectorPtr& result) const {
try {
std::string responseBody;

// Create a RowVector for the remote function call
auto remoteRowVector = std::make_shared<RowVector>(
context.pool(),
remoteInputType_,
BufferPtr{},
rows.end(),
std::move(args));

// Build the JSON request with function and input details
folly::dynamic remoteFunctionHandle = folly::dynamic::object;
remoteFunctionHandle["functionName"] = functionName_;
remoteFunctionHandle["returnType"] = serializeType(outputType);
remoteFunctionHandle["argumentTypes"] = folly::dynamic::array;
for (const auto& value : serializedInputTypes_) {
remoteFunctionHandle["argumentTypes"].push_back(value);
}

folly::dynamic inputs = folly::dynamic::object;
inputs["pageFormat"] = static_cast<int>(metadata_.serdeFormat);
inputs["payload"] = iobufToString(rowVectorToIOBuf(
remoteRowVector,
rows.end(),
*context.pool(),
getSerde(metadata_.serdeFormat).get()));
inputs["rowCount"] = remoteRowVector->size();

// Create the final JSON object to be sent
folly::dynamic jsonObject = folly::dynamic::object;
jsonObject["remoteFunctionHandle"] = remoteFunctionHandle;
jsonObject["inputs"] = inputs;
jsonObject["throwOnError"] = context.throwOnError();

// Construct the full URL for the REST request
std::string fullUrl = fmt::format(
"{}/v1/functions/{}/{}/{}/{}",
url_.getUrl(),
metadata_.schema.value_or("default_schema"),
functionName_,
metadata_.functionId.value_or("default_function_id"),
metadata_.version.value_or("default_version"));

// Invoke the remote function using RestClient
RestClient restClient_(fullUrl);
restClient_.invoke_function(folly::toJson(jsonObject), responseBody);
LOG(INFO) << responseBody;

// Parse the JSON response
auto responseJsonObj = parseJson(responseBody);
if (responseJsonObj.count("err") > 0) {
VELOX_NYI(responseJsonObj["err"].asString());
}

// Deserialize the result payload
auto payloadIObuf = folly::IOBuf::copyBuffer(
responseJsonObj["result"]["payload"].asString());

auto outputRowVector = IOBufToRowVector(
*payloadIObuf,
ROW({outputType}),
*context.pool(),
getSerde(metadata_.serdeFormat).get());
result = outputRowVector->childAt(0);

} catch (const std::exception& e) {
// Log and throw an error if the remote call fails
VELOX_FAIL(
"Error while executing remote function '{}' at '{}': {}",
functionName_,
url_.getUrl(),
e.what());
}
}

void applyRemote(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
Expand All @@ -97,11 +198,14 @@ class RemoteFunction : public exec::VectorFunction {

auto requestInputs = request.inputs_ref();
requestInputs->rowCount_ref() = remoteRowVector->size();
requestInputs->pageFormat_ref() = serdeFormat_;
requestInputs->pageFormat_ref() = metadata_.serdeFormat;

// TODO: serialize only active rows.
requestInputs->payload_ref() = rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), serde_.get());
remoteRowVector,
rows.end(),
*context.pool(),
getSerde(metadata_.serdeFormat).get());

try {
thriftClient_->sync_invokeFunction(remoteResponse, request);
Expand All @@ -117,21 +221,23 @@ class RemoteFunction : public exec::VectorFunction {
remoteResponse.get_result().get_payload(),
ROW({outputType}),
*context.pool(),
serde_.get());
getSerde(metadata_.serdeFormat).get());
result = outputRowVector->childAt(0);
}

const std::string functionName_;
folly::SocketAddress location_;

folly::EventBase eventBase_;
std::unique_ptr<RemoteFunctionClient> thriftClient_;
remote::PageFormat serdeFormat_;
std::unique_ptr<VectorSerde> serde_;
folly::SocketAddress location_;

proxygen::URL url_;

// Structures we construct once to cache:
RowTypePtr remoteInputType_;
std::vector<std::string> serializedInputTypes_;

const RemoteVectorFunctionMetadata metadata_;
};

std::shared_ptr<exec::VectorFunction> createRemoteFunction(
Expand Down
30 changes: 24 additions & 6 deletions velox/functions/remote/client/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,38 @@

#pragma once

#include <boost/variant.hpp>
#include <folly/SocketAddress.h>
#include <proxygen/lib/utils/URL.h>
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunction_types.h"

namespace facebook::velox::functions {

struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata {
/// Network address of the servr to communicate with. Note that this can hold
/// a network location (ip/port pair) or a unix domain socket path (see
/// URL of the HTTP/REST server for remote function.
/// Or Network address of the servr to communicate with. Note that this can
/// hold a network location (ip/port pair) or a unix domain socket path (see
/// SocketAddress::makeFromPath()).
folly::SocketAddress location;
boost::variant<folly::SocketAddress, proxygen::URL> location;

/// The serialization format to be used
/// The serialization format to be used when sending data to the remote.
remote::PageFormat serdeFormat{remote::PageFormat::PRESTO_PAGE};

/// Optional schema defining the structure of the data or input/output types
/// involved in the remote function. This may include details such as column
/// names and data types.
std::optional<std::string> schema;

/// Optional identifier for the specific remote function to be invoked.
/// This can be useful when the same server hosts multiple functions,
/// and the client needs to specify which function to call.
std::optional<std::string> functionId;

/// Optional version information to be used when calling the remote function.
/// This can help in ensuring compatibility with a particular version of the
/// function if multiple versions are available on the server.
std::optional<std::string> version;
};

/// Registers a new remote function. It will use the meatadata defined in
Expand All @@ -38,8 +56,8 @@ struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata {
//
/// Remote functions are registered as regular statufull functions (using the
/// same internal catalog), and hence conflict if there already exists a
/// (non-remote) function registered with the same name. The `overwrite` flag
/// controls whether to overwrite in these cases.
/// (non-remote) function registered with the same name. The `overwrite`
/// flagwrite controls whether to overwrite in these cases.
void registerRemoteFunction(
const std::string& name,
std::vector<exec::FunctionSignaturePtr> signatures,
Expand Down
Loading
Loading