一、引言
背景
我们在做系统时,很多时候是处理实时的任务,请求来了马上就处理,然后立刻给用户以反馈。但有时也会遇到非实时的任务,比如确定的时间点发布重要公告。或者需要在用户做了一件事情的X分钟/Y小时后,EG:
“PM:我们需要在这个用户通话开始10分钟后给予提醒给他们发送奖励”
对其特定动作,比如通知、发券等等。一般我接触到的解决方法中在比较小的服务里都会自己维护一个backend,但是随着这种backend和server增多,这种方法很大程度和本身业务耦合在一起,所以这时需要一个延时队列服务。
名词解释
topic_list队列:每一个来的延时请求都应该又一个延时主题参考kafka,在逻辑上划分出一个队列出来每个业务分开处理;
topic_info队列:每一个队列topic都存在一个新的队列里,每次扫描topic信息检测新的topic建立与销毁管理服务协程数量;
offset:当前消费的进度;
new_offset:新消费的进度,预备更迭offset;
topic_offset_lock:分布式锁。
二、设计目标
功能清单
1、延时信息添加接口基于http调用
2、拥有存储队列特性,可保存近3天内的队列消费数据
3、提供消费功能
4、延时通知
性能指标
预计接口的调用量:单秒单类任务数3500,多秒单类任务数1300
压测结果:
简单压测
wrk写入qps:259.3s 写入9000条记录 单线程 无并发
触发性能/准确率:单秒1000,在测试机无延长。单秒3000时,偶尔出现1-2秒延迟。受内存和cpu影响。
三、系统设计
交互流程
时序图
目前使用全缓存模式
key设计:
topic管理list key: XX:DELAY_TOPIC_LIST type:list
topic_list key: XX:DELAY_SIMPLE_TOPIC_TASK-%s(根据topic分key) type:zset
topic_info key: XX:DELAY_REALL_TOPIC_TASK-%s(根据topic分key) type:hash
topic_offset key: XX:DELAY_TOPIC_OFFSET-%s(根据topic分key) type:string
topic_lock key: xx:DELAY_TOPIC_RELOAD_LOCK-%s(根据topic分key) type:string
六、接口设计
delay.task.addv1 (延时队列添加v1)
请求示例
curl -d '{ "topic": "xxx", // 业务topic "timing_moment": , // 单位秒,要定时时刻 "content": "{}" // 消息体,json串 }' 'http://127.0.0.1:xxxx/delay/task/add'
返回示例
{ "dm_error": 0, "error_msg": "操作成功", "task_id":112345465765 }
pull回调方式返回(v2不再支持)
请求示例
curl -d '{ "topic": "xxxx", // 业务topic "task_id":1324568798765 // taskid,选填,有则返回特定消息 }' 'http://127.0.0.1:xxxx/delay/task/pull'
返回示例
{ "dm_error": 0, "error_msg": "操作成功" "content":"{"\xxx"\}" }
delay.task.addv2 (延时队列添加v2)
请求示例
curl -d '{ "topic": "xxx", // 业务topic "timing_moment": , // 单位秒,要定时时刻 "content": "{ // 消息内容(json string) "sn":"message.call", // 服务发现名字(或为配置服务名) "url":"/ev/tp/xxxx", // 回调url "xxx":"xxx" // 其他字段 }" }' 'http://127.0.0.1:xxxx/delay/task/add'
示例
curl -d '{ "topic":"xxxx_push", "content":"{ "uid":"111111", "sn":"other.server", "url":"/xxxx/callback", "msg_type":"gift", }", "timing_moment":1565700615 }' http://127.0.0.1:xxxx/delay/task/add
返回示例
{ "dm_error": 0, "error_msg": "操作成功", "task_id":112345465765 }
七、MQ设计(v2不再支持)
关于kafka消费方式返回:
topic: delay_base_push 固定返回格式 { "topic": "xxxx", // 业务topic "content": "{}" // 单条生产消息content }
八、其他设计
唯一号设计
调用存储模块,利用redis的自增结合逻辑生成唯一号具体逻辑如下:
func (c *CacheManager) OperGenTaskid() (uint64, error) { now := time.Now().Unix() key := c.getDelayTaskIdKey() reply, err := c.DelayRds.Do("INCR", key) if err != nil { log.Errorf("genTaskid INCR key:%s, error:%s", key, err) return 0, err } version := reply.(int64) if version == 1 { //默认认为1秒能创建100个任务 c.DelayRds.Expire(key, time.Duration(100)*time.Second) } incrNum := version % 10000 taskId := (uint64(now)*10000 + uint64(incrNum)) log.Debugf("genTaskid INCR key:%s, taskId:%d", key, taskId) return taskId, nil }
分布式锁设计
func (c *CacheManager) SetDelayTopicLock(ctx context.Context, topic string) (bool, error) { key := c.getDelayTopicReloadLockKey(topic) reply, err := c.DelayRds.Do("SET", key, "lock", "NX", "EX", 2) if err != nil { log.Errorf("SetDelayTopicLock SETNX key:%s, cal:%v, error:%s", key, "lock", err) return false, err } if reply == nil { return false, nil } log.Debugf("SetDelayTopicLock SETNXEX topic:%s lock:%d", topic, false) return true, nil }
九、设计考虑
健壮性
熔断策略:
for { time.Sleep(time.Second) fmt.Println("test") }
2、time.Tick函数:
t1:=time.Tick(3*time.Second) for { select { case <-t1: fmt.Println("test") } }
3、其中Tick定时任务,也可以先使用time.Ticker函数获取Ticker结构体,然后进行阻塞监听信息,这种方式可以手动选择停止定时任务,在停止任务时,减少对内存的浪费。
t:=time.NewTicker(time.Second) for { select { case <-t.C: fmt.Println("test") t.Stop() } }
在最开始以为sleep是单独处理直接停掉了这个协程,所以第一版用的也是sleep,但是在收集资料后发现这几种方式都创建了timer,并加入了定时任务处理协程。实际上这两个函数产生的timer都放入了同一个timer堆(golang时间轮),都在定时任务处理协程中等待被处理。Tick,Sleep,time.After函数都使用的timer结构体,都会被放在同一个协程中统一处理,这样看起来使用Tick,Sleep并没有什么区别。实际上是有区别的,本文不是讨论golang定时执行任务time.sleep和time.tick的优劣,以后会在后续文章进行探讨。使用channel阻塞协程完成定时任务比较灵活,可以结合select设置超时时间以及默认执行方法,而且可以设置timer的主动关闭,所以,建议使用time.Tick完成定时任务。
2、存储模块问题
目前是全缓存,没有DB参与,首先redis(codis)的高可用是个问题,在熔断之后采取“不作为”的判断也是有问题的,所以对未来展望,首先是:
1·单机的数据结构使用多时间轮。为了减少数据的路程,将load数据的过程异步加载到机器,减少网络io所造成的时间损耗。同时也是减少对redis的依赖
2·引入ZooKeeper或者添加集群备份,leader。保证集群中至少有两台机器load一个topic的数据,leader可以协调消费保证高可用
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。