From 79b23a8f76abaceec67f063b6da0ee57a2c60697 Mon Sep 17 00:00:00 2001 From: Shoji Yamazaki <36025036+ogis-yamazaki@users.noreply.github.com> Date: Fri, 14 Jul 2023 16:18:01 +0900 Subject: [PATCH] fix: problem with publish callback invoked twice (#1635) --- lib/client.js | 46 ++++++++++++----- test/abstract_client.js | 106 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 12 deletions(-) diff --git a/lib/client.js b/lib/client.js index b39ab63db..94c3e1f0f 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1047,11 +1047,12 @@ class MqttClient extends EventEmitter { * @example client.removeOutgoingMessage(client.getLastAllocated()); */ removeOutgoingMessage (messageId) { - const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null - delete this.outgoing[messageId] - this.outgoingStore.del({ messageId }, function () { - cb(new Error('Message removed')) - }) + if (this.outgoing[messageId]) { + const cb = this.outgoing[messageId].cb + this._removeOutgoingAndStoreMessage(messageId, function () { + cb(new Error('Message removed')) + }) + } return this } @@ -1625,12 +1626,13 @@ class MqttClient extends EventEmitter { if (pubackRC && pubackRC > 0 && pubackRC !== 16) { err = new Error('Publish error: ' + errors[pubackRC]) err.code = pubackRC - cb(err, packet) + this._removeOutgoingAndStoreMessage(messageId, function () { + cb(err, packet) + }) + } else { + this._removeOutgoingAndStoreMessage(messageId, cb) } - delete this.outgoing[messageId] - this.outgoingStore.del(packet, cb) - this.messageIdProvider.deallocate(messageId) - this._invokeStoreProcessingQueue() + break } case 'pubrec': { @@ -1644,7 +1646,9 @@ class MqttClient extends EventEmitter { if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { err = new Error('Publish error: ' + errors[pubrecRC]) err.code = pubrecRC - cb(err, packet) + this._removeOutgoingAndStoreMessage(messageId, function () { + cb(err, packet) + }) } else { this._sendPacket(response) } @@ -1888,7 +1892,9 @@ class MqttClient extends EventEmitter { } _invokeStoreProcessingQueue () { - if (this._storeProcessingQueue.length > 0) { + // If _storeProcessing is true, the message is resending. + // During resend, processing is skipped to prevent new messages from interrupting. #1635 + if (!this._storeProcessing && this._storeProcessingQueue.length > 0) { const f = this._storeProcessingQueue[0] if (f && f.invoke()) { this._storeProcessingQueue.shift() @@ -1909,6 +1915,22 @@ class MqttClient extends EventEmitter { } this._storeProcessingQueue.splice(0) } + + /** + * _removeOutgoingAndStoreMessage + * @param {Number} messageId - messageId to remove message + * @param {Function} cb - called when the message removed + * @api private + */ + _removeOutgoingAndStoreMessage (messageId, cb) { + const self = this + delete this.outgoing[messageId] + self.outgoingStore.del({ messageId }, function (err, packet) { + cb(err, packet) + self.messageIdProvider.deallocate(messageId) + self._invokeStoreProcessingQueue() + }) + } } module.exports = MqttClient diff --git a/test/abstract_client.js b/test/abstract_client.js index 985651192..50b18c70f 100644 --- a/test/abstract_client.js +++ b/test/abstract_client.js @@ -1014,6 +1014,57 @@ module.exports = function (server, config) { }) }) + it('should fire a callback (qos 1) on error', function (done) { + // 145 = Packet Identifier in use + const pubackReasonCode = 145 + const pubOpts = { qos: 1 } + let client = null + + const server2 = serverBuilder(config.protocol, function (serverClient) { + serverClient.on('connect', function () { + const connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 } + serverClient.connack(connack) + }) + serverClient.on('publish', function (packet) { + if (packet.qos === 1) { + if (version === 5) { + serverClient.puback({ + messageId: packet.messageId, + reasonCode: pubackReasonCode + }) + } else { + serverClient.puback({ messageId: packet.messageId }) + } + } + }) + }) + + server2.listen(ports.PORTAND72, function () { + client = connect({ + port: ports.PORTAND72, + host: 'localhost', + clean: true, + clientId: 'cid1', + reconnectPeriod: 0 + }) + + client.once('connect', function () { + client.publish('a', 'b', pubOpts, function (err) { + if (version === 5) { + assert.strictEqual(err.code, pubackReasonCode) + } else { + assert.ifError(err) + } + setImmediate(function () { + client.end(() => { + server2.close(done()) + }) + }) + }) + }) + }) + }) + it('should fire a callback (qos 2)', function (done) { const client = connect() const opts = { qos: 2 } @@ -1025,6 +1076,61 @@ module.exports = function (server, config) { }) }) + it('should fire a callback (qos 2) on error', function (done) { + // 145 = Packet Identifier in use + const pubrecReasonCode = 145 + const pubOpts = { qos: 2 } + let client = null + + const server2 = serverBuilder(config.protocol, function (serverClient) { + serverClient.on('connect', function () { + const connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 } + serverClient.connack(connack) + }) + serverClient.on('publish', function (packet) { + if (packet.qos === 2) { + if (version === 5) { + serverClient.pubrec({ + messageId: packet.messageId, + reasonCode: pubrecReasonCode + }) + } else { + serverClient.pubrec({ messageId: packet.messageId }) + } + } + }) + serverClient.on('pubrel', function (packet) { + if (!serverClient.writable) return false + serverClient.pubcomp(packet) + }) + }) + + server2.listen(ports.PORTAND103, function () { + client = connect({ + port: ports.PORTAND103, + host: 'localhost', + clean: true, + clientId: 'cid1', + reconnectPeriod: 0 + }) + + client.once('connect', function () { + client.publish('a', 'b', pubOpts, function (err) { + if (version === 5) { + assert.strictEqual(err.code, pubrecReasonCode) + } else { + assert.ifError(err) + } + setImmediate(function () { + client.end(true, () => { + server2.close(done()) + }) + }) + }) + }) + }) + }) + it('should support UTF-8 characters in topic', function (done) { const client = connect()