Skip to content

Commit

Permalink
Respect remote MAX_CONCURRENT_STREAMS
Browse files Browse the repository at this point in the history
If the limit has been reached, new requests are failed immediately,
so that the application can retry them on a different connection.
  • Loading branch information
zuiderkwast committed Oct 28, 2021
1 parent f917599 commit c19e6d3
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 22 deletions.
56 changes: 35 additions & 21 deletions src/gun_http2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,41 @@ headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
CookieStore0, EvHandlerState0}
end.

request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
request(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, Method, Host, Port,
Path, Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState)
when is_reference(StreamRef) ->
case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of
true ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The maximum number of concurrent streams is reached."}},
{State, CookieStore, EvHandlerState};
false ->
request1(State, StreamRef, ReplyTo, Method, Host, Port,
Path, Headers, Body, InitialFlow, CookieStore,
EvHandler, EvHandlerState)
end;
%% Tunneled request.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo We should send an error to the user if the stream isn't ready.
Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
origin_host := OriginHost, origin_port := OriginPort}}} ->
{ProtoState, CookieStore, EvHandlerState} = Proto:request(ProtoState0, RealStreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
CookieStore, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
{State, CookieStore0, EvHandlerState0};
error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
CookieStore0, EvHandlerState0}
end.

request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
when is_reference(StreamRef) ->
Expand Down Expand Up @@ -949,26 +983,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
{StateRet, EvHandlerStateRet} = maybe_send_data(
State, StreamID, fin, Body, EvHandler, EvHandlerState),
{StateRet, CookieStore, EvHandlerStateRet}
end;
%% Tunneled request.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo We should send an error to the user if the stream isn't ready.
Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
origin_host := OriginHost, origin_port := OriginPort}}} ->
{ProtoState, CookieStore, EvHandlerState} = Proto:request(ProtoState0, RealStreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
CookieStore, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
{State, CookieStore0, EvHandlerState0};
error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
CookieStore0, EvHandlerState0}
end.

initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
Expand Down
33 changes: 32 additions & 1 deletion test/rfc7540_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
-import(gun_test, [receive_from/1]).

all() ->
ct_helper:all(?MODULE).
[respect_max_concurrent_streams].
%%ct_helper:all(?MODULE).

%% Proxy helpers.

Expand Down Expand Up @@ -374,6 +375,36 @@ lingering_data_counts_toward_connection_window(_) ->
timer:sleep(300),
gun:close(ConnPid).

respect_max_concurrent_streams(_) ->
doc("The SETTINGS_MAX_CONCURRENT_STREAMS setting can be used to "
"restrict the number of concurrent streams. (RFC7540 5.1.2, RFC7540 6.5.2)"),
{ok, Ref, Port} = do_cowboy_max_councurrent_streams(1),
try
{ok, ConnPid} = gun:open("localhost", Port, #{protocols => [http2]}),
{ok, http2} = gun:await_up(ConnPid),
StreamRef1 = gun:get(ConnPid, "/"),
timer:sleep(100),
StreamRef2 = gun:get(ConnPid, "/"),
{error, {stream_error, {badstate, Message}}} = gun:await(ConnPid, StreamRef2),
"The maximum number of concurrent streams is reached." = Message,
{response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
{ok, _} = gun:await_body(ConnPid, StreamRef1),
gun:close(ConnPid)
after
cowboy:stop_listener(Ref)
end.

do_cowboy_max_councurrent_streams(MaxConcurrentStreams) ->
Ref = make_ref(),
Routes = [{'_', [{"/", delayed_hello_h, 500}]}],
ProtoOpts = #{
env => #{dispatch => cowboy_router:compile(Routes)},
tcp => #{protocols => [http2]},
max_concurrent_streams => MaxConcurrentStreams
},
[{ref, _}, {port, Port}] = gun_test:init_cowboy_tcp(Ref, ProtoOpts, []),
{ok, Ref, Port}.

headers_priority_flag(_) ->
doc("HEADERS frames may include a PRIORITY flag indicating "
"that stream dependency information is attached. (RFC7540 6.2)"),
Expand Down

0 comments on commit c19e6d3

Please sign in to comment.