Ejecución de tareas "únicas" con apio
Uso apio para actualizar los canales RSS en mi sitio de agregación de noticias. Utilizo una tarea @para cada feed, y las cosas parecen funcionar muy bien.
Hay un detalle que no estoy seguro de manejar bien: todos los feeds se actualizan una vez cada minuto con un @periodic_task, pero ¿qué pasa si un feed sigue actualizándose desde la última tarea periódica cuando se inicia una nueva ? (por ejemplo, si el feed es realmente lento, o fuera de línea y la tarea se mantiene en un bucle de reintentos)
Actualmente almaceno los resultados de las tareas y compruebe su estado de esta manera:
import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed
_results = {}
@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
for feed in Feed.objects.all():
if feed.pk in _results:
if not _results[feed.pk].ready():
# The task is not finished yet
continue
_results[feed.pk] = update_feed.delay(feed)
@task()
def update_feed(feed):
try:
feed.fetch_articles()
except socket.error, exc:
update_feed.retry(args=[feed], exc=exc)
Tal vez hay una forma más sofisticada/robusta de lograr el mismo resultado utilizando algún mecanismo de apio que me perdí ?
5 answers
De la documentación oficial: Asegurarse de que una tarea solo se ejecuta una a la vez.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-10-31 16:28:41
Basado en la respuesta de MattH, podrías usar un decorador como este:
def single_instance_task(timeout):
def task_exc(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock_id = "celery-single-instance-" + func.__name__
acquire_lock = lambda: cache.add(lock_id, "true", timeout)
release_lock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
func(*args, **kwargs)
finally:
release_lock()
return wrapper
return task_exc
Entonces, úsalo así...
@periodic_task(run_every=timedelta(minutes=1))
@single_instance_task(60*10)
def fetch_articles()
yada yada...
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-02-26 18:27:38
Usando https://pypi.python.org/pypi/celery_once parece hacer el trabajo muy bien, incluyendo informes de errores y pruebas con algunos parámetros de singularidad.
Puedes hacer cosas como:
from celery_once import QueueOnce
from myapp.celery import app
from time import sleep
@app.task(base=QueueOnce, once=dict(keys=('customer_id',)))
def start_billing(customer_id, year, month):
sleep(30)
return "Done!"
Que solo necesita los siguientes ajustes en su proyecto:
ONCE_REDIS_URL = 'redis://localhost:6379/0'
ONCE_DEFAULT_TIMEOUT = 60 * 60 # remove lock after 1 hour in case it was stale
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-10-30 11:55:51
Si estás buscando un ejemplo que no use Django, entonces prueba este ejemplo (advertencia: usa Redis en su lugar, que ya estaba usando).
El código del decorador es el siguiente (crédito completo al autor del artículo, vaya a leerlo)
import redis
REDIS_CLIENT = redis.Redis()
def only_one(function=None, key="", timeout=None):
"""Enforce only one celery task at a time."""
def _dec(run_func):
"""Decorator."""
def _caller(*args, **kwargs):
"""Caller."""
ret_value = None
have_lock = False
lock = REDIS_CLIENT.lock(key, timeout=timeout)
try:
have_lock = lock.acquire(blocking=False)
if have_lock:
ret_value = run_func(*args, **kwargs)
finally:
if have_lock:
lock.release()
return ret_value
return _caller
return _dec(function) if function is not None else _dec
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-10-05 17:47:12
Esta solución para el apio que trabaja en un solo huésped con mayor concurrencia 1. Otros tipos (sin dependencias como redis) de bloqueos basados en archivos de diferencia no funcionan con concurrencia mayor 1.
class Lock(object):
def __init__(self, filename):
self.f = open(filename, 'w')
def __enter__(self):
try:
flock(self.f.fileno(), LOCK_EX | LOCK_NB)
return True
except IOError:
pass
return False
def __exit__(self, *args):
self.f.close()
class SinglePeriodicTask(PeriodicTask):
abstract = True
run_every = timedelta(seconds=1)
def __call__(self, *args, **kwargs):
lock_filename = join('/tmp',
md5(self.name).hexdigest())
with Lock(lock_filename) as is_locked:
if is_locked:
super(SinglePeriodicTask, self).__call__(*args, **kwargs)
else:
print 'already working'
class SearchTask(SinglePeriodicTask):
restart_delay = timedelta(seconds=60)
def run(self, *args, **kwargs):
print self.name, 'start', datetime.now()
sleep(5)
print self.name, 'end', datetime.now()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-01-06 19:57:45