celery介绍与使用

一.celery介绍

celery作用

1.celery可以实现异步任务来提高项目的并发量,完成延迟任务、定时任务

2.celery是一个简单、灵活、可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具

celery架构

1.消息中间件:broker 提交的任务(函数)都放到这里,celery本身不提供中间件,需要借助于第三方:redis,rabbitmq

2.任务执行单元:worker,真正执行任务的地方,一个个进程,执行函数

3.结果存储:backend,函数return的结果存储在这里,celery本身不提供结果存储,借助于第三方:redis,数据库,rabbitmq

celery特点

celery是独立的服务

1.可以不依赖任何服务器,通过自身命令,启动

2.celery服务为其他项目服务提供异步解决任务需求的

注意:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

二.celery快速使用

1.安装模块

        官方介绍:Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform(不支持windows,请不要打开与该平台相关的任何问题)

pip install celery

pip install eventlet  # Windows系统需安装

2.使用步骤

新建包:celery_task
    -在包先新建一个 celery.py
    -在里面写app的初始化
    -在包里新建app_task.py 编写相关任务 
    -其它程序,提交任务
    -启动worker ---》它可以先启动,在提交任务之前-->包所在的目录下
        celery -A celery_task worker -l info -P eventlet
    -查看任务执行的结果了

'''celery_task/celery.py'''

from celery import Celery

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
# 一定不要忘了include
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])
'''celery_task/app_task.py'''
from .celery import app
@app.task
def add(a, b):
    time.sleep(3)
    print('计算结果是:%s' % (a + b))
    return a + b
'''add_task.py'''
from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务
res=send_sms.delay('132xxxxxxxx','9999')
print(res)  # 672237ce-c941-415e-9145-f31f90b94627

# 任务执行,要启动worker

# 查看任务执行的结果

3.启动celery工作服务器

celery -A tasks worker -l info -P eventlet 
或
celery -A tasks worker --loglevel=INFO -P eventlet

4.backend中查看任务执行结果

from tasks import app

from celery.result import AsyncResult

task_id = '672237ce-c941-415e-9145-f31f90b94627'

if __name__ == '__main__':
    res = AsyncResult(id=task_id, app=app)
    if res.successful():
        result = res.get()
        print(result)
    # 等同上面代码
    # if res.state == 'SUCCESS':
    #     result = res.get()
    #     print(result)
    elif res.failed():
        print('任务失败')
    # elif res.state == 'FAILURE':
    #     print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

AsyncResult下的方法

def failed(self):
    """Return :const:`True` if the task failed."""
    return self.state == states.FAILURE

def successful(self):
    """Return :const:`True` if the task executed successfully."""
    return self.state == states.SUCCESS

3.celery开启定时、延迟任务、异步任务

异步任务

task.delay(*args, **kwargs)

定时任务


app.conf.beat_schedule = {
    'send_sms_task': {
        'task': 'celery_task.add_task.send_sms', # 路径
        'schedule': timedelta(seconds=5), # 每五秒执行一次
        # 'schedule': crontab(hour=12, day_of_week=1),  # 每周一12点发送验证码
        'args': ('132xxxxxxxx', '7777'),
    },
}

延迟任务

task.apply_async(args=[参数,参数],eta=时间对象(utc时间))

from datetime import timedelta, datetime

res = add.apply_async(args=(1, 2), eta=(datetime.utcnow() + timedelta(seconds=20)))

print(res.task_id)  # c78505e2-614d-4bb2-930c-c73c325af519

三.在django中使用

在包内的celery.py中添加代码

import os
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy.settings.dev')
import django
django.setup()

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(main=__name__, broker=broker, backend=backend,
             include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])

四.双写一致性

为了提高并发量和访问速度我们把数据存放到redis中

class SlideShowView(GenericViewSet, ListMixinView):
    queryset = SlideShow.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :settings.SLIDE_SHOW_COUNT]
    serializer_class = SlideShowSer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:
            print('走了缓存')
            return APIResponse(code=1001, result=result)
        res = super().list(request, *args, **kwargs)
        result = res.data.get('result')
        cache.set('banner_list', result)
        print('走了数据库')
        return res

celery定时任务实现双写一致性

当把数据存放到redis中时我们修改数据库但是redis中的数据不会改变就会造成数据不一致的情况

- 解决方式一:

  1. 修改mysql数据库,删除缓存 【缓存的修改是在后】
  2. 修改数据库,修改缓存 【缓存的修改是在后】
  3. 定时更新缓存,针对于实时性不是很高的接口适合定时更新.

 - 解决方式二:

开启crlery定时每30分钟朝数据库获取一次数据存放到redis中

#home_tasks.py 首页相关任务
import time

from .celery import app
from home.models import SlideShow
from django.conf import settings
from home.serializer import SlideShowSer
from django.core.cache import cache


@app.task
def update_banner():
    # 更新缓存
    queryset = SlideShow.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.SLIDE_SHOW_COUNT]
    ser = SlideShowSer(instance=queryset, many=True)
    # print(ser.data)
    for item in ser.data:
        item['image'] = settings.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True
# celery.py
import os
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy.settings.dev')
import django

django.setup()

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(main=__name__, broker=broker, backend=backend,
             include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])

app.conf.beat_schedule = {
    # 定时任务
    'update_banner': {
        'task': 'celery_tasks.home_tasks.update_banner',
        'schedule': timedelta(minutes=30),
        # 'schedule': crontab(hour=8, day_of_week=1),
        'args': (),
    },
}