From f7bde8cac75015a35303eb566c240b8aca692c11 Mon Sep 17 00:00:00 2001 From: "Adi (Suissa) Peleg" Date: Mon, 30 Sep 2024 09:45:13 -0400 Subject: [PATCH 1/5] srds: minor perf: move scoped_route_config to ScopedRouteInfo (#36270) Signed-off-by: Adi Suissa-Peleg --- source/common/router/scoped_config_impl.cc | 6 +++--- source/common/router/scoped_config_impl.h | 6 +++--- source/common/router/scoped_rds.cc | 2 +- test/common/router/scoped_config_impl_test.cc | 16 ++++++++++------ 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/source/common/router/scoped_config_impl.cc b/source/common/router/scoped_config_impl.cc index e6566a2f87..c400649efe 100644 --- a/source/common/router/scoped_config_impl.cc +++ b/source/common/router/scoped_config_impl.cc @@ -77,10 +77,10 @@ HeaderValueExtractorImpl::computeFragment(const Http::HeaderMap& headers) const return nullptr; } -ScopedRouteInfo::ScopedRouteInfo(envoy::config::route::v3::ScopedRouteConfiguration config_proto, +ScopedRouteInfo::ScopedRouteInfo(envoy::config::route::v3::ScopedRouteConfiguration&& config_proto, ConfigConstSharedPtr route_config) - : config_proto_(config_proto), route_config_(route_config), - config_hash_(MessageUtil::hash(config_proto)) { + : config_proto_(std::move(config_proto)), route_config_(route_config), + config_hash_(MessageUtil::hash(config_proto_)) { // TODO(stevenzzzz): Maybe worth a KeyBuilder abstraction when there are more than one type of // Fragment. for (const auto& fragment : config_proto_.key().fragments()) { diff --git a/source/common/router/scoped_config_impl.h b/source/common/router/scoped_config_impl.h index d7f8bd158f..ee134aaf66 100644 --- a/source/common/router/scoped_config_impl.h +++ b/source/common/router/scoped_config_impl.h @@ -77,7 +77,7 @@ class ScopeKeyBuilderImpl : public ScopeKeyBuilderBase { // ScopedRouteConfiguration and corresponding RouteConfigProvider. class ScopedRouteInfo { public: - ScopedRouteInfo(envoy::config::route::v3::ScopedRouteConfiguration config_proto, + ScopedRouteInfo(envoy::config::route::v3::ScopedRouteConfiguration&& config_proto, ConfigConstSharedPtr route_config); const ConfigConstSharedPtr& routeConfig() const { return route_config_; } @@ -89,9 +89,9 @@ class ScopedRouteInfo { uint64_t configHash() const { return config_hash_; } private: - envoy::config::route::v3::ScopedRouteConfiguration config_proto_; + const envoy::config::route::v3::ScopedRouteConfiguration config_proto_; ScopeKey scope_key_; - ConfigConstSharedPtr route_config_; + const ConfigConstSharedPtr route_config_; const uint64_t config_hash_; }; using ScopedRouteInfoConstSharedPtr = std::shared_ptr; diff --git a/source/common/router/scoped_rds.cc b/source/common/router/scoped_rds.cc index e459d0dc6b..b600825934 100644 --- a/source/common/router/scoped_rds.cc +++ b/source/common/router/scoped_rds.cc @@ -103,7 +103,7 @@ makeScopedRouteInfos(ProtobufTypes::ConstMessagePtrVector&& config_protos, config_provider_manager.routeConfigProviderManager().createStaticRouteConfigProvider( scoped_route_config.route_configuration(), factory_context, factory_context.messageValidationContext().staticValidationVisitor()); - scopes.push_back(std::make_shared(scoped_route_config, + scopes.push_back(std::make_shared(std::move(scoped_route_config), route_config_provider->configCast())); } diff --git a/test/common/router/scoped_config_impl_test.cc b/test/common/router/scoped_config_impl_test.cc index 37d8a7fa65..004b7b2562 100644 --- a/test/common/router/scoped_config_impl_test.cc +++ b/test/common/router/scoped_config_impl_test.cc @@ -384,19 +384,23 @@ TEST_F(ScopedRouteInfoTest, Creation) { // Tests that config hash changes if ScopedRouteConfiguration of the ScopedRouteInfo changes. TEST_F(ScopedRouteInfoTest, Hash) { - const envoy::config::route::v3::ScopedRouteConfiguration config_copy = scoped_route_config_; - info_ = std::make_unique(scoped_route_config_, route_config_); + envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config1 = scoped_route_config_; + info_ = std::make_unique(std::move(scoped_route_config1), route_config_); EXPECT_EQ(info_->routeConfig().get(), route_config_.get()); - EXPECT_TRUE(TestUtility::protoEqual(info_->configProto(), config_copy)); + EXPECT_TRUE(TestUtility::protoEqual(info_->configProto(), scoped_route_config_)); EXPECT_EQ(info_->scopeName(), "foo_scope"); EXPECT_EQ(info_->scopeKey(), makeKey({"foo", "bar"})); - const auto info2 = std::make_unique(scoped_route_config_, route_config_); + envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config2 = scoped_route_config_; + const auto info2 = + std::make_unique(std::move(scoped_route_config2), route_config_); ASSERT_EQ(info2->configHash(), info_->configHash()); // Mutate the config and hash should be different now. - scoped_route_config_.set_on_demand(true); - const auto info3 = std::make_unique(scoped_route_config_, route_config_); + envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config3 = scoped_route_config_; + scoped_route_config3.set_on_demand(true); + const auto info3 = + std::make_unique(std::move(scoped_route_config3), route_config_); ASSERT_NE(info3->configHash(), info_->configHash()); } From e3528a78e1402be4d8ee1b23d01556b207e4fdce Mon Sep 17 00:00:00 2001 From: Ali Beyad Date: Mon, 30 Sep 2024 17:21:40 -0400 Subject: [PATCH 2/5] mobile: Fix the thread priority expectation in InternalEngineTest (#36390) Signed-off-by: Ali Beyad --- mobile/test/common/internal_engine_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mobile/test/common/internal_engine_test.cc b/mobile/test/common/internal_engine_test.cc index c15cf0c434..be479a821b 100644 --- a/mobile/test/common/internal_engine_test.cc +++ b/mobile/test/common/internal_engine_test.cc @@ -503,8 +503,8 @@ TEST_F(ThreadPriorityInternalEngineTest, SetThreadPriority) { } TEST_F(ThreadPriorityInternalEngineTest, SetOutOfRangeThreadPriority) { - // 42 is outside the range of acceptable thread priorities. - const int expected_thread_priority = 42; + // 102 is outside the range of acceptable thread priorities on all platforms. + const int expected_thread_priority = 102; const int actual_thread_priority = startEngineWithPriority(expected_thread_priority); // The `setpriority` system call doesn't define what happens when the thread priority is out of // range, and the behavior could be system dependent. On Linux, if the supplied priority value From 877293076e18c9a6e18c9f83bfd5be4a47719871 Mon Sep 17 00:00:00 2001 From: Ali Beyad Date: Mon, 30 Sep 2024 18:13:25 -0400 Subject: [PATCH 3/5] mobile: Mark the BidirectionalStreamTest as flaky (#36391) Signed-off-by: Ali Beyad --- mobile/test/java/org/chromium/net/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/mobile/test/java/org/chromium/net/BUILD b/mobile/test/java/org/chromium/net/BUILD index aefe1cb015..3b4b49a444 100644 --- a/mobile/test/java/org/chromium/net/BUILD +++ b/mobile/test/java/org/chromium/net/BUILD @@ -305,6 +305,7 @@ envoy_mobile_android_test( srcs = [ "BidirectionalStreamTest.java", ], + flaky = True, # TODO(fredyw): Debug the reason for it being flaky. native_deps = [ "//test/jni:libenvoy_jni_with_test_extensions.so", ] + select({ From a3e32c92c5ae699a4daad094c6a87b58e1e84ec2 Mon Sep 17 00:00:00 2001 From: yanjunxiang-google <78807980+yanjunxiang-google@users.noreply.github.com> Date: Mon, 30 Sep 2024 19:58:32 -0400 Subject: [PATCH 4/5] Ext proc http functionality support (#35740) Risk Level: low Testing: n/a Docs Changes: n/a Release Notes: inline Fixes: Description: This is to address the issue: https://github.com/envoyproxy/envoy/issues/35488, i.e, integrate the ext_proc HTTP client to ext_proc filter. With this PR, the basic functionalities to have Envoy ext_proc filter talk to a HTTP server using HTTP messages are accomplished. This is the follow up of PR: https://github.com/envoyproxy/envoy/pull/35676 --------- Signed-off-by: Yanjun Xiang --- .../extensions/filters/http/ext_proc/v3/BUILD | 1 + .../filters/http/ext_proc/v3/ext_proc.proto | 37 +- source/extensions/filters/http/ext_proc/BUILD | 13 + .../extensions/filters/http/ext_proc/client.h | 9 +- .../filters/http/ext_proc/client_base.h | 47 ++ .../filters/http/ext_proc/client_impl.cc | 7 + .../filters/http/ext_proc/client_impl.h | 9 +- .../filters/http/ext_proc/config.cc | 50 +- .../filters/http/ext_proc/ext_proc.cc | 144 +++-- .../filters/http/ext_proc/ext_proc.h | 26 +- .../filters/http/ext_proc/http_client/BUILD | 13 +- .../http/ext_proc/http_client/client_base.h | 33 -- .../ext_proc/http_client/http_client_impl.cc | 94 +++- .../ext_proc/http_client/http_client_impl.h | 13 +- .../filters/http/ext_proc/client_test.cc | 2 + .../filters/http/ext_proc/config_test.cc | 29 +- .../filters/http/ext_proc/filter_test.cc | 89 +++- .../filters/http/ext_proc/http_client/BUILD | 21 + .../ext_proc_http_integration_test.cc | 493 ++++++++++++++++++ .../ext_proc/http_client/http_client_test.cc | 51 +- .../filters/http/ext_proc/mock_server.cc | 13 +- .../filters/http/ext_proc/mock_server.h | 9 +- .../filters/http/ext_proc/ordering_test.cc | 2 +- .../unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 2 +- test/extensions/filters/http/ext_proc/utils.h | 4 + 25 files changed, 1077 insertions(+), 134 deletions(-) create mode 100644 source/extensions/filters/http/ext_proc/client_base.h delete mode 100644 source/extensions/filters/http/ext_proc/http_client/client_base.h create mode 100644 test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/BUILD b/api/envoy/extensions/filters/http/ext_proc/v3/BUILD index 8322f99fa7..5bfeeda1b7 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/BUILD +++ b/api/envoy/extensions/filters/http/ext_proc/v3/BUILD @@ -10,5 +10,6 @@ api_proto_package( "//envoy/config/core/v3:pkg", "//envoy/type/matcher/v3:pkg", "@com_github_cncf_xds//udpa/annotations:pkg", + "@com_github_cncf_xds//xds/annotations/v3:pkg", ], ) diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto index 13a24ad9fc..83b15731b9 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto @@ -12,6 +12,8 @@ import "envoy/type/matcher/v3/string.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/struct.proto"; +import "xds/annotations/v3/status.proto"; + import "udpa/annotations/migrate.proto"; import "udpa/annotations/status.proto"; import "validate/validate.proto"; @@ -131,8 +133,39 @@ message ExternalProcessor { // Only one of ``http_service`` or // :ref:`grpc_service `. // can be set. It is required that one of them must be set. - ExtProcHttpService http_service = 20 - [(udpa.annotations.field_migrate).oneof_promotion = "ext_proc_service_type"]; + // + // If ``http_service`` is set, the + // :ref:`processing_mode ` + // can not be configured to send any body or trailers. i.e, http_service only supports + // sending request or response headers to the side stream server. + // + // With this configuration, Envoy behavior: + // + // 1. The headers are first put in a proto message + // :ref:`ProcessingRequest `. + // + // 2. This proto message is then transcoded into a JSON text. + // + // 3. Envoy then sends a HTTP POST message with content-type as "application/json", + // and this JSON text as body to the side stream server. + // + // After the side-stream receives this HTTP request message, it is expected to do as follows: + // + // 1. It converts the body, which is a JSON string, into a ``ProcessingRequest`` + // proto message to examine and mutate the headers. + // + // 2. It then sets the mutated headers into a new proto message + // :ref:`ProcessingResponse `. + // + // 3. It converts ``ProcessingResponse`` proto message into a JSON text. + // + // 4. It then sends a HTTP response back to Envoy with status code as "200", + // content-type as "application/json" and sets the JSON text as the body. + // + ExtProcHttpService http_service = 20 [ + (udpa.annotations.field_migrate).oneof_promotion = "ext_proc_service_type", + (xds.annotations.v3.field_status).work_in_progress = true + ]; // By default, if the gRPC stream cannot be established, or if it is closed // prematurely with an error, the filter will fail. Specifically, if the diff --git a/source/extensions/filters/http/ext_proc/BUILD b/source/extensions/filters/http/ext_proc/BUILD index c6cb55ffb2..2fafe8df47 100644 --- a/source/extensions/filters/http/ext_proc/BUILD +++ b/source/extensions/filters/http/ext_proc/BUILD @@ -9,6 +9,15 @@ licenses(["notice"]) # Apache 2 envoy_extension_package() +envoy_cc_library( + name = "client_base_interface", + hdrs = ["client_base.h"], + tags = ["skip_on_windows"], + deps = [ + "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", + ], +) + envoy_cc_library( name = "ext_proc", srcs = [ @@ -21,6 +30,7 @@ envoy_cc_library( ], tags = ["skip_on_windows"], deps = [ + ":client_base_interface", ":client_interface", ":matching_utils_lib", ":mutation_utils_lib", @@ -34,6 +44,7 @@ envoy_cc_library( "//source/common/runtime:runtime_features_lib", "//source/extensions/filters/common/mutation_rules:mutation_rules_lib", "//source/extensions/filters/http/common:pass_through_filter_lib", + "//source/extensions/filters/http/ext_proc/http_client:http_client_lib", "@com_google_absl//absl/status", "@com_google_absl//absl/strings:str_format", "@com_google_absl//absl/strings:string_view", @@ -53,6 +64,7 @@ envoy_cc_extension( ":client_lib", ":ext_proc", "//source/extensions/filters/http/common:factory_base_lib", + "//source/extensions/filters/http/ext_proc/http_client:http_client_lib", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", ], ) @@ -62,6 +74,7 @@ envoy_cc_library( hdrs = ["client.h"], tags = ["skip_on_windows"], deps = [ + ":client_base_interface", "//envoy/grpc:async_client_manager_interface", "//envoy/grpc:status", "//envoy/stream_info:stream_info_interface", diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 413bbcac77..d60f417051 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -10,13 +10,14 @@ #include "envoy/stream_info/stream_info.h" #include "source/common/http/sidestream_watermark.h" +#include "source/extensions/filters/http/ext_proc/client_base.h" namespace Envoy { namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { -class ExternalProcessorStream { +class ExternalProcessorStream : public StreamBase { public: virtual ~ExternalProcessorStream() = default; virtual void send(envoy::service::ext_proc::v3::ProcessingRequest&& request, @@ -30,7 +31,7 @@ class ExternalProcessorStream { using ExternalProcessorStreamPtr = std::unique_ptr; -class ExternalProcessorCallbacks { +class ExternalProcessorCallbacks : public RequestCallbacks { public: virtual ~ExternalProcessorCallbacks() = default; virtual void onReceiveMessage( @@ -40,7 +41,7 @@ class ExternalProcessorCallbacks { virtual void logGrpcStreamInfo() PURE; }; -class ExternalProcessorClient { +class ExternalProcessorClient : public ClientBase { public: virtual ~ExternalProcessorClient() = default; virtual ExternalProcessorStreamPtr @@ -48,8 +49,6 @@ class ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE; - virtual ExternalProcessorStream* stream() PURE; - virtual void setStream(ExternalProcessorStream* stream) PURE; }; using ExternalProcessorClientPtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_base.h b/source/extensions/filters/http/ext_proc/client_base.h new file mode 100644 index 0000000000..d37fd0c1f5 --- /dev/null +++ b/source/extensions/filters/http/ext_proc/client_base.h @@ -0,0 +1,47 @@ +#pragma once + +#include + +#include "envoy/service/ext_proc/v3/external_processor.pb.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +/** + * Async callbacks used during external processing. + */ +class RequestCallbacks { +public: + virtual ~RequestCallbacks() = default; + virtual void onComplete(envoy::service::ext_proc::v3::ProcessingResponse& response) PURE; + virtual void onError() PURE; +}; + +/** + * Stream base class used during external processing. + */ +class StreamBase { +public: + virtual ~StreamBase() = default; +}; + +/** + * Async client base class used during external processing. + */ +class ClientBase { +public: + virtual ~ClientBase() = default; + virtual void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& request, + bool end_stream, const uint64_t stream_id, RequestCallbacks* callbacks, + StreamBase* stream) PURE; + virtual void cancel() PURE; +}; + +using ClientBasePtr = std::unique_ptr; + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/ext_proc/client_impl.cc b/source/extensions/filters/http/ext_proc/client_impl.cc index fef44968dc..b52c919699 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.cc +++ b/source/extensions/filters/http/ext_proc/client_impl.cc @@ -24,6 +24,13 @@ ExternalProcessorStreamPtr ExternalProcessorClientImpl::start( sidestream_watermark_callbacks); } +void ExternalProcessorClientImpl::sendRequest( + envoy::service::ext_proc::v3::ProcessingRequest&& request, bool end_stream, const uint64_t, + RequestCallbacks*, StreamBase* stream) { + ExternalProcessorStream* grpc_stream = dynamic_cast(stream); + grpc_stream->send(std::move(request), end_stream); +} + ExternalProcessorStreamPtr ExternalProcessorStreamImpl::create( Grpc::AsyncClient&& client, ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options, diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index 8ef177cda0..d4ec819c73 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -32,15 +32,14 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override; - ExternalProcessorStream* stream() override { return stream_; } - void setStream(ExternalProcessorStream* stream) override { stream_ = stream; } + void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& request, bool end_stream, + const uint64_t stream_id, RequestCallbacks* callbacks, + StreamBase* stream) override; + void cancel() override {} private: Grpc::AsyncClientManager& client_manager_; Stats::Scope& scope_; - // The gRPC stream to the external processor, which will be opened - // when it's time to send the first message. - ExternalProcessorStream* stream_ = nullptr; }; class ExternalProcessorStreamImpl : public ExternalProcessorStream, diff --git a/source/extensions/filters/http/ext_proc/config.cc b/source/extensions/filters/http/ext_proc/config.cc index 134fa8d190..076325b2d7 100644 --- a/source/extensions/filters/http/ext_proc/config.cc +++ b/source/extensions/filters/http/ext_proc/config.cc @@ -3,6 +3,7 @@ #include "source/extensions/filters/common/expr/evaluator.h" #include "source/extensions/filters/http/ext_proc/client_impl.h" #include "source/extensions/filters/http/ext_proc/ext_proc.h" +#include "source/extensions/filters/http/ext_proc/http_client/http_client_impl.h" namespace Envoy { namespace Extensions { @@ -22,15 +23,22 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped( proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms, dual_info.scope, stats_prefix, dual_info.is_upstream, Envoy::Extensions::Filters::Common::Expr::getBuilder(context), context); - - return [filter_config = std::move(filter_config), grpc_service = proto_config.grpc_service(), - &context, dual_info](Http::FilterChainFactoryCallbacks& callbacks) { - auto client = std::make_unique( - context.clusterManager().grpcAsyncClientManager(), dual_info.scope); - - callbacks.addStreamFilter(Http::StreamFilterSharedPtr{ - std::make_shared(filter_config, std::move(client), grpc_service)}); - }; + if (proto_config.has_grpc_service()) { + return [filter_config = std::move(filter_config), &context, + dual_info](Http::FilterChainFactoryCallbacks& callbacks) { + auto client = std::make_unique( + context.clusterManager().grpcAsyncClientManager(), dual_info.scope); + callbacks.addStreamFilter( + Http::StreamFilterSharedPtr{std::make_shared(filter_config, std::move(client))}); + }; + } else { + return [proto_config = std::move(proto_config), filter_config = std::move(filter_config), + &context](Http::FilterChainFactoryCallbacks& callbacks) { + auto client = std::make_unique(proto_config, context); + callbacks.addStreamFilter( + Http::StreamFilterSharedPtr{std::make_shared(filter_config, std::move(client))}); + }; + } } Router::RouteSpecificFilterConfigConstSharedPtr @@ -54,14 +62,22 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp server_context.scope(), stats_prefix, false, Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context), server_context); - return [filter_config = std::move(filter_config), grpc_service = proto_config.grpc_service(), - &server_context](Http::FilterChainFactoryCallbacks& callbacks) { - auto client = std::make_unique( - server_context.clusterManager().grpcAsyncClientManager(), server_context.scope()); - - callbacks.addStreamFilter(Http::StreamFilterSharedPtr{ - std::make_shared(filter_config, std::move(client), grpc_service)}); - }; + if (proto_config.has_grpc_service()) { + return [filter_config = std::move(filter_config), + &server_context](Http::FilterChainFactoryCallbacks& callbacks) { + auto client = std::make_unique( + server_context.clusterManager().grpcAsyncClientManager(), server_context.scope()); + callbacks.addStreamFilter( + Http::StreamFilterSharedPtr{std::make_shared(filter_config, std::move(client))}); + }; + } else { + return [proto_config = std::move(proto_config), filter_config = std::move(filter_config), + &server_context](Http::FilterChainFactoryCallbacks& callbacks) { + auto client = std::make_unique(proto_config, server_context); + callbacks.addStreamFilter( + Http::StreamFilterSharedPtr{std::make_shared(filter_config, std::move(client))}); + }; + } } LEGACY_REGISTER_FACTORY(ExternalProcessingFilterConfig, diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 160f51910f..e087cdb159 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -9,6 +9,7 @@ #include "source/common/http/utility.h" #include "source/common/protobuf/utility.h" #include "source/common/runtime/runtime_features.h" +#include "source/extensions/filters/http/ext_proc/http_client/http_client_impl.h" #include "source/extensions/filters/http/ext_proc/mutation_utils.h" #include "absl/strings/str_format.h" @@ -21,6 +22,7 @@ namespace ExternalProcessing { namespace { using envoy::config::common::mutation_rules::v3::HeaderMutationRules; +using envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor; using envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute; using envoy::extensions::filters::http::ext_proc::v3::ProcessingMode; using envoy::type::v3::StatusCode; @@ -49,6 +51,19 @@ absl::optional initProcessingMode(const ExtProcPerRoute& config) return absl::nullopt; } +absl::optional +getFilterGrpcService(const ExternalProcessor& config) { + if (config.has_grpc_service() != config.has_http_service()) { + if (config.has_grpc_service()) { + return config.grpc_service(); + } + } else { + throw EnvoyException("One and only one of grpc_service or http_service must be configured"); + } + + return absl::nullopt; +} + absl::optional initGrpcService(const ExtProcPerRoute& config) { if (config.has_overrides() && config.overrides().has_grpc_service()) { @@ -177,18 +192,19 @@ ProcessingMode allDisabledMode() { } // namespace -FilterConfig::FilterConfig( - const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config, - const std::chrono::milliseconds message_timeout, const uint32_t max_message_timeout_ms, - Stats::Scope& scope, const std::string& stats_prefix, bool is_upstream, - Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder, - Server::Configuration::CommonFactoryContext& context) +FilterConfig::FilterConfig(const ExternalProcessor& config, + const std::chrono::milliseconds message_timeout, + const uint32_t max_message_timeout_ms, Stats::Scope& scope, + const std::string& stats_prefix, bool is_upstream, + Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder, + Server::Configuration::CommonFactoryContext& context) : failure_mode_allow_(config.failure_mode_allow()), observability_mode_(config.observability_mode()), route_cache_action_(config.route_cache_action()), deferred_close_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, deferred_close_timeout, DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS)), message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms), + grpc_service_(getFilterGrpcService(config)), send_body_without_waiting_for_header_response_( config.send_body_without_waiting_for_header_response()), stats_(generateStats(stats_prefix, config.stat_prefix(), scope)), @@ -215,14 +231,22 @@ FilterConfig::FilterConfig( config.response_attributes()), immediate_mutation_checker_(context.regexEngine()), thread_local_stream_manager_slot_(context.threadLocal().allocateSlot()) { - if (config.disable_clear_route_cache() && - (route_cache_action_ != - envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor::DEFAULT)) { + if (!grpc_service_.has_value()) { + // In case http_service configured, the processing mode can only support sending headers. + if (processing_mode_.request_body_mode() != ProcessingMode::NONE || + processing_mode_.response_body_mode() != ProcessingMode::NONE || + processing_mode_.request_trailer_mode() == ProcessingMode::SEND || + processing_mode_.response_trailer_mode() == ProcessingMode::SEND) { + throw EnvoyException( + "If http_service is configured, processing modes can not send any body or trailer."); + } + } + if (config.disable_clear_route_cache() && (route_cache_action_ != ExternalProcessor::DEFAULT)) { throw EnvoyException("disable_clear_route_cache and route_cache_action can not " "be set to none-default at the same time."); } if (config.disable_clear_route_cache()) { - route_cache_action_ = envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor::RETAIN; + route_cache_action_ = ExternalProcessor::RETAIN; } thread_local_stream_manager_slot_->set( [](Envoy::Event::Dispatcher&) { return std::make_shared(); }); @@ -333,6 +357,44 @@ void Filter::setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callb watermark_callbacks_.setEncoderFilterCallbacks(&callbacks); } +void Filter::sendRequest(ProcessingRequest&& req, bool end_stream) { + // Calling the client send function to send the request. + client_->sendRequest(std::move(req), end_stream, filter_callbacks_->streamId(), this, stream_); +} + +void Filter::onComplete(ProcessingResponse& response) { + ENVOY_LOG(debug, "Received successful response from server"); + std::unique_ptr resp_ptr = std::make_unique(response); + onReceiveMessage(std::move(resp_ptr)); +} + +void Filter::onError() { + ENVOY_LOG(debug, "Received Error response from server"); + stats_.http_not_ok_resp_received_.inc(); + + if (processing_complete_) { + ENVOY_LOG(debug, "Ignoring stream message received after processing complete"); + return; + } + + if (config_->failureModeAllow()) { + // The user would like a none-200-ok response to not cause message processing to fail. + // Close the external processing. + processing_complete_ = true; + stats_.failure_mode_allowed_.inc(); + clearAsyncState(); + } else { + // Return an error and stop processing the current stream. + processing_complete_ = true; + decoding_state_.onFinishProcessorCall(Grpc::Status::Aborted); + encoding_state_.onFinishProcessorCall(Grpc::Status::Aborted); + ImmediateResponse errorResponse; + errorResponse.mutable_status()->set_code(StatusCode::InternalServerError); + errorResponse.set_details(absl::StrCat(ErrorPrefix, "_HTTP_ERROR")); + sendImmediateResponse(errorResponse); + } +} + Filter::StreamOpenState Filter::openStream() { // External processing is completed. This means there is no need to send any further // message to the server for processing. Just return IgnoreError so the filter @@ -341,7 +403,12 @@ Filter::StreamOpenState Filter::openStream() { ENVOY_LOG(debug, "External processing is completed when trying to open the gRPC stream"); return StreamOpenState::IgnoreError; } - if (!client_->stream()) { + + if (!config().grpcService().has_value()) { + return StreamOpenState::Ok; + } + + if (!stream_) { ENVOY_LOG(debug, "Opening gRPC stream to external processor"); Http::AsyncClient::ParentContext grpc_context; @@ -351,8 +418,9 @@ Filter::StreamOpenState Filter::openStream() { .setParentContext(grpc_context) .setBufferBodyForRetry(grpc_service_.has_retry_policy()); + ExternalProcessorClient* grpc_client = dynamic_cast(client_.get()); ExternalProcessorStreamPtr stream_object = - client_->start(*this, config_with_hash_key_, options, watermark_callbacks_); + grpc_client->start(*this, config_with_hash_key_, options, watermark_callbacks_); if (processing_complete_) { // Stream failed while starting and either onGrpcError or onGrpcClose was already called @@ -363,26 +431,29 @@ Filter::StreamOpenState Filter::openStream() { } stats_.streams_started_.inc(); - ExternalProcessorStream* stream = config_->threadLocalStreamManager().store( - std::move(stream_object), config_->stats(), config_->deferredCloseTimeout()); - client_->setStream(stream); + stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(), + config_->deferredCloseTimeout()); // For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not // have a proper implementation of streamInfo. if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) { - logging_info_->setClusterInfo(client_->stream()->streamInfo().upstreamClusterInfo()); + logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo()); } } return StreamOpenState::Ok; } void Filter::closeStream() { - if (client_->stream()) { + if (!config_->grpcService().has_value()) { + return; + } + + if (stream_) { ENVOY_LOG(debug, "Calling close on stream"); - if (client_->stream()->close()) { + if (stream_->close()) { stats_.streams_closed_.inc(); } - config_->threadLocalStreamManager().erase(client_->stream()); - client_->setStream(nullptr); + config_->threadLocalStreamManager().erase(stream_); + stream_ = nullptr; } else { ENVOY_LOG(debug, "Stream already closed"); } @@ -390,8 +461,7 @@ void Filter::closeStream() { void Filter::deferredCloseStream() { ENVOY_LOG(debug, "Calling deferred close on stream"); - config_->threadLocalStreamManager().deferredErase(client_->stream(), - filter_callbacks_->dispatcher()); + config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher()); } void Filter::onDestroy() { @@ -402,6 +472,11 @@ void Filter::onDestroy() { decoding_state_.stopMessageTimer(); encoding_state_.stopMessageTimer(); + if (!config_->grpcService().has_value()) { + client_->cancel(); + return; + } + if (config_->observabilityMode()) { // In observability mode where the main stream processing and side stream processing are // asynchronous, it is possible that filter instance is destroyed before the side stream request @@ -409,8 +484,8 @@ void Filter::onDestroy() { // closure is deferred upon filter destruction with a timer. // First, release the referenced filter resource. - if (client_->stream() != nullptr) { - client_->stream()->notifyFilterDestroy(); + if (stream_ != nullptr) { + stream_->notifyFilterDestroy(); } // Second, perform stream deferred closure. @@ -440,7 +515,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::HeadersCallback); ENVOY_LOG(debug, "Sending headers message"); - client_->stream()->send(std::move(req), false); + sendRequest(std::move(req), false); stats_.stream_msgs_sent_.inc(); state.setPaused(true); return FilterHeadersStatus::StopIteration; @@ -673,7 +748,7 @@ Filter::sendHeadersInObservabilityMode(Http::RequestOrResponseHeaderMap& headers ProcessingRequest req = buildHeaderRequest(state, headers, end_stream, /*observability_mode=*/true); ENVOY_LOG(debug, "Sending headers message in observability mode"); - client_->stream()->send(std::move(req), false); + sendRequest(std::move(req), false); stats_.stream_msgs_sent_.inc(); return FilterHeadersStatus::Continue; @@ -698,7 +773,7 @@ Http::FilterDataStatus Filter::sendDataInObservabilityMode(Buffer::Instance& dat // Set up the the body chunk and send. auto req = setupBodyChunk(state, data, end_stream); req.set_observability_mode(true); - client_->stream()->send(std::move(req), false); + sendRequest(std::move(req), false); stats_.stream_msgs_sent_.inc(); ENVOY_LOG(debug, "Sending body message in ObservabilityMode"); } else if (state.bodyMode() != ProcessingMode::NONE) { @@ -890,7 +965,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState ProcessingRequest& req) { state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), new_state); - client_->stream()->send(std::move(req), false); + sendRequest(std::move(req), false); stats_.stream_msgs_sent_.inc(); } @@ -906,21 +981,24 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::TrailersCallback); ENVOY_LOG(debug, "Sending trailers message"); - client_->stream()->send(std::move(req), false); + sendRequest(std::move(req), false); stats_.stream_msgs_sent_.inc(); } void Filter::logGrpcStreamInfo() { - if (client_->stream() != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { - const auto& upstream_meter = client_->stream()->streamInfo().getUpstreamBytesMeter(); + if (!config().grpcService().has_value()) { + return; + } + + if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { + const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter(); if (upstream_meter != nullptr) { logging_info_->setBytesSent(upstream_meter->wireBytesSent()); logging_info_->setBytesReceived(upstream_meter->wireBytesReceived()); } // Only set upstream host in logging info once. if (logging_info_->upstreamHost() == nullptr) { - logging_info_->setUpstreamHost( - client_->stream()->streamInfo().upstreamInfo()->upstreamHost()); + logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost()); } } } diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index ce5a9ed4b1..240ffc505e 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -24,6 +24,7 @@ #include "source/extensions/filters/common/mutation_rules/mutation_rules.h" #include "source/extensions/filters/http/common/pass_through_filter.h" #include "source/extensions/filters/http/ext_proc/client.h" +#include "source/extensions/filters/http/ext_proc/client_base.h" #include "source/extensions/filters/http/ext_proc/matching_utils.h" #include "source/extensions/filters/http/ext_proc/processor_state.h" @@ -47,7 +48,8 @@ namespace ExternalProcessing { COUNTER(clear_route_cache_ignored) \ COUNTER(clear_route_cache_disabled) \ COUNTER(clear_route_cache_upstream_ignored) \ - COUNTER(send_immediate_resp_upstream_ignored) + COUNTER(send_immediate_resp_upstream_ignored) \ + COUNTER(http_not_ok_resp_received) struct ExtProcFilterStats { ALL_EXT_PROC_FILTER_STATS(GENERATE_COUNTER_STRUCT) @@ -274,6 +276,10 @@ class FilterConfig { return thread_local_stream_manager_slot_->getTyped(); } + const absl::optional grpcService() const { + return grpc_service_; + } + private: ExtProcFilterStats generateStats(const std::string& prefix, const std::string& filter_stats_prefix, Stats::Scope& scope) { @@ -287,6 +293,7 @@ class FilterConfig { const std::chrono::milliseconds deferred_close_timeout_; const std::chrono::milliseconds message_timeout_; const uint32_t max_message_timeout_ms_; + const absl::optional grpc_service_; const bool send_body_without_waiting_for_header_response_; ExtProcFilterStats stats_; @@ -379,10 +386,11 @@ class Filter : public Logger::Loggable, }; public: - Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client, - const envoy::config::core::v3::GrpcService& grpc_service) + Filter(const FilterConfigSharedPtr& config, ClientBasePtr&& client) : config_(config), client_(std::move(client)), stats_(config->stats()), - grpc_service_(grpc_service), config_with_hash_key_(grpc_service), + grpc_service_(config->grpcService().has_value() ? config->grpcService().value() + : envoy::config::core::v3::GrpcService()), + config_with_hash_key_(grpc_service_), decoding_state_(*this, config->processingMode(), config->untypedForwardingMetadataNamespaces(), config->typedForwardingMetadataNamespaces(), @@ -444,6 +452,8 @@ class Filter : public Logger::Loggable, const ProcessorState& encodingState() { return encoding_state_; } const ProcessorState& decodingState() { return decoding_state_; } + void onComplete(envoy::service::ext_proc::v3::ProcessingResponse& response) override; + void onError() override; private: void mergePerRouteConfig(); @@ -480,8 +490,10 @@ class Filter : public Logger::Loggable, buildHeaderRequest(ProcessorState& state, Http::RequestOrResponseHeaderMap& headers, bool end_stream, bool observability_mode); + void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& req, bool end_stream); + const FilterConfigSharedPtr config_; - const ExternalProcessorClientPtr client_; + const ClientBasePtr client_; ExtProcFilterStats stats_; ExtProcLoggingInfo* logging_info_; envoy::config::core::v3::GrpcService grpc_service_; @@ -491,6 +503,10 @@ class Filter : public Logger::Loggable, DecodingProcessorState decoding_state_; EncodingProcessorState encoding_state_; + // The gRPC stream to the external processor, which will be opened + // when it's time to send the first message. + ExternalProcessorStream* stream_ = nullptr; + // Set to true when no more messages need to be sent to the processor. // This happens when the processor has closed the stream, or when it has // failed. diff --git a/source/extensions/filters/http/ext_proc/http_client/BUILD b/source/extensions/filters/http/ext_proc/http_client/BUILD index 16afa5f698..681afb3bfd 100644 --- a/source/extensions/filters/http/ext_proc/http_client/BUILD +++ b/source/extensions/filters/http/ext_proc/http_client/BUILD @@ -8,22 +8,17 @@ licenses(["notice"]) # Apache 2 envoy_extension_package() -envoy_cc_library( - name = "client_base_interface", - hdrs = ["client_base.h"], - tags = ["skip_on_windows"], - deps = [], -) - envoy_cc_library( name = "http_client_lib", srcs = ["http_client_impl.cc"], hdrs = ["http_client_impl.h"], tags = ["skip_on_windows"], deps = [ - "client_base_interface", "//source/common/common:enum_to_int", + "//source/common/http:header_map_lib", "//source/common/http:utility_lib", - "//source/extensions/filters/http/ext_proc", + "//source/extensions/filters/http/ext_proc:client_base_interface", + "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", + "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", ], ) diff --git a/source/extensions/filters/http/ext_proc/http_client/client_base.h b/source/extensions/filters/http/ext_proc/http_client/client_base.h deleted file mode 100644 index fd9ae5ce7c..0000000000 --- a/source/extensions/filters/http/ext_proc/http_client/client_base.h +++ /dev/null @@ -1,33 +0,0 @@ -#pragma once - -#include - -namespace Envoy { -namespace Extensions { -namespace HttpFilters { -namespace ExternalProcessing { - -/** - * Async callbacks used during external processing. - */ -class RequestCallbacks { -public: - virtual ~RequestCallbacks() = default; - virtual void onComplete() PURE; -}; - -/** - * Async client base class used during external processing. - */ -class ClientBase { -public: - virtual ~ClientBase() = default; - - virtual void sendRequest() PURE; - virtual void cancel() PURE; -}; - -} // namespace ExternalProcessing -} // namespace HttpFilters -} // namespace Extensions -} // namespace Envoy diff --git a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc index abc1721923..b333ea513c 100644 --- a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc +++ b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc @@ -1,6 +1,8 @@ #include "source/extensions/filters/http/ext_proc/http_client/http_client_impl.h" #include "source/common/common/enum_to_int.h" +#include "source/common/http/header_map_impl.h" +#include "source/common/http/message_impl.h" #include "source/common/http/utility.h" namespace Envoy { @@ -8,13 +10,90 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { +namespace { +Http::RequestMessagePtr buildHttpRequest(absl::string_view uri, const uint64_t stream_id, + absl::string_view req_in_json) { + absl::string_view host, path; + Envoy::Http::Utility::extractHostPathFromUri(uri, host, path); + ENVOY_LOG_MISC(debug, " Ext_Proc HTTP client send request to uri {}, host {}, path {}", uri, host, + path); + + // Construct a HTTP POST message. + const Envoy::Http::HeaderValues& header_values = Envoy::Http::Headers::get(); + Http::RequestHeaderMapPtr headers = + Envoy::Http::createHeaderMap( + {{header_values.Method, "POST"}, + {header_values.Scheme, "http"}, + {header_values.Path, std::string(path)}, + {header_values.ContentType, "application/json"}, + {header_values.RequestId, std::to_string(stream_id)}, + {header_values.Host, std::string(host)}}); + Http::RequestMessagePtr message = + std::make_unique(std::move(headers)); + message->body().add(req_in_json); + return message; +} + +} // namespace + +void ExtProcHttpClient::sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& req, bool, + const uint64_t stream_id, RequestCallbacks* callbacks, + StreamBase*) { + // Cancel any active requests. + cancel(); + callbacks_ = callbacks; + + // Transcode req message into JSON string. + auto req_in_json = MessageUtil::getJsonStringFromMessage(req); + if (req_in_json.ok()) { + const auto http_uri = config_.http_service().http_service().http_uri(); + Http::RequestMessagePtr message = + buildHttpRequest(http_uri.uri(), stream_id, req_in_json.value()); + auto options = Http::AsyncClient::RequestOptions() + .setTimeout(std::chrono::milliseconds( + DurationUtil::durationToMilliseconds(http_uri.timeout()))) + .setSendInternal(false) + .setSendXff(false); + const std::string cluster = http_uri.cluster(); + const auto thread_local_cluster = context().clusterManager().getThreadLocalCluster(cluster); + if (thread_local_cluster) { + active_request_ = + thread_local_cluster->httpAsyncClient().send(std::move(message), *this, options); + } else { + ENVOY_LOG(error, "ext_proc cluster {} does not exist in the config", cluster); + } + } +} + void ExtProcHttpClient::onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& response) { auto status = Envoy::Http::Utility::getResponseStatusOrNullopt(response->headers()); + active_request_ = nullptr; if (status.has_value()) { uint64_t status_code = status.value(); if (status_code == Envoy::enumToInt(Envoy::Http::Code::OK)) { - ENVOY_LOG(error, "Response status is OK"); + std::string msg_body = response->body().toString(); + ENVOY_LOG(debug, "Response status is OK, message body length {}", msg_body.size()); + envoy::service::ext_proc::v3::ProcessingResponse response_msg; + if (!msg_body.empty()) { + bool has_unknown_field; + auto status = MessageUtil::loadFromJsonNoThrow(msg_body, response_msg, has_unknown_field); + if (!status.ok()) { + ENVOY_LOG( + error, + "The HTTP response body can not be decoded into a ProcessResponse proto message"); + onError(); + return; + } + } else { + ENVOY_LOG(error, "Response body is empty"); + onError(); + return; + } + if (callbacks_) { + callbacks_->onComplete(response_msg); + callbacks_ = nullptr; + } } else { ENVOY_LOG(error, "Response status is not OK, status: {}", status_code); onError(); @@ -31,12 +110,25 @@ void ExtProcHttpClient::onFailure(const Http::AsyncClient::Request&, ASSERT(reason == Http::AsyncClient::FailureReason::Reset || reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit); ENVOY_LOG(error, "Request failed: stream has been reset"); + active_request_ = nullptr; onError(); } void ExtProcHttpClient::onError() { // Cancel if the request is active. cancel(); + ENVOY_LOG(error, "ext_proc HTTP client error condition happens."); + if (callbacks_) { + callbacks_->onError(); + callbacks_ = nullptr; + } +} + +void ExtProcHttpClient::cancel() { + if (active_request_) { + active_request_->cancel(); + active_request_ = nullptr; + } } } // namespace ExternalProcessing diff --git a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h index fa2df5afd1..6d9aace5ad 100644 --- a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h +++ b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h @@ -2,11 +2,12 @@ #include +#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" #include "envoy/http/async_client.h" +#include "envoy/service/ext_proc/v3/external_processor.pb.h" #include "source/common/common/logger.h" -#include "source/extensions/filters/http/ext_proc/ext_proc.h" -#include "source/extensions/filters/http/ext_proc/http_client/client_base.h" +#include "source/extensions/filters/http/ext_proc/client_base.h" namespace Envoy { namespace Extensions { @@ -23,8 +24,10 @@ class ExtProcHttpClient : public ClientBase, ~ExtProcHttpClient() { cancel(); } - void sendRequest() override {} - void cancel() override {} + void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& req, bool end_stream, + const uint64_t stream_id, RequestCallbacks* callbacks, + StreamBase* stream) override; + void cancel() override; void onBeforeFinalizeUpstreamSpan(Tracing::Span&, const Http::ResponseHeaderMap*) override {} // Http::AsyncClient::Callbacks implemented by this class. @@ -39,6 +42,8 @@ class ExtProcHttpClient : public ClientBase, void onError(); envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor config_; Server::Configuration::ServerFactoryContext& context_; + Http::AsyncClient::Request* active_request_{}; + RequestCallbacks* callbacks_{}; }; } // namespace ExternalProcessing diff --git a/test/extensions/filters/http/ext_proc/client_test.cc b/test/extensions/filters/http/ext_proc/client_test.cc index 590cd2e05e..6e910c367c 100644 --- a/test/extensions/filters/http/ext_proc/client_test.cc +++ b/test/extensions/filters/http/ext_proc/client_test.cc @@ -66,6 +66,8 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback void onGrpcClose() override { grpc_closed_ = true; } void logGrpcStreamInfo() override {} + void onComplete(envoy::service::ext_proc::v3::ProcessingResponse&) override {} + void onError() override {} std::unique_ptr last_response_; Grpc::Status::GrpcStatus grpc_status_ = Grpc::Status::WellKnownGrpcStatus::Ok; diff --git a/test/extensions/filters/http/ext_proc/config_test.cc b/test/extensions/filters/http/ext_proc/config_test.cc index 5c5196af65..b635fc9e30 100644 --- a/test/extensions/filters/http/ext_proc/config_test.cc +++ b/test/extensions/filters/http/ext_proc/config_test.cc @@ -60,7 +60,7 @@ TEST(HttpExtProcConfigTest, CorrectConfig) { cb(filter_callback); } -TEST(HttpExtProcConfigTest, CorrectConfigServerContext) { +TEST(HttpExtProcConfigTest, CorrectGrpcServiceConfigServerContext) { std::string yaml = R"EOF( grpc_service: google_grpc: @@ -106,6 +106,33 @@ TEST(HttpExtProcConfigTest, CorrectConfigServerContext) { cb(filter_callback); } +TEST(HttpExtProcConfigTest, CorrectHttpServiceConfigServerContext) { + std::string yaml = R"EOF( + http_service: + http_service: + http_uri: + uri: "ext_proc_server_0:9000" + cluster: "ext_proc_server_0" + timeout: + seconds: 500 + failure_mode_allow: true + processing_mode: + request_header_mode: send + )EOF"; + + ExternalProcessingFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyConfigProto(); + TestUtility::loadFromYaml(yaml, *proto_config); + + testing::NiceMock context; + EXPECT_CALL(context, messageValidationVisitor()); + Http::FilterFactoryCb cb = + factory.createFilterFactoryFromProtoWithServerContext(*proto_config, "stats", context); + Http::MockFilterChainFactoryCallbacks filter_callback; + EXPECT_CALL(filter_callback, addStreamFilter(_)); + cb(filter_callback); +} + TEST(HttpExtProcConfigTest, CorrectRouteMetadataOnlyConfig) { std::string yaml = R"EOF( overrides: diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index edb3c2b664..148d47e2de 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -135,7 +135,7 @@ class HttpFilterTest : public testing::Test { std::make_shared( Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), factory_context_); - filter_ = std::make_unique(config_, std::move(client_), proto_config.grpc_service()); + filter_ = std::make_unique(config_, std::move(client_)); filter_->setEncoderFilterCallbacks(encoder_callbacks_); EXPECT_CALL(encoder_callbacks_, encoderBufferLimit()).WillRepeatedly(Return(BufferSize)); filter_->setDecoderFilterCallbacks(decoder_callbacks_); @@ -2750,6 +2750,93 @@ TEST_F(HttpFilterTest, ProcessingModeResponseHeadersOnlyWithoutCallingDecodeHead EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +TEST_F(HttpFilterTest, GrpcServiceHttpServiceBothSet) { + std::string yaml = R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + http_service: + http_service: + http_uri: + uri: "ext_proc_server_0:9000" + cluster: "ext_proc_server_0" + timeout: + seconds: 500 + processing_mode: + response_body_mode: "BUFFERED" + )EOF"; + + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config{}; + TestUtility::loadFromYaml(yaml, proto_config); + EXPECT_THROW_WITH_MESSAGE( + { + auto config = std::make_shared( + proto_config, 200ms, 10000, *stats_store_.rootScope(), "", false, + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + factory_context_); + }, + EnvoyException, "One and only one of grpc_service or http_service must be configured"); +} + +TEST_F(HttpFilterTest, HttpServiceBodyProcessingModeNotNone) { + std::string yaml = R"EOF( + http_service: + http_service: + http_uri: + uri: "ext_proc_server_0:9000" + cluster: "ext_proc_server_0" + timeout: + seconds: 500 + processing_mode: + response_header_mode: "SEND" + request_body_mode: "BUFFERED" + )EOF"; + + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config{}; + TestUtility::loadFromYaml(yaml, proto_config); + EXPECT_THROW_WITH_MESSAGE( + { + auto config = std::make_shared( + proto_config, 200ms, 10000, *stats_store_.rootScope(), "", false, + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + factory_context_); + }, + EnvoyException, + "If http_service is configured, processing modes can not send any body or trailer."); +} + +TEST_F(HttpFilterTest, HttpServiceTrailerProcessingModeNotSKIP) { + std::string yaml = R"EOF( + http_service: + http_service: + http_uri: + uri: "ext_proc_server_0:9000" + cluster: "ext_proc_server_0" + timeout: + seconds: 500 + processing_mode: + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SEND" + )EOF"; + + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config{}; + TestUtility::loadFromYaml(yaml, proto_config); + EXPECT_THROW_WITH_MESSAGE( + { + auto config = std::make_shared( + proto_config, 200ms, 10000, *stats_store_.rootScope(), "", false, + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + factory_context_); + }, + EnvoyException, + "If http_service is configured, processing modes can not send any body or trailer."); +} + // Using the default configuration, verify that the "clear_route_cache" flag makes the appropriate // callback on the filter for inbound traffic when header modifications are also present. // Also verify it does not make the callback for outbound traffic. diff --git a/test/extensions/filters/http/ext_proc/http_client/BUILD b/test/extensions/filters/http/ext_proc/http_client/BUILD index 0cc7d2bdab..1a073794cc 100644 --- a/test/extensions/filters/http/ext_proc/http_client/BUILD +++ b/test/extensions/filters/http/ext_proc/http_client/BUILD @@ -25,3 +25,24 @@ envoy_extension_cc_test( "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", ], ) + +envoy_extension_cc_test( + name = "ext_proc_http_integration_test", + srcs = ["ext_proc_http_integration_test.cc"], + extension_names = ["envoy.filters.http.ext_proc"], + rbe_pool = "2core", + shard_count = 8, + tags = [ + "cpu:3", + "skip_on_windows", + ], + deps = [ + "//source/extensions/filters/http/ext_proc:config", + "//test/common/http:common_lib", + "//test/extensions/filters/http/ext_proc:utils_lib", + "//test/integration:http_protocol_integration_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", + "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", + ], +) diff --git a/test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc b/test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc new file mode 100644 index 0000000000..1b8e9899fc --- /dev/null +++ b/test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc @@ -0,0 +1,493 @@ +#include +#include + +#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" +#include "envoy/service/ext_proc/v3/external_processor.pb.h" + +#include "source/extensions/filters/http/ext_proc/config.h" +#include "source/extensions/filters/http/ext_proc/ext_proc.h" + +#include "test/common/http/common.h" +#include "test/extensions/filters/http/ext_proc/utils.h" +#include "test/integration/http_protocol_integration.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { +namespace { + +using envoy::extensions::filters::http::ext_proc::v3::ProcessingMode; +using envoy::service::ext_proc::v3::BodyResponse; +using envoy::service::ext_proc::v3::CommonResponse; +using envoy::service::ext_proc::v3::HeadersResponse; +using envoy::service::ext_proc::v3::HttpBody; +using envoy::service::ext_proc::v3::HttpHeaders; +using envoy::service::ext_proc::v3::HttpTrailers; +using envoy::service::ext_proc::v3::ProcessingRequest; +using envoy::service::ext_proc::v3::ProcessingResponse; +using envoy::service::ext_proc::v3::TrailersResponse; +using Extensions::HttpFilters::ExternalProcessing::HasHeader; +using Extensions::HttpFilters::ExternalProcessing::HasNoHeader; +using Extensions::HttpFilters::ExternalProcessing::HeaderProtosEqual; +using Extensions::HttpFilters::ExternalProcessing::SingleHeaderValueIs; + +using Http::LowerCaseString; + +struct ConfigOptions { + bool downstream_filter = true; + bool failure_mode_allow = false; + int64_t timeout = 900000000; + std::string cluster = "ext_proc_server_0"; +}; + +struct ExtProcHttpTestParams { + Network::Address::IpVersion version; + Http::CodecType downstream_protocol; + Http::CodecType upstream_protocol; +}; + +class ExtProcHttpClientIntegrationTest : public testing::TestWithParam, + public HttpIntegrationTest { +public: + ExtProcHttpClientIntegrationTest() + : HttpIntegrationTest(GetParam().downstream_protocol, GetParam().version) {} + void createUpstreams() override { + HttpIntegrationTest::createUpstreams(); + + // Create separate "upstreams" for ExtProc side stream servers + for (int i = 0; i < side_stream_count_; ++i) { + http_side_upstreams_.push_back(&addFakeUpstream(Http::CodecType::HTTP2)); + } + } + + void TearDown() override { + if (processor_connection_) { + ASSERT_TRUE(processor_connection_->close()); + ASSERT_TRUE(processor_connection_->waitForDisconnect()); + } + cleanupUpstreamAndDownstream(); + } + + void initializeConfig(ConfigOptions config_option = {}, + const std::vector>& cluster_endpoints = {{0, 1}, + {1, 1}}) { + int total_cluster_endpoints = 0; + std::for_each( + cluster_endpoints.begin(), cluster_endpoints.end(), + [&total_cluster_endpoints](const auto& item) { total_cluster_endpoints += item.second; }); + ASSERT_EQ(total_cluster_endpoints, side_stream_count_); + + config_helper_.addConfigModifier([this, cluster_endpoints, config_option]( + envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // Ensure "HTTP2 with no prior knowledge." + ConfigHelper::setHttp2( + *(bootstrap.mutable_static_resources()->mutable_clusters()->Mutable(0))); + + // Clusters for ExtProc servers, starting by copying an existing cluster. + for (const auto& [cluster_id, endpoint_count] : cluster_endpoints) { + auto* server_cluster = bootstrap.mutable_static_resources()->add_clusters(); + server_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + std::string cluster_name = absl::StrCat("ext_proc_server_", cluster_id); + server_cluster->set_name(cluster_name); + server_cluster->mutable_load_assignment()->set_cluster_name(cluster_name); + ASSERT_EQ(server_cluster->load_assignment().endpoints_size(), 1); + auto* endpoints = server_cluster->mutable_load_assignment()->mutable_endpoints(0); + ASSERT_EQ(endpoints->lb_endpoints_size(), 1); + for (int i = 1; i < endpoint_count; ++i) { + auto* new_lb_endpoint = endpoints->add_lb_endpoints(); + new_lb_endpoint->MergeFrom(endpoints->lb_endpoints(0)); + } + } + + auto* http_uri = + proto_config_.mutable_http_service()->mutable_http_service()->mutable_http_uri(); + http_uri->set_uri("ext_proc_server_0:9000"); + http_uri->set_cluster(config_option.cluster); + http_uri->mutable_timeout()->set_nanos(config_option.timeout); + + if (config_option.failure_mode_allow) { + proto_config_.set_failure_mode_allow(true); + } + std::string ext_proc_filter_name = "envoy.filters.http.ext_proc"; + if (config_option.downstream_filter) { + // Construct a configuration proto for our filter and then re-write it + // to JSON so that we can add it to the overall config + envoy::extensions::filters::network::http_connection_manager::v3::HttpFilter + ext_proc_filter; + ext_proc_filter.set_name(ext_proc_filter_name); + ext_proc_filter.mutable_typed_config()->PackFrom(proto_config_); + config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(ext_proc_filter)); + } + }); + + setUpstreamProtocol(GetParam().upstream_protocol); + setDownstreamProtocol(GetParam().downstream_protocol); + } + + IntegrationStreamDecoderPtr sendDownstreamRequest( + absl::optional> modify_headers) { + auto conn = makeClientConnection(lookupPort("http")); + codec_client_ = makeHttpConnection(std::move(conn)); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + if (modify_headers) { + (*modify_headers)(headers); + } + return codec_client_->makeHeaderOnlyRequest(headers); + } + + IntegrationStreamDecoderPtr sendDownstreamRequestWithBodyAndTrailer(absl::string_view body) { + codec_client_ = makeHttpConnection(lookupPort("http")); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + + auto encoder_decoder = codec_client_->startRequest(headers); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + codec_client_->sendData(*request_encoder_, body, false); + Http::TestRequestTrailerMapImpl request_trailers{{"x-trailer-foo", "yes"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + + return response; + } + + void getAndCheckHttpRequest(FakeUpstream* side_stream, bool first_message = false) { + if (first_message) { + ASSERT_TRUE(side_stream->waitForHttpConnection(*dispatcher_, processor_connection_)); + } + + ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); + ASSERT_TRUE(processor_stream_->waitForEndStream(*dispatcher_)); + EXPECT_THAT(processor_stream_->headers(), + SingleHeaderValueIs("content-type", "application/json")); + EXPECT_THAT(processor_stream_->headers(), SingleHeaderValueIs(":method", "POST")); + EXPECT_THAT(processor_stream_->headers(), HasHeader("x-request-id")); + } + + void sendHttpResponse(ProcessingResponse& response) { + // Sending 200 response with the ProcessingResponse JSON encoded in the body. + std::string response_str = MessageUtil::getJsonStringFromMessageOrError(response, true, true); + processor_stream_->encodeHeaders( + Http::TestResponseHeaderMapImpl{{":status", "200"}, {"content-type", "application/json"}}, + false); + processor_stream_->encodeData(response_str, true); + } + + void processRequestHeadersMessage( + FakeUpstream* side_stream, bool first_message, + absl::optional> cb, + bool send_bad_resp = false) { + getAndCheckHttpRequest(side_stream, first_message); + + if (send_bad_resp) { + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "400"}}, true); + return; + } + // The ext_proc ProcessingRequest message is JSON encoded in the body of the HTTP message. + std::string body = processor_stream_->body().toString(); + ProcessingRequest request; + bool has_unknown_field; + auto status = MessageUtil::loadFromJsonNoThrow(body, request, has_unknown_field); + if (status.ok()) { + ProcessingResponse response; + auto* headers = response.mutable_request_headers(); + const bool sendReply = !cb || (*cb)(request.request_headers(), *headers); + if (sendReply) { + sendHttpResponse(response); + } + } else { + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "400"}}, true); + } + } + + void processResponseHeadersMessage( + FakeUpstream* side_stream, bool first_message, + absl::optional> cb) { + getAndCheckHttpRequest(side_stream, first_message); + + std::string body = processor_stream_->body().toString(); + ProcessingRequest request; + bool has_unknown_field; + auto status = MessageUtil::loadFromJsonNoThrow(body, request, has_unknown_field); + if (status.ok()) { + ProcessingResponse response; + auto* headers = response.mutable_response_headers(); + const bool sendReply = !cb || (*cb)(request.response_headers(), *headers); + if (sendReply) { + sendHttpResponse(response); + } + } else { + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "400"}}, true); + } + } + + void handleUpstreamRequest() { + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + } + + void handleUpstreamRequestWithTrailer() { + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, false); + upstream_request_->encodeTrailers(Http::TestResponseTrailerMapImpl{{"x-test-trailers", "Yes"}}); + } + + void verifyDownstreamResponse(IntegrationStreamDecoder& response, int status_code) { + ASSERT_TRUE(response.waitForEndStream()); + EXPECT_TRUE(response.complete()); + EXPECT_EQ(std::to_string(status_code), response.headers().getStatusValue()); + } + + static std::vector getValuesForExtProcHttpTest() { + std::vector ret; + for (auto ip_version : TestEnvironment::getIpVersionsForTest()) { + for (auto downstream_protocol : {Http::CodecType::HTTP1, Http::CodecType::HTTP2}) { + for (auto upstream_protocol : {Http::CodecType::HTTP1, Http::CodecType::HTTP2}) { + ExtProcHttpTestParams params; + params.version = ip_version; + params.downstream_protocol = downstream_protocol; + params.upstream_protocol = upstream_protocol; + ret.push_back(params); + } + } + } + return ret; + } + + static std::string + ExtProcHttpTestParamsToString(const ::testing::TestParamInfo& params) { + return absl::StrCat( + (params.param.version == Network::Address::IpVersion::v4 ? "IPv4_" : "IPv6_"), + (params.param.downstream_protocol == Http::CodecType::HTTP1 ? "HTTP1_DS_" : "HTTP2_DS_"), + (params.param.upstream_protocol == Http::CodecType::HTTP1 ? "HTTP1_US" : "HTTP2_US")); + } + + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config_{}; + std::vector http_side_upstreams_; + FakeHttpConnectionPtr processor_connection_; + FakeStreamPtr processor_stream_; + // Number of side stream servers in the test. + int side_stream_count_ = 2; +}; + +INSTANTIATE_TEST_SUITE_P( + Protocols, ExtProcHttpClientIntegrationTest, + testing::ValuesIn(ExtProcHttpClientIntegrationTest::getValuesForExtProcHttpTest()), + ExtProcHttpClientIntegrationTest::ExtProcHttpTestParamsToString); + +// Side stream server does not mutate the request header. +TEST_P(ExtProcHttpClientIntegrationTest, ServerNoRequestHeaderMutation) { + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest( + [](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("foo"), "yes"); }); + + // The side stream get the request and sends back the response. + processRequestHeadersMessage(http_side_upstreams_[0], true, absl::nullopt); + + // The request is sent to the upstream. + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("foo", "yes")); + + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + verifyDownstreamResponse(*response, 200); +} + +// Side stream server does not mutate the response header. +TEST_P(ExtProcHttpClientIntegrationTest, ServerNoResponseHeaderMutation) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP); + + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest( + [](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("foo"), "yes"); }); + + // The request is sent to the upstream. + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("foo", "yes")); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + processResponseHeadersMessage(http_side_upstreams_[0], true, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + +// Side stream server adds and removes headers from the header request. +TEST_P(ExtProcHttpClientIntegrationTest, GetAndSetHeadersWithMutation) { + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest( + [](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("x-remove-this"), "yes"); }); + + processRequestHeadersMessage( + http_side_upstreams_[0], true, [](const HttpHeaders& headers, HeadersResponse& headers_resp) { + Http::TestRequestHeaderMapImpl expected_request_headers{ + {":scheme", "http"}, {":method", "GET"}, {"host", "host"}, + {":path", "/"}, {"x-remove-this", "yes"}, {"x-forwarded-proto", "http"}}; + EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_request_headers)); + + auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation(); + auto* mut1 = response_header_mutation->add_set_headers(); + mut1->mutable_header()->set_key("x-new-header"); + mut1->mutable_header()->set_raw_value("new"); + response_header_mutation->add_remove_headers("x-remove-this"); + return true; + }); + + // The request is sent to the upstream. + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_THAT(upstream_request_->headers(), HasNoHeader("x-remove-this")); + + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + verifyDownstreamResponse(*response, 200); +} + +// Side stream server does not send response trigger timeout. +TEST_P(ExtProcHttpClientIntegrationTest, ServerNoResponseFilterTimeout) { + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage(http_side_upstreams_[0], true, + [this](const HttpHeaders&, HeadersResponse&) { + // Travel forward 400 ms exceeding 200ms filter timeout. + timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(400)); + return false; + }); + // ext_proc filter timeouts sends a 504 local reply depending on runtime flag. + verifyDownstreamResponse(*response, 504); +} + +// Http timeout value set to 10ms. Test HTTP timeout. +TEST_P(ExtProcHttpClientIntegrationTest, ServerResponseHttpClientTimeout) { + ConfigOptions config_option = {}; + config_option.timeout = 10000000; + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage(http_side_upstreams_[0], true, + [this](const HttpHeaders&, HeadersResponse&) { + // Travel forward 50 ms exceeding 10ms HTTP URI timeout setting. + timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(50)); + return true; + }); + + // HTTP client timeouts sends a 500 local reply. + verifyDownstreamResponse(*response, 500); +} + +// Side stream server sends back 400 with fail-mode-allow set to false. +TEST_P(ExtProcHttpClientIntegrationTest, ServerSendsBackBadRequestFailClose) { + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage( + http_side_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse&) { return true; }, + true); + + verifyDownstreamResponse(*response, 500); +} + +// Side stream server sends back 400 with fail-mode-allow set to true. +TEST_P(ExtProcHttpClientIntegrationTest, ServerSendsBackBadRequestFailOpen) { + ConfigOptions config_option = {}; + config_option.failure_mode_allow = true; + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage( + http_side_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse&) { return true; }, + true); + + // The request is sent to the upstream. + handleUpstreamRequest(); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + verifyDownstreamResponse(*response, 200); +} + +// Send headers in both directions. +TEST_P(ExtProcHttpClientIntegrationTest, SentHeadersInBothDirection) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBodyAndTrailer("foo"); + + processRequestHeadersMessage( + http_side_upstreams_[0], true, [](const HttpHeaders& headers, HeadersResponse& headers_resp) { + Http::TestRequestHeaderMapImpl expected_request_headers{{":scheme", "http"}, + {":method", "GET"}, + {"host", "host"}, + {":path", "/"}, + {"x-forwarded-proto", "http"}}; + EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_request_headers)); + + auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation(); + auto* mut1 = response_header_mutation->add_set_headers(); + mut1->mutable_header()->set_key("x-new-header"); + mut1->mutable_header()->set_raw_value("new"); + return true; + }); + + // The request is sent to the upstream. + handleUpstreamRequestWithTrailer(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_EQ(upstream_request_->body().toString(), "foo"); + + processResponseHeadersMessage(http_side_upstreams_[0], false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + +// Wrong ext_proc filter cluster config with fail close. +TEST_P(ExtProcHttpClientIntegrationTest, WrongClusterConfigWithFailClose) { + ConfigOptions config_option = {}; + config_option.failure_mode_allow = false; + config_option.cluster = "foo"; + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + verifyDownstreamResponse(*response, 504); +} + +// Wrong ext_proc filter cluster config with fail open +TEST_P(ExtProcHttpClientIntegrationTest, WrongClusterConfigWithFailOpen) { + ConfigOptions config_option = {}; + config_option.failure_mode_allow = true; + config_option.cluster = "foo"; + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + // The request is sent to the upstream. + handleUpstreamRequest(); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + verifyDownstreamResponse(*response, 200); +} + +} // namespace +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/http_client/http_client_test.cc b/test/extensions/filters/http/ext_proc/http_client/http_client_test.cc index 27a6d83c9e..faca0383df 100644 --- a/test/extensions/filters/http/ext_proc/http_client/http_client_test.cc +++ b/test/extensions/filters/http/ext_proc/http_client/http_client_test.cc @@ -18,10 +18,26 @@ class ExtProcHttpClientTest : public testing::Test { public: ~ExtProcHttpClientTest() override = default; - void SetUp() override { client_ = std::make_unique(config_, context_); } + const std::string default_http_config_ = R"EOF( + http_service: + http_service: + http_uri: + uri: "ext_proc_server_0:9000" + cluster: "ext_proc_server_0" + timeout: + seconds: 500 + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SKIP" + )EOF"; + + void SetUp() override { + TestUtility::loadFromYaml(default_http_config_, config_); + client_ = std::make_unique(config_, context_); + } protected: - envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor config_; + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor config_{}; testing::NiceMock context_; Upstream::MockClusterManager& cm_{context_.cluster_manager_}; std::unique_ptr client_; @@ -31,30 +47,49 @@ class ExtProcHttpClientTest : public testing::Test { TEST_F(ExtProcHttpClientTest, Basic) { SetUp(); - client_->sendRequest(); + client_->cancel(); client_->context(); Tracing::MockSpan parent_span; client_->onBeforeFinalizeUpstreamSpan(parent_span, nullptr); Http::AsyncClient::FailureReason reason = Envoy::Http::AsyncClient::FailureReason::Reset; client_->onFailure(async_request_, reason); + reason = Envoy::Http::AsyncClient::FailureReason::ExceedResponseBufferLimit; + client_->onFailure(async_request_, reason); +} - Http::ResponseHeaderMapPtr resp_headers_ok(new Http::TestResponseHeaderMapImpl({ +TEST_F(ExtProcHttpClientTest, JsonDecodingErrorTest) { + Http::ResponseHeaderMapPtr resp_headers(new Http::TestResponseHeaderMapImpl({ {":status", "200"}, })); - Http::ResponseMessagePtr response_ok(new Http::ResponseMessageImpl(std::move(resp_headers_ok))); - client_->onSuccess(async_request_, std::move(response_ok)); + Http::ResponseMessagePtr response_ok_with_bad_body( + new Http::ResponseMessageImpl(std::move(resp_headers))); + response_ok_with_bad_body->body().add("foo-bar"); + client_->onSuccess(async_request_, std::move(response_ok_with_bad_body)); +} +TEST_F(ExtProcHttpClientTest, EmptyResponseBodyTest) { + Http::ResponseHeaderMapPtr resp_headers(new Http::TestResponseHeaderMapImpl({ + {":status", "200"}, + })); + Http::ResponseMessagePtr response_ok_with_empty_body( + new Http::ResponseMessageImpl(std::move(resp_headers))); + client_->onSuccess(async_request_, std::move(response_ok_with_empty_body)); +} + +TEST_F(ExtProcHttpClientTest, ResponseStatusNotOkTest) { Http::ResponseHeaderMapPtr resp_headers(new Http::TestResponseHeaderMapImpl({ {":status", "403"}, })); Http::ResponseMessagePtr response(new Http::ResponseMessageImpl(std::move(resp_headers))); client_->onSuccess(async_request_, std::move(response)); +} - Http::ResponseHeaderMapPtr resp_headers_foo(new Http::TestResponseHeaderMapImpl({ +TEST_F(ExtProcHttpClientTest, WrongResponseStatusTest) { + Http::ResponseHeaderMapPtr resp_headers(new Http::TestResponseHeaderMapImpl({ {":status", "foo"}, })); - Http::ResponseMessagePtr response_foo(new Http::ResponseMessageImpl(std::move(resp_headers_foo))); + Http::ResponseMessagePtr response_foo(new Http::ResponseMessageImpl(std::move(resp_headers))); client_->onSuccess(async_request_, std::move(response_foo)); } diff --git a/test/extensions/filters/http/ext_proc/mock_server.cc b/test/extensions/filters/http/ext_proc/mock_server.cc index 29637f793f..35c91b238e 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.cc +++ b/test/extensions/filters/http/ext_proc/mock_server.cc @@ -5,12 +5,17 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { -MockClient::MockClient() { - EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; })); +using ::testing::_; +using ::testing::Invoke; - EXPECT_CALL(*this, setStream(testing::_)) +MockClient::MockClient() { + EXPECT_CALL(*this, sendRequest(_, _, _, _, _)) .WillRepeatedly( - testing::Invoke([this](ExternalProcessorStream* stream) -> void { stream_ = stream; })); + Invoke([](envoy::service::ext_proc::v3::ProcessingRequest&& request, bool end_stream, + const uint64_t, RequestCallbacks*, StreamBase* stream) { + ExternalProcessorStream* grpc_stream = dynamic_cast(stream); + grpc_stream->send(std::move(request), end_stream); + })); } MockClient::~MockClient() = default; diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index 12c9d7308a..a4962acb4e 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -13,14 +13,15 @@ class MockClient : public ExternalProcessorClient { public: MockClient(); ~MockClient() override; + MOCK_METHOD(ExternalProcessorStreamPtr, start, (ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); - MOCK_METHOD(ExternalProcessorStream*, stream, ()); - MOCK_METHOD(void, setStream, (ExternalProcessorStream * stream)); - - ExternalProcessorStream* stream_ = nullptr; + MOCK_METHOD(void, sendRequest, + (envoy::service::ext_proc::v3::ProcessingRequest&&, bool, const uint64_t, + RequestCallbacks*, StreamBase*)); + MOCK_METHOD(void, cancel, ()); }; class MockStream : public ExternalProcessorStream { diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index 4b1e4cf133..a12be075b0 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -76,7 +76,7 @@ class OrderingTest : public testing::Test { std::make_shared( Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), factory_context_); - filter_ = std::make_unique(config_, std::move(client_), proto_config.grpc_service()); + filter_ = std::make_unique(config_, std::move(client_)); filter_->setEncoderFilterCallbacks(encoder_callbacks_); filter_->setDecoderFilterCallbacks(decoder_callbacks_); } diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index 25c6b3ac2d..f8635e1859 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -98,7 +98,7 @@ DEFINE_PROTO_FUZZER( ExternalProcessing::MockClient* client = new ExternalProcessing::MockClient(); std::unique_ptr filter = std::make_unique( - config, ExternalProcessing::ExternalProcessorClientPtr{client}, proto_config.grpc_service()); + config, ExternalProcessing::ExternalProcessorClientPtr{client}); filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_); filter->setEncoderFilterCallbacks(mocks.encoder_callbacks_); diff --git a/test/extensions/filters/http/ext_proc/utils.h b/test/extensions/filters/http/ext_proc/utils.h index 858e8e7cb0..351d40bd74 100644 --- a/test/extensions/filters/http/ext_proc/utils.h +++ b/test/extensions/filters/http/ext_proc/utils.h @@ -31,6 +31,10 @@ MATCHER_P(HasNoHeader, key, absl::StrFormat("Headers have no value for \"%s\"", return arg.get(::Envoy::Http::LowerCaseString(std::string(key))).empty(); } +MATCHER_P(HasHeader, key, absl::StrFormat("There exists a header for \"%s\"", key)) { + return !arg.get(::Envoy::Http::LowerCaseString(std::string(key))).empty(); +} + MATCHER_P2(SingleHeaderValueIs, key, value, absl::StrFormat("Header \"%s\" equals \"%s\"", key, value)) { const auto hdr = arg.get(::Envoy::Http::LowerCaseString(std::string(key))); From b67e3b4491c995d226f3da35972f707bac743fcf Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Tue, 1 Oct 2024 00:10:59 -0500 Subject: [PATCH 5/5] Quiche roll 20240930141637 (#36385) Update QUICHE from 42b2e66c7 to 171f6f89a https://github.com/google/quiche/compare/42b2e66c7..171f6f89a ``` $ git log 42b2e66c7..171f6f89a --date=short --no-merges --format="%ad %al %s" 2024-09-26 birenroy Migrates code from `//third_party/spdy/core:spdy_protocol_lib` to `//third_party/http2/core:spdy_protocol`. 2024-09-26 martinduke fix asan error because the log message uses the variable about to be initialized, rather than the value being assigned. 2024-09-26 birenroy Deletes unused artifacts from //third_party/spdy. 2024-09-26 martinduke Complete processing of RESET_STREAM_AT frames. Negotiate with transport parameters and pass the frame from QuicConnection to QuicStream. 2024-09-26 birenroy Moves //third_party/spdy/core:http2_frame_decoder_adapter_fuzzer to //third_party/http2/core. 2024-09-26 birenroy Migrates code from `//third_party/spdy/core:spdy_protocol_lib` to `//third_party/http2/core:spdy_protocol`. 2024-09-26 birenroy Moves libraries from //third_party/spdy/core/hpack to //third_party/http2/hpack. 2024-09-26 rch No public description 2024-09-26 birenroy Migrates the old cc_fuzz_target in //third_party/spdy to the modern go/fuzztest framework. 2024-09-25 birenroy Migrates code from //third_party/spdy/core:spdy_framer_lib to //third_party/http2/core:spdy_framer. 2024-09-25 wub If QUIC server closes connection due to invalid hostname in SNI, use new connection close code `QUIC_HANDSHAKE_FAILED_INVALID_HOSTNAME` instead of the generic `QUIC_HANDSHAKE_FAILED`. 2024-09-25 birenroy Removes the now-unused build target //third_party/spdy/core:http2_header_block_lib. 2024-09-25 birenroy Migrates targets from `//third_party/spdy/core:spdy_alt_svc_wire_format_lib` to `//third_party/http2/core:spdy_alt_svc_wire_format`. 2024-09-25 birenroy Migrates remaining google3 users of spdy::Http2HeaderBlock to quiche::HttpHeaderBlock. 2024-09-25 birenroy Migrates code from `//third_party/spdy/core:http2_deframer_lib` to `//third_party/http2/core:http2_frame_decoder_adapter`. 2024-09-25 martinduke Roll the version number to draft-06. This will break interoperability with other MoQT implementations until we update the wire image. 2024-09-25 vasilvv Refactor MoQT interfaces to consistently pass FullTrackName around. 2024-09-24 martinduke MOQT MAX_SUBSCRIBE_ID implementation 2024-09-24 wub No public description 2024-09-24 elburrito BlindSignAuth: Update error messages to return constant strings. Messages now return at Anonymous Tokens-level granularity. 2024-09-24 wub Deprecate --quic_dispatcher_replace_cid_on_first_packet. 2024-09-23 quiche-dev No public description 2024-09-23 quiche-dev Remove quic::QuicServerId::privacy_mode_enabled() 2024-09-23 quiche-dev Enabling rolled out flags. 2024-09-23 wub No public description 2024-09-22 fayang Postpone removing handshake timeout from transport params getting negotiated to handshake complete. 2024-09-22 fayang Deliver STOP_SENDING to zombie streams. 2024-09-20 martinduke No public description 2024-09-19 vasilvv Introduce padding streams. 2024-09-19 quiche-dev QUICHE: always print ACK ranges as [a...b], except if the range is a single packet. 2024-09-19 vasilvv Split MoqtParser into MoqtControlParser and MoqtDataParser. 2024-09-18 birenroy Adds `^` and `|` to the set of allowed characters in HTTP/2 request paths. 2024-09-18 rch Add an IPv6 flow label field to QuicPacketWriterParams so that packet writers can set this field when writing packets. Currently nothing sets this field. 2024-09-18 quiche-dev No public description 2024-09-18 vasilvv Clean up MoqtTestMessage 2024-09-18 martinduke Update QuicStream to handle RESET_STREAM_AT frames. ``` --------- Signed-off-by: Fredy Wijaya --- bazel/external/quiche.BUILD | 49 ++++++++----------- bazel/repository_locations.bzl | 6 +-- source/common/http/http3/conn_pool.cc | 2 +- .../client_connection_factory_impl_test.cc | 8 +-- .../quic/envoy_quic_client_session_test.cc | 2 +- test/common/quic/test_utils.h | 2 +- .../integration/quic_http_integration_test.cc | 5 +- 7 files changed, 33 insertions(+), 41 deletions(-) diff --git a/bazel/external/quiche.BUILD b/bazel/external/quiche.BUILD index ccd6261ad6..3a334c4d4e 100644 --- a/bazel/external/quiche.BUILD +++ b/bazel/external/quiche.BUILD @@ -400,12 +400,12 @@ envoy_cc_library( copts = quiche_copts, repository = "@envoy", deps = [ + ":common_http_http_header_block_lib", ":http2_adapter_data_source", ":http2_adapter_http2_protocol", ":http2_adapter_http2_visitor_interface", ":http2_adapter_nghttp2_include", ":quiche_common_platform_export", - ":spdy_core_http2_header_block_lib", ], ) @@ -435,6 +435,7 @@ envoy_cc_library( copts = quiche_copts, repository = "@envoy", deps = [ + ":common_http_http_header_block_lib", ":http2_adapter_chunked_buffer", ":http2_adapter_data_source", ":http2_adapter_event_forwarder", @@ -453,7 +454,6 @@ envoy_cc_library( ":http2_header_byte_listener_interface_lib", ":http2_no_op_headers_handler_lib", ":quiche_common_callbacks", - ":spdy_core_http2_header_block_lib", "@com_google_absl//absl/algorithm", "@com_google_absl//absl/cleanup", ], @@ -498,9 +498,9 @@ envoy_cc_library( copts = quiche_copts, repository = "@envoy", deps = [ + ":common_http_http_header_block_lib", ":http2_adapter_http2_protocol", ":quiche_common_platform_export", - ":spdy_core_http2_header_block_lib", ], ) @@ -570,6 +570,7 @@ envoy_cc_test_library( copts = quiche_copts, repository = "@envoy", deps = [ + ":common_http_http_header_block_lib", ":http2_adapter_chunked_buffer", ":http2_adapter_data_source", ":http2_adapter_http2_protocol", @@ -578,7 +579,6 @@ envoy_cc_test_library( ":http2_core_protocol_lib", ":http2_hpack_hpack_lib", ":quiche_common_platform_test", - ":spdy_core_http2_header_block_lib", ], ) @@ -1320,7 +1320,6 @@ envoy_cc_library( srcs = ["quiche/http2/core/spdy_alt_svc_wire_format.cc"], hdrs = [ "quiche/http2/core/spdy_alt_svc_wire_format.h", - "quiche/spdy/core/spdy_alt_svc_wire_format.h", ], copts = quiche_copts, repository = "@envoy", @@ -1337,25 +1336,23 @@ envoy_cc_library( hdrs = [ "quiche/http2/core/spdy_frame_builder.h", "quiche/http2/core/spdy_framer.h", - "quiche/spdy/core/spdy_frame_builder.h", - "quiche/spdy/core/spdy_framer.h", ], copts = quiche_copts, repository = "@envoy", deps = [ + ":common_http_http_header_block_lib", ":http2_core_alt_svc_wire_format_lib", ":http2_core_headers_handler_interface_lib", ":http2_core_protocol_lib", ":http2_core_zero_copy_output_buffer_lib", ":http2_hpack_hpack_lib", ":quiche_common_platform", - ":spdy_core_http2_header_block_lib", ], ) envoy_cc_library( - name = "spdy_core_http2_header_block_lib", - hdrs = ["quiche/spdy/core/http2_header_block.h"], + name = "common_http_http_header_block_lib", + hdrs = ["quiche/common/http/http_header_block.h"], copts = quiche_copts, repository = "@envoy", visibility = ["//visibility:public"], @@ -1381,13 +1378,11 @@ envoy_cc_library( envoy_cc_library( name = "http2_core_http2_deframer_lib", srcs = ["quiche/http2/core/http2_frame_decoder_adapter.cc"], - hdrs = [ - "quiche/http2/core/http2_frame_decoder_adapter.h", - "quiche/spdy/core/http2_frame_decoder_adapter.h", - ], + hdrs = ["quiche/http2/core/http2_frame_decoder_adapter.h"], copts = quiche_copts, repository = "@envoy", deps = [ + ":common_http_http_header_block_lib", ":http2_constants_lib", ":http2_core_alt_svc_wire_format_lib", ":http2_core_headers_handler_interface_lib", @@ -1396,11 +1391,10 @@ envoy_cc_library( ":http2_decoder_decode_status_lib", ":http2_decoder_frame_decoder_lib", ":http2_decoder_frame_decoder_listener_lib", + ":http2_hpack_hpack_decoder_adapter_lib", ":http2_hpack_hpack_lib", ":http2_structures_lib", ":quiche_common_platform", - ":spdy_core_hpack_hpack_decoder_adapter_lib", - ":spdy_core_http2_header_block_lib", ], ) @@ -1427,7 +1421,6 @@ envoy_cc_library( "quiche/http2/hpack/hpack_header_table.h", "quiche/http2/hpack/hpack_output_stream.h", "quiche/http2/hpack/hpack_static_table.h", - "quiche/spdy/core/hpack/hpack_encoder.h", ], copts = quiche_copts, repository = "@envoy", @@ -1442,12 +1435,13 @@ envoy_cc_library( ) envoy_cc_library( - name = "spdy_core_hpack_hpack_decoder_adapter_lib", - srcs = ["quiche/spdy/core/hpack/hpack_decoder_adapter.cc"], - hdrs = ["quiche/spdy/core/hpack/hpack_decoder_adapter.h"], + name = "http2_hpack_hpack_decoder_adapter_lib", + srcs = ["quiche/http2/hpack/hpack_decoder_adapter.cc"], + hdrs = ["quiche/http2/hpack/hpack_decoder_adapter.h"], copts = quiche_copts, repository = "@envoy", deps = [ + ":common_http_http_header_block_lib", ":http2_core_headers_handler_interface_lib", ":http2_decoder_decode_buffer_lib", ":http2_decoder_decode_status_lib", @@ -1458,7 +1452,6 @@ envoy_cc_library( ":http2_hpack_hpack_lib", ":http2_no_op_headers_handler_lib", ":quiche_common_platform", - ":spdy_core_http2_header_block_lib", ], ) @@ -1482,16 +1475,14 @@ envoy_cc_library( hdrs = [ "quiche/http2/core/spdy_bitmasks.h", "quiche/http2/core/spdy_protocol.h", - "quiche/spdy/core/spdy_bitmasks.h", - "quiche/spdy/core/spdy_protocol.h", ], copts = quiche_copts, repository = "@envoy", visibility = ["//visibility:public"], deps = [ + ":common_http_http_header_block_lib", ":http2_core_alt_svc_wire_format_lib", ":quiche_common_platform", - ":spdy_core_http2_header_block_lib", ], ) @@ -1501,8 +1492,8 @@ envoy_cc_library( hdrs = ["quiche/http2/core/recording_headers_handler.h"], repository = "@envoy", deps = [ + ":common_http_http_header_block_lib", ":http2_core_headers_handler_interface_lib", - ":spdy_core_http2_header_block_lib", ], ) @@ -1513,11 +1504,11 @@ envoy_cc_test_library( copts = quiche_copts, repository = "@envoy", deps = [ + ":common_http_http_header_block_lib", ":http2_core_headers_handler_interface_lib", ":http2_core_protocol_lib", ":quiche_common_platform", ":quiche_common_test_tools_test_utils_lib", - ":spdy_core_http2_header_block_lib", ], ) @@ -3088,13 +3079,13 @@ envoy_quic_cc_library( srcs = ["quiche/quic/core/http/quic_header_list.cc"], hdrs = ["quiche/quic/core/http/quic_header_list.h"], deps = [ + ":common_http_http_header_block_lib", ":http2_core_headers_handler_interface_lib", ":http2_core_protocol_lib", ":quic_core_packets_lib", ":quic_core_qpack_qpack_header_table_lib", ":quic_platform_base", ":quiche_common_circular_deque_lib", - ":spdy_core_http2_header_block_lib", ], ) @@ -3830,8 +3821,8 @@ envoy_quic_cc_library( srcs = ["quiche/quic/core/qpack/value_splitting_header_list.cc"], hdrs = ["quiche/quic/core/qpack/value_splitting_header_list.h"], deps = [ + ":common_http_http_header_block_lib", ":quic_platform_base", - ":spdy_core_http2_header_block_lib", ], ) @@ -5600,10 +5591,10 @@ envoy_cc_library( repository = "@envoy", tags = ["nofips"], deps = [ + ":common_http_http_header_block_lib", ":quiche_common_callbacks", ":quiche_common_platform_export", ":quiche_common_quiche_stream_lib", - ":spdy_core_http2_header_block_lib", "@com_google_absl//absl/strings", "@com_google_absl//absl/time", "@com_google_absl//absl/types:span", diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index 5eada04195..b5233988c6 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -1208,12 +1208,12 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "QUICHE", project_desc = "QUICHE (QUIC, HTTP/2, Etc) is Google‘s implementation of QUIC and related protocols", project_url = "https://github.com/google/quiche", - version = "42b2e66c721f442bb439b40a1e037897360cf1b2", - sha256 = "f72f78d7fa57154ad302d559fee6b72e0695d51391684891ec991b2b5d90491f", + version = "171f6f89a6a119e8763f1216f8d85347f997cd3b", + sha256 = "3e0fec32dfa9c7568d4703516ee14c9e2316379e0a35f723d17a988be178e532", urls = ["https://github.com/google/quiche/archive/{version}.tar.gz"], strip_prefix = "quiche-{version}", use_category = ["controlplane", "dataplane_core"], - release_date = "2024-09-17", + release_date = "2024-09-26", cpe = "N/A", license = "BSD-3-Clause", license_url = "https://github.com/google/quiche/blob/{version}/LICENSE", diff --git a/source/common/http/http3/conn_pool.cc b/source/common/http/http3/conn_pool.cc index 761a5048db..55963e30a0 100644 --- a/source/common/http/http3/conn_pool.cc +++ b/source/common/http/http3/conn_pool.cc @@ -115,7 +115,7 @@ Http3ConnPoolImpl::Http3ConnPoolImpl( random_generator, state, client_fn, codec_fn, protocol, {}, nullptr), quic_info_(dynamic_cast(quic_info)), server_id_(sni(transport_socket_options, host), - static_cast(host_->address()->ip()->port()), false), + static_cast(host_->address()->ip()->port())), connect_callback_(connect_callback), attempt_happy_eyeballs_(attempt_happy_eyeballs), network_observer_registry_(network_observer_registry) {} diff --git a/test/common/quic/client_connection_factory_impl_test.cc b/test/common/quic/client_connection_factory_impl_test.cc index 455a819bcb..5f505584b9 100644 --- a/test/common/quic/client_connection_factory_impl_test.cc +++ b/test/common/quic/client_connection_factory_impl_test.cc @@ -98,7 +98,7 @@ TEST_P(QuicNetworkConnectionTest, BufferLimits) { initialize(); std::unique_ptr client_connection = createQuicNetworkConnection( *quic_info_, crypto_config_, - quic::QuicServerId{factory_->clientContextConfig()->serverNameIndication(), PEER_PORT, false}, + quic::QuicServerId{factory_->clientContextConfig()->serverNameIndication(), PEER_PORT}, dispatcher_, test_address_, test_address_, quic_stat_names_, {}, *store_.rootScope(), nullptr, nullptr, connection_id_generator_, *factory_); EnvoyQuicClientSession* session = static_cast(client_connection.get()); @@ -125,7 +125,7 @@ TEST_P(QuicNetworkConnectionTest, SocketOptions) { std::unique_ptr client_connection = createQuicNetworkConnection( *quic_info_, crypto_config_, - quic::QuicServerId{factory_->clientContextConfig()->serverNameIndication(), PEER_PORT, false}, + quic::QuicServerId{factory_->clientContextConfig()->serverNameIndication(), PEER_PORT}, dispatcher_, test_address_, test_address_, quic_stat_names_, {}, *store_.rootScope(), socket_options, nullptr, connection_id_generator_, *factory_); EnvoyQuicClientSession* session = static_cast(client_connection.get()); @@ -142,7 +142,7 @@ TEST_P(QuicNetworkConnectionTest, LocalAddress) { : Network::Utility::getCanonicalIpv4LoopbackAddress(); std::unique_ptr client_connection = createQuicNetworkConnection( *quic_info_, crypto_config_, - quic::QuicServerId{factory_->clientContextConfig()->serverNameIndication(), PEER_PORT, false}, + quic::QuicServerId{factory_->clientContextConfig()->serverNameIndication(), PEER_PORT}, dispatcher_, test_address_, local_addr, quic_stat_names_, {}, *store_.rootScope(), nullptr, nullptr, connection_id_generator_, *factory_); EnvoyQuicClientSession* session = static_cast(client_connection.get()); @@ -164,7 +164,7 @@ TEST_P(QuicNetworkConnectionTest, Srtt) { std::unique_ptr client_connection = createQuicNetworkConnection( info, crypto_config_, - quic::QuicServerId{factory_->clientContextConfig()->serverNameIndication(), PEER_PORT, false}, + quic::QuicServerId{factory_->clientContextConfig()->serverNameIndication(), PEER_PORT}, dispatcher_, test_address_, test_address_, quic_stat_names_, rtt_cache, *store_.rootScope(), nullptr, nullptr, connection_id_generator_, *factory_); diff --git a/test/common/quic/envoy_quic_client_session_test.cc b/test/common/quic/envoy_quic_client_session_test.cc index d757f03a0b..decd040b5b 100644 --- a/test/common/quic/envoy_quic_client_session_test.cc +++ b/test/common/quic/envoy_quic_client_session_test.cc @@ -137,7 +137,7 @@ class EnvoyQuicClientSessionTest : public testing::TestWithParam( quic_config_, quic_version_, std::unique_ptr(quic_connection_), - quic::QuicServerId("example.com", 443, false), crypto_config_, *dispatcher_, + quic::QuicServerId("example.com", 443), crypto_config_, *dispatcher_, /*send_buffer_limit*/ 1024 * 1024, crypto_stream_factory_, quic_stat_names_, cache, *store_.rootScope(), transport_socket_options_, uts_factory); diff --git a/test/common/quic/test_utils.h b/test/common/quic/test_utils.h index c9c1d04a70..058f9d6d5c 100644 --- a/test/common/quic/test_utils.h +++ b/test/common/quic/test_utils.h @@ -205,7 +205,7 @@ class MockEnvoyQuicClientSession : public IsolatedStoreProvider, public EnvoyQui Event::Dispatcher& dispatcher, uint32_t send_buffer_limit, EnvoyQuicCryptoClientStreamFactoryInterface& crypto_stream_factory) : EnvoyQuicClientSession(config, supported_versions, std::move(connection), - quic::QuicServerId("example.com", 443, false), + quic::QuicServerId("example.com", 443), std::make_shared( quic::test::crypto_test_utils::ProofVerifierForTesting()), dispatcher, send_buffer_limit, crypto_stream_factory, diff --git a/test/integration/quic_http_integration_test.cc b/test/integration/quic_http_integration_test.cc index 8be734d474..f859f5d2b0 100644 --- a/test/integration/quic_http_integration_test.cc +++ b/test/integration/quic_http_integration_test.cc @@ -241,7 +241,7 @@ class QuicHttpIntegrationTestBase : public HttpIntegrationTest { quic::QuicServerId{ (host.empty() ? transport_socket_factory_->clientContextConfig()->serverNameIndication() : host), - static_cast(port), false}, + static_cast(port)}, transport_socket_factory_->getCryptoConfig(), *dispatcher_, // Use smaller window than the default one to have test coverage of client codec buffer // exceeding high watermark. @@ -1089,7 +1089,8 @@ TEST_P(QuicHttpIntegrationTest, CertVerificationFailure) { std::string failure_reason = "QUIC_TLS_CERTIFICATE_UNKNOWN with details: TLS handshake failure " "(ENCRYPTION_HANDSHAKE) 46: " "certificate unknown. SSLErrorStack:"; - EXPECT_EQ(failure_reason, codec_client_->connection()->transportFailureReason()); + EXPECT_THAT(codec_client_->connection()->transportFailureReason(), + testing::HasSubstr(failure_reason)); } TEST_P(QuicHttpIntegrationTest, ResetRequestWithoutAuthorityHeader) {