详细信息: Mqtt server IMqttMessageListener Bean not found
解决方案:IMqttMessageListener 为业务处理,必须要实现的接口。实现该接口并注册成 Spring Bean 即可。
-Dtio.default.read.buffer.size=1048576
(1M,也就是最大支持 10M 消息体,请按需设置)。useQueueDecode(true)
1.3.7 会默认成 true。不过业务一直处理不赢还是会照成更严重的问题。最后队列占满导致 jvm 内存溢出。建议集群并对接 kafka、rocketmq 等。存在此问题的版本:1.0.0
、1.0.0-RC
、1.0.3
、1.0.4
该问题主要是 jar 编译问题,由于 JDK9+ 改了 ByteBuffer 部分返回值的类型,导致 java9+ 下编译的 jar 在 java8 下运行会有问题。如果遇到此问题,请立刻反馈。
IMqttServerUniqueIdService
(1.1.4开始支持) 接口,返回的 uniqueId
会替代 clientId,后续的场景也是需要使用这个 uniqueId
来处理。nginx tcp 负载均衡
即可:stream {
upstream stream_backend {
zone tcp_servers 64k;
hash $remote_addr;
server 192.168.0.2:1883 max_fails=2 fail_timeout=30s;
server 192.168.0.3:1883 max_fails=2 fail_timeout=30s;
}
server {
listen 8883 ssl;
status_zone tcp_server;
proxy_pass stream_backend;
proxy_buffer_size 4k;
proxy_protocol on; # 转发源ip信息, mica-mqtt 开源版不支持,私服版已经支持,可捐助获取
ssl_handshake_timeout 15s;
ssl_certificate /etc/emqx/certs/cert.pem;
ssl_certificate_key /etc/emqx/certs/key.pem;
}
}
mica-mqtt 1.1.2 版本开始添加了 mica-mqtt-broker
模块,采用 redis pub/sub 实现集群,有需求的朋友可以参考。
snapshots 版本会及时响应修复最新的 bug 和需求。
SNAPSHOT 版本使用参考这里:https://www.dreamlu.net/mica2x/#%E4%BD%BF%E7%94%A8-snapshots
腾讯云、阿里云等提供有 jks 证书,直接申请下载,记住申请时的密码:
代码中 .useSsl("classpath:xxx.jks", "classpath:xxx.jks", "密码")
即可
server-cert.pem
、server-key.pem
在线转换成 jks 证书(注意,第一步1生成的时候它是没有设置私钥密码,这里不用设置,新文件密码就是 mqtt server 中要用的密码):https://myssl.com/cert_convert.html
.useSsl("classpath:xxx.jks", "classpath:xxx.jks", "密码")
开启 ssl。更多教程:openssl自签名证书教程(单域名证书/泛域名证书/多域名证书)详见:https://www.orcy.net.cn/340.html
详见: Linux 操作系统参数和TCP 协议栈网络参数章节
/**
* 客户端连接状态监听
*
* @author L.cm
*/
@Service
public class MqttClientConnectListener implements IMqttClientConnectListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);
@Autowired
private ApplicationContext applicationContext;
@Override
public void onConnected(ChannelContext context, boolean isReconnect) {
if (isReconnect) {
logger.info("重连 mqtt 服务器重连成功...");
} else {
logger.info("连接 mqtt 服务器成功...");
}
}
@Override
public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
logger.error("mqtt 链接断开 remark:{} isRemove:{}", remark, isRemove, throwable);
// 在断线时更新 clientId、username、password
MqttClientCreator mqttClientCreator = applicationContext.getBean(MqttClientCreator.class);
mqttClientCreator
.clientId("newClient" + System.currentTimeMillis())
.username("newUserName")
.password("newPassword");
}
}
科普:浏览器只能走 websocket mqtt 子协议,对应 mica-mqtt 8083 端口。
连错端口会报异常,如下:
org.tio.core.exception.TioDecodeException: java.lang.IllegalArgumentException: invalid QoS: 3
at net.dreamlu.iot.mqtt.codec.MqttDecoder.doDecode(MqttDecoder.java:67)
mqtt.js websocket 示例:
const clientId = 'mqttjs_' + Math.random().toString(16).substr(2, 8)
const host = 'ws://mqtt.dreamlu.net:8083/mqtt'
const options = {
keepalive: 60,
clientId: clientId,
username: 'mqtt登录用户名',
password: 'mqtt登录密码',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
will: {
topic: 'WillMsg',
payload: 'Connection Closed abnormally..!',
qos: 0,
retain: false
},
}
console.log('Connecting mqtt client')
const client = mqtt.connect(host, options)
client.on('error', (err) => {
console.log('Connection error: ', err)
client.end()
})
client.on('reconnect', () => {
console.log('Reconnecting...')
})
拔网线等非正常断开需要一个心跳检测周期才会触发断开。
Failed to instantiate [net.dreamlu.iot.mqtt.core.server.MqttServer]: Factory method 'mqttServer' threw
exception; nested exception is java.lang.NoClassDefFoundError:
com/github/benmanes/caffeine/cache/RemovalListener
解决方案: pom 中将 mqtt server 依赖放 mqtt client 前面。
登录 后才可以发表评论