client.js 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897
  1. 'use strict'
  2. /**
  3. * Module dependencies
  4. */
  5. const EventEmitter = require('events').EventEmitter
  6. const Store = require('./store')
  7. const TopicAliasRecv = require('./topic-alias-recv')
  8. const TopicAliasSend = require('./topic-alias-send')
  9. const mqttPacket = require('mqtt-packet')
  10. const DefaultMessageIdProvider = require('./default-message-id-provider')
  11. const Writable = require('readable-stream').Writable
  12. const inherits = require('inherits')
  13. const reInterval = require('reinterval')
  14. const clone = require('rfdc/default')
  15. const validations = require('./validations')
  16. const xtend = require('xtend')
  17. const debug = require('debug')('mqttjs:client')
  18. const nextTick = process ? process.nextTick : function (callback) { setTimeout(callback, 0) }
  19. const setImmediate = global.setImmediate || function (callback) {
  20. // works in node v0.8
  21. nextTick(callback)
  22. }
  23. const defaultConnectOptions = {
  24. keepalive: 60,
  25. reschedulePings: true,
  26. protocolId: 'MQTT',
  27. protocolVersion: 4,
  28. reconnectPeriod: 1000,
  29. connectTimeout: 30 * 1000,
  30. clean: true,
  31. resubscribe: true
  32. }
  33. const socketErrors = [
  34. 'ECONNREFUSED',
  35. 'EADDRINUSE',
  36. 'ECONNRESET',
  37. 'ENOTFOUND'
  38. ]
  39. // Other Socket Errors: EADDRINUSE, ECONNRESET, ENOTFOUND.
  40. const errors = {
  41. 0: '',
  42. 1: 'Unacceptable protocol version',
  43. 2: 'Identifier rejected',
  44. 3: 'Server unavailable',
  45. 4: 'Bad username or password',
  46. 5: 'Not authorized',
  47. 16: 'No matching subscribers',
  48. 17: 'No subscription existed',
  49. 128: 'Unspecified error',
  50. 129: 'Malformed Packet',
  51. 130: 'Protocol Error',
  52. 131: 'Implementation specific error',
  53. 132: 'Unsupported Protocol Version',
  54. 133: 'Client Identifier not valid',
  55. 134: 'Bad User Name or Password',
  56. 135: 'Not authorized',
  57. 136: 'Server unavailable',
  58. 137: 'Server busy',
  59. 138: 'Banned',
  60. 139: 'Server shutting down',
  61. 140: 'Bad authentication method',
  62. 141: 'Keep Alive timeout',
  63. 142: 'Session taken over',
  64. 143: 'Topic Filter invalid',
  65. 144: 'Topic Name invalid',
  66. 145: 'Packet identifier in use',
  67. 146: 'Packet Identifier not found',
  68. 147: 'Receive Maximum exceeded',
  69. 148: 'Topic Alias invalid',
  70. 149: 'Packet too large',
  71. 150: 'Message rate too high',
  72. 151: 'Quota exceeded',
  73. 152: 'Administrative action',
  74. 153: 'Payload format invalid',
  75. 154: 'Retain not supported',
  76. 155: 'QoS not supported',
  77. 156: 'Use another server',
  78. 157: 'Server moved',
  79. 158: 'Shared Subscriptions not supported',
  80. 159: 'Connection rate exceeded',
  81. 160: 'Maximum connect time',
  82. 161: 'Subscription Identifiers not supported',
  83. 162: 'Wildcard Subscriptions not supported'
  84. }
  85. function defaultId () {
  86. return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
  87. }
  88. function applyTopicAlias (client, packet) {
  89. if (client.options.protocolVersion === 5) {
  90. if (packet.cmd === 'publish') {
  91. let alias
  92. if (packet.properties) {
  93. alias = packet.properties.topicAlias
  94. }
  95. const topic = packet.topic.toString()
  96. if (client.topicAliasSend) {
  97. if (alias) {
  98. if (topic.length !== 0) {
  99. // register topic alias
  100. debug('applyTopicAlias :: register topic: %s - alias: %d', topic, alias)
  101. if (!client.topicAliasSend.put(topic, alias)) {
  102. debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
  103. return new Error('Sending Topic Alias out of range')
  104. }
  105. }
  106. } else {
  107. if (topic.length !== 0) {
  108. if (client.options.autoAssignTopicAlias) {
  109. alias = client.topicAliasSend.getAliasByTopic(topic)
  110. if (alias) {
  111. packet.topic = ''
  112. packet.properties = { ...(packet.properties), topicAlias: alias }
  113. debug('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias)
  114. } else {
  115. alias = client.topicAliasSend.getLruAlias()
  116. client.topicAliasSend.put(topic, alias)
  117. packet.properties = { ...(packet.properties), topicAlias: alias }
  118. debug('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias)
  119. }
  120. } else if (client.options.autoUseTopicAlias) {
  121. alias = client.topicAliasSend.getAliasByTopic(topic)
  122. if (alias) {
  123. packet.topic = ''
  124. packet.properties = { ...(packet.properties), topicAlias: alias }
  125. debug('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias)
  126. }
  127. }
  128. }
  129. }
  130. } else if (alias) {
  131. debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
  132. return new Error('Sending Topic Alias out of range')
  133. }
  134. }
  135. }
  136. }
  137. function removeTopicAliasAndRecoverTopicName (client, packet) {
  138. let alias
  139. if (packet.properties) {
  140. alias = packet.properties.topicAlias
  141. }
  142. let topic = packet.topic.toString()
  143. if (topic.length === 0) {
  144. // restore topic from alias
  145. if (typeof alias === 'undefined') {
  146. return new Error('Unregistered Topic Alias')
  147. } else {
  148. topic = client.topicAliasSend.getTopicByAlias(alias)
  149. if (typeof topic === 'undefined') {
  150. return new Error('Unregistered Topic Alias')
  151. } else {
  152. packet.topic = topic
  153. }
  154. }
  155. }
  156. if (alias) {
  157. delete packet.properties.topicAlias
  158. }
  159. }
  160. function sendPacket (client, packet, cb) {
  161. debug('sendPacket :: packet: %O', packet)
  162. debug('sendPacket :: emitting `packetsend`')
  163. client.emit('packetsend', packet)
  164. debug('sendPacket :: writing to stream')
  165. const result = mqttPacket.writeToStream(packet, client.stream, client.options)
  166. debug('sendPacket :: writeToStream result %s', result)
  167. if (!result && cb && cb !== nop) {
  168. debug('sendPacket :: handle events on `drain` once through callback.')
  169. client.stream.once('drain', cb)
  170. } else if (cb) {
  171. debug('sendPacket :: invoking cb')
  172. cb()
  173. }
  174. }
  175. function flush (queue) {
  176. if (queue) {
  177. debug('flush: queue exists? %b', !!(queue))
  178. Object.keys(queue).forEach(function (messageId) {
  179. if (typeof queue[messageId].cb === 'function') {
  180. queue[messageId].cb(new Error('Connection closed'))
  181. // This is suspicious. Why do we only delete this if we have a callbck?
  182. // If this is by-design, then adding no as callback would cause this to get deleted unintentionally.
  183. delete queue[messageId]
  184. }
  185. })
  186. }
  187. }
  188. function flushVolatile (queue) {
  189. if (queue) {
  190. debug('flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function')
  191. Object.keys(queue).forEach(function (messageId) {
  192. if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
  193. queue[messageId].cb(new Error('Connection closed'))
  194. delete queue[messageId]
  195. }
  196. })
  197. }
  198. }
  199. function storeAndSend (client, packet, cb, cbStorePut) {
  200. debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd)
  201. let storePacket = packet
  202. let err
  203. if (storePacket.cmd === 'publish') {
  204. // The original packet is for sending.
  205. // The cloned storePacket is for storing to resend on reconnect.
  206. // Topic Alias must not be used after disconnected.
  207. storePacket = clone(packet)
  208. err = removeTopicAliasAndRecoverTopicName(client, storePacket)
  209. if (err) {
  210. return cb && cb(err)
  211. }
  212. }
  213. client.outgoingStore.put(storePacket, function storedPacket (err) {
  214. if (err) {
  215. return cb && cb(err)
  216. }
  217. cbStorePut()
  218. sendPacket(client, packet, cb)
  219. })
  220. }
  221. function nop (error) {
  222. debug('nop ::', error)
  223. }
  224. /**
  225. * MqttClient constructor
  226. *
  227. * @param {Stream} stream - stream
  228. * @param {Object} [options] - connection options
  229. * (see Connection#connect)
  230. */
  231. function MqttClient (streamBuilder, options) {
  232. let k
  233. const that = this
  234. if (!(this instanceof MqttClient)) {
  235. return new MqttClient(streamBuilder, options)
  236. }
  237. this.options = options || {}
  238. // Defaults
  239. for (k in defaultConnectOptions) {
  240. if (typeof this.options[k] === 'undefined') {
  241. this.options[k] = defaultConnectOptions[k]
  242. } else {
  243. this.options[k] = options[k]
  244. }
  245. }
  246. debug('MqttClient :: options.protocol', options.protocol)
  247. debug('MqttClient :: options.protocolVersion', options.protocolVersion)
  248. debug('MqttClient :: options.username', options.username)
  249. debug('MqttClient :: options.keepalive', options.keepalive)
  250. debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod)
  251. debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized)
  252. debug('MqttClient :: options.topicAliasMaximum', options.topicAliasMaximum)
  253. this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
  254. debug('MqttClient :: clientId', this.options.clientId)
  255. this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }
  256. this.streamBuilder = streamBuilder
  257. this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider
  258. // Inflight message storages
  259. this.outgoingStore = options.outgoingStore || new Store()
  260. this.incomingStore = options.incomingStore || new Store()
  261. // Should QoS zero messages be queued when the connection is broken?
  262. this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero
  263. // map of subscribed topics to support reconnection
  264. this._resubscribeTopics = {}
  265. // map of a subscribe messageId and a topic
  266. this.messageIdToTopic = {}
  267. // Ping timer, setup in _setupPingTimer
  268. this.pingTimer = null
  269. // Is the client connected?
  270. this.connected = false
  271. // Are we disconnecting?
  272. this.disconnecting = false
  273. // Packet queue
  274. this.queue = []
  275. // connack timer
  276. this.connackTimer = null
  277. // Reconnect timer
  278. this.reconnectTimer = null
  279. // Is processing store?
  280. this._storeProcessing = false
  281. // Packet Ids are put into the store during store processing
  282. this._packetIdsDuringStoreProcessing = {}
  283. // Store processing queue
  284. this._storeProcessingQueue = []
  285. // Inflight callbacks
  286. this.outgoing = {}
  287. // True if connection is first time.
  288. this._firstConnection = true
  289. if (options.topicAliasMaximum > 0) {
  290. if (options.topicAliasMaximum > 0xffff) {
  291. debug('MqttClient :: options.topicAliasMaximum is out of range')
  292. } else {
  293. this.topicAliasRecv = new TopicAliasRecv(options.topicAliasMaximum)
  294. }
  295. }
  296. // Send queued packets
  297. this.on('connect', function () {
  298. const queue = this.queue
  299. function deliver () {
  300. const entry = queue.shift()
  301. debug('deliver :: entry %o', entry)
  302. let packet = null
  303. if (!entry) {
  304. that._resubscribe()
  305. return
  306. }
  307. packet = entry.packet
  308. debug('deliver :: call _sendPacket for %o', packet)
  309. let send = true
  310. if (packet.messageId && packet.messageId !== 0) {
  311. if (!that.messageIdProvider.register(packet.messageId)) {
  312. send = false
  313. }
  314. }
  315. if (send) {
  316. that._sendPacket(
  317. packet,
  318. function (err) {
  319. if (entry.cb) {
  320. entry.cb(err)
  321. }
  322. deliver()
  323. }
  324. )
  325. } else {
  326. debug('messageId: %d has already used. The message is skipped and removed.', packet.messageId)
  327. deliver()
  328. }
  329. }
  330. debug('connect :: sending queued packets')
  331. deliver()
  332. })
  333. this.on('close', function () {
  334. debug('close :: connected set to `false`')
  335. this.connected = false
  336. debug('close :: clearing connackTimer')
  337. clearTimeout(this.connackTimer)
  338. debug('close :: clearing ping timer')
  339. if (that.pingTimer !== null) {
  340. that.pingTimer.clear()
  341. that.pingTimer = null
  342. }
  343. if (this.topicAliasRecv) {
  344. this.topicAliasRecv.clear()
  345. }
  346. debug('close :: calling _setupReconnect')
  347. this._setupReconnect()
  348. })
  349. EventEmitter.call(this)
  350. debug('MqttClient :: setting up stream')
  351. this._setupStream()
  352. }
  353. inherits(MqttClient, EventEmitter)
  354. /**
  355. * setup the event handlers in the inner stream.
  356. *
  357. * @api private
  358. */
  359. MqttClient.prototype._setupStream = function () {
  360. const that = this
  361. const writable = new Writable()
  362. const parser = mqttPacket.parser(this.options)
  363. let completeParse = null
  364. const packets = []
  365. debug('_setupStream :: calling method to clear reconnect')
  366. this._clearReconnect()
  367. debug('_setupStream :: using streamBuilder provided to client to create stream')
  368. this.stream = this.streamBuilder(this)
  369. parser.on('packet', function (packet) {
  370. debug('parser :: on packet push to packets array.')
  371. packets.push(packet)
  372. })
  373. function nextTickWork () {
  374. if (packets.length) {
  375. nextTick(work)
  376. } else {
  377. const done = completeParse
  378. completeParse = null
  379. done()
  380. }
  381. }
  382. function work () {
  383. debug('work :: getting next packet in queue')
  384. const packet = packets.shift()
  385. if (packet) {
  386. debug('work :: packet pulled from queue')
  387. that._handlePacket(packet, nextTickWork)
  388. } else {
  389. debug('work :: no packets in queue')
  390. const done = completeParse
  391. completeParse = null
  392. debug('work :: done flag is %s', !!(done))
  393. if (done) done()
  394. }
  395. }
  396. writable._write = function (buf, enc, done) {
  397. completeParse = done
  398. debug('writable stream :: parsing buffer')
  399. parser.parse(buf)
  400. work()
  401. }
  402. function streamErrorHandler (error) {
  403. debug('streamErrorHandler :: error', error.message)
  404. if (socketErrors.includes(error.code)) {
  405. // handle error
  406. debug('streamErrorHandler :: emitting error')
  407. that.emit('error', error)
  408. } else {
  409. nop(error)
  410. }
  411. }
  412. debug('_setupStream :: pipe stream to writable stream')
  413. this.stream.pipe(writable)
  414. // Suppress connection errors
  415. this.stream.on('error', streamErrorHandler)
  416. // Echo stream close
  417. this.stream.on('close', function () {
  418. debug('(%s)stream :: on close', that.options.clientId)
  419. flushVolatile(that.outgoing)
  420. debug('stream: emit close to MqttClient')
  421. that.emit('close')
  422. })
  423. // Send a connect packet
  424. debug('_setupStream: sending packet `connect`')
  425. const connectPacket = Object.create(this.options)
  426. connectPacket.cmd = 'connect'
  427. if (this.topicAliasRecv) {
  428. if (!connectPacket.properties) {
  429. connectPacket.properties = {}
  430. }
  431. if (this.topicAliasRecv) {
  432. connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max
  433. }
  434. }
  435. // avoid message queue
  436. sendPacket(this, connectPacket)
  437. // Echo connection errors
  438. parser.on('error', this.emit.bind(this, 'error'))
  439. // auth
  440. if (this.options.properties) {
  441. if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) {
  442. that.end(() =>
  443. this.emit('error', new Error('Packet has no Authentication Method')
  444. ))
  445. return this
  446. }
  447. if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') {
  448. const authPacket = xtend({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket)
  449. sendPacket(this, authPacket)
  450. }
  451. }
  452. // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
  453. this.stream.setMaxListeners(1000)
  454. clearTimeout(this.connackTimer)
  455. this.connackTimer = setTimeout(function () {
  456. debug('!!connectTimeout hit!! Calling _cleanUp with force `true`')
  457. that._cleanUp(true)
  458. }, this.options.connectTimeout)
  459. }
  460. MqttClient.prototype._handlePacket = function (packet, done) {
  461. const options = this.options
  462. if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) {
  463. this.emit('error', new Error('exceeding packets size ' + packet.cmd))
  464. this.end({ reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' } })
  465. return this
  466. }
  467. debug('_handlePacket :: emitting packetreceive')
  468. this.emit('packetreceive', packet)
  469. switch (packet.cmd) {
  470. case 'publish':
  471. this._handlePublish(packet, done)
  472. break
  473. case 'puback':
  474. case 'pubrec':
  475. case 'pubcomp':
  476. case 'suback':
  477. case 'unsuback':
  478. this._handleAck(packet)
  479. done()
  480. break
  481. case 'pubrel':
  482. this._handlePubrel(packet, done)
  483. break
  484. case 'connack':
  485. this._handleConnack(packet)
  486. done()
  487. break
  488. case 'auth':
  489. this._handleAuth(packet)
  490. done()
  491. break
  492. case 'pingresp':
  493. this._handlePingresp(packet)
  494. done()
  495. break
  496. case 'disconnect':
  497. this._handleDisconnect(packet)
  498. done()
  499. break
  500. default:
  501. // do nothing
  502. // maybe we should do an error handling
  503. // or just log it
  504. break
  505. }
  506. }
  507. MqttClient.prototype._checkDisconnecting = function (callback) {
  508. if (this.disconnecting) {
  509. if (callback && callback !== nop) {
  510. callback(new Error('client disconnecting'))
  511. } else {
  512. this.emit('error', new Error('client disconnecting'))
  513. }
  514. }
  515. return this.disconnecting
  516. }
  517. /**
  518. * publish - publish <message> to <topic>
  519. *
  520. * @param {String} topic - topic to publish to
  521. * @param {String, Buffer} message - message to publish
  522. * @param {Object} [opts] - publish options, includes:
  523. * {Number} qos - qos level to publish on
  524. * {Boolean} retain - whether or not to retain the message
  525. * {Boolean} dup - whether or not mark a message as duplicate
  526. * {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
  527. * @param {Function} [callback] - function(err){}
  528. * called when publish succeeds or fails
  529. * @returns {MqttClient} this - for chaining
  530. * @api public
  531. *
  532. * @example client.publish('topic', 'message');
  533. * @example
  534. * client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
  535. * @example client.publish('topic', 'message', console.log);
  536. */
  537. MqttClient.prototype.publish = function (topic, message, opts, callback) {
  538. debug('publish :: message `%s` to topic `%s`', message, topic)
  539. const options = this.options
  540. // .publish(topic, payload, cb);
  541. if (typeof opts === 'function') {
  542. callback = opts
  543. opts = null
  544. }
  545. // default opts
  546. const defaultOpts = { qos: 0, retain: false, dup: false }
  547. opts = xtend(defaultOpts, opts)
  548. if (this._checkDisconnecting(callback)) {
  549. return this
  550. }
  551. const that = this
  552. const publishProc = function () {
  553. let messageId = 0
  554. if (opts.qos === 1 || opts.qos === 2) {
  555. messageId = that._nextId()
  556. if (messageId === null) {
  557. debug('No messageId left')
  558. return false
  559. }
  560. }
  561. const packet = {
  562. cmd: 'publish',
  563. topic: topic,
  564. payload: message,
  565. qos: opts.qos,
  566. retain: opts.retain,
  567. messageId: messageId,
  568. dup: opts.dup
  569. }
  570. if (options.protocolVersion === 5) {
  571. packet.properties = opts.properties
  572. }
  573. debug('publish :: qos', opts.qos)
  574. switch (opts.qos) {
  575. case 1:
  576. case 2:
  577. // Add to callbacks
  578. that.outgoing[packet.messageId] = {
  579. volatile: false,
  580. cb: callback || nop
  581. }
  582. debug('MqttClient:publish: packet cmd: %s', packet.cmd)
  583. that._sendPacket(packet, undefined, opts.cbStorePut)
  584. break
  585. default:
  586. debug('MqttClient:publish: packet cmd: %s', packet.cmd)
  587. that._sendPacket(packet, callback, opts.cbStorePut)
  588. break
  589. }
  590. return true
  591. }
  592. if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) {
  593. this._storeProcessingQueue.push(
  594. {
  595. invoke: publishProc,
  596. cbStorePut: opts.cbStorePut,
  597. callback: callback
  598. }
  599. )
  600. }
  601. return this
  602. }
  603. /**
  604. * subscribe - subscribe to <topic>
  605. *
  606. * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
  607. * @param {Object} [opts] - optional subscription options, includes:
  608. * {Number} qos - subscribe qos level
  609. * @param {Function} [callback] - function(err, granted){} where:
  610. * {Error} err - subscription error (none at the moment!)
  611. * {Array} granted - array of {topic: 't', qos: 0}
  612. * @returns {MqttClient} this - for chaining
  613. * @api public
  614. * @example client.subscribe('topic');
  615. * @example client.subscribe('topic', {qos: 1});
  616. * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
  617. * @example client.subscribe('topic', console.log);
  618. */
  619. MqttClient.prototype.subscribe = function () {
  620. const that = this
  621. const args = new Array(arguments.length)
  622. for (let i = 0; i < arguments.length; i++) {
  623. args[i] = arguments[i]
  624. }
  625. const subs = []
  626. let obj = args.shift()
  627. const resubscribe = obj.resubscribe
  628. let callback = args.pop() || nop
  629. let opts = args.pop()
  630. const version = this.options.protocolVersion
  631. delete obj.resubscribe
  632. if (typeof obj === 'string') {
  633. obj = [obj]
  634. }
  635. if (typeof callback !== 'function') {
  636. opts = callback
  637. callback = nop
  638. }
  639. const invalidTopic = validations.validateTopics(obj)
  640. if (invalidTopic !== null) {
  641. setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
  642. return this
  643. }
  644. if (this._checkDisconnecting(callback)) {
  645. debug('subscribe: discconecting true')
  646. return this
  647. }
  648. const defaultOpts = {
  649. qos: 0
  650. }
  651. if (version === 5) {
  652. defaultOpts.nl = false
  653. defaultOpts.rap = false
  654. defaultOpts.rh = 0
  655. }
  656. opts = xtend(defaultOpts, opts)
  657. if (Array.isArray(obj)) {
  658. obj.forEach(function (topic) {
  659. debug('subscribe: array topic %s', topic)
  660. if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, topic) ||
  661. that._resubscribeTopics[topic].qos < opts.qos ||
  662. resubscribe) {
  663. const currentOpts = {
  664. topic: topic,
  665. qos: opts.qos
  666. }
  667. if (version === 5) {
  668. currentOpts.nl = opts.nl
  669. currentOpts.rap = opts.rap
  670. currentOpts.rh = opts.rh
  671. currentOpts.properties = opts.properties
  672. }
  673. debug('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos)
  674. subs.push(currentOpts)
  675. }
  676. })
  677. } else {
  678. Object
  679. .keys(obj)
  680. .forEach(function (k) {
  681. debug('subscribe: object topic %s', k)
  682. if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, k) ||
  683. that._resubscribeTopics[k].qos < obj[k].qos ||
  684. resubscribe) {
  685. const currentOpts = {
  686. topic: k,
  687. qos: obj[k].qos
  688. }
  689. if (version === 5) {
  690. currentOpts.nl = obj[k].nl
  691. currentOpts.rap = obj[k].rap
  692. currentOpts.rh = obj[k].rh
  693. currentOpts.properties = opts.properties
  694. }
  695. debug('subscribe: pushing `%s` to subs list', currentOpts)
  696. subs.push(currentOpts)
  697. }
  698. })
  699. }
  700. if (!subs.length) {
  701. callback(null, [])
  702. return this
  703. }
  704. const subscribeProc = function () {
  705. const messageId = that._nextId()
  706. if (messageId === null) {
  707. debug('No messageId left')
  708. return false
  709. }
  710. const packet = {
  711. cmd: 'subscribe',
  712. subscriptions: subs,
  713. qos: 1,
  714. retain: false,
  715. dup: false,
  716. messageId: messageId
  717. }
  718. if (opts.properties) {
  719. packet.properties = opts.properties
  720. }
  721. // subscriptions to resubscribe to in case of disconnect
  722. if (that.options.resubscribe) {
  723. debug('subscribe :: resubscribe true')
  724. const topics = []
  725. subs.forEach(function (sub) {
  726. if (that.options.reconnectPeriod > 0) {
  727. const topic = { qos: sub.qos }
  728. if (version === 5) {
  729. topic.nl = sub.nl || false
  730. topic.rap = sub.rap || false
  731. topic.rh = sub.rh || 0
  732. topic.properties = sub.properties
  733. }
  734. that._resubscribeTopics[sub.topic] = topic
  735. topics.push(sub.topic)
  736. }
  737. })
  738. that.messageIdToTopic[packet.messageId] = topics
  739. }
  740. that.outgoing[packet.messageId] = {
  741. volatile: true,
  742. cb: function (err, packet) {
  743. if (!err) {
  744. const granted = packet.granted
  745. for (let i = 0; i < granted.length; i += 1) {
  746. subs[i].qos = granted[i]
  747. }
  748. }
  749. callback(err, subs)
  750. }
  751. }
  752. debug('subscribe :: call _sendPacket')
  753. that._sendPacket(packet)
  754. return true
  755. }
  756. if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) {
  757. this._storeProcessingQueue.push(
  758. {
  759. invoke: subscribeProc,
  760. callback: callback
  761. }
  762. )
  763. }
  764. return this
  765. }
  766. /**
  767. * unsubscribe - unsubscribe from topic(s)
  768. *
  769. * @param {String, Array} topic - topics to unsubscribe from
  770. * @param {Object} [opts] - optional subscription options, includes:
  771. * {Object} properties - properties of unsubscribe packet
  772. * @param {Function} [callback] - callback fired on unsuback
  773. * @returns {MqttClient} this - for chaining
  774. * @api public
  775. * @example client.unsubscribe('topic');
  776. * @example client.unsubscribe('topic', console.log);
  777. */
  778. MqttClient.prototype.unsubscribe = function () {
  779. const that = this
  780. const args = new Array(arguments.length)
  781. for (let i = 0; i < arguments.length; i++) {
  782. args[i] = arguments[i]
  783. }
  784. let topic = args.shift()
  785. let callback = args.pop() || nop
  786. let opts = args.pop()
  787. if (typeof topic === 'string') {
  788. topic = [topic]
  789. }
  790. if (typeof callback !== 'function') {
  791. opts = callback
  792. callback = nop
  793. }
  794. const invalidTopic = validations.validateTopics(topic)
  795. if (invalidTopic !== null) {
  796. setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
  797. return this
  798. }
  799. if (that._checkDisconnecting(callback)) {
  800. return this
  801. }
  802. const unsubscribeProc = function () {
  803. const messageId = that._nextId()
  804. if (messageId === null) {
  805. debug('No messageId left')
  806. return false
  807. }
  808. const packet = {
  809. cmd: 'unsubscribe',
  810. qos: 1,
  811. messageId: messageId
  812. }
  813. if (typeof topic === 'string') {
  814. packet.unsubscriptions = [topic]
  815. } else if (Array.isArray(topic)) {
  816. packet.unsubscriptions = topic
  817. }
  818. if (that.options.resubscribe) {
  819. packet.unsubscriptions.forEach(function (topic) {
  820. delete that._resubscribeTopics[topic]
  821. })
  822. }
  823. if (typeof opts === 'object' && opts.properties) {
  824. packet.properties = opts.properties
  825. }
  826. that.outgoing[packet.messageId] = {
  827. volatile: true,
  828. cb: callback
  829. }
  830. debug('unsubscribe: call _sendPacket')
  831. that._sendPacket(packet)
  832. return true
  833. }
  834. if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) {
  835. this._storeProcessingQueue.push(
  836. {
  837. invoke: unsubscribeProc,
  838. callback: callback
  839. }
  840. )
  841. }
  842. return this
  843. }
  844. /**
  845. * end - close connection
  846. *
  847. * @returns {MqttClient} this - for chaining
  848. * @param {Boolean} force - do not wait for all in-flight messages to be acked
  849. * @param {Object} opts - added to the disconnect packet
  850. * @param {Function} cb - called when the client has been closed
  851. *
  852. * @api public
  853. */
  854. MqttClient.prototype.end = function (force, opts, cb) {
  855. const that = this
  856. debug('end :: (%s)', this.options.clientId)
  857. if (force == null || typeof force !== 'boolean') {
  858. cb = opts || nop
  859. opts = force
  860. force = false
  861. if (typeof opts !== 'object') {
  862. cb = opts
  863. opts = null
  864. if (typeof cb !== 'function') {
  865. cb = nop
  866. }
  867. }
  868. }
  869. if (typeof opts !== 'object') {
  870. cb = opts
  871. opts = null
  872. }
  873. debug('end :: cb? %s', !!cb)
  874. cb = cb || nop
  875. function closeStores () {
  876. debug('end :: closeStores: closing incoming and outgoing stores')
  877. that.disconnected = true
  878. that.incomingStore.close(function (e1) {
  879. that.outgoingStore.close(function (e2) {
  880. debug('end :: closeStores: emitting end')
  881. that.emit('end')
  882. if (cb) {
  883. const err = e1 || e2
  884. debug('end :: closeStores: invoking callback with args')
  885. cb(err)
  886. }
  887. })
  888. })
  889. if (that._deferredReconnect) {
  890. that._deferredReconnect()
  891. }
  892. }
  893. function finish () {
  894. // defer closesStores of an I/O cycle,
  895. // just to make sure things are
  896. // ok for websockets
  897. debug('end :: (%s) :: finish :: calling _cleanUp with force %s', that.options.clientId, force)
  898. that._cleanUp(force, () => {
  899. debug('end :: finish :: calling process.nextTick on closeStores')
  900. // const boundProcess = nextTick.bind(null, closeStores)
  901. nextTick(closeStores.bind(that))
  902. }, opts)
  903. }
  904. if (this.disconnecting) {
  905. cb()
  906. return this
  907. }
  908. this._clearReconnect()
  909. this.disconnecting = true
  910. if (!force && Object.keys(this.outgoing).length > 0) {
  911. // wait 10ms, just to be sure we received all of it
  912. debug('end :: (%s) :: calling finish in 10ms once outgoing is empty', that.options.clientId)
  913. this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
  914. } else {
  915. debug('end :: (%s) :: immediately calling finish', that.options.clientId)
  916. finish()
  917. }
  918. return this
  919. }
  920. /**
  921. * removeOutgoingMessage - remove a message in outgoing store
  922. * the outgoing callback will be called withe Error('Message removed') if the message is removed
  923. *
  924. * @param {Number} messageId - messageId to remove message
  925. * @returns {MqttClient} this - for chaining
  926. * @api public
  927. *
  928. * @example client.removeOutgoingMessage(client.getLastAllocated());
  929. */
  930. MqttClient.prototype.removeOutgoingMessage = function (messageId) {
  931. const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
  932. delete this.outgoing[messageId]
  933. this.outgoingStore.del({ messageId: messageId }, function () {
  934. cb(new Error('Message removed'))
  935. })
  936. return this
  937. }
  938. /**
  939. * reconnect - connect again using the same options as connect()
  940. *
  941. * @param {Object} [opts] - optional reconnect options, includes:
  942. * {Store} incomingStore - a store for the incoming packets
  943. * {Store} outgoingStore - a store for the outgoing packets
  944. * if opts is not given, current stores are used
  945. * @returns {MqttClient} this - for chaining
  946. *
  947. * @api public
  948. */
  949. MqttClient.prototype.reconnect = function (opts) {
  950. debug('client reconnect')
  951. const that = this
  952. const f = function () {
  953. if (opts) {
  954. that.options.incomingStore = opts.incomingStore
  955. that.options.outgoingStore = opts.outgoingStore
  956. } else {
  957. that.options.incomingStore = null
  958. that.options.outgoingStore = null
  959. }
  960. that.incomingStore = that.options.incomingStore || new Store()
  961. that.outgoingStore = that.options.outgoingStore || new Store()
  962. that.disconnecting = false
  963. that.disconnected = false
  964. that._deferredReconnect = null
  965. that._reconnect()
  966. }
  967. if (this.disconnecting && !this.disconnected) {
  968. this._deferredReconnect = f
  969. } else {
  970. f()
  971. }
  972. return this
  973. }
  974. /**
  975. * _reconnect - implement reconnection
  976. * @api privateish
  977. */
  978. MqttClient.prototype._reconnect = function () {
  979. debug('_reconnect: emitting reconnect to client')
  980. this.emit('reconnect')
  981. if (this.connected) {
  982. this.end(() => { this._setupStream() })
  983. debug('client already connected. disconnecting first.')
  984. } else {
  985. debug('_reconnect: calling _setupStream')
  986. this._setupStream()
  987. }
  988. }
  989. /**
  990. * _setupReconnect - setup reconnect timer
  991. */
  992. MqttClient.prototype._setupReconnect = function () {
  993. const that = this
  994. if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
  995. if (!this.reconnecting) {
  996. debug('_setupReconnect :: emit `offline` state')
  997. this.emit('offline')
  998. debug('_setupReconnect :: set `reconnecting` to `true`')
  999. this.reconnecting = true
  1000. }
  1001. debug('_setupReconnect :: setting reconnectTimer for %d ms', that.options.reconnectPeriod)
  1002. that.reconnectTimer = setInterval(function () {
  1003. debug('reconnectTimer :: reconnect triggered!')
  1004. that._reconnect()
  1005. }, that.options.reconnectPeriod)
  1006. } else {
  1007. debug('_setupReconnect :: doing nothing...')
  1008. }
  1009. }
  1010. /**
  1011. * _clearReconnect - clear the reconnect timer
  1012. */
  1013. MqttClient.prototype._clearReconnect = function () {
  1014. debug('_clearReconnect : clearing reconnect timer')
  1015. if (this.reconnectTimer) {
  1016. clearInterval(this.reconnectTimer)
  1017. this.reconnectTimer = null
  1018. }
  1019. }
  1020. /**
  1021. * _cleanUp - clean up on connection end
  1022. * @api private
  1023. */
  1024. MqttClient.prototype._cleanUp = function (forced, done) {
  1025. const opts = arguments[2]
  1026. if (done) {
  1027. debug('_cleanUp :: done callback provided for on stream close')
  1028. this.stream.on('close', done)
  1029. }
  1030. debug('_cleanUp :: forced? %s', forced)
  1031. if (forced) {
  1032. if ((this.options.reconnectPeriod === 0) && this.options.clean) {
  1033. flush(this.outgoing)
  1034. }
  1035. debug('_cleanUp :: (%s) :: destroying stream', this.options.clientId)
  1036. this.stream.destroy()
  1037. } else {
  1038. const packet = xtend({ cmd: 'disconnect' }, opts)
  1039. debug('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId)
  1040. this._sendPacket(
  1041. packet,
  1042. setImmediate.bind(
  1043. null,
  1044. this.stream.end.bind(this.stream)
  1045. )
  1046. )
  1047. }
  1048. if (!this.disconnecting) {
  1049. debug('_cleanUp :: client not disconnecting. Clearing and resetting reconnect.')
  1050. this._clearReconnect()
  1051. this._setupReconnect()
  1052. }
  1053. if (this.pingTimer !== null) {
  1054. debug('_cleanUp :: clearing pingTimer')
  1055. this.pingTimer.clear()
  1056. this.pingTimer = null
  1057. }
  1058. if (done && !this.connected) {
  1059. debug('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId)
  1060. this.stream.removeListener('close', done)
  1061. done()
  1062. }
  1063. }
  1064. /**
  1065. * _sendPacket - send or queue a packet
  1066. * @param {Object} packet - packet options
  1067. * @param {Function} cb - callback when the packet is sent
  1068. * @param {Function} cbStorePut - called when message is put into outgoingStore
  1069. * @api private
  1070. */
  1071. MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
  1072. debug('_sendPacket :: (%s) :: start', this.options.clientId)
  1073. cbStorePut = cbStorePut || nop
  1074. cb = cb || nop
  1075. const err = applyTopicAlias(this, packet)
  1076. if (err) {
  1077. cb(err)
  1078. return
  1079. }
  1080. if (!this.connected) {
  1081. // allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth)
  1082. if (packet.cmd === 'auth') {
  1083. this._shiftPingInterval()
  1084. sendPacket(this, packet, cb)
  1085. return
  1086. }
  1087. debug('_sendPacket :: client not connected. Storing packet offline.')
  1088. this._storePacket(packet, cb, cbStorePut)
  1089. return
  1090. }
  1091. // When sending a packet, reschedule the ping timer
  1092. this._shiftPingInterval()
  1093. switch (packet.cmd) {
  1094. case 'publish':
  1095. break
  1096. case 'pubrel':
  1097. storeAndSend(this, packet, cb, cbStorePut)
  1098. return
  1099. default:
  1100. sendPacket(this, packet, cb)
  1101. return
  1102. }
  1103. switch (packet.qos) {
  1104. case 2:
  1105. case 1:
  1106. storeAndSend(this, packet, cb, cbStorePut)
  1107. break
  1108. /**
  1109. * no need of case here since it will be caught by default
  1110. * and jshint comply that before default it must be a break
  1111. * anyway it will result in -1 evaluation
  1112. */
  1113. case 0:
  1114. /* falls through */
  1115. default:
  1116. sendPacket(this, packet, cb)
  1117. break
  1118. }
  1119. debug('_sendPacket :: (%s) :: end', this.options.clientId)
  1120. }
  1121. /**
  1122. * _storePacket - queue a packet
  1123. * @param {Object} packet - packet options
  1124. * @param {Function} cb - callback when the packet is sent
  1125. * @param {Function} cbStorePut - called when message is put into outgoingStore
  1126. * @api private
  1127. */
  1128. MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
  1129. debug('_storePacket :: packet: %o', packet)
  1130. debug('_storePacket :: cb? %s', !!cb)
  1131. cbStorePut = cbStorePut || nop
  1132. let storePacket = packet
  1133. if (storePacket.cmd === 'publish') {
  1134. // The original packet is for sending.
  1135. // The cloned storePacket is for storing to resend on reconnect.
  1136. // Topic Alias must not be used after disconnected.
  1137. storePacket = clone(packet)
  1138. const err = removeTopicAliasAndRecoverTopicName(this, storePacket)
  1139. if (err) {
  1140. return cb && cb(err)
  1141. }
  1142. }
  1143. // check that the packet is not a qos of 0, or that the command is not a publish
  1144. if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') {
  1145. this.queue.push({ packet: storePacket, cb: cb })
  1146. } else if (storePacket.qos > 0) {
  1147. cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null
  1148. this.outgoingStore.put(storePacket, function (err) {
  1149. if (err) {
  1150. return cb && cb(err)
  1151. }
  1152. cbStorePut()
  1153. })
  1154. } else if (cb) {
  1155. cb(new Error('No connection to broker'))
  1156. }
  1157. }
  1158. /**
  1159. * _setupPingTimer - setup the ping timer
  1160. *
  1161. * @api private
  1162. */
  1163. MqttClient.prototype._setupPingTimer = function () {
  1164. debug('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive)
  1165. const that = this
  1166. if (!this.pingTimer && this.options.keepalive) {
  1167. this.pingResp = true
  1168. this.pingTimer = reInterval(function () {
  1169. that._checkPing()
  1170. }, this.options.keepalive * 1000)
  1171. }
  1172. }
  1173. /**
  1174. * _shiftPingInterval - reschedule the ping interval
  1175. *
  1176. * @api private
  1177. */
  1178. MqttClient.prototype._shiftPingInterval = function () {
  1179. if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
  1180. this.pingTimer.reschedule(this.options.keepalive * 1000)
  1181. }
  1182. }
  1183. /**
  1184. * _checkPing - check if a pingresp has come back, and ping the server again
  1185. *
  1186. * @api private
  1187. */
  1188. MqttClient.prototype._checkPing = function () {
  1189. debug('_checkPing :: checking ping...')
  1190. if (this.pingResp) {
  1191. debug('_checkPing :: ping response received. Clearing flag and sending `pingreq`')
  1192. this.pingResp = false
  1193. this._sendPacket({ cmd: 'pingreq' })
  1194. } else {
  1195. // do a forced cleanup since socket will be in bad shape
  1196. debug('_checkPing :: calling _cleanUp with force true')
  1197. this._cleanUp(true)
  1198. }
  1199. }
  1200. /**
  1201. * _handlePingresp - handle a pingresp
  1202. *
  1203. * @api private
  1204. */
  1205. MqttClient.prototype._handlePingresp = function () {
  1206. this.pingResp = true
  1207. }
  1208. /**
  1209. * _handleConnack
  1210. *
  1211. * @param {Object} packet
  1212. * @api private
  1213. */
  1214. MqttClient.prototype._handleConnack = function (packet) {
  1215. debug('_handleConnack')
  1216. const options = this.options
  1217. const version = options.protocolVersion
  1218. const rc = version === 5 ? packet.reasonCode : packet.returnCode
  1219. clearTimeout(this.connackTimer)
  1220. delete this.topicAliasSend
  1221. if (packet.properties) {
  1222. if (packet.properties.topicAliasMaximum) {
  1223. if (packet.properties.topicAliasMaximum > 0xffff) {
  1224. this.emit('error', new Error('topicAliasMaximum from broker is out of range'))
  1225. return
  1226. }
  1227. if (packet.properties.topicAliasMaximum > 0) {
  1228. this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum)
  1229. }
  1230. }
  1231. if (packet.properties.serverKeepAlive && options.keepalive) {
  1232. options.keepalive = packet.properties.serverKeepAlive
  1233. this._shiftPingInterval()
  1234. }
  1235. if (packet.properties.maximumPacketSize) {
  1236. if (!options.properties) { options.properties = {} }
  1237. options.properties.maximumPacketSize = packet.properties.maximumPacketSize
  1238. }
  1239. }
  1240. if (rc === 0) {
  1241. this.reconnecting = false
  1242. this._onConnect(packet)
  1243. } else if (rc > 0) {
  1244. const err = new Error('Connection refused: ' + errors[rc])
  1245. err.code = rc
  1246. this.emit('error', err)
  1247. }
  1248. }
  1249. MqttClient.prototype._handleAuth = function (packet) {
  1250. const options = this.options
  1251. const version = options.protocolVersion
  1252. const rc = version === 5 ? packet.reasonCode : packet.returnCode
  1253. if (version !== 5) {
  1254. const err = new Error('Protocol error: Auth packets are only supported in MQTT 5. Your version:' + version)
  1255. err.code = rc
  1256. this.emit('error', err)
  1257. return
  1258. }
  1259. const that = this
  1260. this.handleAuth(packet, function (err, packet) {
  1261. if (err) {
  1262. that.emit('error', err)
  1263. return
  1264. }
  1265. if (rc === 24) {
  1266. that.reconnecting = false
  1267. that._sendPacket(packet)
  1268. } else {
  1269. const error = new Error('Connection refused: ' + errors[rc])
  1270. err.code = rc
  1271. that.emit('error', error)
  1272. }
  1273. })
  1274. }
  1275. /**
  1276. * @param packet the packet received by the broker
  1277. * @return the auth packet to be returned to the broker
  1278. * @api public
  1279. */
  1280. MqttClient.prototype.handleAuth = function (packet, callback) {
  1281. callback()
  1282. }
  1283. /**
  1284. * _handlePublish
  1285. *
  1286. * @param {Object} packet
  1287. * @api private
  1288. */
  1289. /*
  1290. those late 2 case should be rewrite to comply with coding style:
  1291. case 1:
  1292. case 0:
  1293. // do not wait sending a puback
  1294. // no callback passed
  1295. if (1 === qos) {
  1296. this._sendPacket({
  1297. cmd: 'puback',
  1298. messageId: messageId
  1299. });
  1300. }
  1301. // emit the message event for both qos 1 and 0
  1302. this.emit('message', topic, message, packet);
  1303. this.handleMessage(packet, done);
  1304. break;
  1305. default:
  1306. // do nothing but every switch mus have a default
  1307. // log or throw an error about unknown qos
  1308. break;
  1309. for now i just suppressed the warnings
  1310. */
  1311. MqttClient.prototype._handlePublish = function (packet, done) {
  1312. debug('_handlePublish: packet %o', packet)
  1313. done = typeof done !== 'undefined' ? done : nop
  1314. let topic = packet.topic.toString()
  1315. const message = packet.payload
  1316. const qos = packet.qos
  1317. const messageId = packet.messageId
  1318. const that = this
  1319. const options = this.options
  1320. const validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
  1321. if (this.options.protocolVersion === 5) {
  1322. let alias
  1323. if (packet.properties) {
  1324. alias = packet.properties.topicAlias
  1325. }
  1326. if (typeof alias !== 'undefined') {
  1327. if (topic.length === 0) {
  1328. if (alias > 0 && alias <= 0xffff) {
  1329. const gotTopic = this.topicAliasRecv.getTopicByAlias(alias)
  1330. if (gotTopic) {
  1331. topic = gotTopic
  1332. debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias)
  1333. } else {
  1334. debug('_handlePublish :: unregistered topic alias. alias: %d', alias)
  1335. this.emit('error', new Error('Received unregistered Topic Alias'))
  1336. return
  1337. }
  1338. } else {
  1339. debug('_handlePublish :: topic alias out of range. alias: %d', alias)
  1340. this.emit('error', new Error('Received Topic Alias is out of range'))
  1341. return
  1342. }
  1343. } else {
  1344. if (this.topicAliasRecv.put(topic, alias)) {
  1345. debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias)
  1346. } else {
  1347. debug('_handlePublish :: topic alias out of range. alias: %d', alias)
  1348. this.emit('error', new Error('Received Topic Alias is out of range'))
  1349. return
  1350. }
  1351. }
  1352. }
  1353. }
  1354. debug('_handlePublish: qos %d', qos)
  1355. switch (qos) {
  1356. case 2: {
  1357. options.customHandleAcks(topic, message, packet, function (error, code) {
  1358. if (!(error instanceof Error)) {
  1359. code = error
  1360. error = null
  1361. }
  1362. if (error) { return that.emit('error', error) }
  1363. if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) }
  1364. if (code) {
  1365. that._sendPacket({ cmd: 'pubrec', messageId: messageId, reasonCode: code }, done)
  1366. } else {
  1367. that.incomingStore.put(packet, function () {
  1368. that._sendPacket({ cmd: 'pubrec', messageId: messageId }, done)
  1369. })
  1370. }
  1371. })
  1372. break
  1373. }
  1374. case 1: {
  1375. // emit the message event
  1376. options.customHandleAcks(topic, message, packet, function (error, code) {
  1377. if (!(error instanceof Error)) {
  1378. code = error
  1379. error = null
  1380. }
  1381. if (error) { return that.emit('error', error) }
  1382. if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) }
  1383. if (!code) { that.emit('message', topic, message, packet) }
  1384. that.handleMessage(packet, function (err) {
  1385. if (err) {
  1386. return done && done(err)
  1387. }
  1388. that._sendPacket({ cmd: 'puback', messageId: messageId, reasonCode: code }, done)
  1389. })
  1390. })
  1391. break
  1392. }
  1393. case 0:
  1394. // emit the message event
  1395. this.emit('message', topic, message, packet)
  1396. this.handleMessage(packet, done)
  1397. break
  1398. default:
  1399. // do nothing
  1400. debug('_handlePublish: unknown QoS. Doing nothing.')
  1401. // log or throw an error about unknown qos
  1402. break
  1403. }
  1404. }
  1405. /**
  1406. * Handle messages with backpressure support, one at a time.
  1407. * Override at will.
  1408. *
  1409. * @param Packet packet the packet
  1410. * @param Function callback call when finished
  1411. * @api public
  1412. */
  1413. MqttClient.prototype.handleMessage = function (packet, callback) {
  1414. callback()
  1415. }
  1416. /**
  1417. * _handleAck
  1418. *
  1419. * @param {Object} packet
  1420. * @api private
  1421. */
  1422. MqttClient.prototype._handleAck = function (packet) {
  1423. /* eslint no-fallthrough: "off" */
  1424. const messageId = packet.messageId
  1425. const type = packet.cmd
  1426. let response = null
  1427. const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
  1428. const that = this
  1429. let err
  1430. // Checking `!cb` happens to work, but it's not technically "correct".
  1431. //
  1432. // Why? This code assumes that "no callback" is the same as that "we're not
  1433. // waiting for responses" (puback, pubrec, pubcomp, suback, or unsuback).
  1434. //
  1435. // It would be better to check `if (!this.outgoing[messageId])` here, but
  1436. // there's no reason to change it and risk (another) regression.
  1437. //
  1438. // The only reason this code works is becaues code in MqttClient.publish,
  1439. // MqttClinet.subscribe, and MqttClient.unsubscribe ensures that we will
  1440. // have a callback even if the user doesn't pass one in.)
  1441. if (!cb) {
  1442. debug('_handleAck :: Server sent an ack in error. Ignoring.')
  1443. // Server sent an ack in error, ignore it.
  1444. return
  1445. }
  1446. // Process
  1447. debug('_handleAck :: packet type', type)
  1448. switch (type) {
  1449. case 'pubcomp':
  1450. // same thing as puback for QoS 2
  1451. case 'puback': {
  1452. const pubackRC = packet.reasonCode
  1453. // Callback - we're done
  1454. if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
  1455. err = new Error('Publish error: ' + errors[pubackRC])
  1456. err.code = pubackRC
  1457. cb(err, packet)
  1458. }
  1459. delete this.outgoing[messageId]
  1460. this.outgoingStore.del(packet, cb)
  1461. this.messageIdProvider.deallocate(messageId)
  1462. this._invokeStoreProcessingQueue()
  1463. break
  1464. }
  1465. case 'pubrec': {
  1466. response = {
  1467. cmd: 'pubrel',
  1468. qos: 2,
  1469. messageId: messageId
  1470. }
  1471. const pubrecRC = packet.reasonCode
  1472. if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
  1473. err = new Error('Publish error: ' + errors[pubrecRC])
  1474. err.code = pubrecRC
  1475. cb(err, packet)
  1476. } else {
  1477. this._sendPacket(response)
  1478. }
  1479. break
  1480. }
  1481. case 'suback': {
  1482. delete this.outgoing[messageId]
  1483. this.messageIdProvider.deallocate(messageId)
  1484. for (let grantedI = 0; grantedI < packet.granted.length; grantedI++) {
  1485. if ((packet.granted[grantedI] & 0x80) !== 0) {
  1486. // suback with Failure status
  1487. const topics = this.messageIdToTopic[messageId]
  1488. if (topics) {
  1489. topics.forEach(function (topic) {
  1490. delete that._resubscribeTopics[topic]
  1491. })
  1492. }
  1493. }
  1494. }
  1495. this._invokeStoreProcessingQueue()
  1496. cb(null, packet)
  1497. break
  1498. }
  1499. case 'unsuback': {
  1500. delete this.outgoing[messageId]
  1501. this.messageIdProvider.deallocate(messageId)
  1502. this._invokeStoreProcessingQueue()
  1503. cb(null)
  1504. break
  1505. }
  1506. default:
  1507. that.emit('error', new Error('unrecognized packet type'))
  1508. }
  1509. if (this.disconnecting &&
  1510. Object.keys(this.outgoing).length === 0) {
  1511. this.emit('outgoingEmpty')
  1512. }
  1513. }
  1514. /**
  1515. * _handlePubrel
  1516. *
  1517. * @param {Object} packet
  1518. * @api private
  1519. */
  1520. MqttClient.prototype._handlePubrel = function (packet, callback) {
  1521. debug('handling pubrel packet')
  1522. callback = typeof callback !== 'undefined' ? callback : nop
  1523. const messageId = packet.messageId
  1524. const that = this
  1525. const comp = { cmd: 'pubcomp', messageId: messageId }
  1526. that.incomingStore.get(packet, function (err, pub) {
  1527. if (!err) {
  1528. that.emit('message', pub.topic, pub.payload, pub)
  1529. that.handleMessage(pub, function (err) {
  1530. if (err) {
  1531. return callback(err)
  1532. }
  1533. that.incomingStore.del(pub, nop)
  1534. that._sendPacket(comp, callback)
  1535. })
  1536. } else {
  1537. that._sendPacket(comp, callback)
  1538. }
  1539. })
  1540. }
  1541. /**
  1542. * _handleDisconnect
  1543. *
  1544. * @param {Object} packet
  1545. * @api private
  1546. */
  1547. MqttClient.prototype._handleDisconnect = function (packet) {
  1548. this.emit('disconnect', packet)
  1549. }
  1550. /**
  1551. * _nextId
  1552. * @return unsigned int
  1553. */
  1554. MqttClient.prototype._nextId = function () {
  1555. return this.messageIdProvider.allocate()
  1556. }
  1557. /**
  1558. * getLastMessageId
  1559. * @return unsigned int
  1560. */
  1561. MqttClient.prototype.getLastMessageId = function () {
  1562. return this.messageIdProvider.getLastAllocated()
  1563. }
  1564. /**
  1565. * _resubscribe
  1566. * @api private
  1567. */
  1568. MqttClient.prototype._resubscribe = function () {
  1569. debug('_resubscribe')
  1570. const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
  1571. if (!this._firstConnection &&
  1572. (this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) &&
  1573. _resubscribeTopicsKeys.length > 0) {
  1574. if (this.options.resubscribe) {
  1575. if (this.options.protocolVersion === 5) {
  1576. debug('_resubscribe: protocolVersion 5')
  1577. for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
  1578. const resubscribeTopic = {}
  1579. resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
  1580. resubscribeTopic.resubscribe = true
  1581. this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties })
  1582. }
  1583. } else {
  1584. this._resubscribeTopics.resubscribe = true
  1585. this.subscribe(this._resubscribeTopics)
  1586. }
  1587. } else {
  1588. this._resubscribeTopics = {}
  1589. }
  1590. }
  1591. this._firstConnection = false
  1592. }
  1593. /**
  1594. * _onConnect
  1595. *
  1596. * @api private
  1597. */
  1598. MqttClient.prototype._onConnect = function (packet) {
  1599. if (this.disconnected) {
  1600. this.emit('connect', packet)
  1601. return
  1602. }
  1603. const that = this
  1604. this.connackPacket = packet
  1605. this.messageIdProvider.clear()
  1606. this._setupPingTimer()
  1607. this.connected = true
  1608. function startStreamProcess () {
  1609. let outStore = that.outgoingStore.createStream()
  1610. function clearStoreProcessing () {
  1611. that._storeProcessing = false
  1612. that._packetIdsDuringStoreProcessing = {}
  1613. }
  1614. that.once('close', remove)
  1615. outStore.on('error', function (err) {
  1616. clearStoreProcessing()
  1617. that._flushStoreProcessingQueue()
  1618. that.removeListener('close', remove)
  1619. that.emit('error', err)
  1620. })
  1621. function remove () {
  1622. outStore.destroy()
  1623. outStore = null
  1624. that._flushStoreProcessingQueue()
  1625. clearStoreProcessing()
  1626. }
  1627. function storeDeliver () {
  1628. // edge case, we wrapped this twice
  1629. if (!outStore) {
  1630. return
  1631. }
  1632. that._storeProcessing = true
  1633. const packet = outStore.read(1)
  1634. let cb
  1635. if (!packet) {
  1636. // read when data is available in the future
  1637. outStore.once('readable', storeDeliver)
  1638. return
  1639. }
  1640. // Skip already processed store packets
  1641. if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
  1642. storeDeliver()
  1643. return
  1644. }
  1645. // Avoid unnecessary stream read operations when disconnected
  1646. if (!that.disconnecting && !that.reconnectTimer) {
  1647. cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
  1648. that.outgoing[packet.messageId] = {
  1649. volatile: false,
  1650. cb: function (err, status) {
  1651. // Ensure that the original callback passed in to publish gets invoked
  1652. if (cb) {
  1653. cb(err, status)
  1654. }
  1655. storeDeliver()
  1656. }
  1657. }
  1658. that._packetIdsDuringStoreProcessing[packet.messageId] = true
  1659. if (that.messageIdProvider.register(packet.messageId)) {
  1660. that._sendPacket(packet)
  1661. } else {
  1662. debug('messageId: %d has already used.', packet.messageId)
  1663. }
  1664. } else if (outStore.destroy) {
  1665. outStore.destroy()
  1666. }
  1667. }
  1668. outStore.on('end', function () {
  1669. let allProcessed = true
  1670. for (const id in that._packetIdsDuringStoreProcessing) {
  1671. if (!that._packetIdsDuringStoreProcessing[id]) {
  1672. allProcessed = false
  1673. break
  1674. }
  1675. }
  1676. if (allProcessed) {
  1677. clearStoreProcessing()
  1678. that.removeListener('close', remove)
  1679. that._invokeAllStoreProcessingQueue()
  1680. that.emit('connect', packet)
  1681. } else {
  1682. startStreamProcess()
  1683. }
  1684. })
  1685. storeDeliver()
  1686. }
  1687. // start flowing
  1688. startStreamProcess()
  1689. }
  1690. MqttClient.prototype._invokeStoreProcessingQueue = function () {
  1691. if (this._storeProcessingQueue.length > 0) {
  1692. const f = this._storeProcessingQueue[0]
  1693. if (f && f.invoke()) {
  1694. this._storeProcessingQueue.shift()
  1695. return true
  1696. }
  1697. }
  1698. return false
  1699. }
  1700. MqttClient.prototype._invokeAllStoreProcessingQueue = function () {
  1701. while (this._invokeStoreProcessingQueue()) { /* empty */ }
  1702. }
  1703. MqttClient.prototype._flushStoreProcessingQueue = function () {
  1704. for (const f of this._storeProcessingQueue) {
  1705. if (f.cbStorePut) f.cbStorePut(new Error('Connection closed'))
  1706. if (f.callback) f.callback(new Error('Connection closed'))
  1707. }
  1708. this._storeProcessingQueue.splice(0)
  1709. }
  1710. module.exports = MqttClient