Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: keepalive manager #1865

Merged
merged 16 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ npm test

This will run both `browser` and `node` tests.


### Running specific tests

For example, you can run `node -r esbuild-register --test test/pingTimer.ts`
For example, you can run `node -r esbuild-register --test test/keepaliveManager.ts`

### Browser

Expand Down
2 changes: 1 addition & 1 deletion example.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import mqtt from './src/index'

const client = mqtt.connect('mqtts://test.mosquitto.org', {
keepalive: 10,
// keepalive: 10,
port: 8883,
reconnectPeriod: 15000,
rejectUnauthorized: false,
Expand Down
89 changes: 89 additions & 0 deletions src/lib/KeepaliveManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import type MqttClient from './client'
import getTimer, { type Timer } from './get-timer'
import type { TimerVariant } from './shared'

export default class KeepaliveManager {
private _keepalive: number

private timerId: number

private timer: Timer

private destroyed = false

private counter: number

private client: MqttClient

private _keepaliveTimeoutTimestamp: number

/** Timestamp of next keepalive timeout */
get keepaliveTimeoutTimestamp() {
return this._keepaliveTimeoutTimestamp
}

set keepalive(value: number) {
if (
// eslint-disable-next-line no-restricted-globals
isNaN(value) ||
value < 0 ||
robertsLando marked this conversation as resolved.
Show resolved Hide resolved
value * 1000 > 2147483647
robertsLando marked this conversation as resolved.
Show resolved Hide resolved
) {
throw new Error(
`Keepalive value must be a number between 0 and 2147483647. Provided value is ${this._keepalive}`,
)
}

this._keepalive = value * 1000
}

get keepalive() {
return this._keepalive
}

constructor(client: MqttClient, variant: TimerVariant) {
this.keepalive = client.options.keepalive
this.client = client
this.timer = getTimer(variant)
this.reschedule()
}

private clear() {
if (this.timerId) {
this.timer.clear(this.timerId)
this.timerId = null
}
}

destroy() {
this.clear()
this.destroyed = true
}

reschedule() {
if (this.destroyed) {
return
}

this.clear()
this.counter = 0
// https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_3.5_Keep
this._keepaliveTimeoutTimestamp = Date.now() + this._keepalive * 1.5
robertsLando marked this conversation as resolved.
Show resolved Hide resolved

this.timerId = this.timer.set(() => {
// this should never happen, but just in case
if (this.destroyed) {
return
}

this.counter += 1

// after keepalive seconds, send a pingreq
if (this.counter === 2) {
this.client.sendPing()
} else if (this.counter > 2) {
this.client.onKeepaliveTimeout()
}
}, this._keepalive / 2)
robertsLando marked this conversation as resolved.
Show resolved Hide resolved
}
}
56 changes: 0 additions & 56 deletions src/lib/PingTimer.ts

This file was deleted.

84 changes: 29 additions & 55 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import {
} from './shared'
import TopicAliasSend from './topic-alias-send'
import { TypedEventEmitter } from './TypedEmitter'
import PingTimer from './PingTimer'
import KeepaliveManager from './KeepaliveManager'
import isBrowser, { isWebWorker } from './is-browser'

const setImmediate =
Expand Down Expand Up @@ -433,10 +433,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

public noop: (error?: any) => void

/** Timestamp of last received control packet */
public pingResp: number

public pingTimer: PingTimer
public keepaliveManager: KeepaliveManager

/**
* The connection to the Broker. In browsers env this also have `socket` property
Expand Down Expand Up @@ -572,8 +569,8 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
// map of a subscribe messageId and a topic
this.messageIdToTopic = {}

// Ping timer, setup in _setupPingTimer
this.pingTimer = null
// Keepalive manager, setup in _setupKeepaliveManager
this.keepaliveManager = null
// Is the client connected?
this.connected = false
// Are we disconnecting?
Expand Down Expand Up @@ -660,7 +657,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this.log('close :: clearing connackTimer')
clearTimeout(this.connackTimer)

this._destroyPingTimer()
this._destroyKeepaliveManager()

if (this.topicAliasRecv) {
this.topicAliasRecv.clear()
Expand Down Expand Up @@ -1780,7 +1777,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this._setupReconnect()
}

this._destroyPingTimer()
this._destroyKeepaliveManager()

if (done && !this.connected) {
this.log(
Expand Down Expand Up @@ -2064,45 +2061,36 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
}

/**
* _setupPingTimer - setup the ping timer
*
* @api private
* _setupKeepaliveManager - setup the keepalive manager
*/
private _setupPingTimer() {
private _setupKeepaliveManager() {
this.log(
'_setupPingTimer :: keepalive %d (seconds)',
'_setupKeepaliveManager :: keepalive %d (seconds)',
this.options.keepalive,
)

if (!this.pingTimer && this.options.keepalive) {
this.pingTimer = new PingTimer(
this.options.keepalive,
() => {
this._checkPing()
},
if (!this.keepaliveManager && this.options.keepalive) {
this.keepaliveManager = new KeepaliveManager(
this,
this.options.timerVariant,
)
this.pingResp = Date.now()
}
}

private _destroyPingTimer() {
if (this.pingTimer) {
this.log('_destroyPingTimer :: destroying ping timer')
this.pingTimer.destroy()
this.pingTimer = null
private _destroyKeepaliveManager() {
if (this.keepaliveManager) {
this.log('_destroyKeepaliveManager :: destroying keepalive manager')
this.keepaliveManager.destroy()
this.keepaliveManager = null
}
}

/**

* _shiftPingInterval - reschedule the ping interval
*
* @api private
* reschedule the ping interval
*/
private _shiftPingInterval() {
public shiftPing() {
if (
this.pingTimer &&
this.keepaliveManager &&
this.options.keepalive &&
this.options.reschedulePings
) {
Expand All @@ -2115,34 +2103,20 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
*/
private _reschedulePing() {
this.log('_reschedulePing :: rescheduling ping')
this.pingTimer.reschedule()
this.keepaliveManager.reschedule()
}

/**
* _checkPing - check if a pingresp has come back, and ping the server again
*
* @api private
*/
private _checkPing() {
this.log('_checkPing :: checking ping...')
// give 100ms offset to avoid ping timeout when receiving fast responses
const timeSincePing = Date.now() - this.pingResp - 100
if (timeSincePing <= this.options.keepalive * 1000) {
this.log('_checkPing :: ping response received in time')
this._sendPing()
} else {
// do a forced cleanup since socket will be in bad shape
this.emit('error', new Error('Keepalive timeout'))
this.log('_checkPing :: calling _cleanUp with force true')
this._cleanUp(true)
}
}

private _sendPing() {
public sendPing() {
this.log('_sendPing :: sending pingreq')
this._sendPacket({ cmd: 'pingreq' })
}

public onKeepaliveTimeout() {
this.emit('error', new Error('Keepalive timeout'))
this.log('onKeepaliveTimeout :: calling _cleanUp with force true')
this._cleanUp(true)
}

/**
* _resubscribe
* @api private
Expand Down Expand Up @@ -2205,7 +2179,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

this.connackPacket = packet
this.messageIdProvider.clear()
this._setupPingTimer()
this._setupKeepaliveManager()

this.connected = true

Expand Down
14 changes: 7 additions & 7 deletions src/lib/get-timer.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import isBrowser, { isWebWorker, isReactNativeBrowser } from './is-browser'
import { clearTimeout as clearT, setTimeout as setT } from 'worker-timers'
import { clearInterval as clearI, setInterval as setI } from 'worker-timers'
import type { TimerVariant } from './shared'

// dont directly assign globals to class props otherwise this throws in web workers: Uncaught TypeError: Illegal invocation
// See: https://stackoverflow.com/questions/9677985/uncaught-typeerror-illegal-invocation-in-chrome

export interface Timer {
set: typeof setT
clear: typeof clearT
set: typeof setI
clear: typeof clearI
}

const workerTimer: Timer = {
set: setT,
clear: clearT,
set: setI,
clear: clearI,
}

const nativeTimer: Timer = {
set: (func, time) => setTimeout(func, time),
clear: (timerId) => clearTimeout(timerId),
set: (func, time) => setInterval(func, time),
clear: (timerId) => clearInterval(timerId),
}

const getTimer = (variant: TimerVariant): Timer => {
Expand Down
4 changes: 3 additions & 1 deletion src/lib/handlers/connack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ const handleConnack: PacketHandler = (client, packet: IConnackPacket) => {
}
if (packet.properties.serverKeepAlive && options.keepalive) {
options.keepalive = packet.properties.serverKeepAlive
client['_shiftPingInterval']()
client.keepaliveManager.keepalive = options.keepalive
client.shiftPing()
}

if (packet.properties.maximumPacketSize) {
if (!options.properties) {
options.properties = {}
Expand Down
Loading
Loading