本文最后更新于:June 30, 2023 pm

本文作者:[wangwenhai] # 概要:本文主要讲解用Erlang Java Interface实现和Erlang节点通信.

1.本文概述

近期准备用EMQ来实现一个业务:吧数据保存到MongoDb 里面,但是发现一个问题:Erlang的MongoDb 驱动版本太老,甚至是10年前的版本,只支持到MongoDb 3.0阶段.但是我们的需求是在MongoDb 4.0上进行.于是我找遍github还有其他的代码平台,都没找到合适的驱动.工作陷入僵局.

这几天在看Erlang的官网文档,突然看到几个单词:Erlang Jinterface:

The Jinterface package provides a set of tools for communication with Erlang processes. It can also be used for communication with other Java processes using the same package, as well as C processes using the Erl_Interface library.

深入看了文档以后发现Jinterface是Erlang的Java接口,可以实现Java和Erlang节点直接通信.看到这里顿时来了希望,Java各种数据库驱动都有,我们可以用Java接口来实现这部分业务.

2.环境准备

在此之前,我们先准备一下开发环境:

  1. JDK:8以上
  2. Erlang:22以上
  3. EMQX:4.0以上

好了,准备好这些以后我们就开始写代码吧.

3.实现细节

1.Java项目准备

我们需要新建一个Maven项目,把OTP的库依赖进去:

<!-- https://mvnrepository.com/artifact/org.erlang.otp/jinterface -->
<dependency>
    <groupId>org.erlang.otp</groupId>
    <artifactId>jinterface</artifactId>
    <version>1.6.1</version>
</dependency>

然后新建一个Main类,按照正常的流程,在main方法里面加入下面的代码:


import com.ericsson.otp.erlang.OtpEpmd;
import com.ericsson.otp.erlang.OtpMbox;
import com.ericsson.otp.erlang.OtpNode;

/**
 * 测试Erlang Interface
 */
public class ErlangNodeTest1 {
    public static void main(String[] args) throws Exception {
        String nodeName = "ezlinker_core_node1@127.0.0.1";
        String receiveLoopProcessName = "ezlinker_core_node1_receive_loop";
        OtpNode otpNode = new OtpNode(nodeName, "emqxsecretcookie");
        if (otpNode.ping("emqx@127.0.0.1", 5)) {
            System.out.println("远程节点启动");
        } else {
            System.out.println("远程节点停止");
        }
        OtpMbox otpMbox = otpNode.createMbox();
        otpMbox.registerName(receiveLoopProcessName);
        System.out.println("start to listen.....");
        while (true) {
            System.out.println("收到Erlang节点的消息:" + otpMbox.receiveMsg().getMsg());
        }
    }
}

注意:这里仅仅是做个Demo,并不能生产,有经验可以把这段代码移植到比如Spring上.到这里你肯定很兴奋,直接把main运行了一下,结果很失望:抛出异常了:

Exception in thread "main" java.io.IOException: Nameserver not responding on 127.0.0.1 when publishing ezlinker_core_node1
	at com.ericsson.otp.erlang.OtpEpmd.r4_publish(OtpEpmd.java:344)
	at com.ericsson.otp.erlang.OtpEpmd.publishPort(OtpEpmd.java:141)
	at com.ericsson.otp.erlang.OtpNode$Acceptor.publishPort(OtpNode.java:784)
	at com.ericsson.otp.erlang.OtpNode$Acceptor.<init>(OtpNode.java:776)
	at com.ericsson.otp.erlang.OtpNode.init(OtpNode.java:232)
	at com.ericsson.otp.erlang.OtpNode.<init>(OtpNode.java:196)
	at com.ericsson.otp.erlang.OtpNode.<init>(OtpNode.java:149)
	at com.ezlinker.app.ErlangNodeTest1.main(ErlangNodeTest1.java:17)

稳住!!!其实到这里还没有准备完整,接下来还有很多要做。

2.EMQX插件准备

上面准备好Java方面的工作以后,我们来准备Erlang和EMQX,在这里我假设你的电脑上已经装好了Erlang环境,最低版本22,如果没有请上官网自己下载。

接下来我们新建一个EMQX的插件和Java写的Node通信。

  1. 安装rebar3(不做讲解,请网上自行下载安装对应版本);

  2. 安装EMQX官方给出的插件工具(注意:Linux环境下):

    mkdir -p ~/.config/rebar3/templates
    git clone https://github.com/emqx/rebar3_emqx_plugin ~/.config/rebar3/templates
  3. 然后新建一个插件:

    rebar3 new emqx-plugin <plugin-name>

    我新建的插件名字是:ezlinker_core_plugin:

    rebar3 new emqx-plugin `ezlinker_core_plugin`
  4. 增加配置参数:

    首先在etc目录下找到ezlinker_core_plugin.conf文件,里面的内容如下:

    
    ezlinker_core_plugin.hook.client.connected.1     = {"action": "on_client_connected"}
    ezlinker_core_plugin.hook.client.disconnected.1  = {"action": "on_client_disconnected"}
    ezlinker_core_plugin.hook.client.subscribe.1     = {"action": "on_client_subscribe", "topic": "#"}
    ezlinker_core_plugin.hook.client.unsubscribe.1   = {"action": "on_client_unsubscribe", "topic": "#"}
    ezlinker_core_plugin.hook.session.subscribed.1   = {"action": "on_session_subscribed", "topic": "#"}
    ezlinker_core_plugin.hook.session.unsubscribed.1 = {"action": "on_session_unsubscribed", "topic": "#"}
    ezlinker_core_plugin.hook.message.publish.1      = {"action": "on_message_publish", "topic": "#"}
    ezlinker_core_plugin.hook.message.delivered.1    = {"action": "on_message_delivered", "topic": "#"}
    ezlinker_core_plugin.hook.message.acked.1        = {"action": "on_message_acked", "topic": "#"}

    我们在最后加上自己的配置项:

    ## ezlinker core  node config
    ezlinker_core_plugin.node.name = ezlinker_core_node1@127.0.0.1

    找到插件目录下的priv目录,最后一行加入:

    {mapping, "ezlinker_core_plugin.node.name", "ezlinker_core_plugin.node_name", [
      {datatype, string}
    ]}.
    

    其中mapping的作用是把配置项的键映射到插件的值,具体格式如下:

    {mapping, "conf中的配置项名字", "插件名.插件中使用的项名", [
      {datatype, string}
    ]}.

    是不是不耐烦了?其实到这里才刚开始,上述步骤仅仅是我们生成了一个插件,并且注册进了EMQ,能不能发挥作用还不一定呢。接下来我们就开始对这个插件进行简单的开发。

3.插件开发

我们目前实现一个最简单的功能:把EMQX的所有消息全部丢给JavaNode去处理。

先打开:ezlinker_core_plugin.erl,找到publish相关的代码(大概在155行位置处):

%%--------------------------------------------------------------------
%% Message publish
%%--------------------------------------------------------------------
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
    {ok, Message};
on_message_publish(Message = #message{topic = Topic, flags = #{retain := Retain}}, {Filter}) ->
    EZlinkerNodeName = application:get_env(?APP, node_name, 'ezlinker_core_node1@127.0.0.1'),
    with_filter(
      fun() ->
        emqx_metrics:inc('ezlinker_core_plugin.message_publish'),
          // 核心代码
        {ezlinker_core_node1_receive_loop,list_to_atom(EZlinkerNodeName)}!{self(),Message},

        {ok, Message}
      end, Message, Topic, Filter).

加入以下代码:

{ezlinker_core_node1_receive_loop,list_to_atom(EZlinkerNodeName)}!{self(),Message},

上面代码表示,EMQX内部产生的消息全部转发倒=到我们配置的那个节点上面。

到此为止,我们的插件就开发好了,接下来准备运行起来。

4.插件加载

我们还需要维护一个发布EMQX的项目:emqx-rel:https://github.com/emqx/emqx-rel.git。首先需要fork下来,然后做个简单配置,目的是把插件加载进EMQX里面。

在emqx-rel项目中找到rebar.config,在deps节点加入以下配置:

{ezlinker_core_plugin,{git,"你插件的git位置",{branch, "master"}}},

然后在relx节点下的release节点最后面加上:

{ezlinker_core_plugin, load}

到此为止插件就配置完整。

5.启动配置

启动过程也是比较麻烦,首先需要确保你的主机已经启动了epmd(不知道可百度一下)。然后更改EMQX下面的启动脚本,把_build\emqx\rel\emqx\bin\emqx脚本28行倒32行全部注释,改成以下:


#if [ -z "$WITH_EPMD" ]; then
#    EPMD_ARG="-start_epmd false -epmd_module ekka_epmd -proto_dist ekka"
#else
#    EPMD_ARG="-start_epmd true $PROTO_DIST_ARG"
#fi
EPMD_ARG="-start_epmd true $PROTO_DIST_ARG"

这么做的原因是emqx目前是用ekka集群的,没有自己启动epmd,所以当前需要手动指定启动epmd。

6.运行EMQX

​ 切换倒emqx-rel项目,执行脚本:

_build/emqx/rel/emqx/bin/emqx console

如果上面做的没问题,就会出现:

_build/emqx/rel/emqx/bin/emqx console
Exec: /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx/erts-10.5/bin/erlexec -boot /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx/releases/
v4.0-beta.4/emqx -mode embedded -boot_var ERTS_LIB_DIR /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx/erts-10.5/../lib -mnesia dir "/mnt/c/Users/a
" -config /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx/data/configs/app.2020.03.23.16.04.14.config -args_file /mnt/c/Users/admin/Github/emqx-rel
/_build/emqx/rel/emqx/data/configs/vm.2020.03.23.16.04.14.args -vm_args /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx/data/configs/vm.2020.03.23.
16.04.14.args -start_epmd true -- console
Root: /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx
/mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx
Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:16:16] [ds:16:16:8] [async-threads:4] [hipe]

Starting emqx on node emqx@127.0.0.1
2020-03-23 16:04:19.815 [alert] [Plugins] Cannot find plugins: [emqx_recon,emqx_rule_engine]
Start http:management listener on 8081 successfully.
Start http:dashboard listener on 18083 successfully.
ezlinker_core_plugin loaded!
Start mqtt:tcp listener on 127.0.0.1:11883 successfully.
Start mqtt:tcp listener on 0.0.0.0:1883 successfully.
Start_trap_listenerStart mqtt:trap listener on 0.0.0.0:1884 successfully.
Start mqtt:ws listener on 0.0.0.0:8083 successfully.
Start mqtt:ssl listener on 0.0.0.0:8883 successfully.
Start mqtt:wss listener on 0.0.0.0:8084 successfully.
EMQ X Broker master is running now!
Eshell V10.5  (abort with ^G)

7.启动插件

打开http://localhost:18083,进入控制台,找到plugins:

image-20200323171539539

点击start按钮,启动插件。

8.测试通信

此时我们回过去到第一步,运行java的main方法,发现居然不报错了?如果没有报错,说明epmd启动成功了。

我们用Mqttfx发送一条消息看看效果:

image-20200323171732153

看java这边的打印:

image-20200323171821974

此时已经实现通信,到这里,我们就可以解析出消息来任意处理。

4.参考文档

http://erlang.org/doc/apps/jinterface/jinterface_users_guide.html