外观
Mqtt SpringStarter
约 1139 字大约 4 分钟
springmqtt
2022-06-07
⚠ 请注意,本文编写于 1122 天前,最后最后修改于 1122 天前,其中某些信息可能已经过时。
最近有个项目需要发送视频弹幕,实现的手段还是不少的,这里用Mqtt来实现,使用技术如下:
- Spring Cloud
- Spring Boot
- JDK 8
准备
首先是环境准备,这里使用 EMQX 5.0 社区版作为Mqtt
的消息服务器,使用Docker
进行搭建:
下附 docker-compose
:
version: '3.2'
services:
emqx:
image: emqx/emqx:v5.0.0
container_name: emqx
environment:
EMQX_NAME: xinwei-emqx
restart: always
ports:
- '1883:1883'
- '8081:8081'
- '8083:8083'
- '8883:8883'
- '8084:8084'
- '8099:8080'
- '18083:18083'
volumes:
- /service/env/emqx/data:/opt/emqx/data
- /service/env/emqx/etc:/opt/emqx/etc
- /service/env/emqx/log:/opt/emqx/log
封装
配置依赖
maven
加入相关依赖,具体版本可以由项目的spring-boot
版本决定。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置类
注
本文不对mqtt
进行鉴权,需要的可以自行拓展
基础属性配置
@Data @ConfigurationProperties(prefix = "spring.mqtt") @RefreshScope public class MqttProperties { private String url; private AuthTypeEnum authType = AuthTypeEnum.NONE; private String username; private String password; private String wsUrl; private String client; private Topic defaultSendTopic = new Topic("xx_mqtt_default", 1); private List<Topic> receiveTopics; private Integer keepAlive = 15; private Integer completionTimeout = 3000; /** * Mqtt 权限认证类型枚举 */ @NoArgsConstructor public enum AuthTypeEnum { /** * 无 */ NONE, /** * 客户端id */ CLIENT_ID, /** * 用户名 */ USERNAME } @Data @NoArgsConstructor @AllArgsConstructor public static class Topic { private String name; private Integer qos; } }
注入配置
@AutoConfiguration @EnableConfigurationProperties(MqttProperties.class) public class MqttConfiguration { @Bean public MessageChannel mqttValueInputChannel() { return new DirectChannel(); } /** * 不缓存消息 * * @return 消息通道 */ @Bean public MessageChannel mqttOutputClearSessionChannel() { return new DirectChannel(); } /** * 缓存消息 * * @return 消息通道 */ @Bean public MessageChannel mqttOutputRetainSessionChannel() { return new DirectChannel(); } @Bean public MessageProducer valueInbound(MqttProperties mqttProperties) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttProperties.getClient() + StrPool.DASHED + "receiver", mqttClearSessionClientFactory(mqttProperties), mqttProperties.getReceiveTopics().stream() .map(MqttProperties.Topic::getName) .toArray(String[]::new)); adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout()); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(mqttProperties.getReceiveTopics().stream() .mapToInt(MqttProperties.Topic::getQos) .toArray()); adapter.setOutputChannel(mqttValueInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttOutputClearSessionChannel") public MessageHandler clearSessionOutbound(MqttProperties mqttProperties) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClient() + StrPool.DASHED + "clear-session-sender", mqttClearSessionClientFactory(mqttProperties)); messageHandler.setAsync(true); messageHandler.setDefaultQos(mqttProperties.getDefaultSendTopic().getQos()); messageHandler.setDefaultTopic(mqttProperties.getDefaultSendTopic().getName()); return messageHandler; } @Bean public MqttPahoClientFactory mqttClearSessionClientFactory(MqttProperties mqttProperties) { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttClearSessionConnectOptions(mqttProperties)); return factory; } @Bean public MqttConnectOptions getMqttClearSessionConnectOptions(MqttProperties mqttProperties) { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); if (mqttProperties.getAuthType().equals(MqttProperties.AuthTypeEnum.USERNAME)) { mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); } mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()}); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive()); return mqttConnectOptions; } @Bean @ServiceActivator(inputChannel = "mqttOutputRetainSessionChannel") public MessageHandler retainSessionOutbound(MqttProperties mqttProperties) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClient() + StrPool.DASHED + "retain-session-sender", mqttRetainSessionClientFactory(mqttProperties)); messageHandler.setAsync(true); messageHandler.setDefaultQos(mqttProperties.getDefaultSendTopic().getQos()); messageHandler.setDefaultTopic(mqttProperties.getDefaultSendTopic().getName()); return messageHandler; } @Bean public MqttPahoClientFactory mqttRetainSessionClientFactory(MqttProperties mqttProperties) { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttRetainSessionConnectOptions(mqttProperties)); return factory; } @Bean public MqttConnectOptions getMqttRetainSessionConnectOptions(MqttProperties mqttProperties) { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); if (mqttProperties.getAuthType().equals(MqttProperties.AuthTypeEnum.USERNAME)) { mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); } mqttConnectOptions.setCleanSession(false); mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()}); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive()); return mqttConnectOptions; } }
自动注入配置
/** * 消息接收 */ @Slf4j @AutoConfiguration(after = MqttConfiguration.class) public class MqttReceiveConfiguration { @Bean @ServiceActivator(inputChannel = "mqttValueInputChannel") public MessageHandler handlerValue() { return message -> { MessageHeaders headers = message.getHeaders(); //获取消息Topic String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); log.info("获取到的消息的topic :{} ", receivedTopic); //获取消息体 String payload = (String) message.getPayload(); log.info("获取到的消息的payload :{} ", payload); }; } }
imports
resource
META-INF
spring
org.springframework.boot.autoconfigure.AutoConfiguration.imports
配置自动装配路径
com.xxx.cloud.mqtt.config.MqttConfiguration com.xxx.cloud.mqtt.config.MqttReceiveConfiguration
消息发送设置
清除session
@MessagingGateway(defaultRequestChannel = "mqttOutputClearSessionChannel")
public interface MqttClearSessionSendHandler {
/**
* 使用 Default Topic & Default Qos 发送数据
*
* @param data string
*/
void sendToMqtt(String data);
/**
* 使用 Default Topic & 自定义 Qos 发送数据
*
* @param qos 自定义 Qos
* @param data string
*/
void sendToMqtt(@Header(MqttHeaders.QOS) Integer qos, String data);
/**
* 使用 自定义 Topic & Default Qos 发送数据
*
* @param topic 自定义 Topic
* @param data string
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
/**
* 使用 自定义 Topic & 自定义 Qos 发送数据
*
* @param topic 自定义 Topic
* @param qos 自定义 Qos
* @param data string
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data);
/**
* 使用 自定义 Topic & 自定义 Qos 发送数据
*
* @param topic 自定义 Topic
* @param qos 自定义 Qos
* @param payload 自定义负载
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
不清除session
@MessagingGateway(defaultRequestChannel = "mqttOutputRetainSessionChannel")
public interface MqttRetainSessionSendHandler {
/**
* 使用 Default Topic & Default Qos 发送数据
*
* @param data string
*/
void sendToMqtt(String data);
/**
* 使用 Default Topic & 自定义 Qos 发送数据
*
* @param qos 自定义 Qos
* @param data string
*/
void sendToMqtt(@Header(MqttHeaders.QOS) Integer qos, String data);
/**
* 使用 自定义 Topic & Default Qos 发送数据
*
* @param topic 自定义 Topic
* @param data string
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
/**
* 使用 自定义 Topic & 自定义 Qos 发送数据
*
* @param topic 自定义 Topic
* @param qos 自定义 Qos
* @param data string
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data);
/**
* 使用 自定义 Topic & 自定义 Qos 发送数据
*
* @param topic 自定义 Topic
* @param qos 自定义 Qos
* @param payload 自定义负载
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}
使用
- 引入上述封装的
starter
依赖 - 配置信息
spring: mqtt: url: tcp://xxx.xxx.com:1883 ws-url: ws://xxx.xxx.com:8083 auth-type: NONE username: admin password: public # ca-crt: classpath:/certs/broker.emqx.io-ca.crt client: ${spring.application.name} receive-topics: - qos: 1 name: driver/${spring.application.name}/device/+ - qos: 1 name: driver/${spring.application.name}/gateway/+ default-send-topic: qos: 1 name: default/${spring.application.name} keep-alive: 15 completion-timeout: 3000
- 注入后调用方法即可发送
@Resource private MqttClearSessionSendHandler sendHandler;