本文最后更新于:June 30, 2023 pm

本文作者:[wangwenhai] # 概要:本文主要展示一个EMQX的点对点通信插件的开发流程。

1. 背景概述

近期在做一个小玩具,使用MQTT协议通信,有个功能:直接实现A客户端直接给B客户端发送消息。常规做法是B订阅一个全局唯一Topic,然后A给这个Topic发送消息,但是我觉得这样操作比较麻烦。于是写了个插件实现了直接发送消息。

2. 相关代码

关于Erlang的基础知识和EMQX的二次开发教程之前的文章已经写了,请没基础的可以先去翻翻历史文章,我这里直接贴关键代码。

1.核心插件

-module(ezlinker_p2p_plugin).

-include("ezlinker_p2p_plugin.hrl").

-include_lib("emqx/include/emqx.hrl").

-export([load/0, register_metrics/0, unload/0]).


-export([on_message_publish/2]).
-export([on_client_subscribe/4]).

-define(LOG(Level, Format, Args),
	emqx_logger:Level("ezlinker_p2p_plugin: " ++ Format,
			  Args)).

register_metrics() ->
    [emqx_metrics:new(MetricName) || MetricName <- [
				'ezlinker_p2p_plugin.message_publish',
				'ezlinker_p2p_plugin.client_subscribe'
			]
	].

load() ->
    lists:foreach(fun ({Hook, Fun, Filter}) ->
			  load_(Hook, binary_to_atom(Fun, utf8), {Filter})
		  end,
		  parse_rule(application:get_env(?APP, hooks, []))).


unload() ->
    lists:foreach(fun ({Hook, Fun, _Filter}) ->
			  unload_(Hook, binary_to_atom(Fun, utf8))
		  end,
		  parse_rule(application:get_env(?APP, hooks, []))).
load_(Hook, Fun, Params) ->
	case Hook of
		'message.publish'  -> 
			emqx:hook(Hook, fun ?MODULE:Fun/2, [Params]);
		'client.subscribe' -> 
			emqx:hook(Hook, fun ?MODULE:Fun/4, [Params])
	end.

unload_(Hook, Fun) ->
	case Hook of
		'message.publish'  -> 
			emqx:unhook(Hook, fun ?MODULE:Fun/2);
		'client.subscribe' -> 
			emqx:hook(Hook, fun ?MODULE:Fun/4)
	end.

%%--------------------------------------------------------------------

on_message_publish(Message = #message{topic =  <<"$SYS/", _/binary>>}, _Env) ->
	{ok, Message};
%%
on_message_publish(Message = #message{headers= Headers, topic =  <<"$p2p/", Path/binary>>,qos = QOS , payload = Payload ,from = From}, _Env) ->
    case Path of 
		<<>> ->
			io:format("P2P Message is empty,will be ignored ~n"),
			{stop, Message#message{headers = Headers#{allow_publish => false}} };
		PeerClientId ->
			io:format("P2P Message:~p to ~p QOS is:~p ~n",[ Payload , PeerClientId , QOS ] ),
			case  ets:lookup(emqx_channel, PeerClientId) of

				[{_,ChannelPid}] ->
						P2PMessage = emqx_message:make( From, QOS, <<"$p2p/", PeerClientId/binary >> , Payload),
			            ChannelPid ! {deliver, <<"$p2p/", PeerClientId/binary >>, P2PMessage},
						io:format("P2PMessage is :~p ~n", [P2PMessage]),
						{ok, Message};
				[]-> 
					io:format("PeerClientId mappinged channel pid :~p is not exist ~n",[PeerClientId]),
			{stop, Message#message{headers = Headers#{allow_publish => false}} }
		    end
	end;
			
on_message_publish(Message = #message{topic = Topic}, {Filter}) ->
		with_filter(
		  fun() ->
			emqx_metrics:inc('ezlinker_p2p_plugin.message_publish'),
			%% Begin
			%% End
			{ok, Message}
		  end, Message, Topic, Filter).
%%--------------------------------------------------------------------
%% Client subscribe
%%--------------------------------------------------------------------
on_client_subscribe(#{clientid := _C, username := _U}, _P, RawTopicFilters, {_F}) ->
  %% [{Topic,_OP}] = RawTopicFilters
  lists:foreach(fun({Topic, _OP}) ->
        emqx_metrics:inc('ezlinker_p2p_plugin.client_subscribe'),
        %% Code Start
		io:format("Client sub topic:~p~n",[Topic]),
		case  string_start_with(Topic,"$p2p/") of
			false ->
				io:format("Client nomatch p2p topic~n"),
				ok;
			true ->
				io:format("Client match p2p topic ,deny~n"),
				{stop, deny}
		end
	end, RawTopicFilters).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
parse_rule(Rules) -> parse_rule(Rules, []).

parse_rule([], Acc) -> lists:reverse(Acc);
parse_rule([{Rule, Conf} | Rules], Acc) ->
    Params = emqx_json:decode(iolist_to_binary(Conf)),
    Action = proplists:get_value(<<"action">>, Params),
    Filter = proplists:get_value(<<"topic">>, Params),
    parse_rule(Rules,
	       [{list_to_atom(Rule), Action, Filter} | Acc]).

with_filter(Fun, _, undefined) ->
Fun(), ok;
with_filter(Fun, Topic, Filter) ->
case emqx_topic:match(Topic, Filter) of
	true -> Fun(), ok;
	false -> ok
end.

with_filter(Fun, _, _, undefined) -> Fun();
with_filter(Fun, Msg, Topic, Filter) ->
    case emqx_topic:match(Topic, Filter) of
      true -> Fun();
      false -> {ok, Msg}
	end.
	

%% start with
string_start_with(String,SubString)->
	case  string:prefix(String,SubString) of
		nomatch ->
			false;
		_ ->
			true
	end.

2.配置文件

ezlinker_p2p_plugin.hook.message.publish.1      = {"action": "on_message_publish", "topic": "#"}
ezlinker_p2p_plugin.hook.client.subscribe.1     = {"action": "on_client_subscribe", "topic": "#"}

3. 使用方法

p2p不是搞传销的那个p2p,含义是:point to point.

本插件为EMQX新增一个内置topic:$p2p/{client_id},客户端需要订对点通信的时候,只需要 给topic下客户端id为topic的客户端单独发送一条数据

topic:$p2p/{client_id}
payload:数据内容

其中{client_id}就是要发送的对端的clientid.如果是python客户端,A客户端给B发送数据,最简单的代码描述应该是这样:

client.publish('$p2p/client1', json.dumps({a:1,b:2}, ensure_ascii=False))

注意:{clientId}为空的时候不发送任何数据,也不会返回任何数据

这里给出一个简单的客户端方便测试:

import paho.mqtt.client as mqtt
import json
# 连接成功以后打印
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

# 注意 我们没有给客户端订阅任何Topic,就可以收的到消息
def on_message(client, userdata, msg):
    print(msg.topic+" " + ":" + str(msg.payload))

client = mqtt.Client("client1")
client.username_pw_set("username", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect("10.168.1.159",1883, 60)
client.loop_forever()

3.注意事项

  • client_id 不可空;

  • 不需要订阅 $p2p/,不然会收到2条消息。

  • MQTT标准协议规定:如果两个客户端之间要通信,必须订阅相同的topic。但是这种做法不适合单个客户端之间的直接通信。为了解决类似点对点通信的需求,写了一个简单的插件来实现这个功能。本插件仅仅是为了增强EMQX的功能,不是Mqtt协议的规范,请多大家注意。

    4. 使用场景

    任何需要单点通信的地方,比如游戏开发,即时聊天,及时控制等等