wx.js 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. 'use strict'
  2. const { Buffer } = require('buffer')
  3. const Transform = require('readable-stream').Transform
  4. const duplexify = require('duplexify')
  5. /* global wx */
  6. let socketTask, proxy, stream
  7. function buildProxy () {
  8. const proxy = new Transform()
  9. proxy._write = function (chunk, encoding, next) {
  10. socketTask.send({
  11. data: chunk.buffer,
  12. success: function () {
  13. next()
  14. },
  15. fail: function (errMsg) {
  16. next(new Error(errMsg))
  17. }
  18. })
  19. }
  20. proxy._flush = function socketEnd (done) {
  21. socketTask.close({
  22. success: function () {
  23. done()
  24. }
  25. })
  26. }
  27. return proxy
  28. }
  29. function setDefaultOpts (opts) {
  30. if (!opts.hostname) {
  31. opts.hostname = 'localhost'
  32. }
  33. if (!opts.path) {
  34. opts.path = '/'
  35. }
  36. if (!opts.wsOptions) {
  37. opts.wsOptions = {}
  38. }
  39. }
  40. function buildUrl (opts, client) {
  41. const protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
  42. let url = protocol + '://' + opts.hostname + opts.path
  43. if (opts.port && opts.port !== 80 && opts.port !== 443) {
  44. url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
  45. }
  46. if (typeof (opts.transformWsUrl) === 'function') {
  47. url = opts.transformWsUrl(url, opts, client)
  48. }
  49. return url
  50. }
  51. function bindEventHandler () {
  52. socketTask.onOpen(function () {
  53. stream.setReadable(proxy)
  54. stream.setWritable(proxy)
  55. stream.emit('connect')
  56. })
  57. socketTask.onMessage(function (res) {
  58. let data = res.data
  59. if (data instanceof ArrayBuffer) data = Buffer.from(data)
  60. else data = Buffer.from(data, 'utf8')
  61. proxy.push(data)
  62. })
  63. socketTask.onClose(function () {
  64. stream.end()
  65. stream.destroy()
  66. })
  67. socketTask.onError(function (res) {
  68. stream.destroy(new Error(res.errMsg))
  69. })
  70. }
  71. function buildStream (client, opts) {
  72. opts.hostname = opts.hostname || opts.host
  73. if (!opts.hostname) {
  74. throw new Error('Could not determine host. Specify host manually.')
  75. }
  76. const websocketSubProtocol =
  77. (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
  78. ? 'mqttv3.1'
  79. : 'mqtt'
  80. setDefaultOpts(opts)
  81. const url = buildUrl(opts, client)
  82. socketTask = wx.connectSocket({
  83. url: url,
  84. protocols: [websocketSubProtocol]
  85. })
  86. proxy = buildProxy()
  87. stream = duplexify.obj()
  88. stream._destroy = function (err, cb) {
  89. socketTask.close({
  90. success: function () {
  91. cb && cb(err)
  92. }
  93. })
  94. }
  95. const destroyRef = stream.destroy
  96. stream.destroy = function () {
  97. stream.destroy = destroyRef
  98. const self = this
  99. setTimeout(function () {
  100. socketTask.close({
  101. fail: function () {
  102. self._destroy(new Error())
  103. }
  104. })
  105. }, 0)
  106. }.bind(stream)
  107. bindEventHandler()
  108. return stream
  109. }
  110. module.exports = buildStream