bbeceb559b024ca3c7187daf28b010c5c552d44dc8a4100b2bc652be9e719f0dd644c3ca6991d6d385f2241bd341eda8033c59b9c94e94a099dec12d840e3e 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. 'use strict'
  2. /**
  3. * Module dependencies
  4. */
  5. const xtend = require('xtend')
  6. const Readable = require('readable-stream').Readable
  7. const streamsOpts = { objectMode: true }
  8. const defaultStoreOptions = {
  9. clean: true
  10. }
  11. /**
  12. * In-memory implementation of the message store
  13. * This can actually be saved into files.
  14. *
  15. * @param {Object} [options] - store options
  16. */
  17. function Store (options) {
  18. if (!(this instanceof Store)) {
  19. return new Store(options)
  20. }
  21. this.options = options || {}
  22. // Defaults
  23. this.options = xtend(defaultStoreOptions, options)
  24. this._inflights = new Map()
  25. }
  26. /**
  27. * Adds a packet to the store, a packet is
  28. * anything that has a messageId property.
  29. *
  30. */
  31. Store.prototype.put = function (packet, cb) {
  32. this._inflights.set(packet.messageId, packet)
  33. if (cb) {
  34. cb()
  35. }
  36. return this
  37. }
  38. /**
  39. * Creates a stream with all the packets in the store
  40. *
  41. */
  42. Store.prototype.createStream = function () {
  43. const stream = new Readable(streamsOpts)
  44. const values = []
  45. let destroyed = false
  46. let i = 0
  47. this._inflights.forEach(function (value, key) {
  48. values.push(value)
  49. })
  50. stream._read = function () {
  51. if (!destroyed && i < values.length) {
  52. this.push(values[i++])
  53. } else {
  54. this.push(null)
  55. }
  56. }
  57. stream.destroy = function () {
  58. if (destroyed) {
  59. return
  60. }
  61. const self = this
  62. destroyed = true
  63. setTimeout(function () {
  64. self.emit('close')
  65. }, 0)
  66. }
  67. return stream
  68. }
  69. /**
  70. * deletes a packet from the store.
  71. */
  72. Store.prototype.del = function (packet, cb) {
  73. packet = this._inflights.get(packet.messageId)
  74. if (packet) {
  75. this._inflights.delete(packet.messageId)
  76. cb(null, packet)
  77. } else if (cb) {
  78. cb(new Error('missing packet'))
  79. }
  80. return this
  81. }
  82. /**
  83. * get a packet from the store.
  84. */
  85. Store.prototype.get = function (packet, cb) {
  86. packet = this._inflights.get(packet.messageId)
  87. if (packet) {
  88. cb(null, packet)
  89. } else if (cb) {
  90. cb(new Error('missing packet'))
  91. }
  92. return this
  93. }
  94. /**
  95. * Close the store
  96. */
  97. Store.prototype.close = function (cb) {
  98. if (this.options.clean) {
  99. this._inflights = null
  100. }
  101. if (cb) {
  102. cb()
  103. }
  104. }
  105. module.exports = Store