fec8de6a238c02d25782be29185bae0fd686d0dc8d4b4c9aa88a0598042d44b64ca43946c89c554ffa60ba2ae0dd3069c634cc88bd1f12742cd5fd96e1bd37 53 KB


  1. 'use strict'
  2. /**
  3. * Module dependencies
  4. */
  5. const EventEmitter = require('events').EventEmitter
  6. const Store = require('./store')
  7. const TopicAliasRecv = require('./topic-alias-recv')
  8. const TopicAliasSend = require('./topic-alias-send')
  9. const mqttPacket = require('mqtt-packet')
  10. const DefaultMessageIdProvider = require('./default-message-id-provider')
  11. const Writable = require('readable-stream').Writable
  12. const inherits = require('inherits')
  13. const reInterval = require('reinterval')
  14. const clone = require('rfdc/default')
  15. const validations = require('./validations')
  16. const xtend = require('xtend')
  17. const debug = require('debug')('mqttjs:client')
  18. const nextTick = process ? process.nextTick : function (callback) { setTimeout(callback, 0) }
  19. const setImmediate = global.setImmediate || function (callback) {
  20. // works in node v0.8
  21. nextTick(callback)
  22. }
  23. const defaultConnectOptions = {
  24. keepalive: 60,
  25. reschedulePings: true,
  26. protocolId: 'MQTT',
  27. protocolVersion: 4,
  28. reconnectPeriod: 1000,
  29. connectTimeout: 30 * 1000,
  30. clean: true,
  31. resubscribe: true
  32. }
  33. const errors = {
  34. 0: '',
  35. 1: 'Unacceptable protocol version',
  36. 2: 'Identifier rejected',
  37. 3: 'Server unavailable',
  38. 4: 'Bad username or password',
  39. 5: 'Not authorized',
  40. 16: 'No matching subscribers',
  41. 17: 'No subscription existed',
  42. 128: 'Unspecified error',
  43. 129: 'Malformed Packet',
  44. 130: 'Protocol Error',
  45. 131: 'Implementation specific error',
  46. 132: 'Unsupported Protocol Version',
  47. 133: 'Client Identifier not valid',
  48. 134: 'Bad User Name or Password',
  49. 135: 'Not authorized',
  50. 136: 'Server unavailable',
  51. 137: 'Server busy',
  52. 138: 'Banned',
  53. 139: 'Server shutting down',
  54. 140: 'Bad authentication method',
  55. 141: 'Keep Alive timeout',
  56. 142: 'Session taken over',
  57. 143: 'Topic Filter invalid',
  58. 144: 'Topic Name invalid',
  59. 145: 'Packet identifier in use',
  60. 146: 'Packet Identifier not found',
  61. 147: 'Receive Maximum exceeded',
  62. 148: 'Topic Alias invalid',
  63. 149: 'Packet too large',
  64. 150: 'Message rate too high',
  65. 151: 'Quota exceeded',
  66. 152: 'Administrative action',
  67. 153: 'Payload format invalid',
  68. 154: 'Retain not supported',
  69. 155: 'QoS not supported',
  70. 156: 'Use another server',
  71. 157: 'Server moved',
  72. 158: 'Shared Subscriptions not supported',
  73. 159: 'Connection rate exceeded',
  74. 160: 'Maximum connect time',
  75. 161: 'Subscription Identifiers not supported',
  76. 162: 'Wildcard Subscriptions not supported'
  77. }
  78. function defaultId () {
  79. return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
  80. }
  81. function applyTopicAlias (client, packet) {
  82. if (client.options.protocolVersion === 5) {
  83. if (packet.cmd === 'publish') {
  84. let alias
  85. if (packet.properties) {
  86. alias = packet.properties.topicAlias
  87. }
  88. const topic = packet.topic.toString()
  89. if (client.topicAliasSend) {
  90. if (alias) {
  91. if (topic.length !== 0) {
  92. // register topic alias
  93. debug('applyTopicAlias :: register topic: %s - alias: %d', topic, alias)
  94. if (!client.topicAliasSend.put(topic, alias)) {
  95. debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
  96. return new Error('Sending Topic Alias out of range')
  97. }
  98. }
  99. } else {
  100. if (topic.length !== 0) {
  101. if (client.options.autoAssignTopicAlias) {
  102. alias = client.topicAliasSend.getAliasByTopic(topic)
  103. if (alias) {
  104. packet.topic = ''
  105. packet.properties = { ...(packet.properties), topicAlias: alias }
  106. debug('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias)
  107. } else {
  108. alias = client.topicAliasSend.getLruAlias()
  109. client.topicAliasSend.put(topic, alias)
  110. packet.properties = { ...(packet.properties), topicAlias: alias }
  111. debug('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias)
  112. }
  113. } else if (client.options.autoUseTopicAlias) {
  114. alias = client.topicAliasSend.getAliasByTopic(topic)
  115. if (alias) {
  116. packet.topic = ''
  117. packet.properties = { ...(packet.properties), topicAlias: alias }
  118. debug('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias)
  119. }
  120. }
  121. }
  122. }
  123. } else if (alias) {
  124. debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
  125. return new Error('Sending Topic Alias out of range')
  126. }
  127. }
  128. }
  129. }
  130. function removeTopicAliasAndRecoverTopicName (client, packet) {
  131. let alias
  132. if (packet.properties) {
  133. alias = packet.properties.topicAlias
  134. }
  135. let topic = packet.topic.toString()
  136. if (topic.length === 0) {
  137. // restore topic from alias
  138. if (typeof alias === 'undefined') {
  139. return new Error('Unregistered Topic Alias')
  140. } else {
  141. topic = client.topicAliasSend.getTopicByAlias(alias)
  142. if (typeof topic === 'undefined') {
  143. return new Error('Unregistered Topic Alias')
  144. } else {
  145. packet.topic = topic
  146. }
  147. }
  148. }
  149. if (alias) {
  150. delete packet.properties.topicAlias
  151. }
  152. }
  153. function sendPacket (client, packet, cb) {
  154. debug('sendPacket :: packet: %O', packet)
  155. debug('sendPacket :: emitting `packetsend`')
  156. client.emit('packetsend', packet)
  157. debug('sendPacket :: writing to stream')
  158. const result = mqttPacket.writeToStream(packet, client.stream, client.options)
  159. debug('sendPacket :: writeToStream result %s', result)
  160. if (!result && cb && cb !== nop) {
  161. debug('sendPacket :: handle events on `drain` once through callback.')
  162. client.stream.once('drain', cb)
  163. } else if (cb) {
  164. debug('sendPacket :: invoking cb')
  165. cb()
  166. }
  167. }
  168. function flush (queue) {
  169. if (queue) {
  170. debug('flush: queue exists? %b', !!(queue))
  171. Object.keys(queue).forEach(function (messageId) {
  172. if (typeof queue[messageId].cb === 'function') {
  173. queue[messageId].cb(new Error('Connection closed'))
  174. // This is suspicious. Why do we only delete this if we have a callbck?
  175. // If this is by-design, then adding no as callback would cause this to get deleted unintentionally.
  176. delete queue[messageId]
  177. }
  178. })
  179. }
  180. }
  181. function flushVolatile (queue) {
  182. if (queue) {
  183. debug('flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function')
  184. Object.keys(queue).forEach(function (messageId) {
  185. if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
  186. queue[messageId].cb(new Error('Connection closed'))
  187. delete queue[messageId]
  188. }
  189. })
  190. }
  191. }
  192. function storeAndSend (client, packet, cb, cbStorePut) {
  193. debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd)
  194. let storePacket = packet
  195. let err
  196. if (storePacket.cmd === 'publish') {
  197. // The original packet is for sending.
  198. // The cloned storePacket is for storing to resend on reconnect.
  199. // Topic Alias must not be used after disconnected.
  200. storePacket = clone(packet)
  201. err = removeTopicAliasAndRecoverTopicName(client, storePacket)
  202. if (err) {
  203. return cb && cb(err)
  204. }
  205. }
  206. client.outgoingStore.put(storePacket, function storedPacket (err) {
  207. if (err) {
  208. return cb && cb(err)
  209. }
  210. cbStorePut()
  211. sendPacket(client, packet, cb)
  212. })
  213. }
  214. function nop (error) {
  215. debug('nop ::', error)
  216. }
  217. /**
  218. * MqttClient constructor
  219. *
  220. * @param {Stream} stream - stream
  221. * @param {Object} [options] - connection options
  222. * (see Connection#connect)
  223. */
  224. function MqttClient (streamBuilder, options) {
  225. let k
  226. const that = this
  227. if (!(this instanceof MqttClient)) {
  228. return new MqttClient(streamBuilder, options)
  229. }
  230. this.options = options || {}
  231. // Defaults
  232. for (k in defaultConnectOptions) {
  233. if (typeof this.options[k] === 'undefined') {
  234. this.options[k] = defaultConnectOptions[k]
  235. } else {
  236. this.options[k] = options[k]
  237. }
  238. }
  239. debug('MqttClient :: options.protocol', options.protocol)
  240. debug('MqttClient :: options.protocolVersion', options.protocolVersion)
  241. debug('MqttClient :: options.username', options.username)
  242. debug('MqttClient :: options.keepalive', options.keepalive)
  243. debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod)
  244. debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized)
  245. debug('MqttClient :: options.topicAliasMaximum', options.topicAliasMaximum)
  246. this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
  247. debug('MqttClient :: clientId', this.options.clientId)
  248. this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }
  249. this.streamBuilder = streamBuilder
  250. this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider
  251. // Inflight message storages
  252. this.outgoingStore = options.outgoingStore || new Store()
  253. this.incomingStore = options.incomingStore || new Store()
  254. // Should QoS zero messages be queued when the connection is broken?
  255. this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero
  256. // map of subscribed topics to support reconnection
  257. this._resubscribeTopics = {}
  258. // map of a subscribe messageId and a topic
  259. this.messageIdToTopic = {}
  260. // Ping timer, setup in _setupPingTimer
  261. this.pingTimer = null
  262. // Is the client connected?
  263. this.connected = false
  264. // Are we disconnecting?
  265. this.disconnecting = false
  266. // Packet queue
  267. this.queue = []
  268. // connack timer
  269. this.connackTimer = null
  270. // Reconnect timer
  271. this.reconnectTimer = null
  272. // Is processing store?
  273. this._storeProcessing = false
  274. // Packet Ids are put into the store during store processing
  275. this._packetIdsDuringStoreProcessing = {}
  276. // Store processing queue
  277. this._storeProcessingQueue = []
  278. // Inflight callbacks
  279. this.outgoing = {}
  280. // True if connection is first time.
  281. this._firstConnection = true
  282. if (options.topicAliasMaximum > 0) {
  283. if (options.topicAliasMaximum > 0xffff) {
  284. debug('MqttClient :: options.topicAliasMaximum is out of range')
  285. } else {
  286. this.topicAliasRecv = new TopicAliasRecv(options.topicAliasMaximum)
  287. }
  288. }
  289. // Send queued packets
  290. this.on('connect', function () {
  291. const queue = this.queue
  292. function deliver () {
  293. const entry = queue.shift()
  294. debug('deliver :: entry %o', entry)
  295. let packet = null
  296. if (!entry) {
  297. that._resubscribe()
  298. return
  299. }
  300. packet = entry.packet
  301. debug('deliver :: call _sendPacket for %o', packet)
  302. let send = true
  303. if (packet.messageId && packet.messageId !== 0) {
  304. if (!that.messageIdProvider.register(packet.messageId)) {
  305. send = false
  306. }
  307. }
  308. if (send) {
  309. that._sendPacket(
  310. packet,
  311. function (err) {
  312. if (entry.cb) {
  313. entry.cb(err)
  314. }
  315. deliver()
  316. }
  317. )
  318. } else {
  319. debug('messageId: %d has already used. The message is skipped and removed.', packet.messageId)
  320. deliver()
  321. }
  322. }
  323. debug('connect :: sending queued packets')
  324. deliver()
  325. })
  326. this.on('close', function () {
  327. debug('close :: connected set to `false`')
  328. this.connected = false
  329. debug('close :: clearing connackTimer')
  330. clearTimeout(this.connackTimer)
  331. debug('close :: clearing ping timer')
  332. if (that.pingTimer !== null) {
  333. that.pingTimer.clear()
  334. that.pingTimer = null
  335. }
  336. if (this.topicAliasRecv) {
  337. this.topicAliasRecv.clear()
  338. }
  339. debug('close :: calling _setupReconnect')
  340. this._setupReconnect()
  341. })
  342. EventEmitter.call(this)
  343. debug('MqttClient :: setting up stream')
  344. this._setupStream()
  345. }
  346. inherits(MqttClient, EventEmitter)
  347. /**
  348. * setup the event handlers in the inner stream.
  349. *
  350. * @api private
  351. */
  352. MqttClient.prototype._setupStream = function () {
  353. const that = this
  354. const writable = new Writable()
  355. const parser = mqttPacket.parser(this.options)
  356. let completeParse = null
  357. const packets = []
  358. debug('_setupStream :: calling method to clear reconnect')
  359. this._clearReconnect()
  360. debug('_setupStream :: using streamBuilder provided to client to create stream')
  361. this.stream = this.streamBuilder(this)
  362. parser.on('packet', function (packet) {
  363. debug('parser :: on packet push to packets array.')
  364. packets.push(packet)
  365. })
  366. function nextTickWork () {
  367. if (packets.length) {
  368. nextTick(work)
  369. } else {
  370. const done = completeParse
  371. completeParse = null
  372. done()
  373. }
  374. }
  375. function work () {
  376. debug('work :: getting next packet in queue')
  377. const packet = packets.shift()
  378. if (packet) {
  379. debug('work :: packet pulled from queue')
  380. that._handlePacket(packet, nextTickWork)
  381. } else {
  382. debug('work :: no packets in queue')
  383. const done = completeParse
  384. completeParse = null
  385. debug('work :: done flag is %s', !!(done))
  386. if (done) done()
  387. }
  388. }
  389. writable._write = function (buf, enc, done) {
  390. completeParse = done
  391. debug('writable stream :: parsing buffer')
  392. parser.parse(buf)
  393. work()
  394. }
  395. function streamErrorHandler (error) {
  396. debug('streamErrorHandler :: error', error.message)
  397. // error.code will only be set on NodeJS env, browse don't allow to detect erros on sockets
  398. // also emitting errors on browser seems to create issues
  399. if (error.code) {
  400. // handle error
  401. debug('streamErrorHandler :: emitting error')
  402. that.emit('error', error)
  403. } else {
  404. nop(error)
  405. }
  406. }
  407. debug('_setupStream :: pipe stream to writable stream')
  408. this.stream.pipe(writable)
  409. // Suppress connection errors
  410. this.stream.on('error', streamErrorHandler)
  411. // Echo stream close
  412. this.stream.on('close', function () {
  413. debug('(%s)stream :: on close', that.options.clientId)
  414. flushVolatile(that.outgoing)
  415. debug('stream: emit close to MqttClient')
  416. that.emit('close')
  417. })
  418. // Send a connect packet
  419. debug('_setupStream: sending packet `connect`')
  420. const connectPacket = Object.create(this.options)
  421. connectPacket.cmd = 'connect'
  422. if (this.topicAliasRecv) {
  423. if (!connectPacket.properties) {
  424. connectPacket.properties = {}
  425. }
  426. if (this.topicAliasRecv) {
  427. connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max
  428. }
  429. }
  430. // avoid message queue
  431. sendPacket(this, connectPacket)
  432. // Echo connection errors
  433. parser.on('error', this.emit.bind(this, 'error'))
  434. // auth
  435. if (this.options.properties) {
  436. if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) {
  437. that.end(() =>
  438. this.emit('error', new Error('Packet has no Authentication Method')
  439. ))
  440. return this
  441. }
  442. if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') {
  443. const authPacket = xtend({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket)
  444. sendPacket(this, authPacket)
  445. }
  446. }
  447. // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
  448. this.stream.setMaxListeners(1000)
  449. clearTimeout(this.connackTimer)
  450. this.connackTimer = setTimeout(function () {
  451. debug('!!connectTimeout hit!! Calling _cleanUp with force `true`')
  452. that._cleanUp(true)
  453. }, this.options.connectTimeout)
  454. }
  455. MqttClient.prototype._handlePacket = function (packet, done) {
  456. const options = this.options
  457. if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) {
  458. this.emit('error', new Error('exceeding packets size ' + packet.cmd))
  459. this.end({ reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' } })
  460. return this
  461. }
  462. debug('_handlePacket :: emitting packetreceive')
  463. this.emit('packetreceive', packet)
  464. switch (packet.cmd) {
  465. case 'publish':
  466. this._handlePublish(packet, done)
  467. break
  468. case 'puback':
  469. case 'pubrec':
  470. case 'pubcomp':
  471. case 'suback':
  472. case 'unsuback':
  473. this._handleAck(packet)
  474. done()
  475. break
  476. case 'pubrel':
  477. this._handlePubrel(packet, done)
  478. break
  479. case 'connack':
  480. this._handleConnack(packet)
  481. done()
  482. break
  483. case 'auth':
  484. this._handleAuth(packet)
  485. done()
  486. break
  487. case 'pingresp':
  488. this._handlePingresp(packet)
  489. done()
  490. break
  491. case 'disconnect':
  492. this._handleDisconnect(packet)
  493. done()
  494. break
  495. default:
  496. // do nothing
  497. // maybe we should do an error handling
  498. // or just log it
  499. break
  500. }
  501. }
  502. MqttClient.prototype._checkDisconnecting = function (callback) {
  503. if (this.disconnecting) {
  504. if (callback && callback !== nop) {
  505. callback(new Error('client disconnecting'))
  506. } else {
  507. this.emit('error', new Error('client disconnecting'))
  508. }
  509. }
  510. return this.disconnecting
  511. }
  512. /**
  513. * publish - publish <message> to <topic>
  514. *
  515. * @param {String} topic - topic to publish to
  516. * @param {String, Buffer} message - message to publish
  517. * @param {Object} [opts] - publish options, includes:
  518. * {Number} qos - qos level to publish on
  519. * {Boolean} retain - whether or not to retain the message
  520. * {Boolean} dup - whether or not mark a message as duplicate
  521. * {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
  522. * @param {Function} [callback] - function(err){}
  523. * called when publish succeeds or fails
  524. * @returns {MqttClient} this - for chaining
  525. * @api public
  526. *
  527. * @example client.publish('topic', 'message');
  528. * @example
  529. * client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
  530. * @example client.publish('topic', 'message', console.log);
  531. */
  532. MqttClient.prototype.publish = function (topic, message, opts, callback) {
  533. debug('publish :: message `%s` to topic `%s`', message, topic)
  534. const options = this.options
  535. // .publish(topic, payload, cb);
  536. if (typeof opts === 'function') {
  537. callback = opts
  538. opts = null
  539. }
  540. // default opts
  541. const defaultOpts = { qos: 0, retain: false, dup: false }
  542. opts = xtend(defaultOpts, opts)
  543. if (this._checkDisconnecting(callback)) {
  544. return this
  545. }
  546. const that = this
  547. const publishProc = function () {
  548. let messageId = 0
  549. if (opts.qos === 1 || opts.qos === 2) {
  550. messageId = that._nextId()
  551. if (messageId === null) {
  552. debug('No messageId left')
  553. return false
  554. }
  555. }
  556. const packet = {
  557. cmd: 'publish',
  558. topic: topic,
  559. payload: message,
  560. qos: opts.qos,
  561. retain: opts.retain,
  562. messageId: messageId,
  563. dup: opts.dup
  564. }
  565. if (options.protocolVersion === 5) {
  566. packet.properties = opts.properties
  567. }
  568. debug('publish :: qos', opts.qos)
  569. switch (opts.qos) {
  570. case 1:
  571. case 2:
  572. // Add to callbacks
  573. that.outgoing[packet.messageId] = {
  574. volatile: false,
  575. cb: callback || nop
  576. }
  577. debug('MqttClient:publish: packet cmd: %s', packet.cmd)
  578. that._sendPacket(packet, undefined, opts.cbStorePut)
  579. break
  580. default:
  581. debug('MqttClient:publish: packet cmd: %s', packet.cmd)
  582. that._sendPacket(packet, callback, opts.cbStorePut)
  583. break
  584. }
  585. return true
  586. }
  587. if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) {
  588. this._storeProcessingQueue.push(
  589. {
  590. invoke: publishProc,
  591. cbStorePut: opts.cbStorePut,
  592. callback: callback
  593. }
  594. )
  595. }
  596. return this
  597. }
  598. /**
  599. * subscribe - subscribe to <topic>
  600. *
  601. * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
  602. * @param {Object} [opts] - optional subscription options, includes:
  603. * {Number} qos - subscribe qos level
  604. * @param {Function} [callback] - function(err, granted){} where:
  605. * {Error} err - subscription error (none at the moment!)
  606. * {Array} granted - array of {topic: 't', qos: 0}
  607. * @returns {MqttClient} this - for chaining
  608. * @api public
  609. * @example client.subscribe('topic');
  610. * @example client.subscribe('topic', {qos: 1});
  611. * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
  612. * @example client.subscribe('topic', console.log);
  613. */
  614. MqttClient.prototype.subscribe = function () {
  615. const that = this
  616. const args = new Array(arguments.length)
  617. for (let i = 0; i < arguments.length; i++) {
  618. args[i] = arguments[i]
  619. }
  620. const subs = []
  621. let obj = args.shift()
  622. const resubscribe = obj.resubscribe
  623. let callback = args.pop() || nop
  624. let opts = args.pop()
  625. const version = this.options.protocolVersion
  626. delete obj.resubscribe
  627. if (typeof obj === 'string') {
  628. obj = [obj]
  629. }
  630. if (typeof callback !== 'function') {
  631. opts = callback
  632. callback = nop
  633. }
  634. const invalidTopic = validations.validateTopics(obj)
  635. if (invalidTopic !== null) {
  636. setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
  637. return this
  638. }
  639. if (this._checkDisconnecting(callback)) {
  640. debug('subscribe: discconecting true')
  641. return this
  642. }
  643. const defaultOpts = {
  644. qos: 0
  645. }
  646. if (version === 5) {
  647. defaultOpts.nl = false
  648. defaultOpts.rap = false
  649. defaultOpts.rh = 0
  650. }
  651. opts = xtend(defaultOpts, opts)
  652. if (Array.isArray(obj)) {
  653. obj.forEach(function (topic) {
  654. debug('subscribe: array topic %s', topic)
  655. if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, topic) ||
  656. that._resubscribeTopics[topic].qos < opts.qos ||
  657. resubscribe) {
  658. const currentOpts = {
  659. topic: topic,
  660. qos: opts.qos
  661. }
  662. if (version === 5) {
  663. currentOpts.nl = opts.nl
  664. currentOpts.rap = opts.rap
  665. currentOpts.rh = opts.rh
  666. currentOpts.properties = opts.properties
  667. }
  668. debug('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos)
  669. subs.push(currentOpts)
  670. }
  671. })
  672. } else {
  673. Object
  674. .keys(obj)
  675. .forEach(function (k) {
  676. debug('subscribe: object topic %s', k)
  677. if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, k) ||
  678. that._resubscribeTopics[k].qos < obj[k].qos ||
  679. resubscribe) {
  680. const currentOpts = {
  681. topic: k,
  682. qos: obj[k].qos
  683. }
  684. if (version === 5) {
  685. currentOpts.nl = obj[k].nl
  686. currentOpts.rap = obj[k].rap
  687. currentOpts.rh = obj[k].rh
  688. currentOpts.properties = opts.properties
  689. }
  690. debug('subscribe: pushing `%s` to subs list', currentOpts)
  691. subs.push(currentOpts)
  692. }
  693. })
  694. }
  695. if (!subs.length) {
  696. callback(null, [])
  697. return this
  698. }
  699. const subscribeProc = function () {
  700. const messageId = that._nextId()
  701. if (messageId === null) {
  702. debug('No messageId left')
  703. return false
  704. }
  705. const packet = {
  706. cmd: 'subscribe',
  707. subscriptions: subs,
  708. qos: 1,
  709. retain: false,
  710. dup: false,
  711. messageId: messageId
  712. }
  713. if (opts.properties) {
  714. packet.properties = opts.properties
  715. }
  716. // subscriptions to resubscribe to in case of disconnect
  717. if (that.options.resubscribe) {
  718. debug('subscribe :: resubscribe true')
  719. const topics = []
  720. subs.forEach(function (sub) {
  721. if (that.options.reconnectPeriod > 0) {
  722. const topic = { qos: sub.qos }
  723. if (version === 5) {
  724. topic.nl = sub.nl || false
  725. topic.rap = sub.rap || false
  726. topic.rh = sub.rh || 0
  727. topic.properties = sub.properties
  728. }
  729. that._resubscribeTopics[sub.topic] = topic
  730. topics.push(sub.topic)
  731. }
  732. })
  733. that.messageIdToTopic[packet.messageId] = topics
  734. }
  735. that.outgoing[packet.messageId] = {
  736. volatile: true,
  737. cb: function (err, packet) {
  738. if (!err) {
  739. const granted = packet.granted
  740. for (let i = 0; i < granted.length; i += 1) {
  741. subs[i].qos = granted[i]
  742. }
  743. }
  744. callback(err, subs)
  745. }
  746. }
  747. debug('subscribe :: call _sendPacket')
  748. that._sendPacket(packet)
  749. return true
  750. }
  751. if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) {
  752. this._storeProcessingQueue.push(
  753. {
  754. invoke: subscribeProc,
  755. callback: callback
  756. }
  757. )
  758. }
  759. return this
  760. }
  761. /**
  762. * unsubscribe - unsubscribe from topic(s)
  763. *
  764. * @param {String, Array} topic - topics to unsubscribe from
  765. * @param {Object} [opts] - optional subscription options, includes:
  766. * {Object} properties - properties of unsubscribe packet
  767. * @param {Function} [callback] - callback fired on unsuback
  768. * @returns {MqttClient} this - for chaining
  769. * @api public
  770. * @example client.unsubscribe('topic');
  771. * @example client.unsubscribe('topic', console.log);
  772. */
  773. MqttClient.prototype.unsubscribe = function () {
  774. const that = this
  775. const args = new Array(arguments.length)
  776. for (let i = 0; i < arguments.length; i++) {
  777. args[i] = arguments[i]
  778. }
  779. let topic = args.shift()
  780. let callback = args.pop() || nop
  781. let opts = args.pop()
  782. if (typeof topic === 'string') {
  783. topic = [topic]
  784. }
  785. if (typeof callback !== 'function') {
  786. opts = callback
  787. callback = nop
  788. }
  789. const invalidTopic = validations.validateTopics(topic)
  790. if (invalidTopic !== null) {
  791. setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
  792. return this
  793. }
  794. if (that._checkDisconnecting(callback)) {
  795. return this
  796. }
  797. const unsubscribeProc = function () {
  798. const messageId = that._nextId()
  799. if (messageId === null) {
  800. debug('No messageId left')
  801. return false
  802. }
  803. const packet = {
  804. cmd: 'unsubscribe',
  805. qos: 1,
  806. messageId: messageId
  807. }
  808. if (typeof topic === 'string') {
  809. packet.unsubscriptions = [topic]
  810. } else if (Array.isArray(topic)) {
  811. packet.unsubscriptions = topic
  812. }
  813. if (that.options.resubscribe) {
  814. packet.unsubscriptions.forEach(function (topic) {
  815. delete that._resubscribeTopics[topic]
  816. })
  817. }
  818. if (typeof opts === 'object' && opts.properties) {
  819. packet.properties = opts.properties
  820. }
  821. that.outgoing[packet.messageId] = {
  822. volatile: true,
  823. cb: callback
  824. }
  825. debug('unsubscribe: call _sendPacket')
  826. that._sendPacket(packet)
  827. return true
  828. }
  829. if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) {
  830. this._storeProcessingQueue.push(
  831. {
  832. invoke: unsubscribeProc,
  833. callback: callback
  834. }
  835. )
  836. }
  837. return this
  838. }
  839. /**
  840. * end - close connection
  841. *
  842. * @returns {MqttClient} this - for chaining
  843. * @param {Boolean} force - do not wait for all in-flight messages to be acked
  844. * @param {Object} opts - added to the disconnect packet
  845. * @param {Function} cb - called when the client has been closed
  846. *
  847. * @api public
  848. */
  849. MqttClient.prototype.end = function (force, opts, cb) {
  850. const that = this
  851. debug('end :: (%s)', this.options.clientId)
  852. if (force == null || typeof force !== 'boolean') {
  853. cb = opts || nop
  854. opts = force
  855. force = false
  856. if (typeof opts !== 'object') {
  857. cb = opts
  858. opts = null
  859. if (typeof cb !== 'function') {
  860. cb = nop
  861. }
  862. }
  863. }
  864. if (typeof opts !== 'object') {
  865. cb = opts
  866. opts = null
  867. }
  868. debug('end :: cb? %s', !!cb)
  869. cb = cb || nop
  870. function closeStores () {
  871. debug('end :: closeStores: closing incoming and outgoing stores')
  872. that.disconnected = true
  873. that.incomingStore.close(function (e1) {
  874. that.outgoingStore.close(function (e2) {
  875. debug('end :: closeStores: emitting end')
  876. that.emit('end')
  877. if (cb) {
  878. const err = e1 || e2
  879. debug('end :: closeStores: invoking callback with args')
  880. cb(err)
  881. }
  882. })
  883. })
  884. if (that._deferredReconnect) {
  885. that._deferredReconnect()
  886. }
  887. }
  888. function finish () {
  889. // defer closesStores of an I/O cycle,
  890. // just to make sure things are
  891. // ok for websockets
  892. debug('end :: (%s) :: finish :: calling _cleanUp with force %s', that.options.clientId, force)
  893. that._cleanUp(force, () => {
  894. debug('end :: finish :: calling process.nextTick on closeStores')
  895. // const boundProcess = nextTick.bind(null, closeStores)
  896. nextTick(closeStores.bind(that))
  897. }, opts)
  898. }
  899. if (this.disconnecting) {
  900. cb()
  901. return this
  902. }
  903. this._clearReconnect()
  904. this.disconnecting = true
  905. if (!force && Object.keys(this.outgoing).length > 0) {
  906. // wait 10ms, just to be sure we received all of it
  907. debug('end :: (%s) :: calling finish in 10ms once outgoing is empty', that.options.clientId)
  908. this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
  909. } else {
  910. debug('end :: (%s) :: immediately calling finish', that.options.clientId)
  911. finish()
  912. }
  913. return this
  914. }
  915. /**
  916. * removeOutgoingMessage - remove a message in outgoing store
  917. * the outgoing callback will be called withe Error('Message removed') if the message is removed
  918. *
  919. * @param {Number} messageId - messageId to remove message
  920. * @returns {MqttClient} this - for chaining
  921. * @api public
  922. *
  923. * @example client.removeOutgoingMessage(client.getLastAllocated());
  924. */
  925. MqttClient.prototype.removeOutgoingMessage = function (messageId) {
  926. const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
  927. delete this.outgoing[messageId]
  928. this.outgoingStore.del({ messageId: messageId }, function () {
  929. cb(new Error('Message removed'))
  930. })
  931. return this
  932. }
  933. /**
  934. * reconnect - connect again using the same options as connect()
  935. *
  936. * @param {Object} [opts] - optional reconnect options, includes:
  937. * {Store} incomingStore - a store for the incoming packets
  938. * {Store} outgoingStore - a store for the outgoing packets
  939. * if opts is not given, current stores are used
  940. * @returns {MqttClient} this - for chaining
  941. *
  942. * @api public
  943. */
  944. MqttClient.prototype.reconnect = function (opts) {
  945. debug('client reconnect')
  946. const that = this
  947. const f = function () {
  948. if (opts) {
  949. that.options.incomingStore = opts.incomingStore
  950. that.options.outgoingStore = opts.outgoingStore
  951. } else {
  952. that.options.incomingStore = null
  953. that.options.outgoingStore = null
  954. }
  955. that.incomingStore = that.options.incomingStore || new Store()
  956. that.outgoingStore = that.options.outgoingStore || new Store()
  957. that.disconnecting = false
  958. that.disconnected = false
  959. that._deferredReconnect = null
  960. that._reconnect()
  961. }
  962. if (this.disconnecting && !this.disconnected) {
  963. this._deferredReconnect = f
  964. } else {
  965. f()
  966. }
  967. return this
  968. }
  969. /**
  970. * _reconnect - implement reconnection
  971. * @api privateish
  972. */
  973. MqttClient.prototype._reconnect = function () {
  974. debug('_reconnect: emitting reconnect to client')
  975. this.emit('reconnect')
  976. if (this.connected) {
  977. this.end(() => { this._setupStream() })
  978. debug('client already connected. disconnecting first.')
  979. } else {
  980. debug('_reconnect: calling _setupStream')
  981. this._setupStream()
  982. }
  983. }
  984. /**
  985. * _setupReconnect - setup reconnect timer
  986. */
  987. MqttClient.prototype._setupReconnect = function () {
  988. const that = this
  989. if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
  990. if (!this.reconnecting) {
  991. debug('_setupReconnect :: emit `offline` state')
  992. this.emit('offline')
  993. debug('_setupReconnect :: set `reconnecting` to `true`')
  994. this.reconnecting = true
  995. }
  996. debug('_setupReconnect :: setting reconnectTimer for %d ms', that.options.reconnectPeriod)
  997. that.reconnectTimer = setInterval(function () {
  998. debug('reconnectTimer :: reconnect triggered!')
  999. that._reconnect()
  1000. }, that.options.reconnectPeriod)
  1001. } else {
  1002. debug('_setupReconnect :: doing nothing...')
  1003. }
  1004. }
  1005. /**
  1006. * _clearReconnect - clear the reconnect timer
  1007. */
  1008. MqttClient.prototype._clearReconnect = function () {
  1009. debug('_clearReconnect : clearing reconnect timer')
  1010. if (this.reconnectTimer) {
  1011. clearInterval(this.reconnectTimer)
  1012. this.reconnectTimer = null
  1013. }
  1014. }
  1015. /**
  1016. * _cleanUp - clean up on connection end
  1017. * @api private
  1018. */
  1019. MqttClient.prototype._cleanUp = function (forced, done) {
  1020. const opts = arguments[2]
  1021. if (done) {
  1022. debug('_cleanUp :: done callback provided for on stream close')
  1023. this.stream.on('close', done)
  1024. }
  1025. debug('_cleanUp :: forced? %s', forced)
  1026. if (forced) {
  1027. if ((this.options.reconnectPeriod === 0) && this.options.clean) {
  1028. flush(this.outgoing)
  1029. }
  1030. debug('_cleanUp :: (%s) :: destroying stream', this.options.clientId)
  1031. this.stream.destroy()
  1032. } else {
  1033. const packet = xtend({ cmd: 'disconnect' }, opts)
  1034. debug('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId)
  1035. this._sendPacket(
  1036. packet,
  1037. setImmediate.bind(
  1038. null,
  1039. this.stream.end.bind(this.stream)
  1040. )
  1041. )
  1042. }
  1043. if (!this.disconnecting) {
  1044. debug('_cleanUp :: client not disconnecting. Clearing and resetting reconnect.')
  1045. this._clearReconnect()
  1046. this._setupReconnect()
  1047. }
  1048. if (this.pingTimer !== null) {
  1049. debug('_cleanUp :: clearing pingTimer')
  1050. this.pingTimer.clear()
  1051. this.pingTimer = null
  1052. }
  1053. if (done && !this.connected) {
  1054. debug('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId)
  1055. this.stream.removeListener('close', done)
  1056. done()
  1057. }
  1058. }
  1059. /**
  1060. * _sendPacket - send or queue a packet
  1061. * @param {Object} packet - packet options
  1062. * @param {Function} cb - callback when the packet is sent
  1063. * @param {Function} cbStorePut - called when message is put into outgoingStore
  1064. * @api private
  1065. */
  1066. MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
  1067. debug('_sendPacket :: (%s) :: start', this.options.clientId)
  1068. cbStorePut = cbStorePut || nop
  1069. cb = cb || nop
  1070. const err = applyTopicAlias(this, packet)
  1071. if (err) {
  1072. cb(err)
  1073. return
  1074. }
  1075. if (!this.connected) {
  1076. // allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth)
  1077. if (packet.cmd === 'auth') {
  1078. this._shiftPingInterval()
  1079. sendPacket(this, packet, cb)
  1080. return
  1081. }
  1082. debug('_sendPacket :: client not connected. Storing packet offline.')
  1083. this._storePacket(packet, cb, cbStorePut)
  1084. return
  1085. }
  1086. // When sending a packet, reschedule the ping timer
  1087. this._shiftPingInterval()
  1088. switch (packet.cmd) {
  1089. case 'publish':
  1090. break
  1091. case 'pubrel':
  1092. storeAndSend(this, packet, cb, cbStorePut)
  1093. return
  1094. default:
  1095. sendPacket(this, packet, cb)
  1096. return
  1097. }
  1098. switch (packet.qos) {
  1099. case 2:
  1100. case 1:
  1101. storeAndSend(this, packet, cb, cbStorePut)
  1102. break
  1103. /**
  1104. * no need of case here since it will be caught by default
  1105. * and jshint comply that before default it must be a break
  1106. * anyway it will result in -1 evaluation
  1107. */
  1108. case 0:
  1109. /* falls through */
  1110. default:
  1111. sendPacket(this, packet, cb)
  1112. break
  1113. }
  1114. debug('_sendPacket :: (%s) :: end', this.options.clientId)
  1115. }
  1116. /**
  1117. * _storePacket - queue a packet
  1118. * @param {Object} packet - packet options
  1119. * @param {Function} cb - callback when the packet is sent
  1120. * @param {Function} cbStorePut - called when message is put into outgoingStore
  1121. * @api private
  1122. */
  1123. MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
  1124. debug('_storePacket :: packet: %o', packet)
  1125. debug('_storePacket :: cb? %s', !!cb)
  1126. cbStorePut = cbStorePut || nop
  1127. let storePacket = packet
  1128. if (storePacket.cmd === 'publish') {
  1129. // The original packet is for sending.
  1130. // The cloned storePacket is for storing to resend on reconnect.
  1131. // Topic Alias must not be used after disconnected.
  1132. storePacket = clone(packet)
  1133. const err = removeTopicAliasAndRecoverTopicName(this, storePacket)
  1134. if (err) {
  1135. return cb && cb(err)
  1136. }
  1137. }
  1138. // check that the packet is not a qos of 0, or that the command is not a publish
  1139. if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') {
  1140. this.queue.push({ packet: storePacket, cb: cb })
  1141. } else if (storePacket.qos > 0) {
  1142. cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null
  1143. this.outgoingStore.put(storePacket, function (err) {
  1144. if (err) {
  1145. return cb && cb(err)
  1146. }
  1147. cbStorePut()
  1148. })
  1149. } else if (cb) {
  1150. cb(new Error('No connection to broker'))
  1151. }
  1152. }
  1153. /**
  1154. * _setupPingTimer - setup the ping timer
  1155. *
  1156. * @api private
  1157. */
  1158. MqttClient.prototype._setupPingTimer = function () {
  1159. debug('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive)
  1160. const that = this
  1161. if (!this.pingTimer && this.options.keepalive) {
  1162. this.pingResp = true
  1163. this.pingTimer = reInterval(function () {
  1164. that._checkPing()
  1165. }, this.options.keepalive * 1000)
  1166. }
  1167. }
  1168. /**
  1169. * _shiftPingInterval - reschedule the ping interval
  1170. *
  1171. * @api private
  1172. */
  1173. MqttClient.prototype._shiftPingInterval = function () {
  1174. if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
  1175. this.pingTimer.reschedule(this.options.keepalive * 1000)
  1176. }
  1177. }
  1178. /**
  1179. * _checkPing - check if a pingresp has come back, and ping the server again
  1180. *
  1181. * @api private
  1182. */
  1183. MqttClient.prototype._checkPing = function () {
  1184. debug('_checkPing :: checking ping...')
  1185. if (this.pingResp) {
  1186. debug('_checkPing :: ping response received. Clearing flag and sending `pingreq`')
  1187. this.pingResp = false
  1188. this._sendPacket({ cmd: 'pingreq' })
  1189. } else {
  1190. // do a forced cleanup since socket will be in bad shape
  1191. debug('_checkPing :: calling _cleanUp with force true')
  1192. this._cleanUp(true)
  1193. }
  1194. }
  1195. /**
  1196. * _handlePingresp - handle a pingresp
  1197. *
  1198. * @api private
  1199. */
  1200. MqttClient.prototype._handlePingresp = function () {
  1201. this.pingResp = true
  1202. }
  1203. /**
  1204. * _handleConnack
  1205. *
  1206. * @param {Object} packet
  1207. * @api private
  1208. */
  1209. MqttClient.prototype._handleConnack = function (packet) {
  1210. debug('_handleConnack')
  1211. const options = this.options
  1212. const version = options.protocolVersion
  1213. const rc = version === 5 ? packet.reasonCode : packet.returnCode
  1214. clearTimeout(this.connackTimer)
  1215. delete this.topicAliasSend
  1216. if (packet.properties) {
  1217. if (packet.properties.topicAliasMaximum) {
  1218. if (packet.properties.topicAliasMaximum > 0xffff) {
  1219. this.emit('error', new Error('topicAliasMaximum from broker is out of range'))
  1220. return
  1221. }
  1222. if (packet.properties.topicAliasMaximum > 0) {
  1223. this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum)
  1224. }
  1225. }
  1226. if (packet.properties.serverKeepAlive && options.keepalive) {
  1227. options.keepalive = packet.properties.serverKeepAlive
  1228. this._shiftPingInterval()
  1229. }
  1230. if (packet.properties.maximumPacketSize) {
  1231. if (!options.properties) { options.properties = {} }
  1232. options.properties.maximumPacketSize = packet.properties.maximumPacketSize
  1233. }
  1234. }
  1235. if (rc === 0) {
  1236. this.reconnecting = false
  1237. this._onConnect(packet)
  1238. } else if (rc > 0) {
  1239. const err = new Error('Connection refused: ' + errors[rc])
  1240. err.code = rc
  1241. this.emit('error', err)
  1242. }
  1243. }
  1244. MqttClient.prototype._handleAuth = function (packet) {
  1245. const options = this.options
  1246. const version = options.protocolVersion
  1247. const rc = version === 5 ? packet.reasonCode : packet.returnCode
  1248. if (version !== 5) {
  1249. const err = new Error('Protocol error: Auth packets are only supported in MQTT 5. Your version:' + version)
  1250. err.code = rc
  1251. this.emit('error', err)
  1252. return
  1253. }
  1254. const that = this
  1255. this.handleAuth(packet, function (err, packet) {
  1256. if (err) {
  1257. that.emit('error', err)
  1258. return
  1259. }
  1260. if (rc === 24) {
  1261. that.reconnecting = false
  1262. that._sendPacket(packet)
  1263. } else {
  1264. const error = new Error('Connection refused: ' + errors[rc])
  1265. err.code = rc
  1266. that.emit('error', error)
  1267. }
  1268. })
  1269. }
  1270. /**
  1271. * @param packet the packet received by the broker
  1272. * @return the auth packet to be returned to the broker
  1273. * @api public
  1274. */
  1275. MqttClient.prototype.handleAuth = function (packet, callback) {
  1276. callback()
  1277. }
  1278. /**
  1279. * _handlePublish
  1280. *
  1281. * @param {Object} packet
  1282. * @api private
  1283. */
  1284. /*
  1285. those late 2 case should be rewrite to comply with coding style:
  1286. case 1:
  1287. case 0:
  1288. // do not wait sending a puback
  1289. // no callback passed
  1290. if (1 === qos) {
  1291. this._sendPacket({
  1292. cmd: 'puback',
  1293. messageId: messageId
  1294. });
  1295. }
  1296. // emit the message event for both qos 1 and 0
  1297. this.emit('message', topic, message, packet);
  1298. this.handleMessage(packet, done);
  1299. break;
  1300. default:
  1301. // do nothing but every switch mus have a default
  1302. // log or throw an error about unknown qos
  1303. break;
  1304. for now i just suppressed the warnings
  1305. */
  1306. MqttClient.prototype._handlePublish = function (packet, done) {
  1307. debug('_handlePublish: packet %o', packet)
  1308. done = typeof done !== 'undefined' ? done : nop
  1309. let topic = packet.topic.toString()
  1310. const message = packet.payload
  1311. const qos = packet.qos
  1312. const messageId = packet.messageId
  1313. const that = this
  1314. const options = this.options
  1315. const validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
  1316. if (this.options.protocolVersion === 5) {
  1317. let alias
  1318. if (packet.properties) {
  1319. alias = packet.properties.topicAlias
  1320. }
  1321. if (typeof alias !== 'undefined') {
  1322. if (topic.length === 0) {
  1323. if (alias > 0 && alias <= 0xffff) {
  1324. const gotTopic = this.topicAliasRecv.getTopicByAlias(alias)
  1325. if (gotTopic) {
  1326. topic = gotTopic
  1327. debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias)
  1328. } else {
  1329. debug('_handlePublish :: unregistered topic alias. alias: %d', alias)
  1330. this.emit('error', new Error('Received unregistered Topic Alias'))
  1331. return
  1332. }
  1333. } else {
  1334. debug('_handlePublish :: topic alias out of range. alias: %d', alias)
  1335. this.emit('error', new Error('Received Topic Alias is out of range'))
  1336. return
  1337. }
  1338. } else {
  1339. if (this.topicAliasRecv.put(topic, alias)) {
  1340. debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias)
  1341. } else {
  1342. debug('_handlePublish :: topic alias out of range. alias: %d', alias)
  1343. this.emit('error', new Error('Received Topic Alias is out of range'))
  1344. return
  1345. }
  1346. }
  1347. }
  1348. }
  1349. debug('_handlePublish: qos %d', qos)
  1350. switch (qos) {
  1351. case 2: {
  1352. options.customHandleAcks(topic, message, packet, function (error, code) {
  1353. if (!(error instanceof Error)) {
  1354. code = error
  1355. error = null
  1356. }
  1357. if (error) { return that.emit('error', error) }
  1358. if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) }
  1359. if (code) {
  1360. that._sendPacket({ cmd: 'pubrec', messageId: messageId, reasonCode: code }, done)
  1361. } else {
  1362. that.incomingStore.put(packet, function () {
  1363. that._sendPacket({ cmd: 'pubrec', messageId: messageId }, done)
  1364. })
  1365. }
  1366. })
  1367. break
  1368. }
  1369. case 1: {
  1370. // emit the message event
  1371. options.customHandleAcks(topic, message, packet, function (error, code) {
  1372. if (!(error instanceof Error)) {
  1373. code = error
  1374. error = null
  1375. }
  1376. if (error) { return that.emit('error', error) }
  1377. if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) }
  1378. if (!code) { that.emit('message', topic, message, packet) }
  1379. that.handleMessage(packet, function (err) {
  1380. if (err) {
  1381. return done && done(err)
  1382. }
  1383. that._sendPacket({ cmd: 'puback', messageId: messageId, reasonCode: code }, done)
  1384. })
  1385. })
  1386. break
  1387. }
  1388. case 0:
  1389. // emit the message event
  1390. this.emit('message', topic, message, packet)
  1391. this.handleMessage(packet, done)
  1392. break
  1393. default:
  1394. // do nothing
  1395. debug('_handlePublish: unknown QoS. Doing nothing.')
  1396. // log or throw an error about unknown qos
  1397. break
  1398. }
  1399. }
  1400. /**
  1401. * Handle messages with backpressure support, one at a time.
  1402. * Override at will.
  1403. *
  1404. * @param Packet packet the packet
  1405. * @param Function callback call when finished
  1406. * @api public
  1407. */
  1408. MqttClient.prototype.handleMessage = function (packet, callback) {
  1409. callback()
  1410. }
  1411. /**
  1412. * _handleAck
  1413. *
  1414. * @param {Object} packet
  1415. * @api private
  1416. */
  1417. MqttClient.prototype._handleAck = function (packet) {
  1418. /* eslint no-fallthrough: "off" */
  1419. const messageId = packet.messageId
  1420. const type = packet.cmd
  1421. let response = null
  1422. const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
  1423. const that = this
  1424. let err
  1425. // Checking `!cb` happens to work, but it's not technically "correct".
  1426. //
  1427. // Why? This code assumes that "no callback" is the same as that "we're not
  1428. // waiting for responses" (puback, pubrec, pubcomp, suback, or unsuback).
  1429. //
  1430. // It would be better to check `if (!this.outgoing[messageId])` here, but
  1431. // there's no reason to change it and risk (another) regression.
  1432. //
  1433. // The only reason this code works is becaues code in MqttClient.publish,
  1434. // MqttClinet.subscribe, and MqttClient.unsubscribe ensures that we will
  1435. // have a callback even if the user doesn't pass one in.)
  1436. if (!cb) {
  1437. debug('_handleAck :: Server sent an ack in error. Ignoring.')
  1438. // Server sent an ack in error, ignore it.
  1439. return
  1440. }
  1441. // Process
  1442. debug('_handleAck :: packet type', type)
  1443. switch (type) {
  1444. case 'pubcomp':
  1445. // same thing as puback for QoS 2
  1446. case 'puback': {
  1447. const pubackRC = packet.reasonCode
  1448. // Callback - we're done
  1449. if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
  1450. err = new Error('Publish error: ' + errors[pubackRC])
  1451. err.code = pubackRC
  1452. cb(err, packet)
  1453. }
  1454. delete this.outgoing[messageId]
  1455. this.outgoingStore.del(packet, cb)
  1456. this.messageIdProvider.deallocate(messageId)
  1457. this._invokeStoreProcessingQueue()
  1458. break
  1459. }
  1460. case 'pubrec': {
  1461. response = {
  1462. cmd: 'pubrel',
  1463. qos: 2,
  1464. messageId: messageId
  1465. }
  1466. const pubrecRC = packet.reasonCode
  1467. if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
  1468. err = new Error('Publish error: ' + errors[pubrecRC])
  1469. err.code = pubrecRC
  1470. cb(err, packet)
  1471. } else {
  1472. this._sendPacket(response)
  1473. }
  1474. break
  1475. }
  1476. case 'suback': {
  1477. delete this.outgoing[messageId]
  1478. this.messageIdProvider.deallocate(messageId)
  1479. for (let grantedI = 0; grantedI < packet.granted.length; grantedI++) {
  1480. if ((packet.granted[grantedI] & 0x80) !== 0) {
  1481. // suback with Failure status
  1482. const topics = this.messageIdToTopic[messageId]
  1483. if (topics) {
  1484. topics.forEach(function (topic) {
  1485. delete that._resubscribeTopics[topic]
  1486. })
  1487. }
  1488. }
  1489. }
  1490. this._invokeStoreProcessingQueue()
  1491. cb(null, packet)
  1492. break
  1493. }
  1494. case 'unsuback': {
  1495. delete this.outgoing[messageId]
  1496. this.messageIdProvider.deallocate(messageId)
  1497. this._invokeStoreProcessingQueue()
  1498. cb(null)
  1499. break
  1500. }
  1501. default:
  1502. that.emit('error', new Error('unrecognized packet type'))
  1503. }
  1504. if (this.disconnecting &&
  1505. Object.keys(this.outgoing).length === 0) {
  1506. this.emit('outgoingEmpty')
  1507. }
  1508. }
  1509. /**
  1510. * _handlePubrel
  1511. *
  1512. * @param {Object} packet
  1513. * @api private
  1514. */
  1515. MqttClient.prototype._handlePubrel = function (packet, callback) {
  1516. debug('handling pubrel packet')
  1517. callback = typeof callback !== 'undefined' ? callback : nop
  1518. const messageId = packet.messageId
  1519. const that = this
  1520. const comp = { cmd: 'pubcomp', messageId: messageId }
  1521. that.incomingStore.get(packet, function (err, pub) {
  1522. if (!err) {
  1523. that.emit('message', pub.topic, pub.payload, pub)
  1524. that.handleMessage(pub, function (err) {
  1525. if (err) {
  1526. return callback(err)
  1527. }
  1528. that.incomingStore.del(pub, nop)
  1529. that._sendPacket(comp, callback)
  1530. })
  1531. } else {
  1532. that._sendPacket(comp, callback)
  1533. }
  1534. })
  1535. }
  1536. /**
  1537. * _handleDisconnect
  1538. *
  1539. * @param {Object} packet
  1540. * @api private
  1541. */
  1542. MqttClient.prototype._handleDisconnect = function (packet) {
  1543. this.emit('disconnect', packet)
  1544. }
  1545. /**
  1546. * _nextId
  1547. * @return unsigned int
  1548. */
  1549. MqttClient.prototype._nextId = function () {
  1550. return this.messageIdProvider.allocate()
  1551. }
  1552. /**
  1553. * getLastMessageId
  1554. * @return unsigned int
  1555. */
  1556. MqttClient.prototype.getLastMessageId = function () {
  1557. return this.messageIdProvider.getLastAllocated()
  1558. }
  1559. /**
  1560. * _resubscribe
  1561. * @api private
  1562. */
  1563. MqttClient.prototype._resubscribe = function () {
  1564. debug('_resubscribe')
  1565. const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
  1566. if (!this._firstConnection &&
  1567. (this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) &&
  1568. _resubscribeTopicsKeys.length > 0) {
  1569. if (this.options.resubscribe) {
  1570. if (this.options.protocolVersion === 5) {
  1571. debug('_resubscribe: protocolVersion 5')
  1572. for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
  1573. const resubscribeTopic = {}
  1574. resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
  1575. resubscribeTopic.resubscribe = true
  1576. this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties })
  1577. }
  1578. } else {
  1579. this._resubscribeTopics.resubscribe = true
  1580. this.subscribe(this._resubscribeTopics)
  1581. }
  1582. } else {
  1583. this._resubscribeTopics = {}
  1584. }
  1585. }
  1586. this._firstConnection = false
  1587. }
  1588. /**
  1589. * _onConnect
  1590. *
  1591. * @api private
  1592. */
  1593. MqttClient.prototype._onConnect = function (packet) {
  1594. if (this.disconnected) {
  1595. this.emit('connect', packet)
  1596. return
  1597. }
  1598. const that = this
  1599. this.connackPacket = packet
  1600. this.messageIdProvider.clear()
  1601. this._setupPingTimer()
  1602. this.connected = true
  1603. function startStreamProcess () {
  1604. let outStore = that.outgoingStore.createStream()
  1605. function clearStoreProcessing () {
  1606. that._storeProcessing = false
  1607. that._packetIdsDuringStoreProcessing = {}
  1608. }
  1609. that.once('close', remove)
  1610. outStore.on('error', function (err) {
  1611. clearStoreProcessing()
  1612. that._flushStoreProcessingQueue()
  1613. that.removeListener('close', remove)
  1614. that.emit('error', err)
  1615. })
  1616. function remove () {
  1617. outStore.destroy()
  1618. outStore = null
  1619. that._flushStoreProcessingQueue()
  1620. clearStoreProcessing()
  1621. }
  1622. function storeDeliver () {
  1623. // edge case, we wrapped this twice
  1624. if (!outStore) {
  1625. return
  1626. }
  1627. that._storeProcessing = true
  1628. const packet = outStore.read(1)
  1629. let cb
  1630. if (!packet) {
  1631. // read when data is available in the future
  1632. outStore.once('readable', storeDeliver)
  1633. return
  1634. }
  1635. // Skip already processed store packets
  1636. if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
  1637. storeDeliver()
  1638. return
  1639. }
  1640. // Avoid unnecessary stream read operations when disconnected
  1641. if (!that.disconnecting && !that.reconnectTimer) {
  1642. cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
  1643. that.outgoing[packet.messageId] = {
  1644. volatile: false,
  1645. cb: function (err, status) {
  1646. // Ensure that the original callback passed in to publish gets invoked
  1647. if (cb) {
  1648. cb(err, status)
  1649. }
  1650. storeDeliver()
  1651. }
  1652. }
  1653. that._packetIdsDuringStoreProcessing[packet.messageId] = true
  1654. if (that.messageIdProvider.register(packet.messageId)) {
  1655. that._sendPacket(packet)
  1656. } else {
  1657. debug('messageId: %d has already used.', packet.messageId)
  1658. }
  1659. } else if (outStore.destroy) {
  1660. outStore.destroy()
  1661. }
  1662. }
  1663. outStore.on('end', function () {
  1664. let allProcessed = true
  1665. for (const id in that._packetIdsDuringStoreProcessing) {
  1666. if (!that._packetIdsDuringStoreProcessing[id]) {
  1667. allProcessed = false
  1668. break
  1669. }
  1670. }
  1671. if (allProcessed) {
  1672. clearStoreProcessing()
  1673. that.removeListener('close', remove)
  1674. that._invokeAllStoreProcessingQueue()
  1675. that.emit('connect', packet)
  1676. } else {
  1677. startStreamProcess()
  1678. }
  1679. })
  1680. storeDeliver()
  1681. }
  1682. // start flowing
  1683. startStreamProcess()
  1684. }
  1685. MqttClient.prototype._invokeStoreProcessingQueue = function () {
  1686. if (this._storeProcessingQueue.length > 0) {
  1687. const f = this._storeProcessingQueue[0]
  1688. if (f && f.invoke()) {
  1689. this._storeProcessingQueue.shift()
  1690. return true
  1691. }
  1692. }
  1693. return false
  1694. }
  1695. MqttClient.prototype._invokeAllStoreProcessingQueue = function () {
  1696. while (this._invokeStoreProcessingQueue()) { /* empty */ }
  1697. }
  1698. MqttClient.prototype._flushStoreProcessingQueue = function () {
  1699. for (const f of this._storeProcessingQueue) {
  1700. if (f.cbStorePut) f.cbStorePut(new Error('Connection closed'))
  1701. if (f.callback) f.callback(new Error('Connection closed'))
  1702. }
  1703. this._storeProcessingQueue.splice(0)
  1704. }
  1705. module.exports = MqttClient