Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]


Groups > comp.programming.threads > #1920

Concurrent FIFO queue...

From aminer <aminer@toto.net>
Newsgroups comp.programming.threads, comp.programming
Subject Concurrent FIFO queue...
Date 2013-10-13 16:26 -0700
Organization albasani.net
Message-ID <l3evl5$dvu$1@news.albasani.net> (permalink)

Cross-posted to 2 groups.

Show all headers | View raw


Hello,

I have thought more, and i have come with a solution
for a concurrent FIFO queue that satisfies many requirements, it
minimizes efficiently the cache-coherence traffic, it is
FIFO fair(it uses FIFO fair locks) and it is energy efficient on the 
pop() side: it will not spin-wait when there no items in the queue but
it will wait on a manual reset event object, and this is energy efficient.

I have benchmarked it and it gives 1.65 millions pop()
per second and 0.8 million push() per second.

Here is the FIFO queue, i will soon put it on my website.


{****************************************************************************
* 
     *
*                          FIFO MPMC Queue 
     *
* 
     *
* 
     *
* Language:             FPC Pascal v2.2.0+ / Delphi 5+ 
     *
* 
     *
* Required switches:    none 
     *
* 
     *
* Authors:  Amine Moulay Ramdane 
     *
*
* 
     *
* 
     *
* 
     *
* Version:              1.0 
     *
*
*        Send bug reports and feedback to  aminer @@ videotron @@ ca 
     *
*   You can always get the latest version/revision of this package from 
     *
* 
     *
*           http://pages.videotron.com/aminer/ 
     *
* 
     *
* Description:  Algorithm to handle an FIFO MPMC queue 
     *
* 
     *
*  This program is distributed in the hope that it will be useful, 
     *
*  but WITHOUT ANY WARRANTY; without even the implied warranty of 
     *
*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
     *
* 
     *
* 
     *
*****************************************************************************
*                      BEGIN LICENSE BLOCK 
     *

{ changelog
v.1.0
}


unit FIFOQUEUE_MPMC;


interface
{$IFDEF FPC}
{$ASMMODE intel}
{$ENDIF}


uses
LW_ALOCK,SpinLock,SemaCondvar,msync,syncobjs,sysutils;

{$I defines.inc}

type

{$IFDEF CPU64}
long = qword;
{$ENDIF CPU64}
{$IFDEF CPU32}
long = longword;
{$ENDIF CPU32}


   tNodeQueue = tObject;
   typecache1  = array[0..15] of longword;

  // TLockfree_MPMC = class(TFreelist)
   TFIFOQUEUE_MPMC = class
   private
       tail:longword;
       tmp1:typecache1;
       head: long;
       fMask : long;
       fSize : long;
       tab : array of tNodeQueue;
       lock1,lock2:TALOCK;
       event:TSimpleEvent;
       lock3:TSpinlock;
       count1:long;
       procedure setobject(lp : long;const aobject : tNodeQueue);
       function getLength:long;
       function getSize:long;
       function getObject(lp : long):tNodeQueue;
   public
       constructor create(aPower : long =20);  {allocate tab with size 
equal 2^aPower, for 20 size is equal 1048576}
       destructor Destroy; override;
       function push(tm : tNodeQueue):boolean;
       function pop(var obj:tNodeQueue):boolean;
       property length : long read getLength;
       property count: long read getLength;
       property size : long read getSize;

   end;


implementation

{$IF defined(CPU64) }
function LockedCompareExchange(CompareVal, NewVal: long; var Target: 
long): long; overload;
asm
mov rax, rcx
lock cmpxchg [r8], rdx
end;
{$IFEND}
{$IF defined(CPU32) }
function LockedCompareExchange(CompareVal, NewVal: long; var 
Target:long): long; overload;
asm
lock cmpxchg [ecx], edx
end;
{$IFEND}


function CAS(var Target:long;Comp ,Exch : long): boolean;
var ret:long;
begin

ret:=LockedCompareExchange(Comp,Exch,Target);
if ret=comp
  then result:=true
  else result:=false;

end; { CAS }



function LockedIncLong(var Target: long): long;
asm
         {$IFDEF CPU32}
         // --> EAX Target
         // <-- EAX Result
         MOV     ECX, EAX
         MOV     EAX, 1
         //sfence
        LOCK XADD [ECX], EAX
         inc     eax
         {$ENDIF CPU32}
         {$IFDEF CPU64}
         // --> RCX Target
         // <-- EAX Result
         MOV     rax, 1
         //sfence
         LOCK XADD [rcx], rax
         INC     rax
         {$ENDIF CPU64}
end;

function LockedDecLong(var Target: long): long;
asm
         {$IFDEF CPU32}
         // --> EAX Target
         // <-- EAX Result
         MOV     ECX, EAX
         MOV     EAX, -1
         //sfence
        LOCK XADD [ECX], EAX
         dec     eax
         {$ENDIF CPU32}
         {$IFDEF CPU64}
         // --> RCX Target
         // <-- EAX Result
         MOV     rax, -1
         //sfence
         LOCK XADD [rcx], rax
         dec     rax
         {$ENDIF CPU64}
end;

constructor TFIFOQUEUE_MPMC.create(aPower : long );
begin
   if (aPower < 0) or (aPower > high(long))
     then
      begin
       writeln('Constructor''s argument incorrect');
        halt;
      end;

{$IFDEF CPU64}
fMask:=not($FFFFFFFFFFFFFFFF shl aPower);
{$ENDIF CPU64}
{$IFDEF CPU32}
fMask:=not($FFFFFFFF shl aPower);
{$ENDIF CPU32}

   fSize:=(1 shl aPower);
   setLength(tab,1 shl aPower);
   tail:=0;
   head:=0;
   lock1:=TALOCK.create(100);
   lock2:=TALOCK.create(100);
   lock3:=TSpinlock.create;
   event:=TSimpleEvent.create;
   count1:=0;

end;

destructor  TFIFOQUEUE_MPMC.Destroy;

begin
  lock1.free;
  lock2.free;
  lock3.free;
  event.free;
  setLength(tab,0);
  inherited Destroy;
end;


procedure TFIFOQUEUE_MPMC.setObject(lp : long;const aobject : tNodeQueue);
begin
   tab[lp and fMask]:=aObject;
end;

function TFIFOQUEUE_MPMC.getObject(lp : long):tNodeQueue;
begin
   result:=tab[lp and fMask];
end;


function TFIFOQUEUE_MPMC.push(tm : tNodeQueue):boolean;//stdcall;
begin

lock1.enter;
result:=true;
if getlength >= fsize
   then
       begin
           result:=false;
          lock1.leave;
          exit;
       end;

   setObject(tail,tm);
     tail:=(tail+1);
     lock3.enter;
     inc(count1);
     event.setevent;
     lock3.leave;
       lock1.leave;
end;


function TFIFOQUEUE_MPMC.pop(var obj:tNodeQueue):boolean;
var b:long;

begin
if self.count=0
then
  begin
   event.waitfor(INFINITE);
   lock3.enter;
   if count1=0 then event.resetevent;
   lock3.leave;
end;

lock2.enter;

   if tail<>head
    then
     begin
      obj:=getObject(head);
      head:=(head+1);
      result:=true;
      lock3.enter;
      dec(count1);
      lock3.leave;
      lock2.leave;
       exit;
     end
    else
        begin
         result:=false;
         lock2.leave;

        end;
end;


function TFIFOQUEUE_MPMC.getLength:long;
var head1,tail1:long;
begin
head1:=head;
tail1:=tail;
   if tail1 < head1
        then result:= (High(long)-head1)+(1+tail1)
        else result:=(tail1-head1);
end;

function TFIFOQUEUE_MPMC.getSize:long;

begin
   result:=fSize;
end;

end.

Back to comp.programming.threads | Previous | Next | Find similar


Thread

Concurrent FIFO queue... aminer <aminer@toto.net> - 2013-10-13 16:26 -0700

csiph-web