本文最后更新于:June 30, 2023 pm
本文作者:[wangwenhai] # 概要:本文主要展示一个EMQX的点对点通信插件的开发流程。
1. 背景概述
近期在做一个小玩具,使用MQTT协议通信,有个功能:直接实现A客户端直接给B客户端发送消息。常规做法是B订阅一个全局唯一Topic,然后A给这个Topic发送消息,但是我觉得这样操作比较麻烦。于是写了个插件实现了直接发送消息。
2. 相关代码
关于Erlang的基础知识和EMQX的二次开发教程之前的文章已经写了,请没基础的可以先去翻翻历史文章,我这里直接贴关键代码。
1.核心插件
-module(ezlinker_p2p_plugin).
-include("ezlinker_p2p_plugin.hrl").
-include_lib("emqx/include/emqx.hrl").
-export([load/0, register_metrics/0, unload/0]).
-export([on_message_publish/2]).
-export([on_client_subscribe/4]).
-define(LOG(Level, Format, Args),
emqx_logger:Level("ezlinker_p2p_plugin: " ++ Format,
Args)).
register_metrics() ->
[emqx_metrics:new(MetricName) || MetricName <- [
'ezlinker_p2p_plugin.message_publish',
'ezlinker_p2p_plugin.client_subscribe'
]
].
load() ->
lists:foreach(fun ({Hook, Fun, Filter}) ->
load_(Hook, binary_to_atom(Fun, utf8), {Filter})
end,
parse_rule(application:get_env(?APP, hooks, []))).
unload() ->
lists:foreach(fun ({Hook, Fun, _Filter}) ->
unload_(Hook, binary_to_atom(Fun, utf8))
end,
parse_rule(application:get_env(?APP, hooks, []))).
load_(Hook, Fun, Params) ->
case Hook of
'message.publish' ->
emqx:hook(Hook, fun ?MODULE:Fun/2, [Params]);
'client.subscribe' ->
emqx:hook(Hook, fun ?MODULE:Fun/4, [Params])
end.
unload_(Hook, Fun) ->
case Hook of
'message.publish' ->
emqx:unhook(Hook, fun ?MODULE:Fun/2);
'client.subscribe' ->
emqx:hook(Hook, fun ?MODULE:Fun/4)
end.
%%--------------------------------------------------------------------
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
%%
on_message_publish(Message = #message{headers= Headers, topic = <<"$p2p/", Path/binary>>,qos = QOS , payload = Payload ,from = From}, _Env) ->
case Path of
<<>> ->
io:format("P2P Message is empty,will be ignored ~n"),
{stop, Message#message{headers = Headers#{allow_publish => false}} };
PeerClientId ->
io:format("P2P Message:~p to ~p QOS is:~p ~n",[ Payload , PeerClientId , QOS ] ),
case ets:lookup(emqx_channel, PeerClientId) of
[{_,ChannelPid}] ->
P2PMessage = emqx_message:make( From, QOS, <<"$p2p/", PeerClientId/binary >> , Payload),
ChannelPid ! {deliver, <<"$p2p/", PeerClientId/binary >>, P2PMessage},
io:format("P2PMessage is :~p ~n", [P2PMessage]),
{ok, Message};
[]->
io:format("PeerClientId mappinged channel pid :~p is not exist ~n",[PeerClientId]),
{stop, Message#message{headers = Headers#{allow_publish => false}} }
end
end;
on_message_publish(Message = #message{topic = Topic}, {Filter}) ->
with_filter(
fun() ->
emqx_metrics:inc('ezlinker_p2p_plugin.message_publish'),
%% Begin
%% End
{ok, Message}
end, Message, Topic, Filter).
%%--------------------------------------------------------------------
%% Client subscribe
%%--------------------------------------------------------------------
on_client_subscribe(#{clientid := _C, username := _U}, _P, RawTopicFilters, {_F}) ->
%% [{Topic,_OP}] = RawTopicFilters
lists:foreach(fun({Topic, _OP}) ->
emqx_metrics:inc('ezlinker_p2p_plugin.client_subscribe'),
%% Code Start
io:format("Client sub topic:~p~n",[Topic]),
case string_start_with(Topic,"$p2p/") of
false ->
io:format("Client nomatch p2p topic~n"),
ok;
true ->
io:format("Client match p2p topic ,deny~n"),
{stop, deny}
end
end, RawTopicFilters).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
parse_rule(Rules) -> parse_rule(Rules, []).
parse_rule([], Acc) -> lists:reverse(Acc);
parse_rule([{Rule, Conf} | Rules], Acc) ->
Params = emqx_json:decode(iolist_to_binary(Conf)),
Action = proplists:get_value(<<"action">>, Params),
Filter = proplists:get_value(<<"topic">>, Params),
parse_rule(Rules,
[{list_to_atom(Rule), Action, Filter} | Acc]).
with_filter(Fun, _, undefined) ->
Fun(), ok;
with_filter(Fun, Topic, Filter) ->
case emqx_topic:match(Topic, Filter) of
true -> Fun(), ok;
false -> ok
end.
with_filter(Fun, _, _, undefined) -> Fun();
with_filter(Fun, Msg, Topic, Filter) ->
case emqx_topic:match(Topic, Filter) of
true -> Fun();
false -> {ok, Msg}
end.
%% start with
string_start_with(String,SubString)->
case string:prefix(String,SubString) of
nomatch ->
false;
_ ->
true
end.
2.配置文件
ezlinker_p2p_plugin.hook.message.publish.1 = {"action": "on_message_publish", "topic": "#"}
ezlinker_p2p_plugin.hook.client.subscribe.1 = {"action": "on_client_subscribe", "topic": "#"}
3. 使用方法
p2p不是搞传销的那个p2p,含义是:point to point.
本插件为EMQX新增一个内置topic:$p2p/{client_id}
,客户端需要订对点通信的时候,只需要 给topic下客户端id为topic的客户端单独发送一条数据
topic:$p2p/{client_id}
payload:数据内容
其中{client_id}
就是要发送的对端的clientid.如果是python客户端,A客户端给B发送数据,最简单的代码描述应该是这样:
client.publish('$p2p/client1', json.dumps({a:1,b:2}, ensure_ascii=False))
注意:
{clientId}
为空的时候不发送任何数据,也不会返回任何数据
这里给出一个简单的客户端方便测试:
import paho.mqtt.client as mqtt
import json
# 连接成功以后打印
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# 注意 我们没有给客户端订阅任何Topic,就可以收的到消息
def on_message(client, userdata, msg):
print(msg.topic+" " + ":" + str(msg.payload))
client = mqtt.Client("client1")
client.username_pw_set("username", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect("10.168.1.159",1883, 60)
client.loop_forever()
3.注意事项
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!