mqtt.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. var mqtt = require("../utils/mqtt.min.js");
  2. var client = null;
  3. var timeout = null;//定时器
  4. var interval = 100;//定时间隔
  5. var publish_flag = false;
  6. var publish_timeout=0;
  7. var subscribe_flag = false;
  8. var subscribe_timeout=0;
  9. var subscribe_multiple_flag = false
  10. var subscribe_multiple_timeout=0;
  11. var client_state = false;//false:offline;true:online
  12. var onMessageArrivedCallBack;//其它函数注册接收MQTT消息回调
  13. var onConnectionSuccessCallBack;//连接MQTT成功回调
  14. var onConnectionLostCallBack;//连接MQTT失败/掉线回调
  15. var host = 'wxs://mnifdv.cn/mqtt'; //地址
  16. const options = {
  17. keepalive: 30, //30s
  18. ssl: true,
  19. clean: true, //cleanSession不保持持久会话
  20. protocolVersion: 4,//MQTT v3.1.1
  21. username: 'yang', //用户名
  22. password: '11223344', //密码
  23. clientId: (+new Date()) + '' + Math.ceil(Math.random() * 1000000000),
  24. reconnectPeriod: 4 * 1000,
  25. connectTimeout: 3 * 1000,
  26. /**遗嘱 */
  27. /*
  28. will: {
  29. topic: 'test',
  30. payload: 'test',
  31. qos: 0,
  32. retain: false
  33. },*/
  34. }
  35. /**
  36. * @brief //连接MQTT成功回调
  37. * @param
  38. * @param None
  39. * @param None
  40. * @retval None
  41. * @example
  42. **/
  43. var SetonConnectionSuccessCallBack = function SetonConnectionSuccessCallBack(fun) {
  44. onConnectionSuccessCallBack = fun;
  45. }
  46. /**
  47. * @brief //连接MQTT失败/掉线回调
  48. * @param
  49. * @param None
  50. * @param None
  51. * @retval None
  52. * @example
  53. **/
  54. var SetonConnectionLostCallBack = function SetonConnectionLostCallBack(fun) {
  55. onConnectionLostCallBack = fun;
  56. }
  57. /**
  58. * @brief //MQTT发送消息,方式1
  59. * @param topic 发布的主题
  60. * @param payload 发布的消息
  61. * @param qos 消息等级
  62. * @param retained 是否需要服务器保留
  63. * @param SuccessFun 发送消息成功回调函数 SuccessFun(topic, payload, qos, retained)
  64. * @retval none
  65. * @example publishTopic("topic","ssssssssss",0,0,SuccessFun); 发布的消息为 "ssssssssss"
  66. **/
  67. var publishTopic = function publishTopic(topic, payload, qos, retained, SuccessFun) {
  68. if (client_state){
  69. let opts = {};
  70. opts.qos = qos;
  71. opts.retain = retained;
  72. publish_flag = true;
  73. // console.log("启动publish_timeout");
  74. // publish_timeout = setTimeout(function () {
  75. // if (client_state) {
  76. // try { client.reconnect(); } catch (e) { }
  77. // client_state = false;
  78. // }
  79. // console.log("publish_timeout");
  80. // }, 3000);
  81. client.publish(topic, payload, opts, function (err,err1) {
  82. console.log("发布消息:" + topic +" "+ payload);
  83. publish_flag = false;
  84. console.log("清除publish_timeout" + err + " " + err1);
  85. if (SuccessFun !=null){
  86. SuccessFun(topic, payload, qos, retained);
  87. }
  88. });
  89. }
  90. }
  91. /**
  92. * @brief //MQTT接收的数据函数回调
  93. * @param
  94. * @param None
  95. * @param None
  96. * @retval None
  97. * @example
  98. **/
  99. var SetonMessageArrivedCallBack = function SetonMessageArrivedCallBack(fun) {
  100. onMessageArrivedCallBack = fun;
  101. }
  102. /**
  103. * @brief //订阅主题
  104. * @param topic 订阅的主题
  105. * @param q 消息等级
  106. * @param SuccessFun 订阅成功回调函数 SuccessFun(e)
  107. * @param FailureFun 订阅失败回调函数 FailureFun(e)
  108. * @retval none
  109. * @example subscribeTopic("1111",0,SuccessFun,FailureFun);
  110. **/
  111. var subscribeTopic = function subscribeTopic(topic, q, SuccessFun, FailureFun) {
  112. if (client_state) {
  113. subscribe_flag = true;
  114. // subscribe_timeout = setTimeout(function () {
  115. // if (client_state){
  116. // try { client.reconnect(); } catch (e) { }
  117. // client_state = false;
  118. // }
  119. // if (FailureFun != null) FailureFun({ topic: topic, qos: q });
  120. // console.log("subscribe_timeout");
  121. // }, 3000);
  122. //解决重复订阅返回订阅失败
  123. client.unsubscribe(topic, function (err, granted) {
  124. //console.log("取消订阅:" + err, granted);
  125. });
  126. client.subscribe(topic, { qos: q }, function (err, granted) {
  127. try {
  128. console.log("订阅:" + err, granted, granted.length);
  129. subscribe_flag = false;
  130. // clearTimeout(subscribe_timeout);
  131. if (granted.length != 0) {
  132. if (SuccessFun != null) SuccessFun(granted[0]);
  133. } else {
  134. if (FailureFun != null) FailureFun({ topic: topic, qos: q });
  135. }
  136. } catch (e) {
  137. }
  138. });
  139. }
  140. }
  141. /**
  142. * @brief //订阅主题
  143. * @param filter { 'topic': { qos: 0 }, 'topic2': { qos: 1 } }
  144. * @param none
  145. * @param SuccessFun 订阅成功回调函数 SuccessFun(e)
  146. * @param FailureFun 订阅失败回调函数 FailureFun(e)
  147. * @retval none
  148. * @example
  149. **/
  150. var subscribeTopicMultiple = function subscribeTopicMultiple(filter, SuccessFun, FailureFun) {
  151. if (client_state) {
  152. // subscribe_multiple_timeout = setTimeout(function () {
  153. // if (client_state) {
  154. // try { client.reconnect(); } catch (e) { }
  155. // client_state = false;
  156. // }
  157. // if (FailureFun != null) FailureFun(filter);
  158. // console.log("subscribe_multiple_timeout");
  159. // }, 200);
  160. let i = 0;
  161. let topic = [];
  162. Object.keys(filter).forEach(function (key) {
  163. //console.log(filter, filter[key]);
  164. topic[i] = filter[key];
  165. i++;
  166. });
  167. //解决重复订阅返回订阅失败
  168. client.unsubscribe(topic, function (err) {
  169. //console.log("取消多重订阅:" + err);
  170. });
  171. subscribe_multiple_flag = true;
  172. client.subscribe(filter, function (err, granted) {
  173. try {
  174. console.log("多重订阅:" + err, granted, granted.length);
  175. subscribe_multiple_flag = false;
  176. // clearTimeout(subscribe_multiple_timeout);
  177. if (granted.length != 0) {
  178. if (SuccessFun != null) SuccessFun(filter);
  179. } else {
  180. if (FailureFun != null) FailureFun(filter);
  181. }
  182. } catch (e) { }
  183. });
  184. }
  185. }
  186. /**
  187. * @brief //取消订阅主题
  188. * @param topic string or ['topic1','topic2',]
  189. * @param
  190. * @param SuccessFun 订阅成功回调函数 SuccessFun(e)
  191. * @retval none
  192. * @example unSubscribeTopic("1111",SuccessFun);
  193. **/
  194. var unSubscribeTopic = function unSubscribeTopic(topic, SuccessFun) {
  195. if (client_state) {
  196. client.unsubscribe(topic, function (err) {
  197. if (SuccessFun != null){
  198. SuccessFun();
  199. }
  200. console.log("取消订阅:" + topic);
  201. });
  202. }
  203. }
  204. /**
  205. * @brief //定时器回调函数,轮训查询是否通信超时
  206. * @param none
  207. * @param none
  208. * @param none
  209. * @retval none
  210. * @example
  211. **/
  212. function timeout_function(param) {
  213. if (publish_flag){
  214. publish_timeout = publish_timeout+1;
  215. if (publish_timeout >= 30){
  216. publish_timeout = 0;
  217. publish_flag = false;
  218. subscribe_flag = false;
  219. subscribe_multiple_flag = false;
  220. if (client_state) {
  221. try { client.reconnect(); } catch (e) { }
  222. client_state = false;
  223. }
  224. }
  225. }
  226. else {
  227. publish_timeout = 0;
  228. }
  229. if (subscribe_flag) {
  230. subscribe_timeout = subscribe_timeout+1;
  231. if (subscribe_timeout>30){
  232. subscribe_timeout=0;
  233. publish_flag = false;
  234. subscribe_flag = false;
  235. subscribe_multiple_flag = false;
  236. if (client_state) {
  237. try { client.reconnect(); } catch (e) { }
  238. client_state = false;
  239. }
  240. }
  241. }
  242. else{
  243. subscribe_timeout=0;
  244. }
  245. if (subscribe_multiple_flag){
  246. subscribe_multiple_timeout = subscribe_multiple_timeout+1;
  247. if (subscribe_multiple_timeout>30){
  248. subscribe_multiple_timeout = 0;
  249. publish_flag = false;
  250. subscribe_flag = false;
  251. subscribe_multiple_flag = false;
  252. if (client_state) {
  253. try { client.reconnect(); } catch (e) { }
  254. client_state = false;
  255. }
  256. }
  257. }
  258. else{
  259. subscribe_multiple_timeout=0;
  260. }
  261. }
  262. /**
  263. * @brief 控制连接MQTT函数
  264. * @param
  265. * @param None
  266. * @param None
  267. * @retval None
  268. * @example
  269. **/
  270. var ConnectMqtt = function ConnectMqtt() {//链接MQTT
  271. console.log(options);
  272. try { client.end(); } catch (e) { }
  273. client = mqtt.connect(host, options)
  274. client.on('connect', function () {
  275. client_state = true;
  276. console.log("connect");
  277. if (timeout != null)
  278. {
  279. clearInterval(timeout);
  280. }
  281. timeout = setInterval(timeout_function, interval, null);
  282. if (onConnectionSuccessCallBack != null) {//如果回调函数不是空
  283. onConnectionSuccessCallBack();//执行回调函数
  284. }
  285. });
  286. client.on('message', function (topic, message) {
  287. let args = {};
  288. args.destinationName = topic;
  289. args.payloadString = message;
  290. console.log(args);
  291. if (onMessageArrivedCallBack != null)//如果回调函数不是空
  292. {
  293. onMessageArrivedCallBack(args);//执行回调函数
  294. }
  295. })
  296. client.on('close', function () {
  297. console.log("close");
  298. if (onConnectionLostCallBack != null) //如果回调函数不是空
  299. {
  300. onConnectionLostCallBack("close");//执行回调函数
  301. }
  302. });
  303. client.on('disconnect', function () {
  304. console.log("disconnect");
  305. if (onConnectionLostCallBack != null) //如果回调函数不是空
  306. {
  307. onConnectionLostCallBack("disconnect");//执行回调函数
  308. }
  309. });
  310. client.on('reconnect', function () {
  311. console.log("reconnect");
  312. if (onConnectionLostCallBack != null) //如果回调函数不是空
  313. {
  314. onConnectionLostCallBack("reconnect");//执行回调函数
  315. }
  316. });
  317. client.on('offline', function () {
  318. console.log("offline");
  319. if (onConnectionLostCallBack != null) //如果回调函数不是空
  320. {
  321. onConnectionLostCallBack("offline");//执行回调函数
  322. }
  323. });
  324. client.on('error', function () {
  325. console.log("error");
  326. if (onConnectionLostCallBack != null) //如果回调函数不是空
  327. {
  328. onConnectionLostCallBack("error");//执行回调函数
  329. }
  330. });
  331. }
  332. /**
  333. * @brief 启动网络状态监听(网络改变控制重连)
  334. * @param
  335. * @param None
  336. * @param None
  337. * @retval None
  338. * @example
  339. **/
  340. wx.onNetworkStatusChange(function (res) {
  341. ConnectMqtt();
  342. if (res.networkType == "none") console.log("无网络");
  343. else console.log("网络类型:" + res.networkType);
  344. })
  345. module.exports = {
  346. ConnectMqtt: ConnectMqtt,//控制连接MQTT
  347. SetonConnectionSuccessCallBack: SetonConnectionSuccessCallBack,//连接上回调
  348. SetonConnectionLostCallBack: SetonConnectionLostCallBack,//连接失败/掉线回调
  349. SetonMessageArrivedCallBack: SetonMessageArrivedCallBack,//接收到消息回调
  350. publishTopic: publishTopic,//发布消息方式1
  351. subscribeTopic: subscribeTopic,//订阅主题
  352. unSubscribeTopic: unSubscribeTopic,
  353. subscribeTopicMultiple: subscribeTopicMultiple,//多重订阅主题
  354. }