Flask Vue.js全栈开发|第17章:RQ实现后台任务

  • 原创
  • Madman
  • /
  • /
  • 0
  • 2871 次阅读

flask vuejs 全栈开发-min.png

Synopsis: 通常对于处理时间较长的任务,我们应该放在后台进行异步处理,这样就不会阻塞当前请求。本文将通过 Redis Queue 来实现 [群发私信/邮件] 和 [导出文章] 后台任务(且动态显示任务进度),如果你有多个任务要并发执行,请开启多个 rq worker 即可;其中 [导出文章] 的代码在 http://www.madmalls.com/blog/post/latest-code 中

本系列的最新代码将持续更新到: http://www.madmalls.com/blog/post/latest-code/

RQ实现后台任务

1. Redis Queue

RQ (Redis Queue) 是 Python 中用来实现简单的 任务队列,它比 Celery 更加易用,但它的 broker 只支持 Redis,而 Celery 的 broker 支持 RabbitMQRedisAmazon SQS

1 rq架构

1.1 在 Linux 上启动后端 Flask API

由于 RQ 的 workers 只能在有 fork() 函数的操作系统上正常工作,所以 Windows 无法正常启动 rq worker,请参考: http://python-rq.org/docs/#limitations

所以为了方便,我们准备将后端 Flask API 应用迁移到 Linux 中,关于在 Linux 中如何部署需要到第 20 章才讲,本章只是简单介绍在 CentOS 上如何重新运行起我们的后端应用

首先你需要安装 CentOS,比如参考: http://www.madmalls.com/blog/post/customize-centos-7-3-autoinstall-iso/

然后安装 Python 3,请参考: http://www.madmalls.com/blog/post/deploy-flask-gunicorn-nginx-supervisor-on-centos7/#3-python3

将后端代码拷贝到 CentOS 上,并执行:

$ cd back-end
$ python -m venv venv
$ source venv/bin/activate
(venv)$ pip install -r requirements.txt

# Flask-Migrate create database
(venv)$ flask db upgrade

# Pre deploy, eg. insert roles
(venv)$ flask deploy

# create back-end/.env file, like this
FLASK_APP=madblog.py
FLASK_DEBUG=1

(venv)$ flask run -h 0.0.0.0 -p 5000

假设你的 CentOS 的 IP 是 192.168.80.1,则在你的笔记本上浏览器访问: http://192.168.80.1:5000/api/ping,如果返回 "Pong!" 则说明正常

此时,你的笔记本上只需要运行前端应用,但需要修改 API 接口地址。修改 front-end/src/http.js

import Vue from 'vue'
import axios from 'axios'
import router from './router'
import store from './store'


// 基础配置
if (process.env.NODE_ENV === 'production') {
  axios.defaults.baseURL = 'http://www.madmalls.com:5000';
} else {
  axios.defaults.baseURL = 'http://192.168.80.1:5000';
}

然后重启前端应用

D:\python-code\flask-vuejs-madblog\front-end>npm run dev

如果应用能够正常访问后端 API,则进行下一节

1.2 Linux 上安装 Redis

$ yum install -y epel-release
$ yum install -y redis
$ systemctl start redis
$ systemctl enable redis

验证 Redis 是否成功启动了:

$ redis-cli ping
PONG

$ ss -tunlp | grep redis
tcp    LISTEN     0      128    127.0.0.1:6379                  *:*                   users:(("redis-server",pid=18833,fd=5))
tcp    LISTEN     0      128    192.168.80.1:6379                  *:*                   users:(("redis-server",pid=18833,fd=4))

1.3 安装 RQ

后端需要安装 rq 模块,它会自动把 redis 也安装上

(venv)$ pip install rq
(venv)$ pip freeze > requirements.txt

然后重新启动 Flask:

(venv)$ flask run -h 0.0.0.0 -p 5000 

1.4 RQ 连接 Redis

按照常例,我们将配置 Redis 的连接参数到 back-end/config.py 文件中:

REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'

意思是,如果没有设置环境变量 REDIS_URL(可以通过 export REDIS_URL=xxx 命令或者写入 back-end/.env 文件中)的话,直接使用 redis://(表示当前主机上的 Redis,端口是默认的 6379

然后,修改 back-end/app/__init__.py

from redis import Redis
import rq
...

def configure_app(app, config_class):
    app.config.from_object(config_class)
    # 不检查路由中最后是否有斜杠/
    app.url_map.strict_slashes = False
    # 整合RQ任务队列
    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('madblog-tasks', connection=app.redis, default_timeout=3600)  # 设置任务队列中各任务的执行最大超时时间为 1 小时

我们通过 rq.Queue() 创建了一个 任务队列,名称为 madblog-tasks,后续应用想将那些耗时较久的函数放到后台执行的话,就投放到这个任务队列中去。之后,启动一个或多个 rq worker madblog-tasks,指示它们监视 madblog-tasks 这个任务队列,一旦里面有任务了,它们就会 立即 执行! 它们与 Flask 应用是不同的进程,所以在它们执行那些耗时较久的任务时,并不会阻塞 Flask 应用

1.5 任务进入队列(Enqueueing Jobs)

每一个 任务(job/task) 其实就是一个普通的 Python 函数,但是也有需要注意的地方,请参考: http://python-rq.org/docs/#considerations-for-jobs

我们准备把所有需要放到后台执行的任务函数,存放到 back-end/app/utils/tasks.py 这个文件中,新建此文件,增加一个测试函数:

import time


def test_rq(num):
    print('Starting task')
    for i in range(num):
        print(i)
        time.sleep(1)
    print('Task completed')
    return 'Done'

将最新代码同步到 CentOS 上,再启动 Flask Shell

(venv) $ flask shell
Python 3.6.4 (default, Apr 13 2019, 20:59:14) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux
App: app [production]
Instance: /root/flask-vuejs-madblog/back-end/instance
>>> job = app.task_queue.enqueue('app.utils.tasks.test_rq', 60)  # 将一个任务压入任务队列中
>>> job
Job('96231bff-0680-4ebf-aa37-88372946e524', enqueued_at=datetime.datetime(2019, 4, 16, 1, 48, 54, 295339))
>>> job.get_id()  # 获取任务的ID
'96231bff-0680-4ebf-aa37-88372946e524'
>>> job.status  # 任务当前的状态,只是进入队列了,还没有被 rq worker 开始执行,因为我们还没有启动 worker
'queued'
>>> job.func_name  # 任务实际上将要执行的函数
'app.utils.tasks.test_rq'
>>> job.args  # equeue() 方法中第一个参数是要执行的函数名,后面的参数都将传递给要执行的函数中去
(60,)
>>> job.kwargs
{}

此时,我们再打开一个 Shell,切换到后端项目的根目录下,激活虚拟环境,最后启动 rq worker

$ cd /root/flask-vuejs-madblog/back-end
$ source venv/bin/activate
$ rq worker madblog-tasks  # 表示启动了一个 worker,并让它监视名为 madblog-tasks 的任务队列

由于之前已经往 madblog-tasks 这个 任务队列 中投放了一个任务,所以 rq worker 会立即执行它:

(venv) $ rq worker madblog-tasks
10:00:01 RQ worker 'rq:worker:CentOS.33041' started, version 0.13.0
10:00:01 *** Listening on madblog-tasks...
10:00:01 Cleaning registries for queue: madblog-tasks
10:00:01 madblog-tasks: app.utils.tasks.test_rq(60) (71a3d64c-dd02-4139-bdf6-5e5e4d3f5617)
Starting task
0
1
2
3
...
58
59
Task completed
10:01:01 madblog-tasks: Job OK (71a3d64c-dd02-4139-bdf6-5e5e4d3f5617)
10:01:01 Result is kept for 500 seconds

rq worker 正在执行后台任务时,我们再次切换到之前的 Flask Shell 窗口:

>>> job.status  # 表示任务已经在执行中了
'started'
>>> job.result  # 任务尚未结束,所以没有返回值
>>> 
>>> job.status  # 1分钟后...  表示任务已经结束了
'finished'
>>> job.result  # 返回了任务的执行结果
'Done'

除了 job/task 有一些常用的方法外,任务队列(假设为变量 q) 也有如下常用方法,请读者自行测试:

  • len(q): 任务队列中当前的任务数
  • q.jobs: 任务队列中所有的任务列表
  • q.job_ids: 任务队列中所有的任务ID列表
  • q.fetch_job('abc123'): 返回任务ID为 'abc123' 的任务对象
  • q.empty(): 清空整个任务队列

2. 应用RQ实现后台任务

现在准备将 RQ 应用到我们的项目中,实现两个后台任务: 管理员群发私信和邮件给所有注册的用户、有写文章权限的用户可以导出她自己的所有文章并以邮件附件发送给她

2.1 增加数据模型Task

那么,我们需要增加一个数据模型 Task,用来存储用户的每一个后台任务,它们是 一对多 关系(一个用户可以有多个任务)

修改 back-end/app/models.py

class User(PaginatedAPIMixin, db.Model):
    ...
    # 用户的RQ后台任务
    tasks = db.relationship('Task', backref='user', lazy='dynamic')


class Task(PaginatedAPIMixin, db.Model):
    __tablename__ = 'tasks'
    # 不使用默认的整数主键,而是用 RQ 为每个任务生成的字符串ID
    id = db.Column(db.String(36), primary_key=True)
    # 任务名
    name = db.Column(db.String(128), index=True)
    # 任务描述
    description = db.Column(db.String(128))
    # 任务所属的用户
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
    # 是否已执行完成
    complete = db.Column(db.Boolean, default=False)

    def __repr__(self):
        return '<Task {}>'.format(self.id)

2.2 数据库迁移

(venv) $ flask db migrate -m "tasks"
(venv) $ flask db upgrade

2.3 后台任务函数

注意: rq worker 运行的代码跟 Flask API 进程代码一样,但它们是不同的进程,所以需要再次创建 Flask APP

重新编辑 back-end/app/utils/tasks.py

import sys
import time
from rq import get_current_job
from app import create_app
from app.extensions import db
from app.models import User, Message, Task
from app.utils.email import send_email
from config import Config


# RQ worker 在我们的博客Flask应用之外运行,所以需要创建自己的应用实例
app = create_app(Config)
# 后续会使用Flask-SQLAlchemy来查询数据库,所以需要推送一个上下文使应用成为 "当前" 的应用实例
app.app_context().push()


def test_rq(num):
    print('Starting task')
    for i in range(num):
        print(i)
        time.sleep(1)
    print('Task completed')
    return 'Done'


def send_messages(*args, **kwargs):
    '''群发私信'''
    try:  # 由于 rq worker 运行在单独的进程中,当它出现意外错误时,我们需要捕获异常
        # 发送方
        sender = User.query.get(kwargs.get('user_id'))
        # 接收方
        recipients = User.query.filter(User.id != kwargs.get('user_id'))

        for user in recipients:
            message = Message()
            message.body = kwargs.get('body')
            message.sender = sender
            message.recipient = user
            db.session.add
                                
                            
未经允许不得转载: LIFE & SHARE - 王颜公子 » Flask Vue.js全栈开发|第17章:RQ实现后台任务

分享

作者

作者头像

Madman

如需 Linux / Python 相关问题付费解答,请按如下方式联系我

0 条评论

暂时还没有评论.

专题系列