Skip to content

Commit

Permalink
Fixes to Observe notifications (#201)
Browse files Browse the repository at this point in the history
* Fix Observe option encoding and decoding
* Fix Observe notification filtering
* Add more Observe tests
* Add flag to disable Observe notification filtering (#200)
  • Loading branch information
GiedriusM authored Feb 4, 2019
1 parent c759838 commit 0e4071e
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 29 deletions.
16 changes: 14 additions & 2 deletions lib/observe_read_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ function ObserveReadStream(packet, rsinfo, outSocket) {
this.rsinfo = rsinfo
this.outSocket = outSocket

this._lastId = 0
this._lastId = undefined
this._lastTime = 0
this._disableFiltering = false
this.append(packet)
}

Expand All @@ -29,8 +31,18 @@ ObserveReadStream.prototype.append = function(packet) {
return

pktToMsg(this, packet)
if (this.headers['Observe'] > this._lastId) {

// First notification
if (this._lastId === undefined) {
this._lastId = this.headers['Observe'] - 1
}

const dseq = (this.headers['Observe'] - this._lastId) & 0xffffff
const dtime = Date.now() - this._lastTime

if (this._disableFiltering || (dseq > 0 && dseq < (1 << 23)) || dtime > 128*1000) {
this._lastId = this.headers['Observe']
this._lastTime = Date.now()
this.push(packet.payload)
}
}
Expand Down
20 changes: 7 additions & 13 deletions lib/option_converter.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,13 @@ registerOption('Observe', function(sequence) {
var buf

if (!sequence) {
buf = new Buffer(0)
} else if (sequence < 256) {
buf = new Buffer(1)
buf.writeUInt8(sequence, 0)
} else if (sequence >= 256 & sequence < 65535) {
buf = new Buffer(2)
buf.writeUInt16BE(sequence, 0)
buf = Buffer.alloc(0)
} else if (sequence <= 0xff) {
buf = Buffer.from([sequence])
} else if (sequence <= 0xffff) {
buf = Buffer.from([sequence >> 8, sequence])
} else {
// it is three bytes long
buf = new Buffer(3)
buf.writeUInt8(Math.floor(sequence / 65535), 0)
buf.writeUInt16BE(sequence % 65535, 1)
buf = Buffer.from([sequence >> 16, sequence >> 8, sequence])
}

return buf
Expand All @@ -176,8 +171,7 @@ registerOption('Observe', function(sequence) {
} else if (buf.length === 2) {
result = buf.readUInt16BE(0)
} else if (buf.length === 3) {
result += buf.readUInt8(0) * 65353
result += buf.readUInt16BE(1)
result = (buf.readUInt8(0) << 16) | buf.readUInt16BE(1)
}

return result
Expand Down
152 changes: 152 additions & 0 deletions test/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

var coap = require('../')
, toBinary = require('../lib/option_converter').toBinary
, parse = require('coap-packet').parse
, generate = require('coap-packet').generate
, dgram = require('dgram')
Expand Down Expand Up @@ -997,6 +998,20 @@ describe('request', function() {
server.send(toSend, 0, toSend.length, rsinfo.port, rsinfo.address)
}

function sendNotification(rsinfo, req, opts) {
ssend(rsinfo, {
messageId: req.messageId
, token: req.token
, payload: Buffer.from(opts.payload)
, ack: false
, options: [{
name: 'Observe'
, value: toBinary('Observe', opts.num)
}]
, code: '2.05'
})
}

it('should ack the update', function (done) {

var req = doObserve()
Expand Down Expand Up @@ -1070,6 +1085,143 @@ describe('request', function() {
done()
})
})

it('should allow multiple notifications', function (done) {
server.once('message', function(msg, rsinfo) {
const req = parse(msg)

sendNotification(rsinfo, req, { num: 0, payload: 'zero' })
sendNotification(rsinfo, req, { num: 1, payload: 'one' })
})

const req = request({
port: port
, observe: true
, confirmable: false
}).end()

req.on('response', function (res) {
let ndata = 0

res.on('data', function (data) {
ndata++
if (ndata === 1) {
expect(res.headers['Observe']).to.equal(0)
expect(data.toString()).to.equal('zero')
} else if (ndata === 2) {
expect(res.headers['Observe']).to.equal(1)
expect(data.toString()).to.equal('one')
done()
} else {
done(new Error('Unexpected data'))
}
})
})
})

it('should drop out of order notifications', function (done) {
server.once('message', function(msg, rsinfo) {
const req = parse(msg)

sendNotification(rsinfo, req, { num: 1, payload: 'one' })
sendNotification(rsinfo, req, { num: 0, payload: 'zero' })
sendNotification(rsinfo, req, { num: 2, payload: 'two' })
})

const req = request({
port: port
, observe: true
, confirmable: false
}).end()

req.on('response', function (res) {
let ndata = 0

res.on('data', function (data) {
ndata++
if (ndata === 1) {
expect(res.headers['Observe']).to.equal(1)
expect(data.toString()).to.equal('one')
} else if (ndata === 2) {
expect(res.headers['Observe']).to.equal(2)
expect(data.toString()).to.equal('two')
done()
} else {
done(new Error('Unexpected data'))
}
})
})
})

it('should allow repeating order after 128 seconds', function (done) {
server.once('message', function(msg, rsinfo) {
const req = parse(msg)

sendNotification(rsinfo, req, { num: 1, payload: 'one' })
setTimeout(function () {
sendNotification(rsinfo, req, { num: 1, payload: 'two' })
}, 128 * 1000 + 200)
})

const req = request({
port: port
, observe: true
, confirmable: false
}).end()

req.on('response', function (res) {
let ndata = 0

res.on('data', function (data) {
ndata++
if (ndata === 1) {
expect(res.headers['Observe']).to.equal(1)
expect(data.toString()).to.equal('one')
} else if (ndata === 2) {
expect(res.headers['Observe']).to.equal(1)
expect(data.toString()).to.equal('two')
done()
} else {
done(new Error('Unexpected data'))
}
})
})

fastForward(100, 129*1000)
})

it('should allow Observe option 24bit overflow', function (done) {
server.once('message', function(msg, rsinfo) {
const req = parse(msg)

sendNotification(rsinfo, req, { num: 0xffffff, payload: 'max' })
sendNotification(rsinfo, req, { num: 0, payload: 'zero' })
})

const req = request({
port: port
, observe: true
, confirmable: false
}).end()

req.on('response', function (res) {
let ndata = 0

res.on('data', function (data) {
ndata++
if (ndata === 1) {
expect(res.headers['Observe']).to.equal(0xffffff)
expect(data.toString()).to.equal('max')
} else if (ndata === 2) {
expect(res.headers['Observe']).to.equal(0)
expect(data.toString()).to.equal('zero')
done()
} else {
done(new Error('Unexpected data'))
}
})
})
})
})

describe('token', function () {
Expand Down
18 changes: 4 additions & 14 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,6 @@ describe('server', function() {

it('should correctly generate two-byte long sequence numbers', function(done) {
var now = Date.now()
, buf = new Buffer(2)
doObserve()

server.on('request', function(req, res) {
Expand All @@ -905,23 +904,17 @@ describe('server', function() {

// the first one is an ack
client.once('message', function(msg) {
buf.writeUInt16BE(4243, 0)
expect(parse(msg).options[0].value).to.eql(buf)
expect(parse(msg).options[0].value).to.eql(Buffer.from([0x10, 0x93]))

client.once('message', function(msg) {
buf.writeUInt16BE(4244, 0)
expect(parse(msg).options[0].value).to.eql(buf)

expect(parse(msg).options[0].value).to.eql(Buffer.from([0x10, 0x94]))
done()
})
})
})

it('should correctly generate three-byte long sequence numbers', function(done) {
var now = Date.now()
, buf = new Buffer(3)

buf.writeUInt8(1, 0)

doObserve()

Expand All @@ -937,13 +930,10 @@ describe('server', function() {

// the first one is an ack
client.once('message', function(msg) {
buf.writeUInt16BE(1, 1)
expect(parse(msg).options[0].value).to.eql(buf)
expect(parse(msg).options[0].value).to.eql(Buffer.from([1, 0, 0]))

client.once('message', function(msg) {
buf.writeUInt16BE(2, 1)
expect(parse(msg).options[0].value).to.eql(buf)

expect(parse(msg).options[0].value).to.eql(Buffer.from([1, 0, 1]))
done()
})
})
Expand Down

0 comments on commit 0e4071e

Please sign in to comment.