From 0e4071e21d04b432e846cc63992742a51fc1c5cf Mon Sep 17 00:00:00 2001 From: Giedrius <4882339+GiedriusM@users.noreply.github.com> Date: Mon, 4 Feb 2019 12:27:16 +0200 Subject: [PATCH] Fixes to Observe notifications (#201) * Fix Observe option encoding and decoding * Fix Observe notification filtering * Add more Observe tests * Add flag to disable Observe notification filtering (#200) --- lib/observe_read_stream.js | 16 +++- lib/option_converter.js | 20 ++--- test/request.js | 152 +++++++++++++++++++++++++++++++++++++ test/server.js | 18 +---- 4 files changed, 177 insertions(+), 29 deletions(-) diff --git a/lib/observe_read_stream.js b/lib/observe_read_stream.js index 531ba819..b93dc33c 100644 --- a/lib/observe_read_stream.js +++ b/lib/observe_read_stream.js @@ -18,7 +18,9 @@ function ObserveReadStream(packet, rsinfo, outSocket) { this.rsinfo = rsinfo this.outSocket = outSocket - this._lastId = 0 + this._lastId = undefined + this._lastTime = 0 + this._disableFiltering = false this.append(packet) } @@ -29,8 +31,18 @@ ObserveReadStream.prototype.append = function(packet) { return pktToMsg(this, packet) - if (this.headers['Observe'] > this._lastId) { + + // First notification + if (this._lastId === undefined) { + this._lastId = this.headers['Observe'] - 1 + } + + const dseq = (this.headers['Observe'] - this._lastId) & 0xffffff + const dtime = Date.now() - this._lastTime + + if (this._disableFiltering || (dseq > 0 && dseq < (1 << 23)) || dtime > 128*1000) { this._lastId = this.headers['Observe'] + this._lastTime = Date.now() this.push(packet.payload) } } diff --git a/lib/option_converter.js b/lib/option_converter.js index 9605eebb..a9515084 100644 --- a/lib/option_converter.js +++ b/lib/option_converter.js @@ -153,18 +153,13 @@ registerOption('Observe', function(sequence) { var buf if (!sequence) { - buf = new Buffer(0) - } else if (sequence < 256) { - buf = new Buffer(1) - buf.writeUInt8(sequence, 0) - } else if (sequence >= 256 & sequence < 65535) { - buf = new Buffer(2) - buf.writeUInt16BE(sequence, 0) + buf = Buffer.alloc(0) + } else if (sequence <= 0xff) { + buf = Buffer.from([sequence]) + } else if (sequence <= 0xffff) { + buf = Buffer.from([sequence >> 8, sequence]) } else { - // it is three bytes long - buf = new Buffer(3) - buf.writeUInt8(Math.floor(sequence / 65535), 0) - buf.writeUInt16BE(sequence % 65535, 1) + buf = Buffer.from([sequence >> 16, sequence >> 8, sequence]) } return buf @@ -176,8 +171,7 @@ registerOption('Observe', function(sequence) { } else if (buf.length === 2) { result = buf.readUInt16BE(0) } else if (buf.length === 3) { - result += buf.readUInt8(0) * 65353 - result += buf.readUInt16BE(1) + result = (buf.readUInt8(0) << 16) | buf.readUInt16BE(1) } return result diff --git a/test/request.js b/test/request.js index 67b8b941..fb6cfbe8 100644 --- a/test/request.js +++ b/test/request.js @@ -7,6 +7,7 @@ */ var coap = require('../') + , toBinary = require('../lib/option_converter').toBinary , parse = require('coap-packet').parse , generate = require('coap-packet').generate , dgram = require('dgram') @@ -997,6 +998,20 @@ describe('request', function() { server.send(toSend, 0, toSend.length, rsinfo.port, rsinfo.address) } + function sendNotification(rsinfo, req, opts) { + ssend(rsinfo, { + messageId: req.messageId + , token: req.token + , payload: Buffer.from(opts.payload) + , ack: false + , options: [{ + name: 'Observe' + , value: toBinary('Observe', opts.num) + }] + , code: '2.05' + }) + } + it('should ack the update', function (done) { var req = doObserve() @@ -1070,6 +1085,143 @@ describe('request', function() { done() }) }) + + it('should allow multiple notifications', function (done) { + server.once('message', function(msg, rsinfo) { + const req = parse(msg) + + sendNotification(rsinfo, req, { num: 0, payload: 'zero' }) + sendNotification(rsinfo, req, { num: 1, payload: 'one' }) + }) + + const req = request({ + port: port + , observe: true + , confirmable: false + }).end() + + req.on('response', function (res) { + let ndata = 0 + + res.on('data', function (data) { + ndata++ + if (ndata === 1) { + expect(res.headers['Observe']).to.equal(0) + expect(data.toString()).to.equal('zero') + } else if (ndata === 2) { + expect(res.headers['Observe']).to.equal(1) + expect(data.toString()).to.equal('one') + done() + } else { + done(new Error('Unexpected data')) + } + }) + }) + }) + + it('should drop out of order notifications', function (done) { + server.once('message', function(msg, rsinfo) { + const req = parse(msg) + + sendNotification(rsinfo, req, { num: 1, payload: 'one' }) + sendNotification(rsinfo, req, { num: 0, payload: 'zero' }) + sendNotification(rsinfo, req, { num: 2, payload: 'two' }) + }) + + const req = request({ + port: port + , observe: true + , confirmable: false + }).end() + + req.on('response', function (res) { + let ndata = 0 + + res.on('data', function (data) { + ndata++ + if (ndata === 1) { + expect(res.headers['Observe']).to.equal(1) + expect(data.toString()).to.equal('one') + } else if (ndata === 2) { + expect(res.headers['Observe']).to.equal(2) + expect(data.toString()).to.equal('two') + done() + } else { + done(new Error('Unexpected data')) + } + }) + }) + }) + + it('should allow repeating order after 128 seconds', function (done) { + server.once('message', function(msg, rsinfo) { + const req = parse(msg) + + sendNotification(rsinfo, req, { num: 1, payload: 'one' }) + setTimeout(function () { + sendNotification(rsinfo, req, { num: 1, payload: 'two' }) + }, 128 * 1000 + 200) + }) + + const req = request({ + port: port + , observe: true + , confirmable: false + }).end() + + req.on('response', function (res) { + let ndata = 0 + + res.on('data', function (data) { + ndata++ + if (ndata === 1) { + expect(res.headers['Observe']).to.equal(1) + expect(data.toString()).to.equal('one') + } else if (ndata === 2) { + expect(res.headers['Observe']).to.equal(1) + expect(data.toString()).to.equal('two') + done() + } else { + done(new Error('Unexpected data')) + } + }) + }) + + fastForward(100, 129*1000) + }) + + it('should allow Observe option 24bit overflow', function (done) { + server.once('message', function(msg, rsinfo) { + const req = parse(msg) + + sendNotification(rsinfo, req, { num: 0xffffff, payload: 'max' }) + sendNotification(rsinfo, req, { num: 0, payload: 'zero' }) + }) + + const req = request({ + port: port + , observe: true + , confirmable: false + }).end() + + req.on('response', function (res) { + let ndata = 0 + + res.on('data', function (data) { + ndata++ + if (ndata === 1) { + expect(res.headers['Observe']).to.equal(0xffffff) + expect(data.toString()).to.equal('max') + } else if (ndata === 2) { + expect(res.headers['Observe']).to.equal(0) + expect(data.toString()).to.equal('zero') + done() + } else { + done(new Error('Unexpected data')) + } + }) + }) + }) }) describe('token', function () { diff --git a/test/server.js b/test/server.js index 708beff3..aff04a34 100644 --- a/test/server.js +++ b/test/server.js @@ -890,7 +890,6 @@ describe('server', function() { it('should correctly generate two-byte long sequence numbers', function(done) { var now = Date.now() - , buf = new Buffer(2) doObserve() server.on('request', function(req, res) { @@ -905,13 +904,10 @@ describe('server', function() { // the first one is an ack client.once('message', function(msg) { - buf.writeUInt16BE(4243, 0) - expect(parse(msg).options[0].value).to.eql(buf) + expect(parse(msg).options[0].value).to.eql(Buffer.from([0x10, 0x93])) client.once('message', function(msg) { - buf.writeUInt16BE(4244, 0) - expect(parse(msg).options[0].value).to.eql(buf) - + expect(parse(msg).options[0].value).to.eql(Buffer.from([0x10, 0x94])) done() }) }) @@ -919,9 +915,6 @@ describe('server', function() { it('should correctly generate three-byte long sequence numbers', function(done) { var now = Date.now() - , buf = new Buffer(3) - - buf.writeUInt8(1, 0) doObserve() @@ -937,13 +930,10 @@ describe('server', function() { // the first one is an ack client.once('message', function(msg) { - buf.writeUInt16BE(1, 1) - expect(parse(msg).options[0].value).to.eql(buf) + expect(parse(msg).options[0].value).to.eql(Buffer.from([1, 0, 0])) client.once('message', function(msg) { - buf.writeUInt16BE(2, 1) - expect(parse(msg).options[0].value).to.eql(buf) - + expect(parse(msg).options[0].value).to.eql(Buffer.from([1, 0, 1])) done() }) })