Celery: Python任务调度利器
安装rabbitmq
先安装erlang
wget http://erlang.org/download/otp_src_20.0.tar.gz
tar xzf otp_src_20.0.tar.gz
./configure --prefix=/usr/local --with-ssl -enable-threads -enable-smmp-support -enable-kernel-poll --enable-hipe --without-javac && make -j10 && make install
然后下载rabbitmq,直接解压即可
http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-generic-unix-3.6.10.tar.xz
启动rabbitmq-server
./rabbitmq-server
在另一个终端安装插件
./rabbitmq-plugins enable rabbitmq_management
然后重新启动rabbitmq-server。
登录http://localhost:15627 通过web界面进行管理。由于安全限制,默认用户guest及密码guest只能通过localhost访问,如果通过外网ip访问,需要修改ebin/rabbitmq.app
,将loopback_users那一行改为{loopback_users, []},
。如果机器有iptables,需要开放端口。
安装celery
pip install "celery[redis,msgpack]"
编写一个简单的任务脚本
#tasks.py
from __future__ import absolute_import
from celery import Celery
app = Celery('tasks')
app.conf.update(dict(
BROKER_URL='amqp://guest:guest@localhost:5672//',
CELERY_RESULT_BACKEND='redis://localhost:6543/0',
CELERY_TASK_SERIALIZER='msgpack',
CELERY_RESULT_SERIALIZER='json',
CELERY_TASK_RESULT_EXPIRES=86400,
CELERY_ACCEPT_CONTENT=['json', 'msgpack'],
CELERY_IGNORE_RESULT=True,
CELERY_TIMEZONE='UTC',
CELERY_IMPORTS=['tasks']
))
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def add(self, a, b):
return a + b
if __name__ == '__main__':
app.start()
Celery第一个参数是模块名称,config配置可以单独写到一个配置文件config.py中,然后使用app.config_from_object('config')
来加载配置。
bind=True
表示绑定任务为实例方法,可以获取到任务执行时的各种信息,此时第一个参数为self,如果不使用重试可以写成
@app.task
def add(a, b):
return a + b
启动worker使用命令
celery worker -E -A tasks --loglevel=info -c 20
或者
python tasks.py worker -E --loglevel=info -c 20
-c参数指定并发度。
触发任务使用
#trigger.py
import datetime
from tasks import add
res = add.delay(3, 4)
res = add.apply_async(args=[3,4], eta=datetime.datetime.now() + datetime.timedelta(seconds=30))
delay是apply_async的简写,apply_async可以有更多参数,如指定在什么时刻运行等。
安装flower
flower 是一个celery监控程序
pip install flower
启动
celery flower --port=5555
访问http://localhost:5555 即可。