Celery: Python任务调度利器

2017-07-05 78278528 0

安装rabbitmq

先安装erlang

  1. wget http://erlang.org/download/otp_src_20.0.tar.gz
  2. tar xzf otp_src_20.0.tar.gz
  3. ./configure --prefix=/usr/local --with-ssl -enable-threads -enable-smmp-support -enable-kernel-poll --enable-hipe --without-javac && make -j10 && make install

然后下载rabbitmq,直接解压即可
http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-generic-unix-3.6.10.tar.xz

启动rabbitmq-server

  1. ./rabbitmq-server

在另一个终端安装插件

  1. ./rabbitmq-plugins enable rabbitmq_management

然后重新启动rabbitmq-server。

登录http://localhost:15627 通过web界面进行管理。由于安全限制,默认用户guest及密码guest只能通过localhost访问,如果通过外网ip访问,需要修改ebin/rabbitmq.app,将loopback_users那一行改为{loopback_users, []},。如果机器有iptables,需要开放端口。

安装celery

  1. pip install "celery[redis,msgpack]"

编写一个简单的任务脚本

  1. #tasks.py
  2. from __future__ import absolute_import
  3. from celery import Celery
  4. app = Celery('tasks')
  5. app.conf.update(dict(
  6. BROKER_URL='amqp://guest:guest@localhost:5672//',
  7. CELERY_RESULT_BACKEND='redis://localhost:6543/0',
  8. CELERY_TASK_SERIALIZER='msgpack',
  9. CELERY_RESULT_SERIALIZER='json',
  10. CELERY_TASK_RESULT_EXPIRES=86400,
  11. CELERY_ACCEPT_CONTENT=['json', 'msgpack'],
  12. CELERY_IGNORE_RESULT=True,
  13. CELERY_TIMEZONE='UTC',
  14. CELERY_IMPORTS=['tasks']
  15. ))
  16. @app.task(bind=True, default_retry_delay=300, max_retries=5)
  17. def add(self, a, b):
  18. return a + b
  19. if __name__ == '__main__':
  20. app.start()

Celery第一个参数是模块名称,config配置可以单独写到一个配置文件config.py中,然后使用app.config_from_object('config')来加载配置。

bind=True表示绑定任务为实例方法,可以获取到任务执行时的各种信息,此时第一个参数为self,如果不使用重试可以写成

  1. @app.task
  2. def add(a, b):
  3. return a + b

启动worker使用命令

  1. celery worker -E -A tasks --loglevel=info -c 20

或者

  1. python tasks.py worker -E --loglevel=info -c 20

-c参数指定并发度。

触发任务使用

  1. #trigger.py
  2. import datetime
  3. from tasks import add
  4. res = add.delay(3, 4)
  5. res = add.apply_async(args=[3,4], eta=datetime.datetime.now() + datetime.timedelta(seconds=30))

delay是apply_async的简写,apply_async可以有更多参数,如指定在什么时刻运行等。

安装flower

flower 是一个celery监控程序

  1. pip install flower

启动

  1. celery flower --port=5555

访问http://localhost:5555 即可。