Skip to content

Commit

Permalink
Optimze append (#249)
Browse files Browse the repository at this point in the history
* Add Benchmark for append

* Optimize Append and halve byte copies

* Optimize Append by reducing allocs

* Optimize Append by reducing allocs

* Reduces allocs from test construct

Co-authored-by: Fabian Gärtner <[email protected]>
  • Loading branch information
Fabianexe and Fabianexe authored Oct 27, 2020
1 parent 4b1f942 commit 9949a06
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 31 deletions.
31 changes: 31 additions & 0 deletions bigcache_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func BenchmarkWriteToCache(b *testing.B) {
})
}
}
func BenchmarkAppendToCache(b *testing.B) {
for _, shards := range []int{1, 512, 1024, 8192} {
b.Run(fmt.Sprintf("%d-shards", shards), func(b *testing.B) {
appendToCache(b, shards, 100*time.Second, b.N)
})
}
}

func BenchmarkReadFromCache(b *testing.B) {
for _, shards := range []int{1, 512, 1024, 8192} {
Expand Down Expand Up @@ -134,6 +141,30 @@ func writeToCache(b *testing.B, shards int, lifeWindow time.Duration, requestsIn
})
}

func appendToCache(b *testing.B, shards int, lifeWindow time.Duration, requestsInLifeWindow int) {
cache, _ := NewBigCache(Config{
Shards: shards,
LifeWindow: lifeWindow,
MaxEntriesInWindow: max(requestsInLifeWindow, 100),
MaxEntrySize: 2000,
})
rand.Seed(time.Now().Unix())

b.RunParallel(func(pb *testing.PB) {
id := rand.Int()
counter := 0

b.ReportAllocs()
for pb.Next() {
key := fmt.Sprintf("key-%d-%d", id, counter)
for j := 0; j < 7; j++ {
cache.Append(key, message)
}
counter = counter + 1
}
})
}

func readFromCache(b *testing.B, shards int, info bool) {
cache, _ := NewBigCache(Config{
Shards: shards,
Expand Down
21 changes: 21 additions & 0 deletions encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ func wrapEntry(timestamp uint64, hash uint64, key string, entry []byte, buffer *
return blob[:blobLength]
}

func appendToWrappedEntry(timestamp uint64, wrappedEntry []byte, entry []byte, buffer *[]byte) []byte {
blobLength := len(wrappedEntry) + len(entry)
if blobLength > len(*buffer) {
*buffer = make([]byte, blobLength)
}

blob := *buffer

binary.LittleEndian.PutUint64(blob, timestamp)
copy(blob[timestampSizeInBytes:], wrappedEntry[timestampSizeInBytes:])
copy(blob[len(wrappedEntry):], entry)

return blob[:blobLength]
}

func readEntry(data []byte) []byte {
length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])

Expand All @@ -53,6 +68,12 @@ func readKeyFromEntry(data []byte) string {
return bytesToString(dst)
}

func compareKeyFromEntry(data []byte, key string) bool {
length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])

return bytesToString(data[headersSizeInBytes:headersSizeInBytes+length]) == key
}

func readHashFromEntry(data []byte) uint64 {
return binary.LittleEndian.Uint64(data[timestampSizeInBytes:])
}
Expand Down
84 changes: 53 additions & 31 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,6 @@ func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) {
return entry, nil
}

func (s *cacheShard) getWithoutLock(key string, hashedKey uint64) ([]byte, error) {
wrappedEntry, err := s.getWrappedEntry(hashedKey)
if err != nil {
return nil, err
}
if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
s.collision()
if s.isVerbose {
s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey)
}
return nil, ErrEntryNotFound
}
entry := readEntry(wrappedEntry)
s.hitWithoutLock(hashedKey)

return entry, nil
}

func (s *cacheShard) getWrappedEntry(hashedKey uint64) ([]byte, error) {
itemIndex := s.hashmap[hashedKey]

Expand All @@ -116,6 +98,25 @@ func (s *cacheShard) getWrappedEntry(hashedKey uint64) ([]byte, error) {
return wrappedEntry, err
}

func (s *cacheShard) getValidWrapEntry(key string, hashedKey uint64) ([]byte, error) {
wrappedEntry, err := s.getWrappedEntry(hashedKey)
if err != nil {
return nil, err
}

if !compareKeyFromEntry(wrappedEntry, key) {
s.collision()
if s.isVerbose {
s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, readKeyFromEntry(wrappedEntry), hashedKey)
}

return nil, ErrEntryNotFound
}
s.hitWithoutLock(hashedKey)

return wrappedEntry, nil
}

func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
currentTimestamp := uint64(s.clock.Epoch())

Expand Down Expand Up @@ -146,9 +147,27 @@ func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
}
}

func (s *cacheShard) setWithoutLock(key string, hashedKey uint64, entry []byte) error {
func (s *cacheShard) addNewWithoutLock(key string, hashedKey uint64, entry []byte) error {
currentTimestamp := uint64(s.clock.Epoch())

if oldestEntry, err := s.entries.Peek(); err == nil {
s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry)
}

w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer)

for {
if index, err := s.entries.Push(w); err == nil {
s.hashmap[hashedKey] = uint32(index)
return nil
}
if s.removeOldestEntry(NoSpace) != nil {
return fmt.Errorf("entry is bigger than max shard size")
}
}
}

func (s *cacheShard) setWrappedEntryWithoutLock(currentTimestamp uint64, w []byte, hashedKey uint64) error {
if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 {
if previousEntry, err := s.entries.Get(int(previousIndex)); err == nil {
resetKeyFromEntry(previousEntry)
Expand All @@ -159,8 +178,6 @@ func (s *cacheShard) setWithoutLock(key string, hashedKey uint64, entry []byte)
s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry)
}

w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer)

for {
if index, err := s.entries.Push(w); err == nil {
s.hashmap[hashedKey] = uint32(index)
Expand All @@ -174,20 +191,25 @@ func (s *cacheShard) setWithoutLock(key string, hashedKey uint64, entry []byte)

func (s *cacheShard) append(key string, hashedKey uint64, entry []byte) error {
s.lock.Lock()
var newEntry []byte
oldEntry, err := s.getWithoutLock(key, hashedKey)
wrappedEntry, err := s.getValidWrapEntry(key, hashedKey)

if err == ErrEntryNotFound {
err = s.addNewWithoutLock(key, hashedKey, entry)
s.lock.Unlock()
return err
}
if err != nil {
if err != ErrEntryNotFound {
s.lock.Unlock()
return err
}
} else {
newEntry = oldEntry
s.lock.Unlock()
return err
}

newEntry = append(newEntry, entry...)
err = s.setWithoutLock(key, hashedKey, newEntry)
currentTimestamp := uint64(s.clock.Epoch())

w := appendToWrappedEntry(currentTimestamp, wrappedEntry, entry, &s.entryBuffer)

err = s.setWrappedEntryWithoutLock(currentTimestamp, w, hashedKey)
s.lock.Unlock()

return err
}

Expand Down

0 comments on commit 9949a06

Please sign in to comment.