本文最后更新于:June 30, 2023 pm

Ekka是emqx自动集群和自动愈合的一个组件。

1.节点发现与自动集群

ekka_cluster_strategy 模块定义了一些行为。它们分别是:

-type(options() :: list(proplists:property())).

%% 发现
-callback(discover(options()) -> {ok, list(node())} | {error, term()}).

%% 锁
-callback(lock(options()) -> ok | ignore | {error, term()}).

%% 解锁
-callback(unlock(options()) -> ok | ignore | {error, term()}).

%% 注册
-callback(register(options()) -> ok | ignore | {error, term()}).

%% 移除注册
-callback(unregister(options()) -> ok | ignore | {error, term()}).

ekka_cluster模块提供一些集群的API调用和集群管理的RPC调用。它们分别是

%% Cluster API
-export([
	join/1,					%% 加入
	leave/0, 				%% 离开
	force_leave/1, 			%% 强制离开
	status/0 				%% 状态
	]).


%% RPC Call for Cluster Management
-export([
	prepare/1,
	heal/1,
	reboot/0
	]).


%% @doc Join the cluster
%% 加入集群
-spec(join(node()) -> ok | ignore | {error, any()}).

%% 如果节点是当前节点,则忽略
join(Node) when Node =:= node() ->
    ignore;
join(Node) when is_atom(Node) ->
    case {ekka_mnesia:is_node_in_cluster(Node), ekka_node:is_running(Node, ekka)} of
        {false, true} -> 
        	%% 如果节点没有在集群里而且节点正在运行, 则加入这个集群
            prepare(join), ok = ekka_mnesia:join_cluster(Node), reboot();
        {false, false} ->
        	%% 如果节点没有在集群里而且节点也没有运行  返回 错误
            {error, {node_down, Node}};
        {true, _} ->
        	%% 如果节点已经在集群里运行
            {error, {already_in_cluster, Node}}
    end.

%% @doc Leave from the cluster.
%% 离开集群
-spec(leave() -> ok | {error, any()}).
leave() ->
    case ekka_mnesia:running_nodes() -- [node()] of
        [_|_] ->
            %% 如果该节点在运行的节点列表里  离开该集群
            prepare(leave), ok = ekka_mnesia:leave_cluster(), reboot();
        [] ->
            {error, node_not_in_cluster}
    end.


%% @doc Force a node leave from cluster.
%% 强制一个节点离开集群
-spec(force_leave(node()) -> ok | ignore | {error, term()}).

%% 如果是当前节点,忽略
force_leave(Node) when Node =:= node() ->
    ignore;
force_leave(Node) ->
    %% 如果Node节点在集群里,接着rpcdia
    case ekka_mnesia:is_node_in_cluster(Node)
         andalso rpc:call(Node, ?MODULE, prepare, [leave]) of
        ok ->
            case ekka_mnesia:remove_from_cluster(Node) of
                ok    -> rpc:call(Node, ?MODULE, reboot, []);
                Error -> Error
            end;
        false ->
            {error, node_not_in_cluster};
        {badrpc, nodedown} ->
            ekka_membership:announce({force_leave, Node}),
            ekka_mnesia:remove_from_cluster(Node);
        {badrpc, Reason} ->
            {error, Reason}
    end.

%% @doc Cluster status.
%% 集群状态
status() -> ekka_mnesia:cluster_status().

2.ekka_autocluster 自动集群模块

-spec(run(atom()) -> any()).
run(App) ->
    %% 获得锁
    case acquire_lock(App) of
        ok ->
            spawn(fun() ->
                     %% 把当前进程的组长设置为init
                      group_leader(whereis(init), self()),
                      %% 等待应用准备就绪
                      wait_application_ready(App, 10),
                      try
                          %% 发现且加入
                          discover_and_join()
                      catch
                          _:Error:Stacktrace ->
                              ?LOG(error, "Discover error: ~p~n~p", [Error, Stacktrace])
                      after
                          %% 释放锁
                          release_lock(App)
                      end,
                      %% 可能需要再次运行
                      maybe_run_again(App)
                  end);
        failed -> ignore
    end.

%% 等待节点运行应用
wait_application_ready(_App, 0) ->
    timeout;
wait_application_ready(App, Retries) ->
    case ekka_node:is_running(App) of
        true  -> ok;
        false -> timer:sleep(1000),
                 wait_application_ready(App, Retries - 1)
    end.

%% 可能需要在尝试一次
maybe_run_again(App) ->
    %% Check if the node joined cluster?
    %% 检查节点是否在集群里
    case ekka_mnesia:is_node_in_cluster() of
        true  -> ok;
        false ->
                 %% 如果节点没有加入集群, 5秒后再次重试
                 timer:sleep(5000),
                 run(App)
    end.

-spec(discover_and_join() -> any()).
discover_and_join() ->
    with_strategy(
      fun(Mod, Options) ->
        case Mod:lock(Options) of
            ok ->
                discover_and_join(Mod, Options),
                log_error("Unlock", Mod:unlock(Options));
            ignore ->
                timer:sleep(rand:uniform(3000)),
                discover_and_join(Mod, Options);
            {error, Reason} ->
                ?LOG(error, "AutoCluster stopped for lock error: ~p", [Reason])
        end
      end).


-spec(acquire_lock(atom()) -> ok | failed).
%% 获取锁
acquire_lock(App) ->
    %% 如果应用程序APP的配置参数 autocluster_lock没有被设置值,则设置为true,表示获得锁成功,否则获取锁失败
    case application:get_env(App, autocluster_lock) of
        undefined ->
            application:set_env(App, autocluster_lock, true);
        {ok, _} -> failed
    end.

-spec(release_lock(atom()) -> ok).
%% 释放锁
release_lock(App) ->
    %% 清除应用程序APP的配置参数 autocluster_lock
    application:unset_env(App, autocluster_lock).

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

日常学习随笔 Previous
运行时错误 Next