index.js 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. 'use strict'
  2. const MqttClient = require('../client')
  3. const Store = require('../store')
  4. const url = require('url')
  5. const xtend = require('xtend')
  6. const debug = require('debug')('mqttjs')
  7. const protocols = {}
  8. // eslint-disable-next-line camelcase
  9. if ((typeof process !== 'undefined' && process.title !== 'browser') || typeof __webpack_require__ !== 'function') {
  10. protocols.mqtt = require('./tcp')
  11. protocols.tcp = require('./tcp')
  12. protocols.ssl = require('./tls')
  13. protocols.tls = require('./tls')
  14. protocols.mqtts = require('./tls')
  15. } else {
  16. protocols.wx = require('./wx')
  17. protocols.wxs = require('./wx')
  18. protocols.ali = require('./ali')
  19. protocols.alis = require('./ali')
  20. }
  21. protocols.ws = require('./ws')
  22. protocols.wss = require('./ws')
  23. /**
  24. * Parse the auth attribute and merge username and password in the options object.
  25. *
  26. * @param {Object} [opts] option object
  27. */
  28. function parseAuthOptions (opts) {
  29. let matches
  30. if (opts.auth) {
  31. matches = opts.auth.match(/^(.+):(.+)$/)
  32. if (matches) {
  33. opts.username = matches[1]
  34. opts.password = matches[2]
  35. } else {
  36. opts.username = opts.auth
  37. }
  38. }
  39. }
  40. /**
  41. * connect - connect to an MQTT broker.
  42. *
  43. * @param {String} [brokerUrl] - url of the broker, optional
  44. * @param {Object} opts - see MqttClient#constructor
  45. */
  46. function connect (brokerUrl, opts) {
  47. debug('connecting to an MQTT broker...')
  48. if ((typeof brokerUrl === 'object') && !opts) {
  49. opts = brokerUrl
  50. brokerUrl = null
  51. }
  52. opts = opts || {}
  53. if (brokerUrl) {
  54. // eslint-disable-next-line
  55. const parsed = url.parse(brokerUrl, true)
  56. if (parsed.port != null) {
  57. parsed.port = Number(parsed.port)
  58. }
  59. opts = xtend(parsed, opts)
  60. if (opts.protocol === null) {
  61. throw new Error('Missing protocol')
  62. }
  63. opts.protocol = opts.protocol.replace(/:$/, '')
  64. }
  65. // merge in the auth options if supplied
  66. parseAuthOptions(opts)
  67. // support clientId passed in the query string of the url
  68. if (opts.query && typeof opts.query.clientId === 'string') {
  69. opts.clientId = opts.query.clientId
  70. }
  71. if (opts.cert && opts.key) {
  72. if (opts.protocol) {
  73. if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) {
  74. switch (opts.protocol) {
  75. case 'mqtt':
  76. opts.protocol = 'mqtts'
  77. break
  78. case 'ws':
  79. opts.protocol = 'wss'
  80. break
  81. case 'wx':
  82. opts.protocol = 'wxs'
  83. break
  84. case 'ali':
  85. opts.protocol = 'alis'
  86. break
  87. default:
  88. throw new Error('Unknown protocol for secure connection: "' + opts.protocol + '"!')
  89. }
  90. }
  91. } else {
  92. // A cert and key was provided, however no protocol was specified, so we will throw an error.
  93. throw new Error('Missing secure protocol key')
  94. }
  95. }
  96. if (!protocols[opts.protocol]) {
  97. const isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1
  98. opts.protocol = [
  99. 'mqtt',
  100. 'mqtts',
  101. 'ws',
  102. 'wss',
  103. 'wx',
  104. 'wxs',
  105. 'ali',
  106. 'alis'
  107. ].filter(function (key, index) {
  108. if (isSecure && index % 2 === 0) {
  109. // Skip insecure protocols when requesting a secure one.
  110. return false
  111. }
  112. return (typeof protocols[key] === 'function')
  113. })[0]
  114. }
  115. if (opts.clean === false && !opts.clientId) {
  116. throw new Error('Missing clientId for unclean clients')
  117. }
  118. if (opts.protocol) {
  119. opts.defaultProtocol = opts.protocol
  120. }
  121. function wrapper (client) {
  122. if (opts.servers) {
  123. if (!client._reconnectCount || client._reconnectCount === opts.servers.length) {
  124. client._reconnectCount = 0
  125. }
  126. opts.host = opts.servers[client._reconnectCount].host
  127. opts.port = opts.servers[client._reconnectCount].port
  128. opts.protocol = (!opts.servers[client._reconnectCount].protocol ? opts.defaultProtocol : opts.servers[client._reconnectCount].protocol)
  129. opts.hostname = opts.host
  130. client._reconnectCount++
  131. }
  132. debug('calling streambuilder for', opts.protocol)
  133. return protocols[opts.protocol](client, opts)
  134. }
  135. const client = new MqttClient(wrapper, opts)
  136. client.on('error', function () { /* Automatically set up client error handling */ })
  137. return client
  138. }
  139. module.exports = connect
  140. module.exports.connect = connect
  141. module.exports.MqttClient = MqttClient
  142. module.exports.Store = Store