RQ (Redis Queue)是一个轻量级的Python任务队列,这里记录一下它的简单使用。
首先安装RQ(这里使用的Python版本是3.8.0)
pip install rq==1.10.1
随后创建如下的文件
.├── __init__.py├── jobs.py└── run.py
其中__init__.py
中通过连接redis-server创建了两个queue:default和queue_1
1 | from redis import Redis |
之后我们在jobs.py
中定义需要执行的任务
1 | import requests |
创建好了任务发送队列和任务本身之后,我们就可以启动worker了。我们让worker监听两个队列:default和queue_1
rq worker queue_1 default --with-scheduler
随后我们就可以在run.py
中发送任务给worker去执行了
1 | from datetime import timedelta |
执行结果如下
None330None[b'rq:workers:queue_1', b'rq:worker:93618c280dbb445d92380fc26a33bc93', b'rq:job:909c68e7-a517-469a-ad33-c7f350e4f1dd', b'rq:scheduler-lock:default', b'rq:failed:default', b'rq:wip:queue_1', b'rq:job:f098f9f7-82b2-4a4f-b482-6d4cffd9b09e', b'rq:job:e006b2b5-69cf-4240-a0f2-202d31d71ad9', b'rq:scheduler-lock:queue_1', b'rq:finished:default', b'rq:queues', b'rq:job:aaaaf0e1-3673-4652-8109-20c46ebb89e0', b'rq:job:75b3839e-b8bb-45d8-bfe0-64ce63a502ab', b'rq:job:3c013ebb-7e68-4af2-9291-2abc7f8f0ba3', b'rq:job:8a40b7af-6b10-46a7-9b20-c9af734c0e26', b'rq:clean_registries:default', b'rq:job:43db8266-c1d0-4256-b355-00135e6a67c6', b'rq:clean_registries:queue_1', b'rq:job:e0f68454-c599-45db-b572-a7364f911b4f', b'rq:job:f9adc85e-400a-4e33-95cc-6f68c4d8a0d2', b'rq:workers', b'rq:workers:default']
worker的输出如下
➜ rq_demo git:(master) ./rq.sh17:28:32 Worker rq:worker:93618c280dbb445d92380fc26a33bc93: started, version 1.10.117:28:32 Subscribing to channel rq:pubsub:93618c280dbb445d92380fc26a33bc9317:28:32 *** Listening on queue_1, default...17:28:32 Trying to acquire locks for default, queue_117:28:32 Cleaning registries for queue: queue_117:28:32 Cleaning registries for queue: default17:28:38 default: jobs.count_words_at_url('https://www.jd.com') (f098f9f7-82b2-4a4f-b482-6d4cffd9b09e)370017:28:39 default: Job OK (f098f9f7-82b2-4a4f-b482-6d4cffd9b09e)17:28:39 Result is kept for 500 seconds17:28:39 default: jobs.count_words_at_url('http://nvie.com') (43db8266-c1d0-4256-b355-00135e6a67c6)17:28:40 default: Job OK (43db8266-c1d0-4256-b355-00135e6a67c6)17:28:40 Result is kept for 500 seconds17:28:45 queue_1: jobs.get_redis_keys() (75b3839e-b8bb-45d8-bfe0-64ce63a502ab)17:28:45 queue_1: Job OK (75b3839e-b8bb-45d8-bfe0-64ce63a502ab)17:28:45 Result is kept for 500 seconds
可以看到任务已经被执行了。当然,我们也可以起多个worker来同时接受任务去执行,这样可以提高任务的执行效率。
本文涉及到的代码都放在了我的GitHub上面。