Skip to content

Commit

Permalink
feat: option to disable writeCache and fix leak in subscriptions (#…
Browse files Browse the repository at this point in the history
…1622)

@vishnureddy17 Fix typo in README 00bf657
@bverhoeven Fix memory leak in subscription topic mapping (#1535) 8c77eec
@mwohlert @robertsLando @ynagasaki feat: allow user to disable pre-generated write cache (#1151) 0d11888

Co-authored-by: Daniel Lando <[email protected]>
Co-authored-by: Yoshi Nagasaki <[email protected]>
Co-authored-by: Dmitry Kurmanov <[email protected]>
Co-authored-by: Vishnu Reddy <[email protected]>
Co-authored-by: Bas Verhoeven <[email protected]>
Co-authored-by: Michel Wohlert <[email protected]>
  • Loading branch information
6 people authored Jul 3, 2023
1 parent dcb24de commit c8aa654
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 6 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ to use MQTT.js in the browser see the [browserify](#browserify) section
### CommonJS (Require)

```js
const mqtt = require("mqtt"); // require mqtt
const client = mqtt.connect("est.mosquitto.org"); // create a client
const mqtt = require("mqtt") // require mqtt
const client = mqtt.connect("test.mosquitto.org") // create a client
```

### ES6 Modules (Import)
Expand Down
9 changes: 8 additions & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ const defaultConnectOptions = {
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
clean: true,
resubscribe: true
resubscribe: true,
writeCache: true
}

const socketErrors = [
Expand Down Expand Up @@ -278,6 +279,11 @@ function MqttClient (streamBuilder, options) {

this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }

// Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance
if (!this.options.writeCache) {
mqttPacket.writeToStream.cacheNumbers = false
}

this.streamBuilder = streamBuilder

this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider
Expand Down Expand Up @@ -1662,6 +1668,7 @@ MqttClient.prototype._handleAck = function (packet) {
}
}
}
delete this.messageIdToTopic[messageId]
this._invokeStoreProcessingQueue()
cb(null, packet)
break
Expand Down
39 changes: 39 additions & 0 deletions test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -3274,4 +3274,43 @@ module.exports = function (server, config) {
})
})
})

describe('message id to subscription topic mapping', () => {
it('should not create a mapping if resubscribe is disabled', function (done) {
const client = connect({ resubscribe: false })
client.subscribe('test1')
client.subscribe('test2')
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 0)
client.end(true, done)
})

it('should create a mapping for each subscribe call', function (done) {
const client = connect()
client.subscribe('test1')
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 1)
client.subscribe('test2')
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 2)

client.subscribe(['test3', 'test4'])
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 3)
client.subscribe(['test5', 'test6'])
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 4)

client.end(true, done)
})

it('should remove the mapping after suback', function (done) {
const client = connect()
client.once('connect', function () {
client.subscribe('test1', { qos: 2 }, function () {
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 0)

client.subscribe(['test2', 'test3'], { qos: 2 }, function () {
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 0)
client.end(done)
})
})
})
})
})
}
19 changes: 16 additions & 3 deletions test/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ describe('MqttClient', function () {
done()
}
})

it('should disable number cache if specified in options', function (done) {
try {
assert.isTrue(mqttPacket.writeToStream.cacheNumbers)
client = mqtt.MqttClient(function () {
throw Error('break')
}, { writeCache: false })
client.end()
} catch (err) {
assert.isFalse(mqttPacket.writeToStream.cacheNumbers)
done()
}
})
})

describe('message ids', function () {
Expand Down Expand Up @@ -83,7 +96,7 @@ describe('MqttClient', function () {
const max = 1000
let count = 0
const duplex = new Duplex({
read: function (n) {},
read: function (n) { },
write: function (chunk, enc, cb) {
parser.parse(chunk)
cb() // nothing to do
Expand Down Expand Up @@ -300,7 +313,7 @@ describe('MqttClient', function () {
})

const server2 = new MqttServer(function (serverClient) {
serverClient.on('error', function () {})
serverClient.on('error', function () { })
debug('setting serverClient connect callback')
serverClient.on('connect', function (packet) {
if (packet.clientId === 'invalid') {
Expand Down Expand Up @@ -397,7 +410,7 @@ describe('MqttClient', function () {

const server2 = net.createServer(function (stream) {
const serverClient = new Connection(stream)
serverClient.on('error', function () {})
serverClient.on('error', function () { })
serverClient.on('connect', function (packet) {
if (packet.clientId === 'invalid') {
serverClient.connack({ returnCode: 2 })
Expand Down

0 comments on commit c8aa654

Please sign in to comment.