From c19e6d3dbc2d40535f5dfe0aa0462a24676a8486 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Thu, 28 Oct 2021 16:13:07 +0200 Subject: [PATCH] Respect remote MAX_CONCURRENT_STREAMS If the limit has been reached, new requests are failed immediately, so that the application can retry them on a different connection. --- src/gun_http2.erl | 56 ++++++++++++++++++++++++++---------------- test/rfc7540_SUITE.erl | 33 ++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 22 deletions(-) diff --git a/src/gun_http2.erl b/src/gun_http2.erl index ec512351..29a1df3a 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -904,7 +904,41 @@ headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, CookieStore0, EvHandlerState0} end. -request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, +request(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState) + when is_reference(StreamRef) -> + case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of + true -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), + {badstate, "The maximum number of concurrent streams is reached."}}, + {State, CookieStore, EvHandlerState}; + false -> + request1(State, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers, Body, InitialFlow, CookieStore, + EvHandler, EvHandlerState) + end; +%% Tunneled request. +request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, + Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + %% @todo We should send an error to the user if the stream isn't ready. + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ + origin_host := OriginHost, origin_port := OriginPort}}} -> + {ProtoState, CookieStore, EvHandlerState} = Proto:request(ProtoState0, RealStreamRef, + ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, + InitialFlow, CookieStore0, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + CookieStore, EvHandlerState}; + #stream{tunnel=undefined} -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}, + {State, CookieStore0, EvHandlerState0}; + error -> + {error_stream_not_found(State, StreamRef, ReplyTo), + CookieStore0, EvHandlerState0} + end. + +request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) when is_reference(StreamRef) -> @@ -949,26 +983,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, {StateRet, EvHandlerStateRet} = maybe_send_data( State, StreamID, fin, Body, EvHandler, EvHandlerState), {StateRet, CookieStore, EvHandlerStateRet} - end; -%% Tunneled request. -request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, - Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> - case get_stream_by_ref(State, StreamRef) of - %% @todo We should send an error to the user if the stream isn't ready. - Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ - origin_host := OriginHost, origin_port := OriginPort}}} -> - {ProtoState, CookieStore, EvHandlerState} = Proto:request(ProtoState0, RealStreamRef, - ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0), - {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), - CookieStore, EvHandlerState}; - #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream is not a tunnel."}}, - {State, CookieStore0, EvHandlerState0}; - error -> - {error_stream_not_found(State, StreamRef, ReplyTo), - CookieStore0, EvHandlerState0} end. initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl index 56555c19..7c0e087f 100644 --- a/test/rfc7540_SUITE.erl +++ b/test/rfc7540_SUITE.erl @@ -22,7 +22,8 @@ -import(gun_test, [receive_from/1]). all() -> - ct_helper:all(?MODULE). + [respect_max_concurrent_streams]. + %%ct_helper:all(?MODULE). %% Proxy helpers. @@ -374,6 +375,36 @@ lingering_data_counts_toward_connection_window(_) -> timer:sleep(300), gun:close(ConnPid). +respect_max_concurrent_streams(_) -> + doc("The SETTINGS_MAX_CONCURRENT_STREAMS setting can be used to " + "restrict the number of concurrent streams. (RFC7540 5.1.2, RFC7540 6.5.2)"), + {ok, Ref, Port} = do_cowboy_max_councurrent_streams(1), + try + {ok, ConnPid} = gun:open("localhost", Port, #{protocols => [http2]}), + {ok, http2} = gun:await_up(ConnPid), + StreamRef1 = gun:get(ConnPid, "/"), + timer:sleep(100), + StreamRef2 = gun:get(ConnPid, "/"), + {error, {stream_error, {badstate, Message}}} = gun:await(ConnPid, StreamRef2), + "The maximum number of concurrent streams is reached." = Message, + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), + {ok, _} = gun:await_body(ConnPid, StreamRef1), + gun:close(ConnPid) + after + cowboy:stop_listener(Ref) + end. + +do_cowboy_max_councurrent_streams(MaxConcurrentStreams) -> + Ref = make_ref(), + Routes = [{'_', [{"/", delayed_hello_h, 500}]}], + ProtoOpts = #{ + env => #{dispatch => cowboy_router:compile(Routes)}, + tcp => #{protocols => [http2]}, + max_concurrent_streams => MaxConcurrentStreams + }, + [{ref, _}, {port, Port}] = gun_test:init_cowboy_tcp(Ref, ProtoOpts, []), + {ok, Ref, Port}. + headers_priority_flag(_) -> doc("HEADERS frames may include a PRIORITY flag indicating " "that stream dependency information is attached. (RFC7540 6.2)"),