¿Cómo puedo evitar continuaciones síncronas en una Tarea?


Tengo un código de biblioteca (socket networking) que proporciona una API basada en Task para las respuestas pendientes a las solicitudes, basada en TaskCompletionSource<T>. Sin embargo, hay una molestia en el TPL en que parece ser imposible evitar continuaciones sincrónicas. Lo que me gustaría ser capaz de hacer es:

  • dile a TaskCompletionSource<T> que no debe permitir que las personas que llaman se conecten con TaskContinuationOptions.ExecuteSynchronously, o
  • establecer el resultado (SetResult / TrySetResult) de una manera que especifique que TaskContinuationOptions.ExecuteSynchronously debe ser ignorado, usando el pool en su lugar

Específicamente, el problema que tengo es que los datos entrantes están siendo procesados por un lector dedicado, y si una persona que llama puede adjuntar con TaskContinuationOptions.ExecuteSynchronously pueden detener el lector (lo que afecta a más que solo a ellos). Anteriormente, he trabajado alrededor de esto por algún hacker que detecta si cualquier continuaciones están presentes, y si lo están empuja la finalización en el ThreadPool, sin embargo, esto tiene un impacto significativo si la persona que llama ha saturado su trabajo cola, ya que la finalización no se procesará de manera oportuna. Si están utilizando Task.Wait() (o similar), entonces esencialmente se bloquearán a sí mismos. Del mismo modo, esta es la razón por la que el lector está en un hilo dedicado en lugar de usar workers.

Entonces; antes de intentar molestar al equipo de TPL: ¿me falta una opción?

Puntos clave:

  • No quiero que las llamadas externas sean capaces de secuestrar mi hilo
  • No puedo usar el ThreadPool como una implementación, ya que necesita funciona cuando la piscina está saturada

El siguiente ejemplo produce salida (el orden puede variar según el tiempo):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

El problema es el hecho de que una persona que llama al azar logró obtener una continuación en "Hilo principal". En el código real, esto sería interrumpir el lector principal; cosas malas!

Código:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}
Author: Martin Prikryl, 2014-03-22

6 answers

Nuevo en. NET 4.6:

. NET 4.6 contiene un nuevo TaskCreationOptions: RunContinuationsAsynchronously.


Ya que estás dispuesto a usar la Reflexión para acceder a campos privados...

Puede marcar la tarea del TCS con el flag TASK_STATE_THREAD_WAS_ABORTED, lo que provocaría que todas las continuaciones no estuvieran en línea.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Editar:

En lugar de usar Reflection emit, sugiero que use expresiones. Esto es mucho más legible y tiene la ventaja de ser compatible con PCL:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Sin usando Reflexión:

Si alguien está interesado, he descubierto una manera de hacer esto sin reflexión, pero también es un poco "sucio", y por supuesto conlleva una penalización perf no despreciable:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}
 46
Author: Eli Arbel,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-07-20 17:07:57

No creo que haya nada en TPL que proporcione explícito control API sobre TaskCompletionSource.SetResult continuaciones. Decidí mantener mi respuesta inicial para controlar este comportamiento para async/await escenarios.

Aquí hay otra solución que impone asíncrono a ContinueWith, si la continuación activada por tcs.SetResult tiene lugar en el mismo hilo en el que se invocó a SetResult:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Actualizado para responder al comentario:

No controlo la persona que llama-No puedo conseguir que usen un continuar-con variante: si pudiera, el problema no existiría en el primer lugar

No sabía que no controlabas a la persona que llamaba. Sin embargo, si no lo controlas, probablemente tampoco estés pasando el objeto TaskCompletionSource directamente a la persona que llama. Lógicamente, estaría pasando la parte token de la misma, es decir, tcs.Task. En cuyo caso, la solución podría ser aún más fácil, mediante la adición de otro método de extensión a la arriba:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Uso:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

Esto en realidad funciona tanto para await y ContinueWith (fiddle) y está libre de trucos de reflexión.

 10
Author: noseratio,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 10:30:08

¿Qué pasa con en lugar de hacer

var task = source.Task;

Haces esto en su lugar

var task = source.Task.ContinueWith<Int32>( x => x.Result );

Por lo tanto, siempre está agregando una continuación que se ejecutará de forma asíncrona y luego no importa si los suscriptores desean una continuación en el mismo contexto. Es una especie de curtir la tarea, ¿no?

 3
Author: Ivan Zlatanov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-03-22 22:25:32

Si puede y está listo para usar reflexión, esto debería hacerlo;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}
 3
Author: Fredou,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-03-23 03:14:05

Actualizado , he publicado una respuesta separada para tratar con ContinueWith en lugar de await (porque ContinueWith no se preocupa por el contexto de sincronización actual).

Podría usar un contexto de sincronización tonto para imponer asincronía sobre la continuación activada llamando a SetResult/SetCancelled/SetException en TaskCompletionSource. Creo que el contexto de sincronización actual (en el punto de await tcs.Task) es el criterio que TPL utiliza para decidir si hacer que dicha continuación sea síncrona o asíncrona.

Las siguientes obras para mí:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync se implementa así:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext es muy barato en términos de los gastos generales que agrega. De hecho, un enfoque muy similar es adoptado por la implementación de WPF Dispatcher.BeginInvoke.

TPL compara el contexto de sincronización de destino en el punto de await con el del punto de tcs.SetResult. Si el contexto de sincronización es el mismo (o no hay contexto de sincronización en ambos lugares), continuación se llama directamente, sincrónicamente. De lo contrario, se pone en cola usando SynchronizationContext.Post en el contexto de sincronización de destino, es decir, el comportamiento normal await. Lo que hace este enfoque es siempre imponer el comportamiento SynchronizationContext.Post (o una continuación de subproceso de grupo si no hay un contexto de sincronización de destino).

Actualizado , esto no funcionará para task.ContinueWith, porque ContinueWith no le importa el contexto de sincronización actual. Sin embargo, funciona para await task (fiddle ). También funciona para await task.ConfigureAwait(false).

OTOH, este enfoque funciona para ContinueWith.

 3
Author: noseratio,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 11:47:34

El enfoque simular abortar se veía muy bien, pero condujo a los hilos de secuestro de TPL en algunos escenarios.

Entonces tuve una implementación que era similar a comprobando el objeto de continuación, pero simplemente comprobando cualquier continuación ya que en realidad hay demasiados escenarios para que el código dado funcione bien, pero eso significaba que incluso cosas como Task.Wait dieron lugar a una búsqueda de subprocesos.

En última instancia, después de inspeccionar lotes y lotes de IL, el único escenario seguro y útil es el escenario SetOnInvokeMres (continuación manual-reset-event-slim). Hay muchos otros escenarios:

  • algunos no son seguros, y conducen al secuestro de hilos
  • el resto no son útiles, ya que en última instancia conducen al grupo de subprocesos

Así que al final, opté por verificar un objeto de continuación no nulo; si es nulo, bien (sin continuaciones); si no es nulo, comprobación de caso especial para SetOnInvokeMres - si es eso: bien (seguro de lo contrario, deje que el grupo de subprocesos realice el TrySetComplete, sin decirle a la tarea que haga algo especial como abortar la suplantación. Task.Wait utiliza el enfoque SetOnInvokeMres, que es el escenario específico que queremos probar realmente difícil de no bloquear.

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
 3
Author: Marc Gravell,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:10:50