diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl index 43321f26fe..cb441fea71 100644 --- a/src/couch_replicator/src/couch_replicator_fabric.erl +++ b/src/couch_replicator/src/couch_replicator_fabric.erl @@ -34,14 +34,8 @@ docs(DbName, Options, QueryArgs, Callback, Acc) -> after fabric_streams:cleanup(Workers) end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "replicator docs" - ), + {timeout, DefunctWorkers} -> + fabric_util:log_timeout(DefunctWorkers, "replicator docs"), Callback({error, timeout}, Acc); {error, Error} -> Callback({error, Error}, Acc) diff --git a/src/fabric/include/fabric.hrl b/src/fabric/include/fabric.hrl index dd312f0289..6312741c23 100644 --- a/src/fabric/include/fabric.hrl +++ b/src/fabric/include/fabric.hrl @@ -32,14 +32,6 @@ update_seq }). --record(stream_acc, { - workers, - ready, - start_fun, - replacements, - ring_opts -}). - -record(view_row, {key, id, value, doc, worker}). -type row_property_key() :: id | key | value | doc | worker. diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl index 3f14cde454..85a4978c4e 100644 --- a/src/fabric/src/fabric_streams.erl +++ b/src/fabric/src/fabric_streams.erl @@ -23,9 +23,16 @@ add_worker_to_cleaner/2 ]). --include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). +-record(stream_acc, { + workers, + ready, + start_fun, + replacements, + ring_opts +}). + -define(WORKER_CLEANER, fabric_worker_cleaner). % This is the streams equivalent of fabric_util:submit_jobs/4. Besides @@ -77,7 +84,12 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) -> Workers ), {ok, AckedWorkers}; + {timeout, #stream_acc{workers = Defunct}} -> + cleanup(Workers0), + DefunctWorkers = fabric_util:remove_done_workers(Defunct, waiting), + {timeout, DefunctWorkers}; Else -> + cleanup(Workers0), Else end. @@ -165,10 +177,7 @@ handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) -> {stop, St#stream_acc{workers = [], ready = Ready1}} end end; -handle_stream_start({ok, Error}, _, St) when Error == ddoc_updated; Error == insufficient_storage -> - WaitingWorkers = [W || {W, _} <- St#stream_acc.workers], - ReadyWorkers = [W || {W, _} <- St#stream_acc.ready], - cleanup(WaitingWorkers ++ ReadyWorkers), +handle_stream_start({ok, Error}, _, _) when Error == ddoc_updated; Error == insufficient_storage -> {stop, Error}; handle_stream_start(Else, _, _) -> exit({invalid_stream_start, Else}). @@ -236,7 +245,9 @@ worker_cleaner_test_() -> ?TDEF_FE(should_clean_additional_worker_too), ?TDEF_FE(coordinator_is_killed_if_client_disconnects), ?TDEF_FE(coordinator_is_not_killed_if_client_is_connected), - ?TDEF_FE(submit_jobs_sets_up_cleaner) + ?TDEF_FE(submit_jobs_sets_up_cleaner), + ?TDEF_FE(cleanup_called_on_timeout), + ?TDEF_FE(cleanup_called_on_error) ] } }. @@ -442,7 +453,39 @@ submit_jobs_sets_up_cleaner(_) -> ?assert(is_process_alive(Cleaner)) end. +cleanup_called_on_timeout(_) -> + Ref1 = make_ref(), + Ref2 = make_ref(), + W1 = #shard{node = 'n1', ref = Ref1}, + W2 = #shard{node = 'n2', ref = Ref2}, + Workers = [W1, W2], + meck:expect(rexi_utils, recv, fun(_, _, _, Acc, _, _) -> + {timeout, Acc#stream_acc{workers = [{W2, waiting}]}} + end), + meck:reset(fabric_util), + Res = start(Workers, #shard.ref, undefined, undefined, []), + ?assertEqual({timeout, [W2]}, Res), + ?assert(meck:called(fabric_util, cleanup, 1)). + +cleanup_called_on_error(_) -> + Ref1 = make_ref(), + Ref2 = make_ref(), + W1 = #shard{node = 'n1', ref = Ref1}, + W2 = #shard{node = 'n2', ref = Ref2}, + Workers = [W1, W2], + meck:expect(rexi_utils, recv, fun(_, _, _, _, _, _) -> + {error, foo} + end), + meck:reset(fabric_util), + Res = start(Workers, #shard.ref, undefined, undefined, []), + ?assertEqual({error, foo}, Res), + ?assert(meck:called(fabric_util, cleanup, 1)). + setup() -> + ok = meck:new(rexi_utils, [passthrough]), + ok = meck:new(config, [passthrough]), + ok = meck:new(fabric_util, [passthrough]), + meck:expect(config, get, fun(_, _, Default) -> Default end), ok = meck:expect(rexi, kill_all, fun(_) -> ok end), % Speed up disconnect socket timeout for the test to 200 msec ok = meck:expect(chttpd_util, mochiweb_client_req_check_msec, 0, 200). diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index 3a03357c24..2d0133acb5 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -37,14 +37,8 @@ go(Db, Options, #mrargs{keys = undefined} = QueryArgs, Callback, Acc) -> after fabric_streams:cleanup(Workers) end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "all_docs" - ), + {timeout, DefunctWorkers} -> + fabric_util:log_timeout(DefunctWorkers, "all_docs"), Callback({error, timeout}, Acc); {error, Error} -> Callback({error, Error}, Acc) diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index 85bc7370cc..410c057c27 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -199,15 +199,8 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> after fabric_streams:cleanup(Workers) end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, - waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "changes" - ), + {timeout, DefunctWorkers} -> + fabric_util:log_timeout(DefunctWorkers, "changes"), throw({error, timeout}); {error, Reason} -> throw({error, Reason}); diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 6f13270a98..cc8ed6cf1c 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -16,7 +16,6 @@ -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) when @@ -66,15 +65,8 @@ go(Db, Options, DDoc, View, Args0, Callback, Acc, VInfo) -> after fabric_streams:cleanup(Workers) end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, - waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "map_view" - ), + {timeout, DefunctWorkers} -> + fabric_util:log_timeout(DefunctWorkers, "map_view"), Callback({error, timeout}, Acc); {error, Error} -> Callback({error, Error}, Acc) diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index 04d73bd943..3206d01a44 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -16,7 +16,6 @@ -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -> @@ -55,15 +54,8 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> after fabric_streams:cleanup(Workers) end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, - waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "reduce_view" - ), + {timeout, DefunctWorkers} -> + fabric_util:log_timeout(DefunctWorkers, "reduce_view"), Callback({error, timeout}, Acc); {error, Error} -> Callback({error, Error}, Acc)