Skip to content

Commit

Permalink
fix: remove unnecessary aws configuration endpoint specialization. Th…
Browse files Browse the repository at this point in the history
…e endpoint is just a DNS alias to any nodes

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Jul 21, 2024
1 parent 5ed697d commit 0a75d48
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 42 deletions.
11 changes: 2 additions & 9 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -82,7 +81,6 @@ type clusterClient struct {
stop uint32
cmd Builder
retry bool
aws bool
}

// NOTE: connrole and conn must be initialized at the same time
Expand All @@ -98,7 +96,6 @@ func newClusterClient(opt *ClientOption, connFn connFn) (*clusterClient, error)
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
aws: len(opt.InitAddress) == 1 && strings.Contains(opt.InitAddress[0], "amazonaws.com"),
}

if opt.ReplicaOnly && opt.SendToReplicas != nil {
Expand Down Expand Up @@ -198,12 +195,8 @@ func (c *clusterClient) _refresh() (err error) {
c.mu.RLock()
results := make(chan clusterslots, len(c.conns))
pending := make([]conn, 0, len(c.conns))
if c.aws {
pending = append(pending, c.conns[c.opt.InitAddress[0]].conn)
} else {
for _, cc := range c.conns {
pending = append(pending, cc.conn)
}
for _, cc := range c.conns {
pending = append(pending, cc.conn)
}
c.mu.RUnlock()

Expand Down
33 changes: 0 additions & 33 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,39 +776,6 @@ func TestClusterClientInit(t *testing.T) {
}
})

t.Run("Refresh aws cluster", func(t *testing.T) {
getClient := func(version int) (client *clusterClient, err error) {
return newClusterClient(&ClientOption{InitAddress: []string{"xxxxx.amazonaws.com:1"}}, func(dst string, opt *ClientOption) conn {
return &mockConn{
DoFn: func(cmd Completed) RedisResult {
if dst == "xxxxx.amazonaws.com:1" && strings.Join(cmd.Commands(), " ") == "CLUSTER SHARDS" {
return shardsResp
}
return newErrResult(errors.New("unexpected call"))
},
AddrFn: func() string { return "xxxxx.amazonaws.com:1" },
VersionFn: func() int { return version },
}
})
}

t.Run("shards", func(t *testing.T) {
client, err := getClient(7)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
nodes := client.nodes()
sort.Strings(nodes)
if len(nodes) != 3 ||
nodes[0] != "127.0.0.1:0" ||
nodes[1] != "127.0.1.1:1" ||
nodes[2] != "xxxxx.amazonaws.com:1" {
t.Fatalf("unexpected nodes %v", nodes)
}
client.Close()
})
})

t.Run("Refresh cluster which has only primary node per shard with SendToReplica option", func(t *testing.T) {
m := &mockConn{
DoFn: func(cmd Completed) RedisResult {
Expand Down

0 comments on commit 0a75d48

Please sign in to comment.