From c8aa6540dbf68ffb0d88c287e2c862b28d3fb6e6 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 3 Jul 2023 11:07:27 +0200 Subject: [PATCH] feat: option to disable `writeCache` and fix leak in subscriptions (#1622) @vishnureddy17 Fix typo in README 00bf657 @bverhoeven Fix memory leak in subscription topic mapping (#1535) 8c77eec @mwohlert @robertsLando @ynagasaki feat: allow user to disable pre-generated write cache (#1151) 0d11888 Co-authored-by: Daniel Lando Co-authored-by: Yoshi Nagasaki Co-authored-by: Dmitry Kurmanov Co-authored-by: Vishnu Reddy Co-authored-by: Bas Verhoeven Co-authored-by: Michel Wohlert --- README.md | 4 ++-- lib/client.js | 9 ++++++++- test/abstract_client.js | 39 +++++++++++++++++++++++++++++++++++++++ test/client.js | 19 ++++++++++++++++--- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index eec44db27..8417c0a40 100644 --- a/README.md +++ b/README.md @@ -135,8 +135,8 @@ to use MQTT.js in the browser see the [browserify](#browserify) section ### CommonJS (Require) ```js -const mqtt = require("mqtt"); // require mqtt -const client = mqtt.connect("est.mosquitto.org"); // create a client +const mqtt = require("mqtt") // require mqtt +const client = mqtt.connect("test.mosquitto.org") // create a client ``` ### ES6 Modules (Import) diff --git a/lib/client.js b/lib/client.js index d02c16f47..c1757aaf6 100644 --- a/lib/client.js +++ b/lib/client.js @@ -29,7 +29,8 @@ const defaultConnectOptions = { reconnectPeriod: 1000, connectTimeout: 30 * 1000, clean: true, - resubscribe: true + resubscribe: true, + writeCache: true } const socketErrors = [ @@ -278,6 +279,11 @@ function MqttClient (streamBuilder, options) { this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) } + // Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance + if (!this.options.writeCache) { + mqttPacket.writeToStream.cacheNumbers = false + } + this.streamBuilder = streamBuilder this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider @@ -1662,6 +1668,7 @@ MqttClient.prototype._handleAck = function (packet) { } } } + delete this.messageIdToTopic[messageId] this._invokeStoreProcessingQueue() cb(null, packet) break diff --git a/test/abstract_client.js b/test/abstract_client.js index b711d2899..ee19ee1d6 100644 --- a/test/abstract_client.js +++ b/test/abstract_client.js @@ -3274,4 +3274,43 @@ module.exports = function (server, config) { }) }) }) + + describe('message id to subscription topic mapping', () => { + it('should not create a mapping if resubscribe is disabled', function (done) { + const client = connect({ resubscribe: false }) + client.subscribe('test1') + client.subscribe('test2') + assert.strictEqual(Object.keys(client.messageIdToTopic).length, 0) + client.end(true, done) + }) + + it('should create a mapping for each subscribe call', function (done) { + const client = connect() + client.subscribe('test1') + assert.strictEqual(Object.keys(client.messageIdToTopic).length, 1) + client.subscribe('test2') + assert.strictEqual(Object.keys(client.messageIdToTopic).length, 2) + + client.subscribe(['test3', 'test4']) + assert.strictEqual(Object.keys(client.messageIdToTopic).length, 3) + client.subscribe(['test5', 'test6']) + assert.strictEqual(Object.keys(client.messageIdToTopic).length, 4) + + client.end(true, done) + }) + + it('should remove the mapping after suback', function (done) { + const client = connect() + client.once('connect', function () { + client.subscribe('test1', { qos: 2 }, function () { + assert.strictEqual(Object.keys(client.messageIdToTopic).length, 0) + + client.subscribe(['test2', 'test3'], { qos: 2 }, function () { + assert.strictEqual(Object.keys(client.messageIdToTopic).length, 0) + client.end(done) + }) + }) + }) + }) + }) } diff --git a/test/client.js b/test/client.js index bac6434da..f1eaa3daf 100644 --- a/test/client.js +++ b/test/client.js @@ -43,6 +43,19 @@ describe('MqttClient', function () { done() } }) + + it('should disable number cache if specified in options', function (done) { + try { + assert.isTrue(mqttPacket.writeToStream.cacheNumbers) + client = mqtt.MqttClient(function () { + throw Error('break') + }, { writeCache: false }) + client.end() + } catch (err) { + assert.isFalse(mqttPacket.writeToStream.cacheNumbers) + done() + } + }) }) describe('message ids', function () { @@ -83,7 +96,7 @@ describe('MqttClient', function () { const max = 1000 let count = 0 const duplex = new Duplex({ - read: function (n) {}, + read: function (n) { }, write: function (chunk, enc, cb) { parser.parse(chunk) cb() // nothing to do @@ -300,7 +313,7 @@ describe('MqttClient', function () { }) const server2 = new MqttServer(function (serverClient) { - serverClient.on('error', function () {}) + serverClient.on('error', function () { }) debug('setting serverClient connect callback') serverClient.on('connect', function (packet) { if (packet.clientId === 'invalid') { @@ -397,7 +410,7 @@ describe('MqttClient', function () { const server2 = net.createServer(function (stream) { const serverClient = new Connection(stream) - serverClient.on('error', function () {}) + serverClient.on('error', function () { }) serverClient.on('connect', function (packet) { if (packet.clientId === 'invalid') { serverClient.connack({ returnCode: 2 })