Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into respect-remote-conc…
Browse files Browse the repository at this point in the history
…urrency-limit
  • Loading branch information
zuiderkwast committed Oct 6, 2022
2 parents e1b73b4 + ece7ceb commit 7d3e6e5
Show file tree
Hide file tree
Showing 479 changed files with 1,003 additions and 840 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ dep_cowlib = git https://github.com/Nordix/cowlib respect-remote-concurrency-lim

DOC_DEPS = asciideck

TEST_DEPS = $(if $(CI_ERLANG_MK),ci.erlang.mk) ct_helper cowboy ranch
dep_ct_helper = git https://github.com/extend/ct_helper.git master
TEST_DEPS = $(if $(CI_ERLANG_MK),ci.erlang.mk) ct_helper cowboy ranch jsx
dep_ct_helper = git https://github.com/ninenines/ct_helper.git master
dep_cowboy_commit = 2.9.0
dep_ranch_commit = 2.0.0

Expand Down
60 changes: 34 additions & 26 deletions src/gun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ flush(ServerPid) when is_pid(ServerPid) ->
flush(StreamRef) ->
flush_ref(StreamRef).

-spec flush_pid(pid()) -> ok.
flush_pid(ServerPid) ->
receive
{gun_up, ServerPid, _} ->
Expand Down Expand Up @@ -871,10 +872,11 @@ flush_pid(ServerPid) ->
ok
end.

-spec flush_ref(stream_ref()) -> ok.
flush_ref(StreamRef) ->
receive
{gun_inform, _, StreamRef, _, _} ->
flush_pid(StreamRef);
flush_ref(StreamRef);
{gun_response, _, StreamRef, _, _, _} ->
flush_ref(StreamRef);
{gun_data, _, StreamRef, _, _} ->
Expand Down Expand Up @@ -1102,10 +1104,15 @@ ensure_alpn_sni(Protocols0, TransOpts0, OriginHost) ->
%%
%% Normally only DNS hostnames are supported for SNI. However, the ssl
%% application itself allows any string through so we do the same.
if
is_list(OriginHost) -> [{server_name_indication, OriginHost}|TransOpts];
is_atom(OriginHost) -> [{server_name_indication, atom_to_list(OriginHost)}|TransOpts];
true -> TransOpts
%%
%% Only add SNI if not already present and OriginHost isn't an IP address.
case lists:keymember(server_name_indication, 1, TransOpts) of
false when is_list(OriginHost) ->
[{server_name_indication, OriginHost}|TransOpts];
false when is_atom(OriginHost) ->
[{server_name_indication, atom_to_list(OriginHost)}|TransOpts];
_ ->
TransOpts
end.

%% Normal TLS handshake.
Expand Down Expand Up @@ -1230,7 +1237,8 @@ connected_ws_only(Type, Event, State) ->
connected(internal, {connected, Socket, NewProtocol},
State0=#state{owner=Owner, opts=Opts, transport=Transport}) ->
{Protocol, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
{StateName, ProtoState} = Protocol:init(Owner, Socket, Transport, ProtoOpts),
%% @todo Handle error result from Protocol:init/4
{ok, StateName, ProtoState} = Protocol:init(Owner, Socket, Transport, ProtoOpts),
Owner ! {gun_up, self(), Protocol:name()},
case active(State0#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}) of
{ok, State} ->
Expand All @@ -1249,32 +1257,31 @@ connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, CookieStore, EvHandlerState} = Protocol:headers(ProtoState,
{Commands, CookieStore, EvHandlerState} = Protocol:headers(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore,
event_handler_state=EvHandlerState}};
commands(Commands, State#state{cookie_store=CookieStore,
event_handler_state=EvHandlerState});
connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, CookieStore, EvHandlerState} = Protocol:request(ProtoState,
{Commands, CookieStore, EvHandlerState} = Protocol:request(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore,
event_handler_state=EvHandlerState}};
commands(Commands, State#state{cookie_store=CookieStore,
event_handler_state=EvHandlerState});
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:connect(ProtoState,
{Commands, EvHandlerState} = Protocol:connect(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Destination, #{host => Host, port => Port},
Headers, InitialFlow, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2,
event_handler_state=EvHandlerState}};
commands(Commands, State#state{event_handler_state=EvHandlerState});
%% Public Websocket interface.
connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) ->
WsOpts = maps:get(ws_opts, Opts, #{}),
Expand All @@ -1289,11 +1296,11 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts},
opts => WsOpts
}, EvHandlerState0),
%% @todo Can fail if HTTP/1.0.
{ProtoState2, CookieStore, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
{Commands, CookieStore, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState1),
{keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore,
event_handler_state=EvHandlerState}};
commands(Commands, State#state{cookie_store=CookieStore,
event_handler_state=EvHandlerState});
%% @todo Maybe better standardize the protocol callbacks argument orders.
connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
protocol=Protocol, protocol_state=ProtoState,
Expand Down Expand Up @@ -1370,10 +1377,10 @@ closing(Type, Event, State) ->
handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
{Commands, EvHandlerState} = Protocol:data(ProtoState,
dereference_stream_ref(StreamRef, State),
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
commands(Commands, State#state{event_handler_state=EvHandlerState});
handle_common_connected(info, {timeout, TRef, Name}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
Commands = Protocol:timeout(ProtoState, Name, TRef),
Expand Down Expand Up @@ -1419,19 +1426,19 @@ handle_common_connected_no_input(info, {handle_continue, StreamRef, Msg}, _,
handle_common_connected_no_input(info, keepalive, _,
State=#state{protocol=Protocol, protocol_state=ProtoState0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState, EvHandlerState} = Protocol:keepalive(ProtoState0, EvHandler, EvHandlerState0),
{keep_state, keepalive_timeout(State#state{
protocol_state=ProtoState, event_handler_state=EvHandlerState})};
{Commands, EvHandlerState} = Protocol:keepalive(ProtoState0, EvHandler, EvHandlerState0),
commands(Commands, keepalive_timeout(State#state{
event_handler_state=EvHandlerState}));
handle_common_connected_no_input(cast, {update_flow, ReplyTo, StreamRef, Flow}, _,
State0=#state{protocol=Protocol, protocol_state=ProtoState}) ->
Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
maybe_active(commands(Commands, State0));
handle_common_connected_no_input(cast, {cancel, ReplyTo, StreamRef}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
{Commands, EvHandlerState} = Protocol:cancel(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
commands(Commands, State#state{event_handler_state=EvHandlerState});
handle_common_connected_no_input({call, From}, {stream_info, StreamRef}, _,
State=#state{intermediaries=Intermediaries0, protocol=Protocol, protocol_state=ProtoState}) ->
Intermediaries = [I || I=#{protocol := http} <- Intermediaries0],
Expand Down Expand Up @@ -1673,7 +1680,8 @@ commands([{switch_protocol, NewProtocol, ReplyTo}], State0=#state{
#{tunnel_transport := _} -> ProtoOpts0;
_ -> ProtoOpts0#{tunnel_transport => tcp}
end,
{StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts),
%% @todo Handle error result from Protocol:init/4
{ok, StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts),
ProtocolChangedEvent = case ProtoOpts of
#{stream_ref := StreamRef} ->
#{stream_ref => StreamRef, protocol => Protocol:name()};
Expand Down
2 changes: 2 additions & 0 deletions src/gun_cookies.erl
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ session_gc({Mod, State0}) ->
%% @todo The given URI must be normalized.
-spec set_cookie(Store, uri_string:uri_map(), binary(), binary(), cow_cookie:cookie_attrs())
-> {ok, Store} | {error, any()} when Store::store().
set_cookie(_, _, Name, Value, _) when byte_size(Name) + byte_size(Value) > 4096 ->
{error, larger_than_4096_bytes};
set_cookie(Store, URI=#{host := Host}, Name, Value, Attrs) ->
%% This is where we would add a feature to block cookies (like a blacklist).
CurrentTime = erlang:universaltime(),
Expand Down
76 changes: 40 additions & 36 deletions src/gun_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ default_keepalive() -> infinity.
init(_ReplyTo, Socket, Transport, Opts) ->
BaseStreamRef = maps:get(stream_ref, Opts, undefined),
Version = maps:get(version, Opts, 'HTTP/1.1'),
{connected, #http_state{socket=Socket, transport=Transport,
{ok, connected, #http_state{socket=Socket, transport=Transport,
opts=Opts, version=Version, base_stream_ref=BaseStreamRef}}.

switch_transport(Transport, Socket, State) ->
Expand Down Expand Up @@ -546,45 +546,45 @@ close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
close_streams(State, Tail, Reason).

%% We don't send a keep-alive when a CONNECT request was initiated.
keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) ->
{State, EvHandlerState};
keepalive(#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) ->
{[], EvHandlerState};
%% We can only keep-alive by sending an empty line in-between streams.
keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) ->
keepalive(#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) ->
Transport:send(Socket, <<"\r\n">>),
{State, EvHandlerState};
keepalive(State, _, EvHandlerState) ->
{State, EvHandlerState}.
{[], EvHandlerState};
keepalive(_State, _, EvHandlerState) ->
{[], EvHandlerState}.

headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{State, CookieStore, EvHandlerState};
{[], CookieStore, EvHandlerState};
headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
{Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = initial_flow(InitialFlow0, Opts),
{new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo,
Method, Authority, Path, InitialFlow),
{{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.

request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{State, CookieStore, EvHandlerState};
{[], CookieStore, EvHandlerState};
request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
{Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = initial_flow(InitialFlow0, Opts),
{new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo,
Method, Authority, Path, InitialFlow),
{{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.

initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
Expand Down Expand Up @@ -677,10 +677,12 @@ scheme(#http_state{transport=Transport}) ->

%% We are expecting a new stream.
data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _, _, EvHandlerState) ->
{error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState};
error_stream_closed(State, StreamRef, ReplyTo),
{[], EvHandlerState};
%% There are no active streams.
data(State=#http_state{streams=[]}, StreamRef, ReplyTo, _, _, _, EvHandlerState) ->
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState};
error_stream_not_found(State, StreamRef, ReplyTo),
{[], EvHandlerState};
%% We can only send data on the last created stream.
data(State=#http_state{socket=Socket, transport=Transport, version=Version,
out=Out, streams=Streams}, StreamRef, ReplyTo, IsFin, Data,
Expand All @@ -690,10 +692,10 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
DataLength = iolist_size(Data),
case Out of
body_chunked when Version =:= 'HTTP/1.1', IsFin =:= fin ->
case Data of
<<>> ->
if
DataLength =:= 0 ->
Transport:send(Socket, cow_http_te:last_chunk());
_ ->
true ->
Transport:send(Socket, [
cow_http_te:chunk(Data),
cow_http_te:last_chunk()
Expand All @@ -704,10 +706,10 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
{State#http_state{out=head}, EvHandlerState};
{{state, State#http_state{out=head}}, EvHandlerState};
body_chunked when Version =:= 'HTTP/1.1' ->
Transport:send(Socket, cow_http_te:chunk(Data)),
{State, EvHandlerState0};
{[], EvHandlerState0};
{body, Length} when DataLength =< Length ->
Transport:send(Socket, Data),
Length2 = Length - DataLength,
Expand All @@ -718,28 +720,29 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
{State#http_state{out=head}, EvHandlerState};
{{state, State#http_state{out=head}}, EvHandlerState};
Length2 > 0, IsFin =:= nofin ->
{State#http_state{out={body, Length2}}, EvHandlerState0}
{{state, State#http_state{out={body, Length2}}}, EvHandlerState0}
end;
body_chunked -> %% HTTP/1.0
Transport:send(Socket, Data),
{State, EvHandlerState0}
{[], EvHandlerState0}
end;
_ ->
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
error_stream_not_found(State, StreamRef, ReplyTo),
{[], EvHandlerState0}
end.

connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{State, EvHandlerState};
{[], EvHandlerState};
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when Streams =/= [] ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
{State, EvHandlerState};
{[], EvHandlerState};
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
EvHandler, EvHandlerState0) ->
Expand Down Expand Up @@ -786,8 +789,8 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
{new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
<<"CONNECT">>, Authority, <<>>, InitialFlow),
{{state, new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
<<"CONNECT">>, Authority, <<>>, InitialFlow)},
EvHandlerState}.

%% We can't cancel anything, we can just stop forwarding messages to the owner.
Expand All @@ -801,9 +804,10 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
endpoint => local,
reason => cancel
}, EvHandlerState0),
{State, EvHandlerState};
{{state, State}, EvHandlerState};
false ->
{error_stream_not_found(State0, StreamRef, ReplyTo), EvHandlerState0}
error_stream_not_found(State0, StreamRef, ReplyTo),
{[], EvHandlerState0}
end.

stream_info(#http_state{streams=Streams}, StreamRef) ->
Expand Down Expand Up @@ -831,12 +835,12 @@ down(#http_state{streams=Streams}) ->
error_stream_closed(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
State.
ok.

error_stream_not_found(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream cannot be found."}},
State.
ok.

%% Headers information retrieval.

Expand Down Expand Up @@ -927,12 +931,12 @@ ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerSt
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{State, CookieStore, EvHandlerState};
{[], CookieStore, EvHandlerState};
ws_upgrade(State=#http_state{version='HTTP/1.0'},
StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"Websocket cannot be used over an HTTP/1.0 connection."}},
{State, CookieStore, EvHandlerState};
{[], CookieStore, EvHandlerState};
ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
Host, Port, Path, Headers0, WsOpts, CookieStore0, EvHandler, EvHandlerState0) ->
{Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of
Expand Down Expand Up @@ -960,9 +964,9 @@ ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
StreamRef, ReplyTo, <<"GET">>, Host, Port, Path, Headers, undefined,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = maps:get(flow, WsOpts, infinity),
{new_stream(State#http_state{connection=Conn, out=Out},
{{state, new_stream(State#http_state{connection=Conn, out=Out},
#websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts},
ReplyTo, <<"GET">>, Authority, Path, InitialFlow),
ReplyTo, <<"GET">>, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.

ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) ->
Expand Down
Loading

0 comments on commit 7d3e6e5

Please sign in to comment.