本文最后更新于:June 30, 2023 pm
本文作者:[wangwenhai] # 概要:本文主要讲解用Erlang Java Interface实现和Erlang节点通信.
1.本文概述
近期准备用EMQ来实现一个业务:吧数据保存到MongoDb 里面,但是发现一个问题:Erlang的MongoDb 驱动版本太老,甚至是10年前的版本,只支持到MongoDb 3.0阶段.但是我们的需求是在MongoDb 4.0上进行.于是我找遍github还有其他的代码平台,都没找到合适的驱动.工作陷入僵局.
这几天在看Erlang的官网文档,突然看到几个单词:Erlang Jinterface:
The Jinterface package provides a set of tools for communication with Erlang processes. It can also be used for communication with other Java processes using the same package, as well as C processes using the Erl_Interface library.
深入看了文档以后发现Jinterface是Erlang的Java接口,可以实现Java和Erlang节点直接通信.看到这里顿时来了希望,Java各种数据库驱动都有,我们可以用Java接口来实现这部分业务.
2.环境准备
在此之前,我们先准备一下开发环境:
- JDK:8以上
- Erlang:22以上
- EMQX:4.0以上
好了,准备好这些以后我们就开始写代码吧.
3.实现细节
1.Java项目准备
我们需要新建一个Maven项目,把OTP的库依赖进去:
<!-- https://mvnrepository.com/artifact/org.erlang.otp/jinterface -->
<dependency>
<groupId>org.erlang.otp</groupId>
<artifactId>jinterface</artifactId>
<version>1.6.1</version>
</dependency>
然后新建一个Main类,按照正常的流程,在main方法里面加入下面的代码:
import com.ericsson.otp.erlang.OtpEpmd;
import com.ericsson.otp.erlang.OtpMbox;
import com.ericsson.otp.erlang.OtpNode;
/**
* 测试Erlang Interface
*/
public class ErlangNodeTest1 {
public static void main(String[] args) throws Exception {
String nodeName = "ezlinker_core_node1@127.0.0.1";
String receiveLoopProcessName = "ezlinker_core_node1_receive_loop";
OtpNode otpNode = new OtpNode(nodeName, "emqxsecretcookie");
if (otpNode.ping("emqx@127.0.0.1", 5)) {
System.out.println("远程节点启动");
} else {
System.out.println("远程节点停止");
}
OtpMbox otpMbox = otpNode.createMbox();
otpMbox.registerName(receiveLoopProcessName);
System.out.println("start to listen.....");
while (true) {
System.out.println("收到Erlang节点的消息:" + otpMbox.receiveMsg().getMsg());
}
}
}
注意:这里仅仅是做个Demo,并不能生产,有经验可以把这段代码移植到比如Spring上.到这里你肯定很兴奋,直接把main运行了一下,结果很失望:抛出异常了:
Exception in thread "main" java.io.IOException: Nameserver not responding on 127.0.0.1 when publishing ezlinker_core_node1
at com.ericsson.otp.erlang.OtpEpmd.r4_publish(OtpEpmd.java:344)
at com.ericsson.otp.erlang.OtpEpmd.publishPort(OtpEpmd.java:141)
at com.ericsson.otp.erlang.OtpNode$Acceptor.publishPort(OtpNode.java:784)
at com.ericsson.otp.erlang.OtpNode$Acceptor.<init>(OtpNode.java:776)
at com.ericsson.otp.erlang.OtpNode.init(OtpNode.java:232)
at com.ericsson.otp.erlang.OtpNode.<init>(OtpNode.java:196)
at com.ericsson.otp.erlang.OtpNode.<init>(OtpNode.java:149)
at com.ezlinker.app.ErlangNodeTest1.main(ErlangNodeTest1.java:17)
稳住!!!其实到这里还没有准备完整,接下来还有很多要做。
2.EMQX插件准备
上面准备好Java方面的工作以后,我们来准备Erlang和EMQX,在这里我假设你的电脑上已经装好了Erlang环境,最低版本22,如果没有请上官网自己下载。
接下来我们新建一个EMQX的插件和Java写的Node通信。
安装rebar3(不做讲解,请网上自行下载安装对应版本);
安装EMQX官方给出的插件工具(注意:Linux环境下):
mkdir -p ~/.config/rebar3/templates git clone https://github.com/emqx/rebar3_emqx_plugin ~/.config/rebar3/templates
然后新建一个插件:
rebar3 new emqx-plugin <plugin-name>
我新建的插件名字是:
ezlinker_core_plugin
:rebar3 new emqx-plugin `ezlinker_core_plugin`
增加配置参数:
首先在
etc
目录下找到ezlinker_core_plugin.conf
文件,里面的内容如下:ezlinker_core_plugin.hook.client.connected.1 = {"action": "on_client_connected"} ezlinker_core_plugin.hook.client.disconnected.1 = {"action": "on_client_disconnected"} ezlinker_core_plugin.hook.client.subscribe.1 = {"action": "on_client_subscribe", "topic": "#"} ezlinker_core_plugin.hook.client.unsubscribe.1 = {"action": "on_client_unsubscribe", "topic": "#"} ezlinker_core_plugin.hook.session.subscribed.1 = {"action": "on_session_subscribed", "topic": "#"} ezlinker_core_plugin.hook.session.unsubscribed.1 = {"action": "on_session_unsubscribed", "topic": "#"} ezlinker_core_plugin.hook.message.publish.1 = {"action": "on_message_publish", "topic": "#"} ezlinker_core_plugin.hook.message.delivered.1 = {"action": "on_message_delivered", "topic": "#"} ezlinker_core_plugin.hook.message.acked.1 = {"action": "on_message_acked", "topic": "#"}
我们在最后加上自己的配置项:
## ezlinker core node config ezlinker_core_plugin.node.name = ezlinker_core_node1@127.0.0.1
找到插件目录下的
priv
目录,最后一行加入:{mapping, "ezlinker_core_plugin.node.name", "ezlinker_core_plugin.node_name", [ {datatype, string} ]}.
其中mapping的作用是把配置项的键映射到插件的值,具体格式如下:
{mapping, "conf中的配置项名字", "插件名.插件中使用的项名", [ {datatype, string} ]}.
是不是不耐烦了?其实到这里才刚开始,上述步骤仅仅是我们生成了一个插件,并且注册进了EMQ,能不能发挥作用还不一定呢。接下来我们就开始对这个插件进行简单的开发。
3.插件开发
我们目前实现一个最简单的功能:把EMQX的所有消息全部丢给JavaNode去处理。
先打开:ezlinker_core_plugin.erl
,找到publish相关的代码(大概在155行位置处):
%%--------------------------------------------------------------------
%% Message publish
%%--------------------------------------------------------------------
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
on_message_publish(Message = #message{topic = Topic, flags = #{retain := Retain}}, {Filter}) ->
EZlinkerNodeName = application:get_env(?APP, node_name, 'ezlinker_core_node1@127.0.0.1'),
with_filter(
fun() ->
emqx_metrics:inc('ezlinker_core_plugin.message_publish'),
// 核心代码
{ezlinker_core_node1_receive_loop,list_to_atom(EZlinkerNodeName)}!{self(),Message},
{ok, Message}
end, Message, Topic, Filter).
加入以下代码:
{ezlinker_core_node1_receive_loop,list_to_atom(EZlinkerNodeName)}!{self(),Message},
上面代码表示,EMQX内部产生的消息全部转发倒=到我们配置的那个节点上面。
到此为止,我们的插件就开发好了,接下来准备运行起来。
4.插件加载
我们还需要维护一个发布EMQX的项目:emqx-rel:https://github.com/emqx/emqx-rel.git
。首先需要fork下来,然后做个简单配置,目的是把插件加载进EMQX里面。
在emqx-rel项目中找到rebar.config,在deps节点加入以下配置:
{ezlinker_core_plugin,{git,"你插件的git位置",{branch, "master"}}},
然后在relx节点下的release节点最后面加上:
{ezlinker_core_plugin, load}
到此为止插件就配置完整。
5.启动配置
启动过程也是比较麻烦,首先需要确保你的主机已经启动了epmd(不知道可百度一下)。然后更改EMQX下面的启动脚本,把_build\emqx\rel\emqx\bin\emqx
脚本28行倒32行全部注释,改成以下:
#if [ -z "$WITH_EPMD" ]; then
# EPMD_ARG="-start_epmd false -epmd_module ekka_epmd -proto_dist ekka"
#else
# EPMD_ARG="-start_epmd true $PROTO_DIST_ARG"
#fi
EPMD_ARG="-start_epmd true $PROTO_DIST_ARG"
这么做的原因是emqx目前是用ekka集群的,没有自己启动epmd,所以当前需要手动指定启动epmd。
6.运行EMQX
切换倒emqx-rel项目,执行脚本:
_build/emqx/rel/emqx/bin/emqx console
如果上面做的没问题,就会出现:
_build/emqx/rel/emqx/bin/emqx console
Exec: /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx/erts-10.5/bin/erlexec -boot /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx/releases/
v4.0-beta.4/emqx -mode embedded -boot_var ERTS_LIB_DIR /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx/erts-10.5/../lib -mnesia dir "/mnt/c/Users/a
" -config /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx/data/configs/app.2020.03.23.16.04.14.config -args_file /mnt/c/Users/admin/Github/emqx-rel
/_build/emqx/rel/emqx/data/configs/vm.2020.03.23.16.04.14.args -vm_args /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx/data/configs/vm.2020.03.23.
16.04.14.args -start_epmd true -- console
Root: /mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx
/mnt/c/Users/admin/Github/emqx-rel/_build/emqx/rel/emqx
Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:16:16] [ds:16:16:8] [async-threads:4] [hipe]
Starting emqx on node emqx@127.0.0.1
2020-03-23 16:04:19.815 [alert] [Plugins] Cannot find plugins: [emqx_recon,emqx_rule_engine]
Start http:management listener on 8081 successfully.
Start http:dashboard listener on 18083 successfully.
ezlinker_core_plugin loaded!
Start mqtt:tcp listener on 127.0.0.1:11883 successfully.
Start mqtt:tcp listener on 0.0.0.0:1883 successfully.
Start_trap_listenerStart mqtt:trap listener on 0.0.0.0:1884 successfully.
Start mqtt:ws listener on 0.0.0.0:8083 successfully.
Start mqtt:ssl listener on 0.0.0.0:8883 successfully.
Start mqtt:wss listener on 0.0.0.0:8084 successfully.
EMQ X Broker master is running now!
Eshell V10.5 (abort with ^G)
7.启动插件
打开http://localhost:18083
,进入控制台,找到plugins:
点击start按钮,启动插件。
8.测试通信
此时我们回过去到第一步,运行java的main
方法,发现居然不报错了?如果没有报错,说明epmd启动成功了。
我们用Mqttfx发送一条消息看看效果:
看java这边的打印:
此时已经实现通信,到这里,我们就可以解析出消息来任意处理。
4.参考文档
http://erlang.org/doc/apps/jinterface/jinterface_users_guide.html
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!