Loading... ## 前言回顾 <div class="preview"> <div class="post-inser post box-shadow-wrap-normal"> <a href="https://www.epicmo.cn/index.php/archives/22/" target="_blank" class="post_inser_a no-external-link no-underline-link"> <div class="inner-image bg" style="background-image: url(https://www.epicmo.cn/usr/themes/handsome/assets/img/sj/2.jpg);background-size: cover;"></div> <div class="inner-content" > <p class="inser-title">我应该怎么用我的Redis(二)?</p> <div class="inster-summary text-muted"> 前言回顾实际应用计数应用场景对于我们业务中的计数场景,我们可能会对某个用户有多个关联的数据,例如对于一个 B站 用... </div> </div> </a> <!-- .inner-content #####--> </div> <!-- .post-inser ####--> </div> ## 使用场景 ### 限流 #### 业务需求 我们需要对业务进行限流,比如我们要求1秒内只能让某个服务访问 N 次,如果超过 N 那么就紧张访问,或者让他们等待。 #### 业务代码 基本的思路是使用一个 Key + Incr 操作来完成: 例如: - Key: comment_freq_limit_xxx (timestamp) 对这个Key调用Incr,超过限制N则禁止访问。 #### 数据结构 是我们(一)中所讲到的 String 结构 ### 分布式锁 #### 业务需求 并发场景,要求一次只能由一个协程执行,执行完成后其他在等待的协程才能够执行。 #### 业务代码 可以使用 Redis 的 SetNX 实现,利用的特性有: - Redis 单线程执行操作 - SetNX 只有没有设置过才能够执行成功 <div class="panel panel-default collapse-panel box-shadow-wrap-lg"><div class="panel-heading panel-collapse" data-toggle="collapse" data-target="#collapse-672ef9500c89033d2dc1e718baf88ff327" aria-expanded="true"><div class="accordion-toggle"><span style="">代码</span> <i class="pull-right fontello icon-fw fontello-angle-right"></i> </div> </div> <div class="panel-body collapse-panel-body"> <div id="collapse-672ef9500c89033d2dc1e718baf88ff327" class="collapse collapse-content"><p></p> ```go package example // setnx 分布式锁 import ( "context" "fmt" "strconv" "time" "gitee.com/wedone/redis_course/example/common" ) const resourceKey = "syncKey" // 分布式锁的key const exp = 800 * time.Millisecond // 锁的过期时间,避免死锁 // EventLog 搜集日志的结构 type EventLog struct { eventTime time.Time log string } // Ex02Params Ex02的自定义函数 type Ex02Params struct { } // Ex02 只是体验SetNX的特性,不是高可用的分布式锁实现 // 该实现存在的问题: // (1) 业务超时解锁,导致并发问题。业务执行时间超过锁超时时间 // (2) redis主备切换临界点问题。主备切换后,A持有的锁还未同步到新的主节点时,B可在新主节点获取锁,导致并发问题。 // (3) redis集群脑裂,导致出现多个主节点 func Ex02(ctx context.Context) { eventLogger := &common.ConcurrentEventLogger{} // new一个并发执行器 cInst := common.NewConcurrentRoutine(10, eventLogger) // 并发执行用户自定义函数work cInst.Run(ctx, Ex02Params{}, ex02Work) // 按日志时间正序打印日志 eventLogger.PrintLogs() } func ex02Work(ctx context.Context, cInstParam common.CInstParams) { routine := cInstParam.Routine eventLogger := cInstParam.ConcurrentEventLogger defer ex02ReleaseLock(ctx, routine, eventLogger) for { // 1. 尝试获取锁 // exp - 锁过期设置,避免异常死锁 acquired, err := RedisClient.SetNX(ctx, resourceKey, routine, exp).Result() // 尝试获取锁 if err != nil { eventLogger.Append(common.EventLog{ EventTime: time.Now(), Log: fmt.Sprintf("[%s] error routine[%d], %v", time.Now().Format(time.RFC3339Nano), routine, err), }) panic(err) } if acquired { // 2. 成功获取锁 eventLogger.Append(common.EventLog{ EventTime: time.Now(), Log: fmt.Sprintf("[%s] routine[%d] 获取锁", time.Now().Format(time.RFC3339Nano), routine), }) // 3. sleep 模拟业务逻辑耗时 time.Sleep(10 * time.Millisecond) eventLogger.Append(common.EventLog{ EventTime: time.Now(), Log: fmt.Sprintf("[%s] routine[%d] 完成业务逻辑", time.Now().Format(time.RFC3339Nano), routine), }) return } else { // 没有获得锁,等待后重试 time.Sleep(100 * time.Millisecond) } } } func ex02ReleaseLock(ctx context.Context, routine int, eventLogger *common.ConcurrentEventLogger) { routineMark, _ := RedisClient.Get(ctx, resourceKey).Result() if strconv.FormatInt(int64(routine), 10) != routineMark { // 其它协程误删lock panic(fmt.Sprintf("del err lock[%s] can not del by [%d]", routineMark, routine)) } set, err := RedisClient.Del(ctx, resourceKey).Result() if set == 1 { eventLogger.Append(common.EventLog{ EventTime: time.Now(), Log: fmt.Sprintf("[%s] routine[%d] 释放锁", time.Now().Format(time.RFC3339Nano), routine), }) } else { eventLogger.Append(common.EventLog{ EventTime: time.Now(), Log: fmt.Sprintf("[%s] routine[%d] no lock to del", time.Now().Format(time.RFC3339Nano), routine), }) } if err != nil { fmt.Errorf("[%s] error routine=%d, %v", time.Now().Format(time.RFC3339Nano), routine, err) panic(err) } } ``` <p></p></div></div></div> 但是上面使用的 SetNX 也不能够做成真正的高可用的锁,有以下实现上的问题: - 业务超时解锁,导致并发问题。业务执行时间超过锁超时时间 - redis主备切换临界点问题。主备切换后,A持有的锁还未同步到新的主节点时,B可在新主节点获取锁,导致并发问题。 - redis集群脑裂,导致出现多个主节点 最后修改:2023 年 08 月 08 日 © 允许规范转载 打赏 赞赏作者 支付宝微信 赞 0 如果你觉得文章帮到你了,请以沫喝杯奶茶吧~