本文最后更新于:June 30, 2023 pm
本文作者:[wangwenhai] # 概要:本文主要讲如何开发一个完整的数据转发插件,通过该插件可以把EMQX产生的数据发送到任一接口.
1.本文概述
EMQX的官方插件满足不了的时候,我们可以自定义开发插件.本文就以一个简单的插件入手,演示如何为EMQX开发可使用的插件.
2.环境准备
- Erlang环境;
- 一个编辑器,建议VScode;
- git;
- rebar3;
- rebar3的EMQX插件模板.
上面的环境确保安装成功以后再进行下一步,如果没有安装请查阅之前的文章进行安装.
3.开发实战
本插件是一个把EMQX的消息发布到WEB端的简单模块,接下来我们按照软件开发流程来走一遍.其中我们的插件名字叫:advisory_plugin.
1.创建项目
切换到自己的工作目录
执行命令:
rebar3 new emqx-plugin advisory_plugin
输出成功以后,会生成如下目录结构:
其中我们的源码在src下.
2.实现接口
此处我们拿当客户端连接成功这个事件来作为案例:当设备连接成功以后,给WEB端 一个上线通知,写入一些记录.
解析来找到上线的响应函数:on_client_connected
on_client_connected(ClientInfo = #{clientid := ClientId, username := Username, peerhost :=Host}, 0, _ConnInfo, _Env) ->
emqx_metrics:inc('advisory_plugin.client_connected'),
Params = [
{action, connected},
{clientid, ClientId},
{username, Username},
{ip, iolist_to_binary(ntoa(Host))}
],
?LOG(debug, "on_client_connected ~p~n", [ClientInfo]),
send_http_post("connected", Params),
ok;
on_client_connected(#{}, _ConnAck, _ConnInfo, _Env) ->
ok.
其中send_http_post
函数便是推送函数,具体代码如下:
send_http_post(Api, Params) ->
Json = jsx:encode(Params),
Url = application:get_env(?APP, destination, "http://127.0.0.1"),
case request(post, {string:concat(Url, Api), [], "application/json", Json}, [{timeout, 5000}], [], 0) of
{ok, _} ->
?LOG(debug, "Post to:[~p], Params:~s success~n", [Url, Json]),
ok;
{error, Reason} ->
?LOG(error, "HTTP request error: ~p", [Reason]),
error
end.
request(Method, Req, HTTPOpts, Opts, Times) ->
%% Resend request, when TCP closed by remotely
case httpc:request(Method, Req, HTTPOpts, Opts) of
{error, socket_closed_remotely} when Times < 3 ->
timer:sleep(trunc(math:pow(10, Times))),
request(Method, Req, HTTPOpts, Opts, Times + 1);
Other -> Other
end.
其中我们用到了jsx库,这是一个erlang的JSON解析器实现.
Url = application:get_env(?APP, destination, "http://127.0.0.1")
上述代码先从配置文件中获取推送的目标,如果不存在就是http://127.0.0.1
,然后把数据post上去.代码看起来比较简单,接下来我们增加配置项.
找到priv
目录下的schema文件,最后加入以下代码:
{mapping, "advisory_plugin.destination", "advisory_plugin.destination", [
{datatype, string}
]}.
这里表示的是从配置文件中映射配置项,下面是配置文件etc
目录下的conf文件:
advisory_plugin.destination = http://localhost:端口/data/
到这步,我们的插件就开发完成了,然后来实现WEB端的接口.
3.WEB推送接口实现
我拿SpringMVC为例:
@PostMapping("/connected")
public R connected(@RequestBody @Valid ConnectedMessage message) throws XException {
System.out.println("设备 Clientid is:" + message.getClientid() + " Username is:" + message.getUsername() + " 上线");
}
此时我们就能监听客户端的数据了.
4.插件安装
在这里我使用了编译的形式安装插件.首先我们打开emqx-rel项目,没有的话可以先git克隆下来:
git clone https://github.com/emqx/emqx-rel.git
找到rebar.config文件,加入插件,在deps节点下加入(这里我用了自己的仓库):
{ advisory_plugin, { git , "https://gitee.com/lagrangewang/advisory_plugin.git" , {branch, "master"} } }
然后配置插件加载,在relx节点下加入:
{advisory_plugin,load}
然后执行make.经过漫长的等待以后,不出问题,我们的项目可以编译成功,执行下面的命令:
cd _build/emqx/rel/emqx && ./bin/emqx console
会有如下输出:
此时表示已经成功运行起了EMQX,我们看看插件情况,打开http://localhost:18083/
,我们看到插件已经加载到了EMQX里面.
5.总结
本文我们主要讲了EMQX插件的开发流程和一个简单的Demo.WEB接口可自定义去实现功能.
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!