ali.js 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. 'use strict'
  2. const { Buffer } = require('buffer')
  3. const Transform = require('readable-stream').Transform
  4. const duplexify = require('duplexify')
  5. /* global FileReader */
  6. let my
  7. let proxy
  8. let stream
  9. let isInitialized = false
  10. function buildProxy () {
  11. const proxy = new Transform()
  12. proxy._write = function (chunk, encoding, next) {
  13. my.sendSocketMessage({
  14. data: chunk.buffer,
  15. success: function () {
  16. next()
  17. },
  18. fail: function () {
  19. next(new Error())
  20. }
  21. })
  22. }
  23. proxy._flush = function socketEnd (done) {
  24. my.closeSocket({
  25. success: function () {
  26. done()
  27. }
  28. })
  29. }
  30. return proxy
  31. }
  32. function setDefaultOpts (opts) {
  33. if (!opts.hostname) {
  34. opts.hostname = 'localhost'
  35. }
  36. if (!opts.path) {
  37. opts.path = '/'
  38. }
  39. if (!opts.wsOptions) {
  40. opts.wsOptions = {}
  41. }
  42. }
  43. function buildUrl (opts, client) {
  44. const protocol = opts.protocol === 'alis' ? 'wss' : 'ws'
  45. let url = protocol + '://' + opts.hostname + opts.path
  46. if (opts.port && opts.port !== 80 && opts.port !== 443) {
  47. url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
  48. }
  49. if (typeof (opts.transformWsUrl) === 'function') {
  50. url = opts.transformWsUrl(url, opts, client)
  51. }
  52. return url
  53. }
  54. function bindEventHandler () {
  55. if (isInitialized) return
  56. isInitialized = true
  57. my.onSocketOpen(function () {
  58. stream.setReadable(proxy)
  59. stream.setWritable(proxy)
  60. stream.emit('connect')
  61. })
  62. my.onSocketMessage(function (res) {
  63. if (typeof res.data === 'string') {
  64. const buffer = Buffer.from(res.data, 'base64')
  65. proxy.push(buffer)
  66. } else {
  67. const reader = new FileReader()
  68. reader.addEventListener('load', function () {
  69. let data = reader.result
  70. if (data instanceof ArrayBuffer) data = Buffer.from(data)
  71. else data = Buffer.from(data, 'utf8')
  72. proxy.push(data)
  73. })
  74. reader.readAsArrayBuffer(res.data)
  75. }
  76. })
  77. my.onSocketClose(function () {
  78. stream.end()
  79. stream.destroy()
  80. })
  81. my.onSocketError(function (res) {
  82. stream.destroy(res)
  83. })
  84. }
  85. function buildStream (client, opts) {
  86. opts.hostname = opts.hostname || opts.host
  87. if (!opts.hostname) {
  88. throw new Error('Could not determine host. Specify host manually.')
  89. }
  90. const websocketSubProtocol =
  91. (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
  92. ? 'mqttv3.1'
  93. : 'mqtt'
  94. setDefaultOpts(opts)
  95. const url = buildUrl(opts, client)
  96. my = opts.my
  97. my.connectSocket({
  98. url: url,
  99. protocols: websocketSubProtocol
  100. })
  101. proxy = buildProxy()
  102. stream = duplexify.obj()
  103. bindEventHandler()
  104. return stream
  105. }
  106. module.exports = buildStream