Skip to content

Commit

Permalink
perf: clear sync.Pool containers when putting them back
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Aug 21, 2024
1 parent ea0d7eb commit 653339c
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 276 deletions.
107 changes: 0 additions & 107 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,6 @@ import (
var ErrNoSlot = errors.New("the slot has no redis node")
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")

type retry struct {
cIndexes []int
commands []Completed
aIndexes []int
cAskings []Completed
}

func (r *retry) Capacity() int {
return cap(r.commands)
}

func (r *retry) ResetLen(n int) {
r.cIndexes = r.cIndexes[:n]
r.commands = r.commands[:n]
r.aIndexes = r.aIndexes[:0]
r.cAskings = r.cAskings[:0]
}

var retryp = util.NewPool(func(capacity int) *retry {
return &retry{
cIndexes: make([]int, 0, capacity),
commands: make([]Completed, 0, capacity),
}
})

type retrycache struct {
cIndexes []int
commands []CacheableTTL
aIndexes []int
cAskings []CacheableTTL
}

func (r *retrycache) Capacity() int {
return cap(r.commands)
}

func (r *retrycache) ResetLen(n int) {
r.cIndexes = r.cIndexes[:n]
r.commands = r.commands[:n]
r.aIndexes = r.aIndexes[:0]
r.cAskings = r.cAskings[:0]
}

var retrycachep = util.NewPool(func(capacity int) *retrycache {
return &retrycache{
cIndexes: make([]int, 0, capacity),
commands: make([]CacheableTTL, 0, capacity),
}
})

type clusterClient struct {
pslots [16384]conn
rslots []conn
Expand Down Expand Up @@ -1285,60 +1235,3 @@ const (
panicMsgCxSlot = "cross slot command in Dedicated is prohibited"
panicMixCxSlot = "Mixing no-slot and cross slot commands in DoMulti is prohibited"
)

type conncount struct {
m map[conn]int
n int
}

func (r *conncount) Capacity() int {
return r.n
}

func (r *conncount) ResetLen(n int) {
for k := range r.m {
delete(r.m, k)
}
}

var conncountp = util.NewPool(func(capacity int) *conncount {
return &conncount{m: make(map[conn]int, capacity), n: capacity}
})

type connretry struct {
m map[conn]*retry
n int
}

func (r *connretry) Capacity() int {
return r.n
}

func (r *connretry) ResetLen(n int) {
for k := range r.m {
delete(r.m, k)
}
}

var connretryp = util.NewPool(func(capacity int) *connretry {
return &connretry{m: make(map[conn]*retry, capacity), n: capacity}
})

type connretrycache struct {
m map[conn]*retrycache
n int
}

func (r *connretrycache) Capacity() int {
return r.n
}

func (r *connretrycache) ResetLen(n int) {
for k := range r.m {
delete(r.m, k)
}
}

var connretrycachep = util.NewPool(func(capacity int) *connretrycache {
return &connretrycache{m: make(map[conn]*retrycache, capacity), n: capacity}
})
33 changes: 0 additions & 33 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,8 @@ import (
"time"

intl "github.com/redis/rueidis/internal/cmds"
"github.com/redis/rueidis/internal/util"
)

type mgetcachecmds struct {
s []CacheableTTL
}

func (r *mgetcachecmds) Capacity() int {
return cap(r.s)
}

func (r *mgetcachecmds) ResetLen(n int) {
r.s = r.s[:n]
}

var mgetcachecmdsp = util.NewPool(func(capacity int) *mgetcachecmds {
return &mgetcachecmds{s: make([]CacheableTTL, 0, capacity)}
})

type mgetcmds struct {
s []Completed
}

func (r *mgetcmds) Capacity() int {
return cap(r.s)
}

func (r *mgetcmds) ResetLen(n int) {
r.s = r.s[:n]
}

var mgetcmdsp = util.NewPool(func(capacity int) *mgetcmds {
return &mgetcmds{s: make([]Completed, 0, capacity)}
})

// MGetCache is a helper that consults the client-side caches with multiple keys by grouping keys within same slot into multiple GETs
func MGetCache(client Client, ctx context.Context, ttl time.Duration, keys []string) (ret map[string]RedisMessage, err error) {
if len(keys) == 0 {
Expand Down
6 changes: 1 addition & 5 deletions internal/cmds/builder_put.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
//go:build !go1.21

package cmds

func Put(cs *CommandSlice) {
for i := range cs.s {
cs.s[i] = ""
}
clear(cs.s)
cs.s = cs.s[:0]
cs.l = -1
cs.r = 0
Expand Down
11 changes: 0 additions & 11 deletions internal/cmds/builder_put_121.go

This file was deleted.

16 changes: 6 additions & 10 deletions internal/util/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package util

import (
"sync"
"sync/atomic"
)

type Container interface {
Expand All @@ -11,23 +10,19 @@ type Container interface {
}

func NewPool[T Container](fn func(capacity int) T) *Pool[T] {
p := &Pool[T]{fn: fn}
p.sp.New = func() any {
return fn(int(atomic.LoadUint32(&p.ca)))
}
return p
return &Pool[T]{fn: fn}
}

type Pool[T Container] struct {
sp sync.Pool
fn func(capacity int) T
ca uint32
}

func (p *Pool[T]) Get(length, capacity int) T {
atomic.StoreUint32(&p.ca, uint32(capacity))
s := p.sp.Get().(T)
if s.Capacity() < capacity {
s, ok := p.sp.Get().(T)
if !ok {
s = p.fn(capacity)
} else if s.Capacity() < capacity {
p.sp.Put(s)
s = p.fn(capacity)
}
Expand All @@ -36,5 +31,6 @@ func (p *Pool[T]) Get(length, capacity int) T {
}

func (p *Pool[T]) Put(s T) {
s.ResetLen(0)
p.sp.Put(s)
}
71 changes: 0 additions & 71 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,6 @@ type singleconnect struct {
g sync.WaitGroup
}

type batchcache struct {
cIndexes []int
commands []CacheableTTL
}

func (r *batchcache) Capacity() int {
return cap(r.commands)
}

func (r *batchcache) ResetLen(n int) {
r.cIndexes = r.cIndexes[:n]
r.commands = r.commands[:n]
}

var batchcachep = util.NewPool(func(capacity int) *batchcache {
return &batchcache{
cIndexes: make([]int, 0, capacity),
commands: make([]CacheableTTL, 0, capacity),
}
})

type conn interface {
Do(ctx context.Context, cmd Completed) RedisResult
DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) RedisResult
Expand Down Expand Up @@ -399,53 +378,3 @@ func slotfn(n int, ks uint16, noreply bool) uint16 {
}
return uint16(util.FastRand(n))
}

type muxslots struct {
s []int
}

func (r *muxslots) Capacity() int {
return cap(r.s)
}

func (r *muxslots) ResetLen(n int) {
r.s = r.s[:n]
for i := 0; i < n; i++ {
r.s[i] = 0
}
}

func (r *muxslots) LessThen(n int) bool {
count := 0
for _, value := range r.s {
if value > 0 {
if count++; count == n {
return false
}
}
}
return true
}

var muxslotsp = util.NewPool(func(capacity int) *muxslots {
return &muxslots{s: make([]int, 0, capacity)}
})

type batchcachemap struct {
m map[uint16]*batchcache
n int
}

func (r *batchcachemap) Capacity() int {
return r.n
}

func (r *batchcachemap) ResetLen(n int) {
for k := range r.m {
delete(r.m, k)
}
}

var batchcachemaps = util.NewPool(func(capacity int) *batchcachemap {
return &batchcachemap{m: make(map[uint16]*batchcache, capacity), n: capacity}
})
39 changes: 0 additions & 39 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/redis/rueidis/internal/cmds"
"github.com/redis/rueidis/internal/util"
)

const LibName = "rueidis"
Expand All @@ -43,44 +42,6 @@ type wire interface {
SetOnCloseHook(fn func(error))
}

type redisresults struct {
s []RedisResult
}

func (r *redisresults) Capacity() int {
return cap(r.s)
}

func (r *redisresults) ResetLen(n int) {
r.s = r.s[:n]
for i := 0; i < n; i++ {
r.s[i] = RedisResult{}
}
}

var resultsp = util.NewPool(func(capacity int) *redisresults {
return &redisresults{s: make([]RedisResult, 0, capacity)}
})

type cacheentries struct {
e map[int]CacheEntry
c int
}

func (c *cacheentries) Capacity() int {
return c.c
}

func (c *cacheentries) ResetLen(n int) {
for k := range c.e {
delete(c.e, k)
}
}

var entriesp = util.NewPool(func(capacity int) *cacheentries {
return &cacheentries{e: make(map[int]CacheEntry, capacity), c: capacity}
})

var _ wire = (*pipe)(nil)

type pipe struct {
Expand Down
1 change: 1 addition & 0 deletions resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ next:
lr.R = i
lr.N = n
n, err = io.Copy(w, lr)
lr.R = nil
lrs.Put(lr)
} else if typ == typeChunk {
return n, err, true
Expand Down
Loading

0 comments on commit 653339c

Please sign in to comment.