本文最后更新于:June 30, 2023 pm

本文作者:[wangwenhai] # 概要:本文主要讲如何开发一个完整的数据转发插件,通过该插件可以把EMQX产生的数据发送到任一接口.

1.本文概述

EMQX的官方插件满足不了的时候,我们可以自定义开发插件.本文就以一个简单的插件入手,演示如何为EMQX开发可使用的插件.

2.环境准备

  1. Erlang环境;
  2. 一个编辑器,建议VScode;
  3. git;
  4. rebar3;
  5. rebar3的EMQX插件模板.

上面的环境确保安装成功以后再进行下一步,如果没有安装请查阅之前的文章进行安装.

3.开发实战

本插件是一个把EMQX的消息发布到WEB端的简单模块,接下来我们按照软件开发流程来走一遍.其中我们的插件名字叫:advisory_plugin.

1.创建项目

  1. 切换到自己的工作目录

  2. 执行命令:

    rebar3 new emqx-plugin advisory_plugin
  3. 输出成功以后,会生成如下目录结构:

image-20191225091715849

其中我们的源码在src下.

2.实现接口

此处我们拿当客户端连接成功这个事件来作为案例:当设备连接成功以后,给WEB端 一个上线通知,写入一些记录.

解析来找到上线的响应函数:on_client_connected

on_client_connected(ClientInfo = #{clientid := ClientId, username := Username, peerhost :=Host}, 0, _ConnInfo, _Env) ->
  emqx_metrics:inc('advisory_plugin.client_connected'),
  Params = [
    {action, connected},
    {clientid, ClientId},
    {username, Username},
    {ip, iolist_to_binary(ntoa(Host))}
  ],
  ?LOG(debug, "on_client_connected ~p~n", [ClientInfo]),
  send_http_post("connected", Params),
  ok;

on_client_connected(#{}, _ConnAck, _ConnInfo, _Env) ->
  ok.

其中send_http_post函数便是推送函数,具体代码如下:

send_http_post(Api, Params) ->
  Json = jsx:encode(Params),
  Url = application:get_env(?APP, destination, "http://127.0.0.1"),
  case request(post, {string:concat(Url, Api), [], "application/json", Json}, [{timeout, 5000}], [], 0) of
    {ok, _} ->
      ?LOG(debug, "Post to:[~p], Params:~s success~n", [Url, Json]),
      ok;
    {error, Reason} ->
      ?LOG(error, "HTTP request error: ~p", [Reason]),
      error
  end.

request(Method, Req, HTTPOpts, Opts, Times) ->
  %% Resend request, when TCP closed by remotely
  case httpc:request(Method, Req, HTTPOpts, Opts) of
    {error, socket_closed_remotely} when Times < 3 ->
      timer:sleep(trunc(math:pow(10, Times))),
      request(Method, Req, HTTPOpts, Opts, Times + 1);
    Other -> Other
  end.

其中我们用到了jsx库,这是一个erlang的JSON解析器实现.

Url = application:get_env(?APP, destination, "http://127.0.0.1")

上述代码先从配置文件中获取推送的目标,如果不存在就是http://127.0.0.1,然后把数据post上去.代码看起来比较简单,接下来我们增加配置项.

找到priv目录下的schema文件,最后加入以下代码:

{mapping, "advisory_plugin.destination", "advisory_plugin.destination", [
  {datatype, string}
]}.

这里表示的是从配置文件中映射配置项,下面是配置文件etc目录下的conf文件:

advisory_plugin.destination = http://localhost:端口/data/

到这步,我们的插件就开发完成了,然后来实现WEB端的接口.

3.WEB推送接口实现

我拿SpringMVC为例:

@PostMapping("/connected")
public R connected(@RequestBody @Valid ConnectedMessage message) throws XException {
System.out.println("设备 Clientid is:" + message.getClientid() + " Username is:" + message.getUsername() + " 上线");
}

此时我们就能监听客户端的数据了.

4.插件安装

在这里我使用了编译的形式安装插件.首先我们打开emqx-rel项目,没有的话可以先git克隆下来:

git clone https://github.com/emqx/emqx-rel.git

找到rebar.config文件,加入插件,在deps节点下加入(这里我用了自己的仓库):

{ advisory_plugin, { git , "https://gitee.com/lagrangewang/advisory_plugin.git" , {branch, "master"} } }

然后配置插件加载,在relx节点下加入:

{advisory_plugin,load}

然后执行make.经过漫长的等待以后,不出问题,我们的项目可以编译成功,执行下面的命令:

cd _build/emqx/rel/emqx && ./bin/emqx console

会有如下输出:

image-20191225093324490

此时表示已经成功运行起了EMQX,我们看看插件情况,打开http://localhost:18083/,我们看到插件已经加载到了EMQX里面.

image-20191225093425903

5.总结

本文我们主要讲了EMQX插件的开发流程和一个简单的Demo.WEB接口可自定义去实现功能.