Skip to content

Commit

Permalink
Fix a bug that obscures the byte queue when it is full and is extended (
Browse files Browse the repository at this point in the history
#251)

* Fix a bug that obscures the byte queue when it is full and need to re allocate

* Fix a bug that add artificial entry that is to short to have a valid header

* Fix size check for larger minimum header size

Co-authored-by: Fabian Gärtner <[email protected]>
  • Loading branch information
Fabianexe and Fabianexe authored Nov 4, 2020
1 parent 9949a06 commit 92a824f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
13 changes: 8 additions & 5 deletions queue/bytes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

const (
// Number of bytes to encode 0 in uvarint format
minimumHeaderSize = 1
minimumHeaderSize = 17 // 1 byte blobsize + timestampSizeInBytes + hashSizeInBytes
// Bytes before left margin are not used. Zero index means element does not exist in queue, useful while reading slice from index
leftMarginIndex = 1
)
Expand Down Expand Up @@ -118,10 +118,13 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
if leftMarginIndex != q.rightMargin {
copy(q.array, oldArray[:q.rightMargin])

if q.tail < q.head {
headerEntrySize := getUvarintSize(uint32(q.head - q.tail))
emptyBlobLen := q.head - q.tail - headerEntrySize
q.push(make([]byte, emptyBlobLen), emptyBlobLen)
if q.tail <= q.head {
if q.tail != q.head {
headerEntrySize := getUvarintSize(uint32(q.head - q.tail))
emptyBlobLen := q.head - q.tail - headerEntrySize
q.push(make([]byte, emptyBlobLen), emptyBlobLen)
}

q.head = leftMarginIndex
q.tail = q.rightMargin
}
Expand Down
26 changes: 24 additions & 2 deletions queue/bytes_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,6 @@ func TestPushEntryAfterAllocateAdditionMemory(t *testing.T) {
queue.Push([]byte("aaa"))
queue.Push([]byte("bb"))
queue.Pop()
queue.Push([]byte("c"))
queue.Push([]byte("d"))

// allocate more memory
assertEqual(t, 9, queue.Capacity())
Expand All @@ -402,6 +400,30 @@ func TestPushEntryAfterAllocateAdditionMemory(t *testing.T) {
noError(t, err)
}

func TestPushEntryAfterAllocateAdditionMemoryInFull(t *testing.T) {
t.Parallel()

// given
queue := NewBytesQueue(9, 40, true)

// when
queue.Push([]byte("aaa"))
queue.Push([]byte("bb"))
_, err := queue.Pop()
noError(t, err)

queue.Push([]byte("c"))
queue.Push([]byte("d"))
queue.Push([]byte("e"))
_, err = queue.Pop()
noError(t, err)
_, err = queue.Pop()
noError(t, err)
queue.Push([]byte("fff"))
_, err = queue.Pop()
noError(t, err)
}

func pop(queue *BytesQueue) []byte {
entry, err := queue.Pop()
if err != nil {
Expand Down

0 comments on commit 92a824f

Please sign in to comment.