Skip to content

Commit

Permalink
de-functorize 'State_flow'
Browse files Browse the repository at this point in the history
  • Loading branch information
ulugbekna committed Feb 8, 2021
1 parent d2830dc commit 285dce3
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 117 deletions.
17 changes: 9 additions & 8 deletions src/not-so-smart/fetch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ struct
List.fold_left fold [] have |> List.split

module V1 = struct
module Smart_flow = State_flow.Make (Smart)

let fetch ?(uses_git_transport = false) ?(push_stdout = ignore)
?(push_stderr = ignore) ~capabilities ?deepen ?want:(refs = `None) ~host
path flow store access fetch_cfg push_pack =
Expand Down Expand Up @@ -106,7 +104,7 @@ struct

let ctx = Smart.Context.make ~client_caps in

Smart_flow.run sched io_raise io flow (prelude ctx) |> prj
State_flow.run sched io_raise Smart.pp_error io flow (prelude ctx) |> prj
>>= fun (uids, refs) ->
let hex =
{ Neg.to_hex = Uid.to_hex; of_hex = Uid.of_hex; compare = Uid.compare }
Expand All @@ -131,7 +129,9 @@ struct
if res < 0 then Log.warn (fun m -> m "No common commits");
let rec read_pack () =
Log.debug (fun m -> m "Reading PACK file...");
Smart_flow.run sched io_raise io flow (recv_pack_state ctx) |> prj
State_flow.run sched io_raise Smart.pp_error io flow
(recv_pack_state ctx)
|> prj
>>= fun should_continue ->
if should_continue then read_pack () else return ()
in
Expand All @@ -140,8 +140,6 @@ struct
end

module V2 = struct
module State_flow = State_flow.Make (Wire_proto_v2)

let connect ?(uses_git_transport = false) ~host ~path ctx =
let open Wire_proto_v2.Syntax in
let return = Wire_proto_v2.return in
Expand All @@ -162,7 +160,9 @@ struct
let* () = Wire_proto_v2.send ctx Flush () in
Wire_proto_v2.return caps
in
State_flow.run sched io_raise io flow (get_caps ctx) |> prj
State_flow.run sched io_raise Wire_proto_v2.pp_error io flow
(get_caps ctx)
|> prj

let ls_refs_request ?(uses_git_transport = false) ~host ~path ctx flow req =
let ls_refs_resp =
Expand All @@ -172,6 +172,7 @@ struct
let* () = Wire_proto_v2.send ctx Ls_refs_req (`Client_caps caps, req) in
Wire_proto_v2.recv ctx Ls_refs_res
in
State_flow.run sched io_raise io flow ls_refs_resp |> prj
State_flow.run sched io_raise Wire_proto_v2.pp_error io flow ls_refs_resp
|> prj
end
end
27 changes: 16 additions & 11 deletions src/not-so-smart/find_common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ let io_monad (type t) { bind; return } =
with Smart.v1 and implement a state of the art synchronisation algorithm, I
translated as is [fetch-pack.c:find_common] in OCaml. *)

module Smart_flow = State_flow.Make (Smart)

let tips (type t) scheduler access store negotiator =
let open (val io_monad scheduler : Io_monad with type s = t) in
access.locals store >>= fun ref_lst ->
Expand All @@ -76,13 +74,15 @@ let consume_shallow_list (type t) scheduler io flow cfg deepen { of_hex; _ } ctx
=
let open (val io_monad scheduler : Io_monad with type s = t) in
if cfg.stateless && Option.is_some deepen then
Smart_flow.run scheduler raise io flow Smart.(recv ctx shallows)
State_flow.run scheduler raise Smart.pp_error io flow
Smart.(recv ctx shallows)
>>| fun shallows -> List.map (Smart.Shallow.map ~f:of_hex) shallows
else return []

let handle_shallow (type t) scheduler io flow { of_hex; _ } access store ctx =
let open (val io_monad scheduler : Io_monad with type s = t) in
Smart_flow.run scheduler raise io flow Smart.(recv ctx shallows)
State_flow.run scheduler raise Smart.pp_error io flow
Smart.(recv ctx shallows)
>>= fun shallows ->
let shallows = List.map (Smart.Shallow.map ~f:of_hex) shallows in
fold_left_s shallows ~init:() ~f:(fun () -> function
Expand Down Expand Up @@ -115,13 +115,14 @@ let find_common (type t) scheduler io flow cfg
>>= function
| [] ->
Log.debug (fun m -> m "Nothing to download.");
Smart_flow.run scheduler raise io flow Smart.(send ctx flush ())
State_flow.run scheduler raise Smart.pp_error io flow
Smart.(send ctx flush ())
>>= fun () -> return `Close
| (uid, _) :: others as refs ->
Log.debug (fun m -> m "We want %d commit(s)." (List.length refs));
access.shallowed store >>= fun shallowed ->
let shallowed = List.map to_hex shallowed in
Smart_flow.run scheduler raise io flow
State_flow.run scheduler raise Smart.pp_error io flow
Smart.(
let uid = to_hex uid in
let others = List.map (fun (uid, _) -> to_hex uid) others in
Expand Down Expand Up @@ -164,7 +165,8 @@ let find_common (type t) scheduler io flow cfg
m "count: %d, in-vain: %d, flush-at: %d.\n%!" !count !in_vain
!flush_at);
if !flush_at <= !count then (
Smart_flow.run scheduler raise io flow Smart.(send ctx flush ())
State_flow.run scheduler raise Smart.pp_error io flow
Smart.(send ctx flush ())
>>= fun () ->
incr flushes;
flush_at := next_flush stateless !count;
Expand All @@ -173,7 +175,8 @@ let find_common (type t) scheduler io flow cfg
consume_shallow_list scheduler io flow cfg None hex ctx
>>= fun _shallows ->
let rec loop () =
Smart_flow.run scheduler raise io flow Smart.(recv ctx ack)
State_flow.run scheduler raise Smart.pp_error io flow
Smart.(recv ctx ack)
>>| Smart.Negotiation.map ~f:of_hex
>>= fun ack ->
match ack with
Expand Down Expand Up @@ -238,7 +241,7 @@ let find_common (type t) scheduler io flow cfg
Log.debug (fun m ->
m "Negotiation (got ready: %b, no-done: %b)." !got_ready no_done);
(if (not !got_ready) || not no_done then
Smart_flow.run scheduler raise io flow
State_flow.run scheduler raise Smart.pp_error io flow
Smart.(send ctx negotiation_done ())
else return ())
>>= fun () ->
Expand All @@ -247,14 +250,16 @@ let find_common (type t) scheduler io flow cfg
incr flushes);
(if (not !got_ready) || not no_done then (
Log.debug (fun m -> m "Negotiation is done!");
Smart_flow.run scheduler raise io flow Smart.(recv ctx shallows)
State_flow.run scheduler raise Smart.pp_error io flow
Smart.(recv ctx shallows)
>>= fun _shallows -> return ())
else return ())
>>= fun () ->
let rec go () =
if !flushes > 0 || cfg.multi_ack = `Some || cfg.multi_ack = `Detailed
then (
Smart_flow.run scheduler raise io flow Smart.(recv ctx ack)
State_flow.run scheduler raise Smart.pp_error io flow
Smart.(recv ctx ack)
>>| Smart.Negotiation.map ~f:of_hex
>>= fun ack ->
match ack with
Expand Down
16 changes: 9 additions & 7 deletions src/not-so-smart/push.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ struct
pp_error = Flow.pp_error;
}

module Smart_flow = State_flow.Make (Smart)

let push ?(uses_git_transport = true) ~capabilities:client_caps cmds ~host
path flow store access push_cfg pack =
let fiber ctx =
Expand All @@ -57,7 +55,7 @@ struct
return (Smart.Advertised_refs.map ~fuid:Uid.of_hex ~fref:Ref.v v)
in
let ctx = Smart.Context.make ~client_caps in
Smart_flow.run sched fail io flow (fiber ctx) |> prj
State_flow.run sched fail Smart.pp_error io flow (fiber ctx) |> prj
>>= fun advertised_refs ->
Pck.commands sched
~capabilities:(Smart.Advertised_refs.capabilities advertised_refs)
Expand All @@ -66,10 +64,12 @@ struct
|> prj
>>= function
| None ->
Smart_flow.run sched fail io flow Smart.(send ctx flush ()) |> prj
State_flow.run sched fail Smart.pp_error io flow
Smart.(send ctx flush ())
|> prj
>>= fun () -> return ()
| Some cmds -> (
Smart_flow.run sched fail io flow
State_flow.run sched fail Smart.pp_error io flow
Smart.(
send ctx commands
(Commands.map ~fuid:Uid.to_hex ~fref:Ref.to_string cmds))
Expand Down Expand Up @@ -101,14 +101,16 @@ struct
Log.debug (fun m ->
m "report-status capability: %b." report_status);
if report_status then
Smart_flow.run sched fail io flow Smart.(recv ctx status)
State_flow.run sched fail Smart.pp_error io flow
Smart.(recv ctx status)
|> prj
>>| Smart.Status.map ~f:Ref.v
else
let cmds = List.map R.ok (Smart.Commands.commands cmds) in
return (Smart.Status.v cmds)
| Some payload ->
Smart_flow.run sched fail io flow Smart.(send ctx pack payload)
State_flow.run sched fail Smart.pp_error io flow
Smart.(send ctx pack payload)
|> prj
>>= fun () -> go ()
in
Expand Down
8 changes: 4 additions & 4 deletions src/not-so-smart/smart.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ module Value = struct

type error = [ Protocol.Encoder.error | Protocol.Decoder.error ]

let pp_error ppf = function
| #Protocol.Encoder.error as err -> Protocol.Encoder.pp_error ppf err
| #Protocol.Decoder.error as err -> Protocol.Decoder.pp_error ppf err

let encode :
type a. encoder -> a send -> a -> (unit, [> Encoder.error ]) State.t =
fun encoder w v ->
Expand Down Expand Up @@ -137,10 +141,6 @@ let send_advertised_refs : _ send = Advertised_refs

include State.Scheduler (Value)

let pp_error ppf = function
| #Protocol.Encoder.error as err -> Protocol.Encoder.pp_error ppf err
| #Protocol.Decoder.error as err -> Protocol.Decoder.pp_error ppf err

module Unsafe = struct
let write context packet =
let encoder = Context.encoder context in
Expand Down
3 changes: 3 additions & 0 deletions src/not-so-smart/state.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ module type VALUE = sig
type encoder
type decoder

val pp_error : error Fmt.t
val encode : encoder -> 'a send -> 'a -> (unit, error) t
val decode : decoder -> 'a recv -> ('a, error) t
end
Expand Down Expand Up @@ -77,6 +78,8 @@ module Scheduler
struct
type error = Value.error

let pp_error = Value.pp_error

let bind : ('a, 'err) t -> f:('a -> ('b, 'err) t) -> ('b, 'err) t =
let rec bind' m ~f =
match m with
Expand Down
2 changes: 2 additions & 0 deletions src/not-so-smart/state.mli
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ module type VALUE = sig
type encoder
type decoder

val pp_error : error Fmt.t
val encode : encoder -> 'a send -> 'a -> (unit, error) t
val decode : decoder -> 'a recv -> ('a, error) t
end
Expand Down Expand Up @@ -57,6 +58,7 @@ module Scheduler
and type decoder = Context.decoder) : sig
type error = Value.error

val pp_error : error Fmt.t
val return : 'v -> ('v, 'err) t
val bind : ('a, 'err) t -> f:('a -> ('b, 'err) t) -> ('b, 'err) t
val ( >>= ) : ('a, 'err) t -> ('a -> ('b, 'err) t) -> ('b, 'err) t
Expand Down
116 changes: 53 additions & 63 deletions src/not-so-smart/state_flow.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,70 +7,60 @@ let io_buffer_size = 65536

type ('a, 's) raise = exn -> ('a, 's) io

module Make (Read_write : sig
type ('a, 'err) t = ('a, 'err) State.t
type error
let run :
type fl s err.
s scheduler ->
('a, s) raise ->
err Fmt.t ->
(fl, 'error, s) flow ->
fl ->
('res, [ `Protocol of err ]) State.t ->
('res, s) io =
fun scheduler io_raise pp_error flow_ops flow state ->
let { bind; return } = scheduler in
let ( >>= ) = bind in

val pp_error : error Fmt.t
end) =
struct
type nonrec error = Read_write.error
let failwithf fmt =
Format.kasprintf (fun err -> io_raise (Failure err)) fmt
in

let run :
type fl s.
s scheduler ->
('a, s) raise ->
(fl, 'error, s) flow ->
fl ->
('res, [ `Protocol of error ]) Read_write.t ->
('res, s) io =
fun scheduler io_raise flow_ops flow state ->
let { bind; return } = scheduler in
let ( >>= ) = bind in
let cbuff = Cstruct.create io_buffer_size in

let failwithf fmt =
Format.kasprintf (fun err -> io_raise (Failure err)) fmt
in
let rec unwrap = function
| State.Return v ->
Log.debug (fun m -> m "got return ");
return v
| Error (`Protocol err) ->
Log.err (fun m -> m "Got a protocol error: %a." pp_error err);
failwithf "%a" pp_error err
| Read { k; buffer; off; len; eof } -> (
let rd_n_bytes = min (Cstruct.len cbuff) len in
Log.debug (fun m -> m "Start to read %d byte(s)." rd_n_bytes);
flow_ops.recv flow (Cstruct.sub cbuff 0 rd_n_bytes) >>= function
| Ok `End_of_flow ->
Log.debug (fun m -> m "Got end of input.");
unwrap (eof ())
| Ok (`Input len) ->
Log.debug (fun m -> m "Got %d/%d byte(s)." len rd_n_bytes);
Cstruct.blit_to_bytes cbuff 0 buffer off len;
unwrap (k len)
| Error err ->
Log.err (fun m -> m "Got an error: %a." flow_ops.pp_error err);
failwithf "%a" flow_ops.pp_error err)
| Write { k; buffer; off; len } ->
(* TODO: almost always we can write in one go instead of calling a loop,
so we should try writing and call loop if we aren't done *)
let rec loop tmp =
if Cstruct.is_empty tmp then unwrap (k len)
else
flow_ops.send flow tmp >>= function
| Ok shift ->
Log.debug (fun m ->
m "Wrote %d byte(s). %s" shift (Cstruct.to_string tmp));
loop (Cstruct.shift tmp shift)
| Error err -> failwithf "%a" flow_ops.pp_error err
in
Cstruct.of_string buffer ~off ~len |> loop
in

let cbuff = Cstruct.create io_buffer_size in

let rec unwrap = function
| State.Return v ->
Log.debug (fun m -> m "got return ");
return v
| Error (`Protocol err) ->
Log.err (fun m ->
m "Got a protocol error: %a." Read_write.pp_error err);
failwithf "%a" Read_write.pp_error err
| Read { k; buffer; off; len; eof } -> (
let rd_n_bytes = min (Cstruct.len cbuff) len in
Log.debug (fun m -> m "Start to read %d byte(s)." rd_n_bytes);
flow_ops.recv flow (Cstruct.sub cbuff 0 rd_n_bytes) >>= function
| Ok `End_of_flow ->
Log.debug (fun m -> m "Got end of input.");
unwrap (eof ())
| Ok (`Input len) ->
Log.debug (fun m -> m "Got %d/%d byte(s)." len rd_n_bytes);
Cstruct.blit_to_bytes cbuff 0 buffer off len;
unwrap (k len)
| Error err ->
Log.err (fun m -> m "Got an error: %a." flow_ops.pp_error err);
failwithf "%a" flow_ops.pp_error err)
| Write { k; buffer; off; len } ->
(* TODO: almost always we can write in one go instead of calling a loop,
so we should try writing and call loop if we aren't done *)
let rec loop tmp =
if Cstruct.is_empty tmp then unwrap (k len)
else
flow_ops.send flow tmp >>= function
| Ok shift ->
Log.debug (fun m ->
m "Wrote %d byte(s). %s" shift (Cstruct.to_string tmp));
loop (Cstruct.shift tmp shift)
| Error err -> failwithf "%a" flow_ops.pp_error err
in
Cstruct.of_string buffer ~off ~len |> loop
in

unwrap state
end
unwrap state
Loading

0 comments on commit 285dce3

Please sign in to comment.