diff --git a/src/eredis_cluster.erl b/src/eredis_cluster.erl index 1da3a9f..33b8bb2 100644 --- a/src/eredis_cluster.erl +++ b/src/eredis_cluster.erl @@ -11,6 +11,8 @@ % Generic redis call -export([q/1, qp/1, qw/2, qk/2, qa/1, qmn/1, transaction/1, transaction/2]). +-export([get_key_slot/1]). + % Specific redis command implementation -export([flushdb/0]). diff --git a/src/eredis_cluster_monitor.erl b/src/eredis_cluster_monitor.erl index 5a74f45..e6b4f27 100644 --- a/src/eredis_cluster_monitor.erl +++ b/src/eredis_cluster_monitor.erl @@ -20,10 +20,10 @@ %% Type definition. -include("eredis_cluster.hrl"). -record(state, { - init_nodes :: [#node{}], - slots :: tuple(), %% whose elements are integer indexes into slots_maps - slots_maps :: tuple(), %% whose elements are #slots_map{} - version :: integer() + init_nodes = [] :: [#node{}], + slots = {} :: tuple(), %% whose elements are integer indexes into slots_maps + slots_maps = {} :: tuple(), %% whose elements are #slots_map{} + version = 0 :: integer() }). %% API. @@ -45,8 +45,12 @@ refresh_mapping(Version) -> -spec get_state() -> #state{}. get_state() -> - [{cluster_state, State}] = ets:lookup(?MODULE, cluster_state), - State. + case ets:lookup(?MODULE, cluster_state) of + [{cluster_state, S}] -> + S; + [] -> + #state{} + end. get_state_version(State) -> State#state.version. @@ -83,25 +87,93 @@ get_pool_by_slot(Slot) -> -spec reload_slots_map(State::#state{}) -> NewState::#state{}. reload_slots_map(State) -> - [close_connection(SlotsMap) - || SlotsMap <- tuple_to_list(State#state.slots_maps)], + OldMap = case State#state.slots_maps of + undefined -> + []; + Map -> + tuple_to_list(Map) + end, ClusterSlots = get_cluster_slots(State#state.init_nodes), - SlotsMaps = parse_cluster_slots(ClusterSlots), - ConnectedSlotsMaps = connect_all_slots(SlotsMaps), - Slots = create_slots_cache(ConnectedSlotsMaps), - - NewState = State#state{ - slots = list_to_tuple(Slots), - slots_maps = list_to_tuple(ConnectedSlotsMaps), - version = State#state.version + 1 - }, - - true = ets:insert(?MODULE, [{cluster_state, NewState}]), + ExistingMap = + lists:flatmap( + fun(#slots_map{start_slot = NSS, + end_slot = NES, + node = NNode} = NEl) -> + [{Elem, NEl} || Elem <- OldMap, + Elem#slots_map.start_slot == NSS, + Elem#slots_map.end_slot == NES, + Elem#slots_map.node == NNode] + end, SlotsMaps), + {ExistingOldMap, ExistingNewMap}=lists:unzip(ExistingMap), + + ChangedOldMap = minus(OldMap, ExistingOldMap), + ChangedNewMap = minus(SlotsMaps, ExistingNewMap), + + ToBeCl = get_nodes_from_map(ChangedOldMap), + ToBeOp = get_nodes_from_map(ChangedNewMap), + NewNodes = get_nodes_from_map(SlotsMaps), + + %% Find Pools that must be closed. Do not close Pool if + %% it needs to be connected again but with a new slots map: + ToBeClosed = minus(ToBeCl, NewNodes), + close_connection_with_nodes(OldMap, ToBeClosed), + + NewState = case ToBeOp of + [] -> + %% Connect if connections have been forcibly closed: + connect_all_slots(OldMap), + State; + _ -> + %% Connect to Pool nodes if are not connected before: + ConnectedSlotsMaps = connect_all_slots(SlotsMaps), + Slots = create_slots_cache(ConnectedSlotsMaps), + NS = State#state{ + slots = list_to_tuple(Slots), + slots_maps = list_to_tuple(ConnectedSlotsMaps), + version = State#state.version + 1 + }, + true = ets:insert(?MODULE, [{cluster_state, NS}]), + NS + end, NewState. +%%%------------------------------------------------------------ +-spec get_nodes_from_map(SlotsMaps::#slots_map{}) -> [atom()]. +%%% +%%% Gets all pool nodes from state +%%%------------------------------------------------------------ +get_nodes_from_map(SlotsMaps) -> + lists:usort(lists:foldl(fun(Map,Acc) -> + if Map#slots_map.node == undefined -> + Acc; + Map#slots_map.node#node.address == [] -> + Acc; + true -> + [Map#slots_map.node#node.pool|Acc] + end + end, [], SlotsMaps)). + +%%%------------------------------------------------------------ +-spec close_connection_with_nodes(SlotsMap::#slots_map{}, + Pools :: [atom()]) -> #slots_map{}. +%%% +%%% Closes the connection related to specified Pool node. +%%%------------------------------------------------------------ +close_connection_with_nodes(SlotsMaps, Pools) -> + lists:foldl(fun(Map, AccMap) -> + case lists:member(Map#slots_map.node#node.pool, + Pools) of + true -> + close_connection(Map), + AccMap; + false -> + [Map|AccMap] + end + end,[], SlotsMaps). + -spec get_cluster_slots([#node{}]) -> [[bitstring() | [bitstring()]]]. get_cluster_slots([]) -> throw({error,cannot_connect_to_cluster}); @@ -133,16 +205,18 @@ get_cluster_slots_from_single_node(Node) -> -spec parse_cluster_slots([[bitstring() | [bitstring()]]]) -> [#slots_map{}]. parse_cluster_slots(ClusterInfo) -> parse_cluster_slots(ClusterInfo, 1, []). - parse_cluster_slots([[StartSlot, EndSlot | [[Address, Port | _] | _]] | T], Index, Acc) -> + Addr = binary_to_list(Address), + P = binary_to_integer(Port), SlotsMap = #slots_map{ index = Index, start_slot = binary_to_integer(StartSlot), end_slot = binary_to_integer(EndSlot), node = #node{ - address = binary_to_list(Address), - port = binary_to_integer(Port) + address = Addr, + port = P, + pool = list_to_atom(Addr ++ "#" ++ integer_to_list(P)) } }, parse_cluster_slots(T, Index+1, [SlotsMap | Acc]); @@ -203,14 +277,22 @@ connect_all_slots(SlotsMapList) -> connect_([]) -> #state{}; connect_(InitNodes) -> - State = #state{ - slots = undefined, - slots_maps = {}, - init_nodes = [#node{address = A, port = P} || {A,P} <- InitNodes], - version = 0 - }, - - reload_slots_map(State). + OldState = case get_state() of + undefined -> + #state{init_nodes = [#node{address = A, port = P} || {A,P} <- InitNodes]}; + State -> + State#state{init_nodes = [#node{address = A, port = P} || {A,P} <- InitNodes]} + end, + reload_slots_map(OldState). + +%%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +-spec minus(Xs :: [A], Ys :: [A]) -> [A]. +%%% Removes all elements (including duplicates) of Ys from Xs. +%%% Xs and Ys can be unordered and contain duplicates. +%%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +minus(Xs, Ys) -> + Set = gb_sets:from_list(Ys), + [E || E <- Xs, not gb_sets:is_element(E, Set)]. %% gen_server. diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 7b8a7ce..41bb153 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -163,6 +163,61 @@ basic_test_() -> eredis_cluster:eval(Script, ScriptHash, ["qrs"], ["evaltest"]), ?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"])) end + }, + + { "reload slots map", + fun () -> + Key = "{111}:test", + + {ok, NodesInfo} = eredis_cluster:q(["cluster","nodes"]), + + ClusterNodesList = [CNEL || CNEL <- binary:split(NodesInfo,<<"\n">>, [global]), CNEL =/= <<>>], + NodeIdsL = lists:foldl(fun(ClusterNode, Acc) -> + ClusterNodeI = binary:split(ClusterNode,<<" ">>,[global]), + case lists:nth(3, ClusterNodeI) of + Role when Role == <<"myself,master">>; + Role == <<"master">> -> + [Ip, Port] = binary:split(lists:nth(2, ClusterNodeI), <<":">>,[global]), + Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), + [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; + _ -> + Acc + end + end, [], ClusterNodesList), + KeySlot = eredis_cluster:get_key_slot(Key), + Pool = element(1, eredis_cluster_monitor:get_pool_by_slot(KeySlot)), + + {NodeId, Pool} = lists:keyfind(Pool, 2, NodeIdsL), + [{NodeId2, _Pool2}, _] = [{NI, P} || {NI, P} <- NodeIdsL, {NI, P} =/= {NodeId, Pool}], + + %% Migrate Slot: + CmdImp = ["CLUSTER", "SETSLOT", KeySlot, "IMPORTING", NodeId], + eredis_cluster:qa(CmdImp), + + CmdMig = ["CLUSTER", "SETSLOT", KeySlot, "MIGRATING", NodeId2], + eredis_cluster:qa(CmdMig), + + CmdMig1 = ["CLUSTER", "SETSLOT", KeySlot, "NODE", NodeId2], + eredis_cluster:qa(CmdMig1), + + OldState = eredis_cluster_monitor:get_state(), + Version = eredis_cluster_monitor:get_state_version(OldState), + eredis_cluster_monitor:refresh_mapping(Version), + {state, _, _, OldSlotMap, Version} = OldState, + NewState = eredis_cluster_monitor:get_state(), + {state, _, _, NewSlotMap, _} = NewState, + + CheckMaps = lists:map(fun({slots_map, NSS, NES, _NI, NNode}) -> + % Check if slots maps are the same: + Fun = fun({slots_map, OSS, OES, _OI, ONode}) when NSS == OSS, + NES == OES, + NNode == ONode -> true; + (_) -> false % maps are different + end, + lists:any(Fun, tuple_to_list(OldSlotMap)) + end, tuple_to_list(NewSlotMap)), + ?assertEqual(true, lists:member(false, CheckMaps)) + end } ]