本文的内容
显示
1.
队列延迟的具体实现
2.
关于队列延迟实现的思考
3.
使用延迟队列
3.1.
推荐阅读
在后端服务中,经常会出现数据库写操作在一个异步队列中执行的场景,这个异步队列由多个进程运行。此时,如果将同一个资源写入数据库,很可能会造成数据被覆盖等问题,因此业务层需要在更新数据库之前将其锁定,以保证在更改同一个资源时不会受到其他更新操作的干扰,保证数据的一致性。
但是如果数据库更新在更新前被锁定,那么更新数据库的新请求又来了,但是这个更新操作不能丢弃,需要延迟,所以需要加入延迟队列,延迟。
那么如何实现延迟队列呢?利用Redis的SortedSet和String两种结构可以很容易地实现。
队列延迟的具体实现
#编码:utf8
'''延迟队列' ' '
导入json
导入时间
导入uuid
导入redis
类延迟队列(对象):
''延迟队列'''
QUEUE_KEY=\ '延迟队列\ '
数据前缀=\ '队列数据\ '
def __init__(self,conf):
主机,端口,db=conf[\ '主机\'],conf[\ '端口\'],conf[\ '数据库\']
self.client=redis。Redis(主机=主机,端口=端口,数据库=数据库)
定义推送(自身,数据):
''推送
:参数数据:数据
'''
#唯一ID
task_id=str(uuid.uuid4())
data_key=\'{}_{}\ '格式(自我。数据前缀,任务标识)
#保存字符串
self.client.set(data_key,json.dumps(data))
# add zset(queue_key=data_key,ts)
self.client.zadd(self。QUEUE_KEY,data_key,int(time.time()))
定义弹出(自身,数量=5,先前=3):
''弹出多个数据
:param num: pop多少?
:param previous:多少秒前获取推送的数据?
'''
#取出前几秒的数据推送。
until_ts=int(time.time()) -上一个
task _ ids=self . client . zrangebyscore(
自我。QUEUE_KEY,0,until_ts,start=0,num=num)
如果不是任务标识号:
return []
#使用删除的原子性来防止重复数据的并发获取。
管道=self.client.pipeline()
对于任务标识中的任务标识:
pipe.zrem(self。队列关键字,任务标识)
data_keys=[
数据键
对于data_key,zip中的标志(task_ids,pipe.execute())
if标志
]
如果不是data_keys:
return []
#加载数据
数据=[
json.loads(项目)
对于self.client.mget中的项目(data_keys)
]
#删除字符串键
self.client.delete(*data_keys)
返回数据
关于队列延迟实现的思考
推
当推送数据时,执行以下步骤:
生成唯一密钥,这里用uuid4 (uuid4是根据随机数生成的,重复概率很小)。
序列化数据并将其存储在这个唯一键的字符串结构中。
将这个惟一的键添加到SortedSet中,分数就是当前的时间戳。
这里用SortedSet记录添加数据的时间,方便在获取时根据时间获取之前的数据,达到延时的效果。
真实数据存储在字符串结构中,在获取真实数据之前获取数据的密钥。
这里可能有人会疑惑,为什么不把真实的数据放在SortedSet的名下呢?
将数据放入名称中可能会导致立即写入相同的数据,从而导致多段数据变成一段。
将数据序列化成SortedSet的名字有点太大,不符合使用习惯。
流行音乐
这个pop可以获取多条数据,上面的代码默认在延迟队列中3秒前获取5条数据。具体思路如下:
计算前几秒之前的时间戳,并使用SortedSet的zrangebysocre方法获取前几秒之前添加的唯一键。
如果SortedSet中有数据,利用Redis删除的原子性,使用zrem依次删除SortedSet的元素。如果删除成功,请使用来防止多个进程同时执行此方法并获取相同的数据。
然后获取唯一可用的键,并从String中获取实际数据。
这里最重要的是第二步。在取出SortedSet的数据后,我们必须防止其他进程并发获取相同的数据,所以我们在这里使用zrem依次删除元素,以保证只有删除成功的进程才能使用这些数据。
使用延迟队列
#编码:utf8
导入时间
从延迟导入延迟队列
redis _ conf={ \ ' host \ '\ ' 127 . 0 . 0 . 1 \ '\'port\' 6379,\'db\' 0}
#构造一个延迟队列对象
queue=延迟队列(redis_conf)
#推送20条数据
对于范围内的I(20):
item={\'user\' \'user-{}\ '格式(一)}
queue.push(项目)
#一次从延迟队列中获取10条数据
data=queue.pop(num=10)
#您无法立即获得刚刚添加的内容。
断言len(data)==0
#睡眠10秒钟
时间.睡眠(10)
#从延迟队列中获取10条数据
data=queue.pop(num=10)
断言len(data)==10
#从延迟队列中获取5秒前添加的10条数据。
data=queue.pop(num=10,previous=5)
断言len(data)==10
使用起来相对简单。在实际使用过程中,每次处理正常队列时,延时队列的数据都是通过上述方法得到的。如果延迟队列中有数据,可以根据业务正常处理,达到数据延迟处理的效果。
来源| http://kaito-kidd.com/2016/12/26/delay-queue-based-on-redis/
关于西部数码代理
成都西维数码科技有限公司成立于2002年,注册资金1000万元。其总部位于天府之国成都——,品牌为西部数码代理(www.chenqinet.cn)。深耕IDC行业十余年,在中国拥有北京、广东、郑州、成都、绵阳、香港等多家IDC云计算安全数据中心,在美国拥有海外数据中心。自主研发了虚拟主机、灵活度云服务器、西部数据企业云邮箱等产品,广受用户欢迎。我们始终坚持用户体验至上的价值导向,深度挖掘用户需求。目前,超过100万用户通过我们注册和管理了超过1000万个域名,共有超过50万个网站在我们自研的云主机平台上运行。我们服务的用户有:宝贝回家找子网、四川大学、链家网(北京)科技有限公司、四川省互联网协会、沱牌集团、谭木匠、中铁二局、四川省中国青年旅行社。
我们始终坚持“以人为本、以客为尊、持续创新”的核心价值观,抓住各种发展机遇,不断创新发展理念,不断转变发展方式,不断解决发展难题。随着企业的发展,我们的业务也不断发展为基于云计算的云托管业务、域名注册、域名交易等相关业务。公司从最初的几个员工发展到近200人的精英团队,并在IDC和中国建立了业务。成为拥有多项自主知识产权的国家高新技术企业、获得ICANN和CNNIC双重认证的国际顶级域名注册服务机构、首批获得工信部颁发的国家云服务牌照的企业之一。