TThreadedQueue no es capaz de múltiples consumidores?


Tratando de usar el TThreadedQueue (Genéricos.Colecciones) en un esquema de consumo múltiple de un solo productor. (Delphi-XE). La idea es insertar objetos en una cola y dejar que varios hilos de trabajo drenen la cola.

No funciona como se esperaba, sin embargo. Cuando dos o más hilos de trabajo están llamando a PopItem, las violaciones de acceso se lanzan desde el TThreadedQueue.

Si la llamada a PopItem se serializa con una sección crítica, todo está bien.

Seguramente el TThreadedQueue ¿debería ser capaz de manejar múltiples consumidores, así que me estoy perdiendo algo o es esto un error puro en TThreadedQueue ?

Aquí hay un ejemplo simple para producir el error.

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

Update : El error en TMonitor que causaba que TThreadedQueue se bloqueara se solucionó en Delphi XE2.

Actualización 2 : La prueba anterior enfatizó la cola en el estado vacío. Darian Miller encontró que hacer hincapié en la cola en estado completo, todavía podría reproducir el error en XE2. El error una vez de nuevo está en el TMonitor. Vea su respuesta a continuación para más información. Y también un enlace al QC101114.

Actualización 3 : Con la actualización 4 de Delphi-XE2 hubo una corrección anunciada para TMonitor que curaría los problemas en TThreadedQueue. Mis pruebas hasta ahora no son capaces de reproducir ningún error en TThreadedQueue más. Se probaron subprocesos de un solo productor/múltiples consumidores cuando la cola está vacía y llena. También se probaron varios productores / múltiples consumidores. Varié los hilos de lector y los hilos de escritor de 1 a 100 sin ningún problema. Pero conociendo la historia, reto a otros a romper TMonitor.

Author: LU RD, 2011-02-01

5 answers

Bueno, es difícil estar seguro sin muchas pruebas, pero ciertamente parece que esto es un error, ya sea en TThreadedQueue o en TMonitor. De cualquier manera está en el RTL y no en tu código. Debe archivar esto como un informe de control de calidad y usar el ejemplo anterior como el código "cómo reproducir".

 19
Author: Mason Wheeler,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2011-01-31 23:45:11

Te recomiendo usar OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary cuando se trabaja con hilos, paralelismo, etc. Primoz hizo un muy buen trabajo, y en el sitio encontrará una gran cantidad de documentación útil.

 10
Author: RBA,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-12-05 15:06:25

Su ejemplo parece funcionar bien bajo XE2, pero si llenamos su cola, falla con AV en un PushItem. (Probado con XE2 Update1)

Para reproducir, simplemente aumente la creación de su tarea de 100 a 1100 (la profundidad de la cola se estableció en 1024)

for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

Esto muere para mí cada vez en Windows 7. Inicialmente intenté un esfuerzo continuo para probarlo, y falló en el bucle 30...luego en el bucle 16...luego en 65 por lo que en diferentes intervalos, pero constantemente falló en algún momento.

  iLoop := 0;
  while iLoop < 1000 do
  begin
    Inc(iLoop);
    WriteLn('Loop: ' + IntToStr(iLoop));  
    for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
  end;
 4
Author: Darian Miller,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2011-11-17 22:08:36

Busqué la clase TThreadedQueue pero no parece tenerla en mi D2009. No voy a suicidarme exactamente por esto: el soporte de subprocesos Delphi siempre ha sido un error.. errm... 'no óptimo' y sospecho que TThreadedQueue no es diferente:)

¿Por qué usar genéricos para objetos P-C (Productor / Consumidor)? Un simple descendiente de TObjectQueue funcionará bien - ha estado usando esto durante décadas-funciona bien con múltiples productores / consumidores:

unit MinimalSemaphorePCqueue;

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore.

The semaphore count reflects the queue count
'push' will always succeed unless memory runs out, then you're stuft anyway.
'pop' has a timeout parameter as well as the address of where any received
object is to be put.
'pop' returns immediately with 'true' if there is an object on the queue
available for it.
'pop' blocks the caller if the queue is empty and the timeout is not 0.
'pop' returns false if the timeout is exceeded before an object is available
from the queue.
'pop' returns true if an object is available from the queue before the timeout
is exceeded.
If multiple threads have called 'pop' and are blocked because the queue is
empty, a single 'push' will make only one of the waiting threads ready.


Methods to push/pop from the queue
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
When the handle is signaled, the 'peek' method will retrieve the queued object.
}
interface

uses
  Windows, Messages, SysUtils, Classes,syncObjs,contnrs;


type

pObject=^Tobject;


TsemaphoreMailbox=class(TobjectQueue)
private
  countSema:Thandle;
protected
  access:TcriticalSection;
public
  property semaHandle:Thandle read countSema;
  constructor create; virtual;
  procedure push(aObject:Tobject); virtual;
  function pop(pResObject:pObject;timeout:DWORD):boolean;  virtual;
  function peek(pResObject:pObject):boolean;  virtual;
  destructor destroy; override;
end;


implementation

{ TsemaphoreMailbox }

constructor TsemaphoreMailbox.create;
begin
{$IFDEF D2009}
   inherited Create;
{$ELSE}
  inherited create;
{$ENDIF}
  access:=TcriticalSection.create;
  countSema:=createSemaphore(nil,0,maxInt,nil);
end;

destructor TsemaphoreMailbox.destroy;
begin
  access.free;
  closeHandle(countSema);
  inherited;
end;

function TsemaphoreMailbox.pop(pResObject: pObject;
  timeout: DWORD): boolean;
// dequeues an object, if one is available on the queue.  If the queue is empty,
// the caller is blocked until either an object is pushed on or the timeout
// period expires
begin // wait for a unit from the semaphore
  result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
  if result then // if a unit was supplied before the timeout,
  begin
    access.acquire;
    try
      pResObject^:=inherited pop; // get an object from the queue
    finally
      access.release;
    end;
  end;
end;

procedure TsemaphoreMailbox.push(aObject: Tobject);
// pushes an object onto the queue.  If threads are waiting in a 'pop' call,
// one of them is made ready.
begin
  access.acquire;
  try
    inherited push(aObject); // shove the object onto the queue
  finally
    access.release;
  end;
  releaseSemaphore(countSema,1,nil); // release one unit to semaphore
end;

function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
begin
  access.acquire;
  try
    result:=(count>0);
    if result then pResObject^:=inherited pop; // get an object from the queue
  finally
    access.release;
  end;
end;
end.
 3
Author: Martin James,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-02-10 02:52:39

No creo que TThreadedQueue se supone que soporte a múltiples consumidores. Es un FIFO, según el archivo de ayuda. Tengo la impresión de que hay un hilo empujando y otro (¡solo uno!) aparecer.

 1
Author: Giel,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2011-02-01 13:27:45