| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- 'use strict'
- const { Buffer } = require('buffer')
- const WS = require('ws')
- const debug = require('debug')('mqttjs:ws')
- const duplexify = require('duplexify')
- const Transform = require('readable-stream').Transform
- const WSS_OPTIONS = [
- 'rejectUnauthorized',
- 'ca',
- 'cert',
- 'key',
- 'pfx',
- 'passphrase'
- ]
- // eslint-disable-next-line camelcase
- const IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function'
- function buildUrl (opts, client) {
- let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path
- if (typeof (opts.transformWsUrl) === 'function') {
- url = opts.transformWsUrl(url, opts, client)
- }
- return url
- }
- function setDefaultOpts (opts) {
- const options = opts
- if (!opts.hostname) {
- options.hostname = 'localhost'
- }
- if (!opts.port) {
- if (opts.protocol === 'wss') {
- options.port = 443
- } else {
- options.port = 80
- }
- }
- if (!opts.path) {
- options.path = '/'
- }
- if (!opts.wsOptions) {
- options.wsOptions = {}
- }
- if (!IS_BROWSER && opts.protocol === 'wss') {
- // Add cert/key/ca etc options
- WSS_OPTIONS.forEach(function (prop) {
- if (Object.prototype.hasOwnProperty.call(opts, prop) && !Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)) {
- options.wsOptions[prop] = opts[prop]
- }
- })
- }
- return options
- }
- function setDefaultBrowserOpts (opts) {
- const options = setDefaultOpts(opts)
- if (!options.hostname) {
- options.hostname = options.host
- }
- if (!options.hostname) {
- // Throwing an error in a Web Worker if no `hostname` is given, because we
- // can not determine the `hostname` automatically. If connecting to
- // localhost, please supply the `hostname` as an argument.
- if (typeof (document) === 'undefined') {
- throw new Error('Could not determine host. Specify host manually.')
- }
- const parsed = new URL(document.URL)
- options.hostname = parsed.hostname
- if (!options.port) {
- options.port = parsed.port
- }
- }
- // objectMode should be defined for logic
- if (options.objectMode === undefined) {
- options.objectMode = !(options.binary === true || options.binary === undefined)
- }
- return options
- }
- function createWebSocket (client, url, opts) {
- debug('createWebSocket')
- debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion)
- const websocketSubProtocol =
- (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
- ? 'mqttv3.1'
- : 'mqtt'
- debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol)
- const socket = new WS(url, [websocketSubProtocol], opts.wsOptions)
- return socket
- }
- function createBrowserWebSocket (client, opts) {
- const websocketSubProtocol =
- (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
- ? 'mqttv3.1'
- : 'mqtt'
- const url = buildUrl(opts, client)
- /* global WebSocket */
- const socket = new WebSocket(url, [websocketSubProtocol])
- socket.binaryType = 'arraybuffer'
- return socket
- }
- function streamBuilder (client, opts) {
- debug('streamBuilder')
- const options = setDefaultOpts(opts)
- const url = buildUrl(options, client)
- const socket = createWebSocket(client, url, options)
- const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions)
- webSocketStream.url = url
- socket.on('close', () => { webSocketStream.destroy() })
- return webSocketStream
- }
- function browserStreamBuilder (client, opts) {
- debug('browserStreamBuilder')
- let stream
- const options = setDefaultBrowserOpts(opts)
- // sets the maximum socket buffer size before throttling
- const bufferSize = options.browserBufferSize || 1024 * 512
- const bufferTimeout = opts.browserBufferTimeout || 1000
- const coerceToBuffer = !opts.objectMode
- const socket = createBrowserWebSocket(client, opts)
- const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser)
- if (!opts.objectMode) {
- proxy._writev = writev
- }
- proxy.on('close', () => { socket.close() })
- const eventListenerSupport = (typeof socket.addEventListener !== 'undefined')
- // was already open when passed in
- if (socket.readyState === socket.OPEN) {
- stream = proxy
- } else {
- stream = stream = duplexify(undefined, undefined, opts)
- if (!opts.objectMode) {
- stream._writev = writev
- }
- if (eventListenerSupport) {
- socket.addEventListener('open', onopen)
- } else {
- socket.onopen = onopen
- }
- }
- stream.socket = socket
- if (eventListenerSupport) {
- socket.addEventListener('close', onclose)
- socket.addEventListener('error', onerror)
- socket.addEventListener('message', onmessage)
- } else {
- socket.onclose = onclose
- socket.onerror = onerror
- socket.onmessage = onmessage
- }
- // methods for browserStreamBuilder
- function buildProxy (options, socketWrite, socketEnd) {
- const proxy = new Transform({
- objectModeMode: options.objectMode
- })
- proxy._write = socketWrite
- proxy._flush = socketEnd
- return proxy
- }
- function onopen () {
- stream.setReadable(proxy)
- stream.setWritable(proxy)
- stream.emit('connect')
- }
- function onclose () {
- stream.end()
- stream.destroy()
- }
- function onerror (err) {
- stream.destroy(err)
- }
- function onmessage (event) {
- let data = event.data
- if (data instanceof ArrayBuffer) data = Buffer.from(data)
- else data = Buffer.from(data, 'utf8')
- proxy.push(data)
- }
- // this is to be enabled only if objectMode is false
- function writev (chunks, cb) {
- const buffers = new Array(chunks.length)
- for (let i = 0; i < chunks.length; i++) {
- if (typeof chunks[i].chunk === 'string') {
- buffers[i] = Buffer.from(chunks[i], 'utf8')
- } else {
- buffers[i] = chunks[i].chunk
- }
- }
- this._write(Buffer.concat(buffers), 'binary', cb)
- }
- function socketWriteBrowser (chunk, enc, next) {
- if (socket.bufferedAmount > bufferSize) {
- // throttle data until buffered amount is reduced.
- setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
- }
- if (coerceToBuffer && typeof chunk === 'string') {
- chunk = Buffer.from(chunk, 'utf8')
- }
- try {
- socket.send(chunk)
- } catch (err) {
- return next(err)
- }
- next()
- }
- function socketEndBrowser (done) {
- socket.close()
- done()
- }
- // end methods for browserStreamBuilder
- return stream
- }
- if (IS_BROWSER) {
- module.exports = browserStreamBuilder
- } else {
- module.exports = streamBuilder
- }
|