ws.js 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. 'use strict'
  2. const { Buffer } = require('buffer')
  3. const WS = require('ws')
  4. const debug = require('debug')('mqttjs:ws')
  5. const duplexify = require('duplexify')
  6. const Transform = require('readable-stream').Transform
  7. const WSS_OPTIONS = [
  8. 'rejectUnauthorized',
  9. 'ca',
  10. 'cert',
  11. 'key',
  12. 'pfx',
  13. 'passphrase'
  14. ]
  15. // eslint-disable-next-line camelcase
  16. const IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function'
  17. function buildUrl (opts, client) {
  18. let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path
  19. if (typeof (opts.transformWsUrl) === 'function') {
  20. url = opts.transformWsUrl(url, opts, client)
  21. }
  22. return url
  23. }
  24. function setDefaultOpts (opts) {
  25. const options = opts
  26. if (!opts.hostname) {
  27. options.hostname = 'localhost'
  28. }
  29. if (!opts.port) {
  30. if (opts.protocol === 'wss') {
  31. options.port = 443
  32. } else {
  33. options.port = 80
  34. }
  35. }
  36. if (!opts.path) {
  37. options.path = '/'
  38. }
  39. if (!opts.wsOptions) {
  40. options.wsOptions = {}
  41. }
  42. if (!IS_BROWSER && opts.protocol === 'wss') {
  43. // Add cert/key/ca etc options
  44. WSS_OPTIONS.forEach(function (prop) {
  45. if (Object.prototype.hasOwnProperty.call(opts, prop) && !Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)) {
  46. options.wsOptions[prop] = opts[prop]
  47. }
  48. })
  49. }
  50. return options
  51. }
  52. function setDefaultBrowserOpts (opts) {
  53. const options = setDefaultOpts(opts)
  54. if (!options.hostname) {
  55. options.hostname = options.host
  56. }
  57. if (!options.hostname) {
  58. // Throwing an error in a Web Worker if no `hostname` is given, because we
  59. // can not determine the `hostname` automatically. If connecting to
  60. // localhost, please supply the `hostname` as an argument.
  61. if (typeof (document) === 'undefined') {
  62. throw new Error('Could not determine host. Specify host manually.')
  63. }
  64. const parsed = new URL(document.URL)
  65. options.hostname = parsed.hostname
  66. if (!options.port) {
  67. options.port = parsed.port
  68. }
  69. }
  70. // objectMode should be defined for logic
  71. if (options.objectMode === undefined) {
  72. options.objectMode = !(options.binary === true || options.binary === undefined)
  73. }
  74. return options
  75. }
  76. function createWebSocket (client, url, opts) {
  77. debug('createWebSocket')
  78. debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion)
  79. const websocketSubProtocol =
  80. (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
  81. ? 'mqttv3.1'
  82. : 'mqtt'
  83. debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol)
  84. const socket = new WS(url, [websocketSubProtocol], opts.wsOptions)
  85. return socket
  86. }
  87. function createBrowserWebSocket (client, opts) {
  88. const websocketSubProtocol =
  89. (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
  90. ? 'mqttv3.1'
  91. : 'mqtt'
  92. const url = buildUrl(opts, client)
  93. /* global WebSocket */
  94. const socket = new WebSocket(url, [websocketSubProtocol])
  95. socket.binaryType = 'arraybuffer'
  96. return socket
  97. }
  98. function streamBuilder (client, opts) {
  99. debug('streamBuilder')
  100. const options = setDefaultOpts(opts)
  101. const url = buildUrl(options, client)
  102. const socket = createWebSocket(client, url, options)
  103. const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions)
  104. webSocketStream.url = url
  105. socket.on('close', () => { webSocketStream.destroy() })
  106. return webSocketStream
  107. }
  108. function browserStreamBuilder (client, opts) {
  109. debug('browserStreamBuilder')
  110. let stream
  111. const options = setDefaultBrowserOpts(opts)
  112. // sets the maximum socket buffer size before throttling
  113. const bufferSize = options.browserBufferSize || 1024 * 512
  114. const bufferTimeout = opts.browserBufferTimeout || 1000
  115. const coerceToBuffer = !opts.objectMode
  116. const socket = createBrowserWebSocket(client, opts)
  117. const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser)
  118. if (!opts.objectMode) {
  119. proxy._writev = writev
  120. }
  121. proxy.on('close', () => { socket.close() })
  122. const eventListenerSupport = (typeof socket.addEventListener !== 'undefined')
  123. // was already open when passed in
  124. if (socket.readyState === socket.OPEN) {
  125. stream = proxy
  126. } else {
  127. stream = stream = duplexify(undefined, undefined, opts)
  128. if (!opts.objectMode) {
  129. stream._writev = writev
  130. }
  131. if (eventListenerSupport) {
  132. socket.addEventListener('open', onopen)
  133. } else {
  134. socket.onopen = onopen
  135. }
  136. }
  137. stream.socket = socket
  138. if (eventListenerSupport) {
  139. socket.addEventListener('close', onclose)
  140. socket.addEventListener('error', onerror)
  141. socket.addEventListener('message', onmessage)
  142. } else {
  143. socket.onclose = onclose
  144. socket.onerror = onerror
  145. socket.onmessage = onmessage
  146. }
  147. // methods for browserStreamBuilder
  148. function buildProxy (options, socketWrite, socketEnd) {
  149. const proxy = new Transform({
  150. objectModeMode: options.objectMode
  151. })
  152. proxy._write = socketWrite
  153. proxy._flush = socketEnd
  154. return proxy
  155. }
  156. function onopen () {
  157. stream.setReadable(proxy)
  158. stream.setWritable(proxy)
  159. stream.emit('connect')
  160. }
  161. function onclose () {
  162. stream.end()
  163. stream.destroy()
  164. }
  165. function onerror (err) {
  166. stream.destroy(err)
  167. }
  168. function onmessage (event) {
  169. let data = event.data
  170. if (data instanceof ArrayBuffer) data = Buffer.from(data)
  171. else data = Buffer.from(data, 'utf8')
  172. proxy.push(data)
  173. }
  174. // this is to be enabled only if objectMode is false
  175. function writev (chunks, cb) {
  176. const buffers = new Array(chunks.length)
  177. for (let i = 0; i < chunks.length; i++) {
  178. if (typeof chunks[i].chunk === 'string') {
  179. buffers[i] = Buffer.from(chunks[i], 'utf8')
  180. } else {
  181. buffers[i] = chunks[i].chunk
  182. }
  183. }
  184. this._write(Buffer.concat(buffers), 'binary', cb)
  185. }
  186. function socketWriteBrowser (chunk, enc, next) {
  187. if (socket.bufferedAmount > bufferSize) {
  188. // throttle data until buffered amount is reduced.
  189. setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
  190. }
  191. if (coerceToBuffer && typeof chunk === 'string') {
  192. chunk = Buffer.from(chunk, 'utf8')
  193. }
  194. try {
  195. socket.send(chunk)
  196. } catch (err) {
  197. return next(err)
  198. }
  199. next()
  200. }
  201. function socketEndBrowser (done) {
  202. socket.close()
  203. done()
  204. }
  205. // end methods for browserStreamBuilder
  206. return stream
  207. }
  208. if (IS_BROWSER) {
  209. module.exports = browserStreamBuilder
  210. } else {
  211. module.exports = streamBuilder
  212. }