package redis import ( "context" "errors" "fmt" "math/rand" "strconv" "sync" "sync/atomic" "time" "github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal/consistenthash" "github.com/go-redis/redis/internal/hashtag" "github.com/go-redis/redis/internal/pool" ) // Hash is type of hash function used in consistent hash. type Hash consistenthash.Hash var errRingShardsDown = errors.New("redis: all ring shards are down") // RingOptions are used to configure a ring client and should be // passed to NewRing. type RingOptions struct { // Map of name => host:port addresses of ring shards. Addrs map[string]string // Frequency of PING commands sent to check shards availability. // Shard is considered down after 3 subsequent failed checks. HeartbeatFrequency time.Duration // Hash function used in consistent hash. // Default is crc32.ChecksumIEEE. Hash Hash // Number of replicas in consistent hash. // Default is 100 replicas. // // Higher number of replicas will provide less deviation, that is keys will be // distributed to nodes more evenly. // // Following is deviation for common nreplicas: // -------------------------------------------------------- // | nreplicas | standard error | 99% confidence interval | // | 10 | 0.3152 | (0.37, 1.98) | // | 100 | 0.0997 | (0.76, 1.28) | // | 1000 | 0.0316 | (0.92, 1.09) | // -------------------------------------------------------- // // See https://arxiv.org/abs/1406.2294 for reference HashReplicas int // Following options are copied from Options struct. OnConnect func(*Conn) error DB int Password string MaxRetries int MinRetryBackoff time.Duration MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration PoolSize int MinIdleConns int MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration } func (opt *RingOptions) init() { if opt.HeartbeatFrequency == 0 { opt.HeartbeatFrequency = 500 * time.Millisecond } if opt.HashReplicas == 0 { opt.HashReplicas = 100 } switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0 case 0: opt.MinRetryBackoff = 8 * time.Millisecond } switch opt.MaxRetryBackoff { case -1: opt.MaxRetryBackoff = 0 case 0: opt.MaxRetryBackoff = 512 * time.Millisecond } } func (opt *RingOptions) clientOptions() *Options { return &Options{ OnConnect: opt.OnConnect, DB: opt.DB, Password: opt.Password, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, PoolSize: opt.PoolSize, MinIdleConns: opt.MinIdleConns, MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, } } //------------------------------------------------------------------------------ type ringShard struct { Client *Client down int32 } func (shard *ringShard) String() string { var state string if shard.IsUp() { state = "up" } else { state = "down" } return fmt.Sprintf("%s is %s", shard.Client, state) } func (shard *ringShard) IsDown() bool { const threshold = 3 return atomic.LoadInt32(&shard.down) >= threshold } func (shard *ringShard) IsUp() bool { return !shard.IsDown() } // Vote votes to set shard state and returns true if state was changed. func (shard *ringShard) Vote(up bool) bool { if up { changed := shard.IsDown() atomic.StoreInt32(&shard.down, 0) return changed } if shard.IsDown() { return false } atomic.AddInt32(&shard.down, 1) return shard.IsDown() } //------------------------------------------------------------------------------ type ringShards struct { opt *RingOptions mu sync.RWMutex hash *consistenthash.Map shards map[string]*ringShard // read only list []*ringShard // read only len int closed bool } func newRingShards(opt *RingOptions) *ringShards { return &ringShards{ opt: opt, hash: newConsistentHash(opt), shards: make(map[string]*ringShard), } } func (c *ringShards) Add(name string, cl *Client) { shard := &ringShard{Client: cl} c.hash.Add(name) c.shards[name] = shard c.list = append(c.list, shard) } func (c *ringShards) List() []*ringShard { c.mu.RLock() list := c.list c.mu.RUnlock() return list } func (c *ringShards) Hash(key string) string { c.mu.RLock() hash := c.hash.Get(key) c.mu.RUnlock() return hash } func (c *ringShards) GetByKey(key string) (*ringShard, error) { key = hashtag.Key(key) c.mu.RLock() if c.closed { c.mu.RUnlock() return nil, pool.ErrClosed } hash := c.hash.Get(key) if hash == "" { c.mu.RUnlock() return nil, errRingShardsDown } shard := c.shards[hash] c.mu.RUnlock() return shard, nil } func (c *ringShards) GetByHash(name string) (*ringShard, error) { if name == "" { return c.Random() } c.mu.RLock() shard := c.shards[name] c.mu.RUnlock() return shard, nil } func (c *ringShards) Random() (*ringShard, error) { return c.GetByKey(strconv.Itoa(rand.Int())) } // heartbeat monitors state of each shard in the ring. func (c *ringShards) Heartbeat(frequency time.Duration) { ticker := time.NewTicker(frequency) defer ticker.Stop() for range ticker.C { var rebalance bool c.mu.RLock() if c.closed { c.mu.RUnlock() break } shards := c.list c.mu.RUnlock() for _, shard := range shards { err := shard.Client.Ping().Err() if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { internal.Logf("ring shard state changed: %s", shard) rebalance = true } } if rebalance { c.rebalance() } } } // rebalance removes dead shards from the Ring. func (c *ringShards) rebalance() { hash := newConsistentHash(c.opt) var shardsNum int for name, shard := range c.shards { if shard.IsUp() { hash.Add(name) shardsNum++ } } c.mu.Lock() c.hash = hash c.len = shardsNum c.mu.Unlock() } func (c *ringShards) Len() int { c.mu.RLock() l := c.len c.mu.RUnlock() return l } func (c *ringShards) Close() error { c.mu.Lock() defer c.mu.Unlock() if c.closed { return nil } c.closed = true var firstErr error for _, shard := range c.shards { if err := shard.Client.Close(); err != nil && firstErr == nil { firstErr = err } } c.hash = nil c.shards = nil c.list = nil return firstErr } //------------------------------------------------------------------------------ // Ring is a Redis client that uses consistent hashing to distribute // keys across multiple Redis servers (shards). It's safe for // concurrent use by multiple goroutines. // // Ring monitors the state of each shard and removes dead shards from // the ring. When a shard comes online it is added back to the ring. This // gives you maximum availability and partition tolerance, but no // consistency between different shards or even clients. Each client // uses shards that are available to the client and does not do any // coordination when shard state is changed. // // Ring should be used when you need multiple Redis servers for caching // and can tolerate losing data when one of the servers dies. // Otherwise you should use Redis Cluster. type Ring struct { cmdable ctx context.Context opt *RingOptions shards *ringShards cmdsInfoCache *cmdsInfoCache process func(Cmder) error processPipeline func([]Cmder) error } func NewRing(opt *RingOptions) *Ring { opt.init() ring := &Ring{ opt: opt, shards: newRingShards(opt), } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.process = ring.defaultProcess ring.processPipeline = ring.defaultProcessPipeline ring.cmdable.setProcessor(ring.Process) for name, addr := range opt.Addrs { clopt := opt.clientOptions() clopt.Addr = addr ring.shards.Add(name, NewClient(clopt)) } go ring.shards.Heartbeat(opt.HeartbeatFrequency) return ring } func (c *Ring) Context() context.Context { if c.ctx != nil { return c.ctx } return context.Background() } func (c *Ring) WithContext(ctx context.Context) *Ring { if ctx == nil { panic("nil context") } c2 := c.copy() c2.ctx = ctx return c2 } func (c *Ring) copy() *Ring { cp := *c return &cp } // Options returns read-only Options that were used to create the client. func (c *Ring) Options() *RingOptions { return c.opt } func (c *Ring) retryBackoff(attempt int) time.Duration { return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) } // PoolStats returns accumulated connection pool stats. func (c *Ring) PoolStats() *PoolStats { shards := c.shards.List() var acc PoolStats for _, shard := range shards { s := shard.Client.connPool.Stats() acc.Hits += s.Hits acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns acc.IdleConns += s.IdleConns } return &acc } // Len returns the current number of shards in the ring. func (c *Ring) Len() int { return c.shards.Len() } // Subscribe subscribes the client to the specified channels. func (c *Ring) Subscribe(channels ...string) *PubSub { if len(channels) == 0 { panic("at least one channel is required") } shard, err := c.shards.GetByKey(channels[0]) if err != nil { // TODO: return PubSub with sticky error panic(err) } return shard.Client.Subscribe(channels...) } // PSubscribe subscribes the client to the given patterns. func (c *Ring) PSubscribe(channels ...string) *PubSub { if len(channels) == 0 { panic("at least one channel is required") } shard, err := c.shards.GetByKey(channels[0]) if err != nil { // TODO: return PubSub with sticky error panic(err) } return shard.Client.PSubscribe(channels...) } // ForEachShard concurrently calls the fn on each live shard in the ring. // It returns the first error if any. func (c *Ring) ForEachShard(fn func(client *Client) error) error { shards := c.shards.List() var wg sync.WaitGroup errCh := make(chan error, 1) for _, shard := range shards { if shard.IsDown() { continue } wg.Add(1) go func(shard *ringShard) { defer wg.Done() err := fn(shard.Client) if err != nil { select { case errCh <- err: default: } } }(shard) } wg.Wait() select { case err := <-errCh: return err default: return nil } } func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) { shards := c.shards.List() firstErr := errRingShardsDown for _, shard := range shards { cmdsInfo, err := shard.Client.Command().Result() if err == nil { return cmdsInfo, nil } if firstErr == nil { firstErr = err } } return nil, firstErr } func (c *Ring) cmdInfo(name string) *CommandInfo { cmdsInfo, err := c.cmdsInfoCache.Get() if err != nil { return nil } info := cmdsInfo[name] if info == nil { internal.Logf("info for cmd=%s not found", name) } return info } func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { cmdInfo := c.cmdInfo(cmd.Name()) pos := cmdFirstKeyPos(cmd, cmdInfo) if pos == 0 { return c.shards.Random() } firstKey := cmd.stringArg(pos) return c.shards.GetByKey(firstKey) } // Do creates a Cmd from the args and processes the cmd. func (c *Ring) Do(args ...interface{}) *Cmd { cmd := NewCmd(args...) c.Process(cmd) return cmd } func (c *Ring) WrapProcess( fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, ) { c.process = fn(c.process) } func (c *Ring) Process(cmd Cmder) error { return c.process(cmd) } func (c *Ring) defaultProcess(cmd Cmder) error { for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { if attempt > 0 { time.Sleep(c.retryBackoff(attempt)) } shard, err := c.cmdShard(cmd) if err != nil { cmd.setErr(err) return err } err = shard.Client.Process(cmd) if err == nil { return nil } if !internal.IsRetryableError(err, cmd.readTimeout() == nil) { return err } } return cmd.Err() } func (c *Ring) Pipeline() Pipeliner { pipe := Pipeline{ exec: c.processPipeline, } pipe.cmdable.setProcessor(pipe.Process) return &pipe } func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(fn) } func (c *Ring) WrapProcessPipeline( fn func(oldProcess func([]Cmder) error) func([]Cmder) error, ) { c.processPipeline = fn(c.processPipeline) } func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) if hash != "" { hash = c.shards.Hash(hashtag.Key(hash)) } cmdsMap[hash] = append(cmdsMap[hash], cmd) } for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { if attempt > 0 { time.Sleep(c.retryBackoff(attempt)) } var mu sync.Mutex var failedCmdsMap map[string][]Cmder var wg sync.WaitGroup for hash, cmds := range cmdsMap { wg.Add(1) go func(hash string, cmds []Cmder) { defer wg.Done() shard, err := c.shards.GetByHash(hash) if err != nil { setCmdsErr(cmds, err) return } cn, err := shard.Client.getConn() if err != nil { setCmdsErr(cmds, err) return } canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) shard.Client.releaseConnStrict(cn, err) if canRetry && internal.IsRetryableError(err, true) { mu.Lock() if failedCmdsMap == nil { failedCmdsMap = make(map[string][]Cmder) } failedCmdsMap[hash] = cmds mu.Unlock() } }(hash, cmds) } wg.Wait() if len(failedCmdsMap) == 0 { break } cmdsMap = failedCmdsMap } return cmdsFirstErr(cmds) } func (c *Ring) TxPipeline() Pipeliner { panic("not implemented") } func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { panic("not implemented") } // Close closes the ring client, releasing any open resources. // // It is rare to Close a Ring, as the Ring is meant to be long-lived // and shared between many goroutines. func (c *Ring) Close() error { return c.shards.Close() } func newConsistentHash(opt *RingOptions) *consistenthash.Map { return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) }