Flask Vue.js全栈开发|第17章:RQ实现后台任务
Synopsis: 通常对于处理时间较长的任务,我们应该放在后台进行异步处理,这样就不会阻塞当前请求。本文将通过 Redis Queue 来实现 [群发私信/邮件] 和 [导出文章] 后台任务(且动态显示任务进度),如果你有多个任务要并发执行,请开启多个 rq worker 即可;其中 [导出文章] 的代码在 http://www.madmalls.com/blog/post/latest-code 中
本系列的最新代码将持续更新到: http://www.madmalls.com/blog/post/latest-code/
1. Redis Queue
RQ (Redis Queue)
是 Python 中用来实现简单的 任务队列
,它比 Celery
更加易用,但它的 broker
只支持 Redis
,而 Celery 的 broker 支持 RabbitMQ
、Redis
、Amazon SQS
1.1 在 Linux 上启动后端 Flask API
由于 RQ
的 workers 只能在有 fork()
函数的操作系统上正常工作,所以 Windows 无法正常启动 rq worker
,请参考: http://python-rq.org/docs/#limitations
所以为了方便,我们准备将后端 Flask API 应用迁移到 Linux 中,关于在 Linux 中如何部署需要到第 20 章才讲,本章只是简单介绍在 CentOS 上如何重新运行起我们的后端应用
首先你需要安装 CentOS
,比如参考: http://www.madmalls.com/blog/post/customize-centos-7-3-autoinstall-iso/
然后安装 Python 3
,请参考: http://www.madmalls.com/blog/post/deploy-flask-gunicorn-nginx-supervisor-on-centos7/#3-python3
将后端代码拷贝到 CentOS 上,并执行:
$ cd back-end $ python -m venv venv $ source venv/bin/activate (venv)$ pip install -r requirements.txt # Flask-Migrate create database (venv)$ flask db upgrade # Pre deploy, eg. insert roles (venv)$ flask deploy # create back-end/.env file, like this FLASK_APP=madblog.py FLASK_DEBUG=1 (venv)$ flask run -h 0.0.0.0 -p 5000
假设你的 CentOS 的 IP 是 192.168.80.1
,则在你的笔记本上浏览器访问: http://192.168.80.1:5000/api/ping
,如果返回 "Pong!"
则说明正常
此时,你的笔记本上只需要运行前端应用,但需要修改 API 接口地址。修改 front-end/src/http.js
:
import Vue from 'vue' import axios from 'axios' import router from './router' import store from './store' // 基础配置 if (process.env.NODE_ENV === 'production') { axios.defaults.baseURL = 'http://www.madmalls.com:5000'; } else { axios.defaults.baseURL = 'http://192.168.80.1:5000'; }
然后重启前端应用
如果应用能够正常访问后端 API,则进行下一节
1.2 Linux 上安装 Redis
$ yum install -y epel-release
$ yum install -y redis
$ systemctl start redis
$ systemctl enable redis
验证 Redis
是否成功启动了:
$ redis-cli ping PONG $ ss -tunlp | grep redis tcp LISTEN 0 128 127.0.0.1:6379 *:* users:(("redis-server",pid=18833,fd=5)) tcp LISTEN 0 128 192.168.80.1:6379 *:* users:(("redis-server",pid=18833,fd=4))
1.3 安装 RQ
后端需要安装 rq
模块,它会自动把 redis
也安装上
然后重新启动 Flask:
1.4 RQ 连接 Redis
按照常例,我们将配置 Redis
的连接参数到 back-end/config.py
文件中:
意思是,如果没有设置环境变量 REDIS_URL
(可以通过 export REDIS_URL=xxx
命令或者写入 back-end/.env
文件中)的话,直接使用 redis://
(表示当前主机上的 Redis,端口是默认的 6379
)
然后,修改 back-end/app/__init__.py
:
from redis import Redis import rq ... def configure_app(app, config_class): app.config.from_object(config_class) # 不检查路由中最后是否有斜杠/ app.url_map.strict_slashes = False # 整合RQ任务队列 app.redis = Redis.from_url(app.config['REDIS_URL']) app.task_queue = rq.Queue('madblog-tasks', connection=app.redis, default_timeout=3600) # 设置任务队列中各任务的执行最大超时时间为 1 小时
我们通过 rq.Queue()
创建了一个 任务队列,名称为 madblog-tasks
,后续应用想将那些耗时较久的函数放到后台执行的话,就投放到这个任务队列中去。之后,启动一个或多个 rq worker madblog-tasks
,指示它们监视 madblog-tasks
这个任务队列,一旦里面有任务了,它们就会 立即
执行! 它们与 Flask 应用是不同的进程,所以在它们执行那些耗时较久的任务时,并不会阻塞 Flask 应用
1.5 任务进入队列(Enqueueing Jobs)
每一个 任务(job/task)
其实就是一个普通的 Python 函数,但是也有需要注意的地方,请参考: http://python-rq.org/docs/#considerations-for-jobs
我们准备把所有需要放到后台执行的任务函数,存放到 back-end/app/utils/tasks.py
这个文件中,新建此文件,增加一个测试函数:
import time def test_rq(num): print('Starting task') for i in range(num): print(i) time.sleep(1) print('Task completed') return 'Done'
将最新代码同步到 CentOS 上,再启动 Flask Shell
:
(venv) $ flask shell Python 3.6.4 (default, Apr 13 2019, 20:59:14) [GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux App: app [production] Instance: /root/flask-vuejs-madblog/back-end/instance >>> job = app.task_queue.enqueue('app.utils.tasks.test_rq', 60) # 将一个任务压入任务队列中 >>> job Job('96231bff-0680-4ebf-aa37-88372946e524', enqueued_at=datetime.datetime(2019, 4, 16, 1, 48, 54, 295339)) >>> job.get_id() # 获取任务的ID '96231bff-0680-4ebf-aa37-88372946e524' >>> job.status # 任务当前的状态,只是进入队列了,还没有被 rq worker 开始执行,因为我们还没有启动 worker 'queued' >>> job.func_name # 任务实际上将要执行的函数 'app.utils.tasks.test_rq' >>> job.args # equeue() 方法中第一个参数是要执行的函数名,后面的参数都将传递给要执行的函数中去 (60,) >>> job.kwargs {}
此时,我们再打开一个 Shell,切换到后端项目的根目录下,激活虚拟环境,最后启动 rq worker
:
$ cd /root/flask-vuejs-madblog/back-end $ source venv/bin/activate $ rq worker madblog-tasks # 表示启动了一个 worker,并让它监视名为 madblog-tasks 的任务队列
由于之前已经往 madblog-tasks
这个 任务队列 中投放了一个任务,所以 rq worker
会立即执行它:
(venv) $ rq worker madblog-tasks 10:00:01 RQ worker 'rq:worker:CentOS.33041' started, version 0.13.0 10:00:01 *** Listening on madblog-tasks... 10:00:01 Cleaning registries for queue: madblog-tasks 10:00:01 madblog-tasks: app.utils.tasks.test_rq(60) (71a3d64c-dd02-4139-bdf6-5e5e4d3f5617) Starting task 0 1 2 3 ... 58 59 Task completed 10:01:01 madblog-tasks: Job OK (71a3d64c-dd02-4139-bdf6-5e5e4d3f5617) 10:01:01 Result is kept for 500 seconds
当 rq worker
正在执行后台任务时,我们再次切换到之前的 Flask Shell
窗口:
>>> job.status # 表示任务已经在执行中了 'started' >>> job.result # 任务尚未结束,所以没有返回值 >>> >>> job.status # 1分钟后... 表示任务已经结束了 'finished' >>> job.result # 返回了任务的执行结果 'Done'
除了 job/task
有一些常用的方法外,任务队列(假设为变量 q)
也有如下常用方法,请读者自行测试:
len(q)
: 任务队列中当前的任务数q.jobs
: 任务队列中所有的任务列表q.job_ids
: 任务队列中所有的任务ID列表q.fetch_job('abc123')
: 返回任务ID为 'abc123' 的任务对象q.empty()
: 清空整个任务队列
2. 应用RQ实现后台任务
现在准备将 RQ
应用到我们的项目中,实现两个后台任务: 管理员群发私信和邮件给所有注册的用户、有写文章权限的用户可以导出她自己的所有文章并以邮件附件发送给她
2.1 增加数据模型Task
那么,我们需要增加一个数据模型 Task
,用来存储用户的每一个后台任务,它们是 一对多
关系(一个用户可以有多个任务)
修改 back-end/app/models.py
:
class User(PaginatedAPIMixin, db.Model): ... # 用户的RQ后台任务 tasks = db.relationship('Task', backref='user', lazy='dynamic') class Task(PaginatedAPIMixin, db.Model): __tablename__ = 'tasks' # 不使用默认的整数主键,而是用 RQ 为每个任务生成的字符串ID id = db.Column(db.String(36), primary_key=True) # 任务名 name = db.Column(db.String(128), index=True) # 任务描述 description = db.Column(db.String(128)) # 任务所属的用户 user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # 是否已执行完成 complete = db.Column(db.Boolean, default=False) def __repr__(self): return '<Task {}>'.format(self.id)
2.2 数据库迁移
2.3 后台任务函数
注意:
rq worker
运行的代码跟 Flask API 进程代码一样,但它们是不同的进程,所以需要再次创建 Flask APP
重新编辑 back-end/app/utils/tasks.py
:
import sys import time from rq import get_current_job from app import create_app from app.extensions import db from app.models import User, Message, Task from app.utils.email import send_email from config import Config # RQ worker 在我们的博客Flask应用之外运行,所以需要创建自己的应用实例 app = create_app(Config) # 后续会使用Flask-SQLAlchemy来查询数据库,所以需要推送一个上下文使应用成为 "当前" 的应用实例 app.app_context().push() def test_rq(num): print('Starting task') for i in range(num): print(i) time.sleep(1) print('Task completed') return 'Done' def send_messages(*args, **kwargs): '''群发私信''' try: # 由于 rq worker 运行在单独的进程中,当它出现意外错误时,我们需要捕获异常 # 发送方 sender = User.
1 条评论
评论者的用户名
评论时间shishijia
2020-05-28T13:42:20Z博主,我执行job = app.task_queue.enqueue('app.utils.tasks.test_rq', 60)正常,但执行rq worker madblog-tasks,一直报错Authentication required.