陈奇网络工作室

利用Redis的SortedSet和String结构快速实现延迟队列

本文的内容

显示

1.

队列延迟的具体实现

2.

关于队列延迟实现的思考

3.

使用延迟队列

3.1.

推荐阅读

在后端服务中,经常会出现数据库写操作在一个异步队列中执行的场景,这个异步队列由多个进程运行。此时,如果将同一个资源写入数据库,很可能会造成数据被覆盖等问题,因此业务层需要在更新数据库之前将其锁定,以保证在更改同一个资源时不会受到其他更新操作的干扰,保证数据的一致性。

但是如果数据库更新在更新前被锁定,那么更新数据库的新请求又来了,但是这个更新操作不能丢弃,需要延迟,所以需要加入延迟队列,延迟。

那么如何实现延迟队列呢?利用Redis的SortedSet和String两种结构可以很容易地实现。

队列延迟的具体实现

#编码:utf8

'''延迟队列' ' '

导入json

导入时间

导入uuid

导入redis

类延迟队列(对象):

''延迟队列'''

QUEUE_KEY=\ '延迟队列\ '

数据前缀=\ '队列数据\ '

def __init__(self,conf):

主机,端口,db=conf[\ '主机\'],conf[\ '端口\'],conf[\ '数据库\']

self.client=redis。Redis(主机=主机,端口=端口,数据库=数据库)

定义推送(自身,数据):

''推送

:参数数据:数据

'''

#唯一ID

task_id=str(uuid.uuid4())

data_key=\'{}_{}\ '格式(自我。数据前缀,任务标识)

#保存字符串

self.client.set(data_key,json.dumps(data))

# add zset(queue_key=data_key,ts)

self.client.zadd(self。QUEUE_KEY,data_key,int(time.time()))

定义弹出(自身,数量=5,先前=3):

''弹出多个数据

:param num: pop多少?

:param previous:多少秒前获取推送的数据?

'''

#取出前几秒的数据推送。

until_ts=int(time.time()) -上一个

task _ ids=self . client . zrangebyscore(

自我。QUEUE_KEY,0,until_ts,start=0,num=num)

如果不是任务标识号:

return []

#使用删除的原子性来防止重复数据的并发获取。

管道=self.client.pipeline()

对于任务标识中的任务标识:

pipe.zrem(self。队列关键字,任务标识)

data_keys=[

数据键

对于data_key,zip中的标志(task_ids,pipe.execute())

if标志

]

如果不是data_keys:

return []

#加载数据

数据=[

json.loads(项目)

对于self.client.mget中的项目(data_keys)

]

#删除字符串键

self.client.delete(*data_keys)

返回数据

关于队列延迟实现的思考

当推送数据时,执行以下步骤:

生成唯一密钥,这里用uuid4 (uuid4是根据随机数生成的,重复概率很小)。

序列化数据并将其存储在这个唯一键的字符串结构中。

将这个惟一的键添加到SortedSet中,分数就是当前的时间戳。

这里用SortedSet记录添加数据的时间,方便在获取时根据时间获取之前的数据,达到延时的效果。

真实数据存储在字符串结构中,在获取真实数据之前获取数据的密钥。

这里可能有人会疑惑,为什么不把真实的数据放在SortedSet的名下呢?

将数据放入名称中可能会导致立即写入相同的数据,从而导致多段数据变成一段。

将数据序列化成SortedSet的名字有点太大,不符合使用习惯。

流行音乐

这个pop可以获取多条数据,上面的代码默认在延迟队列中3秒前获取5条数据。具体思路如下:

计算前几秒之前的时间戳,并使用SortedSet的zrangebysocre方法获取前几秒之前添加的唯一键。

如果SortedSet中有数据,利用Redis删除的原子性,使用zrem依次删除SortedSet的元素。如果删除成功,请使用来防止多个进程同时执行此方法并获取相同的数据。

然后获取唯一可用的键,并从String中获取实际数据。

这里最重要的是第二步。在取出SortedSet的数据后,我们必须防止其他进程并发获取相同的数据,所以我们在这里使用zrem依次删除元素,以保证只有删除成功的进程才能使用这些数据。

使用延迟队列

#编码:utf8

导入时间

从延迟导入延迟队列

redis _ conf={ \ ' host \ '\ ' 127 . 0 . 0 . 1 \ '\'port\' 6379,\'db\' 0}

#构造一个延迟队列对象

queue=延迟队列(redis_conf)

#推送20条数据

对于范围内的I(20):

item={\'user\' \'user-{}\ '格式(一)}

queue.push(项目)

#一次从延迟队列中获取10条数据

data=queue.pop(num=10)

#您无法立即获得刚刚添加的内容。

断言len(data)==0

#睡眠10秒钟

时间.睡眠(10)

#从延迟队列中获取10条数据

data=queue.pop(num=10)

断言len(data)==10

#从延迟队列中获取5秒前添加的10条数据。

data=queue.pop(num=10,previous=5)

断言len(data)==10

使用起来相对简单。在实际使用过程中,每次处理正常队列时,延时队列的数据都是通过上述方法得到的。如果延迟队列中有数据,可以根据业务正常处理,达到数据延迟处理的效果。

来源| http://kaito-kidd.com/2016/12/26/delay-queue-based-on-redis/

关于西部数码代理

成都西维数码科技有限公司成立于2002年,注册资金1000万元。其总部位于天府之国成都——,品牌为西部数码代理(www.chenqinet.cn)。深耕IDC行业十余年,在中国拥有北京、广东、郑州、成都、绵阳、香港等多家IDC云计算安全数据中心,在美国拥有海外数据中心。自主研发了虚拟主机、灵活度云服务器、西部数据企业云邮箱等产品,广受用户欢迎。我们始终坚持用户体验至上的价值导向,深度挖掘用户需求。目前,超过100万用户通过我们注册和管理了超过1000万个域名,共有超过50万个网站在我们自研的云主机平台上运行。我们服务的用户有:宝贝回家找子网、四川大学、链家网(北京)科技有限公司、四川省互联网协会、沱牌集团、谭木匠、中铁二局、四川省中国青年旅行社。

我们始终坚持“以人为本、以客为尊、持续创新”的核心价值观,抓住各种发展机遇,不断创新发展理念,不断转变发展方式,不断解决发展难题。随着企业的发展,我们的业务也不断发展为基于云计算的云托管业务、域名注册、域名交易等相关业务。公司从最初的几个员工发展到近200人的精英团队,并在IDC和中国建立了业务。成为拥有多项自主知识产权的国家高新技术企业、获得ICANN和CNNIC双重认证的国际顶级域名注册服务机构、首批获得工信部颁发的国家云服务牌照的企业之一。

后台-系统设置-扩展变量-手机广告位-内容页底部广告位3