Skip to content

Commit

Permalink
Handle of HTTP/2 tunnel errors
Browse files Browse the repository at this point in the history
  • Loading branch information
essen committed Sep 19, 2022
1 parent 9e6d0f6 commit ece7ceb
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 71 deletions.
72 changes: 37 additions & 35 deletions src/gun_http2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,13 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan
{maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame),
CookieStore, EvHandlerState};
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data,
CookieStore, EvHandler, EvHandlerState);
{StateRet, CookieStoreRet, EvHandlerStateRet} = data_frame(
State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data,
CookieStore, EvHandler, EvHandlerState),
case StateRet of
{state, State1} -> {State1, CookieStoreRet, EvHandlerStateRet};
Error -> {Error, CookieStoreRet, EvHandlerStateRet}
end;
{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
headers_frame(State#http2_state{http2_machine=HTTP2Machine},
StreamID, IsFin, Headers, PseudoHeaders, BodyLen,
Expand Down Expand Up @@ -359,15 +364,17 @@ data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerStat
% %% @todo What about IsFin?
{Commands, CookieStore, EvHandlerState1} = Proto:handle(Data,
ProtoState0, CookieStore0, EvHandler, EvHandlerState0),
{State, EvHandlerState} = tunnel_commands(Commands, Stream, State0, EvHandler, EvHandlerState1),
{State, CookieStore, EvHandlerState}
%% The frame/parse functions only handle state or error commands.
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
Stream, State0, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState}
end.

tunnel_commands(Command, Stream, State, EvHandler, EvHandlerState)
when not is_list(Command) ->
tunnel_commands([Command], Stream, State, EvHandler, EvHandlerState);
tunnel_commands([], Stream, State, _EvHandler, EvHandlerState) ->
{store_stream(State, Stream), EvHandlerState};
{{state, store_stream(State, Stream)}, EvHandlerState};
tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID},
State0, EvHandler, EvHandlerState0) ->
{State, EvHandlerState} = maybe_send_data(State0, StreamID,
Expand All @@ -377,9 +384,11 @@ tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel},
State, EvHandler, EvHandlerState) ->
tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}},
State, EvHandler, EvHandlerState);
tunnel_commands([{error, _Reason}|_], #stream{id=StreamID},
tunnel_commands([{error, Reason}|_], #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
State, _EvHandler, EvHandlerState) ->
{delete_stream(State, StreamID), EvHandlerState};
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{stream_error, Reason, 'Tunnel closed unexpectedly.'}},
{{state, delete_stream(State, StreamID)}, EvHandlerState};
%% @todo Set a timeout for closing the Websocket stream.
tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
Expand Down Expand Up @@ -729,12 +738,14 @@ handle_continue(ContinueStreamRef, Msg, State0, CookieStore0, EvHandler, EvHandl
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
{Commands, CookieStore, EvHandlerState1} = Proto:handle_continue(ContinueStreamRef,
Msg, ProtoState0, CookieStore0, EvHandler, EvHandlerState0),
{State, EvHandlerState} = tunnel_commands(Commands, Stream, State0, EvHandler, EvHandlerState1),
{{state, State}, CookieStore, EvHandlerState}
%% The stream may have ended while TLS was being decoded. @todo What should we do?
% error ->
% error_stream_not_found(State, StreamRef, ReplyTo),
% {[], EvHandlerState0}
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
Stream, State0, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState};
%% The stream may have ended while TLS was being decoded.
%% We do not trigger an error because this is an internal event.
%% The stream_error, if any, was already sent from tunnel_commands.
error ->
{[], CookieStore0, EvHandlerState0}
end.

update_flow(State, _ReplyTo, StreamRef, Inc) ->
Expand Down Expand Up @@ -895,9 +906,9 @@ headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
{Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, RealStreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{State1, EvHandlerState} = tunnel_commands(Commands, Stream,
{ResCommands, EvHandlerState} = tunnel_commands(Commands, Stream,
State, EvHandler, EvHandlerState1),
{{state, State1}, CookieStore, EvHandlerState};
{ResCommands, CookieStore, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
Expand Down Expand Up @@ -963,9 +974,9 @@ request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
{Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{State1, EvHandlerState} = tunnel_commands(Commands,
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
Stream, State, EvHandler, EvHandlerState1),
{{state, State1}, CookieStore, EvHandlerState};
{ResCommands, CookieStore, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
Expand Down Expand Up @@ -1033,9 +1044,7 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin,
#tunnel{protocol=Proto, protocol_state=ProtoState0} = Tunnel,
{Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef,
ReplyTo, IsFin, Data, EvHandler, EvHandlerState),
{State1, EvHandlerStateRet} = tunnel_commands(Commands,
Stream, State, EvHandler, EvHandlerState1),
{{state, State1}, EvHandlerStateRet}
tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1)
end;
error ->
error_stream_not_found(State, StreamRef, ReplyTo),
Expand All @@ -1047,9 +1056,7 @@ data(State, RealStreamRef=[StreamRef|_], ReplyTo, IsFin, Data, EvHandler, EvHand
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
{Commands, EvHandlerState1} = Proto:data(ProtoState0, RealStreamRef,
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
{State1, EvHandlerState} = tunnel_commands(Commands,
Stream, State, EvHandler, EvHandlerState1),
{{state, State1}, EvHandlerState};
tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1);
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
Expand Down Expand Up @@ -1188,9 +1195,7 @@ connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, He
{Commands, EvHandlerState1} = Proto:connect(ProtoState0, RealStreamRef,
ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
EvHandler, EvHandlerState0),
{State1, EvHandlerState} = tunnel_commands(Commands,
Stream, State, EvHandler, EvHandlerState1),
{{state, State1}, EvHandlerState};
tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1);
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
Expand Down Expand Up @@ -1225,9 +1230,7 @@ cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0)
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
{Commands, EvHandlerState1} = Proto:cancel(ProtoState0,
RealStreamRef, ReplyTo, EvHandler, EvHandlerState0),
{State1, EvHandlerState} = tunnel_commands(Commands,
Stream, State, EvHandler, EvHandlerState1),
{{state, State1}, EvHandlerState};
tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1);
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
Expand Down Expand Up @@ -1376,23 +1379,22 @@ ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,
ProtoState0, RealStreamRef, ReplyTo,
Host, Port, Path, Headers, WsOpts,
CookieStore0, EvHandler, EvHandlerState0),
{State1, EvHandlerState} = tunnel_commands(Commands,
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
Stream, State, EvHandler, EvHandlerState1),
{{state, State1}, CookieStore, EvHandlerState}
{ResCommands, CookieStore, EvHandlerState}
%% @todo Error conditions?
end.

ws_send(Frames, State0, RealStreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
ws_send(Frames, State, RealStreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
StreamRef = case RealStreamRef of
[SR|_] -> SR;
_ -> RealStreamRef
end,
case get_stream_by_ref(State0, StreamRef) of
case get_stream_by_ref(State, StreamRef) of
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState}} ->
{Commands, EvHandlerState1} = Proto:ws_send(Frames, ProtoState,
RealStreamRef, ReplyTo, EvHandler, EvHandlerState0),
{State, EvHandlerState} = tunnel_commands(Commands, Stream, State0, EvHandler, EvHandlerState1),
{{state, State}, EvHandlerState}
tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1)
%% @todo Error conditions?
end.

Expand Down
81 changes: 45 additions & 36 deletions src/gun_tunnel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun

%% When we receive data we pass it forward directly for TCP;
%% or we decrypt it and pass it via handle_continue for TLS.
handle(Data, State0=#tunnel_state{transport=gun_tcp_proxy,
handle(Data, State=#tunnel_state{transport=gun_tcp_proxy,
protocol=Proto, protocol_state=ProtoState0},
CookieStore0, EvHandler, EvHandlerState0) ->
{Commands, CookieStore, EvHandlerState1} = Proto:handle(
Data, ProtoState0, CookieStore0, EvHandler, EvHandlerState0),
{State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
{{state, State}, CookieStore, EvHandlerState};
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState};
handle(Data, State=#tunnel_state{transport=gun_tls_proxy,
socket=ProxyPid, tls_origin_socket=OriginSocket},
CookieStore, _EvHandler, EvHandlerState) ->
Expand Down Expand Up @@ -226,13 +226,13 @@ handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data},
when is_reference(ContinueStreamRef) ->
{{send, IsFin, Data}, CookieStore, EvHandlerState};
handle_continue(ContinueStreamRef, {tls_proxy, ProxyPid, Data},
State0=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState},
State=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState},
CookieStore0, EvHandler, EvHandlerState0)
when is_reference(ContinueStreamRef) ->
{Commands, CookieStore, EvHandlerState1} = Proto:handle(
Data, ProtoState, CookieStore0, EvHandler, EvHandlerState0),
{State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
{{state, State}, CookieStore, EvHandlerState};
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState};
handle_continue(ContinueStreamRef, {tls_proxy_closed, ProxyPid},
#tunnel_state{socket=ProxyPid}, CookieStore, _EvHandler, EvHandlerState0)
when is_reference(ContinueStreamRef) ->
Expand All @@ -248,24 +248,24 @@ handle_continue(ContinueStreamRef, {tls_proxy_error, ProxyPid, Reason},
%%
%% @todo Assert StreamRef to be our reference().
handle_continue([_StreamRef|ContinueStreamRef0], Msg,
State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
State=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
CookieStore0, EvHandler, EvHandlerState0) ->
ContinueStreamRef = case ContinueStreamRef0 of
[CSR] -> CSR;
_ -> ContinueStreamRef0
end,
{Commands, CookieStore, EvHandlerState1} = Proto:handle_continue(
ContinueStreamRef, Msg, ProtoState, CookieStore0, EvHandler, EvHandlerState0),
{State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
{{state, State}, CookieStore, EvHandlerState}.
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState}.

%% @todo This function will need EvHandler/EvHandlerState?
update_flow(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
update_flow(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
ReplyTo, StreamRef0, Inc) ->
StreamRef = maybe_dereference(State0, StreamRef0),
StreamRef = maybe_dereference(State, StreamRef0),
Commands = Proto:update_flow(ProtoState, ReplyTo, StreamRef, Inc),
{State, undefined} = commands(Commands, State0, undefined, undefined),
{state, State}.
{ResCommands, undefined} = commands(Commands, State, undefined, undefined),
ResCommands.

closing(_Reason, _State, _EvHandler, EvHandlerState) ->
%% @todo Graceful shutdown must be propagated to tunnels.
Expand All @@ -280,27 +280,27 @@ keepalive(_State, _EvHandler, EvHandlerState) ->
{[], EvHandlerState}.

%% We pass the headers forward and optionally dereference StreamRef.
headers(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
headers(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
StreamRef0, ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
StreamRef = maybe_dereference(State0, StreamRef0),
StreamRef = maybe_dereference(State, StreamRef0),
{Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, StreamRef,
ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
{{state, State}, CookieStore, EvHandlerState}.
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState}.

%% We pass the request forward and optionally dereference StreamRef.
request(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState0,
request(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0,
info=#{origin_host := OriginHost, origin_port := OriginPort}},
StreamRef0, ReplyTo, Method, _Host, _Port, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
StreamRef = maybe_dereference(State0, StreamRef0),
StreamRef = maybe_dereference(State, StreamRef0),
{Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, StreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
{{state, State}, CookieStore, EvHandlerState}.
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState}.

%% When the next tunnel is SOCKS we pass the data forward directly.
%% This is needed because SOCKS has no StreamRef and the data cannot
Expand All @@ -310,8 +310,8 @@ data(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0,
StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) ->
{Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef,
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
{State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{{state, State1}, EvHandlerState};
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, EvHandlerState};
%% CONNECT tunnels pass the data forward and dereference StreamRef
%% unless they are the recipient of the callback, in which case the
%% data is sent to the socket.
Expand All @@ -327,9 +327,9 @@ data(State=#tunnel_state{socket=Socket, transport=Transport,
StreamRef = maybe_dereference(State, StreamRef0),
{Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef,
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
{State1, EvHandlerState} = commands(Commands, State,
{ResCommands, EvHandlerState} = commands(Commands, State,
EvHandler, EvHandlerState1),
{{state, State1}, EvHandlerState}
{ResCommands, EvHandlerState}
end.

%% We pass the CONNECT request forward and optionally dereference StreamRef.
Expand All @@ -341,16 +341,16 @@ connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port},
{Commands, EvHandlerState1} = Proto:connect(ProtoState0, StreamRef,
ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow,
EvHandler, EvHandlerState0),
{State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{{state, State1}, EvHandlerState}.
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, EvHandlerState}.

cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
StreamRef0, ReplyTo, EvHandler, EvHandlerState0) ->
StreamRef = maybe_dereference(State, StreamRef0),
{Commands, EvHandlerState1} = Proto:cancel(ProtoState0, StreamRef,
ReplyTo, EvHandler, EvHandlerState0),
{State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{{state, State1}, EvHandlerState}.
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, EvHandlerState}.

timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) ->
case Proto:timeout(ProtoState0, Msg, TRef) of
Expand Down Expand Up @@ -436,23 +436,32 @@ ws_upgrade(State=#tunnel_state{info=TunnelInfo, protocol=Proto, protocol_state=P
{Commands, CookieStore, EvHandlerState1} = Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo,
Host, Port, Path, Headers, WsOpts,
CookieStore0, EvHandler, EvHandlerState0),
{State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{{state, State1}, CookieStore, EvHandlerState}.
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState}.

ws_send(Frames, State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
ws_send(Frames, State=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
StreamRef0, ReplyTo, EvHandler, EvHandlerState0) ->
StreamRef = maybe_dereference(State0, StreamRef0),
StreamRef = maybe_dereference(State, StreamRef0),
{Commands, EvHandlerState1} = Proto:ws_send(Frames,
ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
{State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
{{state, State}, EvHandlerState}.
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
{ResCommands, EvHandlerState}.

%% Internal.

commands(Command, State, EvHandler, EvHandlerState) when not is_list(Command) ->
commands([Command], State, EvHandler, EvHandlerState);
commands([], State, _, EvHandlerState) ->
{State, EvHandlerState};
{{state, State}, EvHandlerState};
commands([Error = {error, _}|_],
State=#tunnel_state{socket=Socket, transport=Transport},
_, EvHandlerState) ->
%% We must terminate the TLS proxy pid if any.
case Transport of
gun_tls_proxy -> gun_tls_proxy:close(Socket);
_ -> ok
end,
{[{state, State}, Error], EvHandlerState};
commands([{state, ProtoState}|Tail], State, EvHandler, EvHandlerState) ->
commands(Tail, State#tunnel_state{protocol_state=ProtoState}, EvHandler, EvHandlerState);
%% @todo What to do about IsFin?
Expand Down

0 comments on commit ece7ceb

Please sign in to comment.