本文最后更新于:June 30, 2023 pm
本文作者:[gerry] # 概要:先前搜了一些关于reactive websocket的示例都是单个session收到消息后又发送给自己,例如:session.send(session.receive()),这种形式和单次http的作用无差别,
@[TOC](Spring webflux使用websocket实现单播和广播)
- 写这个配置类的原因:
先前搜了一些关于reactive websocket的示例都是单个session收到消息后又发送给自己,例如:session.send(session.receive()),这种形式和单次http的作用无差别,并没有体现双向通讯。Spring boot又还未支持像1.*版本对stomp协议的支持,直接使用SimpMessagingTemplate就能点对点单播和广播。然后在GitHub上找到一个靠谱的例子,而且真的是只有一个,资料太少,参考后先实现了进行单播和广播,再加上对响应式编程渐渐理解,最终形成这个配置代码。
使用 Spring boot 2.1.2.release,该版本的Spring boot还未对reactive websocket进行自动装配,需要开发人员手动配置,代码地址供参考:
https://github.com/Gerry-Pan/Live/blob/master/src/main/java/cn/com/pan/live/config/WebSocketConfiguration.java
- 注册HandlerMapping和WebSocketHandlerAdapter
Spring boot启动时会扫码BeanFactory中的org.springframework.web.reactive.HandlerMapping类型的Bean,给websocket增加映射操作类:
@Bean
public HandlerMapping handlerMapping(WebSocketHandler webSocketHandler) {
Map<String, WebSocketHandler> map = new HashMap<String, WebSocketHandler>();
map.put(webSocketHandlerPath, webSocketHandler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(10);
return mapping;
}
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
- 注册Websocket握手时的处理类WebSocketHandler
该方法中定义了接收和发送消息:
@Bean
public WebSocketHandler webSocketHandler() {
UnicastProcessor<String> messageProcessor = this.messageProcessor();
Flux<String> messages = messageProcessor.replay(0).autoConnect();
Flux<String> outputMessages = Flux.from(messages);
return (session) -> {
session.receive().map(WebSocketMessage::getPayloadAsText).subscribe(messageProcessor::onNext,
messageProcessor::onError, messageProcessor::onComplete);
return session.getHandshakeInfo().getPrincipal().flatMap((p) -> {
session.getAttributes().put("username", p.getName());
return session.send(outputMessages.filter((payload) -> this.filterUser(session, payload))
.map((payload) -> this.generateMessage(session, payload)));
}).switchIfEmpty(Mono.defer(() -> {
return Mono.error(new BadCredentialsException("Bad Credentials."));
})).then();
};
}
UnicastProcessor订阅了org.springframework.web.reactive.socket.WebSocketSession.receive()的数据流,session接收消息后将消息传给UnicastProcessor,而session.send订阅UnicastProcessor,当UnicastProcessor中有数据进入,会触发session.send。而且同理用在reactor kafka上。除了UnicastProcessor,还有几个Processor,比如EmitterProcessor,TopicProcessor,ReplyProcessor,DirectProcessor等等等等,感兴趣的童鞋就去研究一下。
- 根据消息过滤用户
有多少用户开启websocket连接进来就存在多少session实例,session.send就会被触发多少次,但是消息只发给对应的用户,所以需要筛选,如何筛选是可以自定义,比如根据payload中的字段指定的用户名筛选。filterUser方法返回true则是表示该消息是发送给此session,会继续调用generateMessage方法,false则不是,generateMessage方法不被调用,例如:
protected boolean filterUser(WebSocketSession session, String payload) {
System.out.println(session.getAttributes());
System.out.println("filterUser-----" + Thread.currentThread().getName());
return true;
}
当然这种方式不够科学,本人会再研究如何实现对应的session.send被触发,而不是所有的被触发再进行过滤。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!