Ejecución de tareas "únicas" con apio


Uso apio para actualizar los canales RSS en mi sitio de agregación de noticias. Utilizo una tarea @para cada feed, y las cosas parecen funcionar muy bien.

Hay un detalle que no estoy seguro de manejar bien: todos los feeds se actualizan una vez cada minuto con un @periodic_task, pero ¿qué pasa si un feed sigue actualizándose desde la última tarea periódica cuando se inicia una nueva ? (por ejemplo, si el feed es realmente lento, o fuera de línea y la tarea se mantiene en un bucle de reintentos)

Actualmente almaceno los resultados de las tareas y compruebe su estado de esta manera:

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed


_results = {}


@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
    for feed in Feed.objects.all():
        if feed.pk in _results:
            if not _results[feed.pk].ready():
                # The task is not finished yet
                continue
        _results[feed.pk] = update_feed.delay(feed)


@task()
def update_feed(feed):
    try:
        feed.fetch_articles()
    except socket.error, exc:
        update_feed.retry(args=[feed], exc=exc)

Tal vez hay una forma más sofisticada/robusta de lograr el mismo resultado utilizando algún mecanismo de apio que me perdí ?

Author: Luper Rouch, 2010-11-04

5 answers

 26
Author: MattH,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-10-31 16:28:41

Basado en la respuesta de MattH, podrías usar un decorador como este:

def single_instance_task(timeout):
    def task_exc(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-instance-" + func.__name__
            acquire_lock = lambda: cache.add(lock_id, "true", timeout)
            release_lock = lambda: cache.delete(lock_id)
            if acquire_lock():
                try:
                    func(*args, **kwargs)
                finally:
                    release_lock()
        return wrapper
    return task_exc

Entonces, úsalo así...

@periodic_task(run_every=timedelta(minutes=1))
@single_instance_task(60*10)
def fetch_articles()
    yada yada...
 39
Author: SteveJ,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-02-26 18:27:38

Usando https://pypi.python.org/pypi/celery_once parece hacer el trabajo muy bien, incluyendo informes de errores y pruebas con algunos parámetros de singularidad.

Puedes hacer cosas como:

from celery_once import QueueOnce
from myapp.celery import app
from time import sleep

@app.task(base=QueueOnce, once=dict(keys=('customer_id',)))
def start_billing(customer_id, year, month):
    sleep(30)
    return "Done!"

Que solo necesita los siguientes ajustes en su proyecto:

ONCE_REDIS_URL = 'redis://localhost:6379/0'
ONCE_DEFAULT_TIMEOUT = 60 * 60  # remove lock after 1 hour in case it was stale
 11
Author: vdboor,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-10-30 11:55:51

Si estás buscando un ejemplo que no use Django, entonces prueba este ejemplo (advertencia: usa Redis en su lugar, que ya estaba usando).

El código del decorador es el siguiente (crédito completo al autor del artículo, vaya a leerlo)

import redis

REDIS_CLIENT = redis.Redis()

def only_one(function=None, key="", timeout=None):
    """Enforce only one celery task at a time."""

    def _dec(run_func):
        """Decorator."""

        def _caller(*args, **kwargs):
            """Caller."""
            ret_value = None
            have_lock = False
            lock = REDIS_CLIENT.lock(key, timeout=timeout)
            try:
                have_lock = lock.acquire(blocking=False)
                if have_lock:
                    ret_value = run_func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()

            return ret_value

        return _caller

    return _dec(function) if function is not None else _dec
 8
Author: keithl8041,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-10-05 17:47:12

Esta solución para el apio que trabaja en un solo huésped con mayor concurrencia 1. Otros tipos (sin dependencias como redis) de bloqueos basados en archivos de diferencia no funcionan con concurrencia mayor 1.

class Lock(object):
    def __init__(self, filename):
        self.f = open(filename, 'w')

    def __enter__(self):
        try:
            flock(self.f.fileno(), LOCK_EX | LOCK_NB)
            return True
        except IOError:
            pass
        return False

    def __exit__(self, *args):
        self.f.close()


class SinglePeriodicTask(PeriodicTask):
    abstract = True
    run_every = timedelta(seconds=1)

    def __call__(self, *args, **kwargs):
        lock_filename = join('/tmp',
                             md5(self.name).hexdigest())
        with Lock(lock_filename) as is_locked:
            if is_locked:
                super(SinglePeriodicTask, self).__call__(*args, **kwargs)
            else:
                print 'already working'


class SearchTask(SinglePeriodicTask):
    restart_delay = timedelta(seconds=60)

    def run(self, *args, **kwargs):
        print self.name, 'start', datetime.now()
        sleep(5)
        print self.name, 'end', datetime.now()
 0
Author: user1239798,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-01-06 19:57:45