///Lock-free containers. Part of the OmniThreadLibrary project. ///Primoz Gabrijelcic, GJ /// ///This software is distributed under the BSD license. /// ///Copyright (c) 2008, Primoz Gabrijelcic ///All rights reserved. /// ///Redistribution and use in source and binary forms, with or without modification, ///are permitted provided that the following conditions are met: ///- Redistributions of source code must retain the above copyright notice, this /// list of conditions and the following disclaimer. ///- Redistributions in binary form must reproduce the above copyright notice, /// this list of conditions and the following disclaimer in the documentation /// and/or other materials provided with the distribution. ///- The name of the Primoz Gabrijelcic may not be used to endorse or promote /// products derived from this software without specific prior written permission. /// ///THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ///ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ///WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE ///DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ///ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES ///(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; ///LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ///ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT ///(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS ///SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. /// /// /// Author : GJ, Primoz Gabrijelcic /// Creation date : 2008-07-13 /// Last modification : 2008-10-26 /// Version : 1.01 /// /// History: /// 1.01: 2008-10-26 /// - [GJ] Redesigned stack with better lock contention. /// - [GJ] Totally redesigned queue, which is no longer based on stack and allows /// multiple readers. /// {$WARN SYMBOL_PLATFORM OFF} unit OtlContainers; {$R-,O+,A8} interface uses OtlCommon; type {:Lock-free, single writer, single reader, size-limited stack. } IOmniStack = interface ['{F4C57327-18A0-44D6-B95D-2D51A0EF32B4}'] procedure Empty; procedure Initialize(numElements, elementSize: integer); function Pop(var value): boolean; function Push(const value): boolean; function IsEmpty: boolean; function IsFull: boolean; end; { IOmniStack } {:Lock-free, single writer, single reader ring buffer. } IOmniQueue = interface ['{AE6454A2-CDB4-43EE-9F1B-5A7307593EE9}'] procedure Empty; procedure Initialize(numElements, elementSize: integer); function Enqueue(const value): boolean; function Dequeue(var value): boolean; function IsEmpty: boolean; function IsFull: boolean; end; { IOmniQueue } IOmniNotifySupport = interface ['{E5FFC739-669A-4931-B0DC-C5005A94A08B}'] function GetNewDataEvent: THandle; // procedure Signal; property NewDataEvent: THandle read GetNewDataEvent; end; { IOmniNotifySupport } TOmniContainerOption = (coEnableMonitor, coEnableNotify); TOmniContainerOptions = set of TOmniContainerOption; PReferencedPtr = ^TReferencedPtr; TReferencedPtr = record PData : pointer; Reference: cardinal; end; { TReferencedPtr } TReferencedPtrBuffer = array [0..MaxInt shr 4] of TReferencedPtr; POmniRingBuffer = ^TOmniRingBuffer; TOmniRingBuffer = packed record FirstIn : TReferencedPtr; LastIn : TReferencedPtr; StartBuffer : pointer; EndBuffer : pointer; Buffer : TReferencedPtrBuffer; end; { TOmniRingBuffer } POmniLinkedData = ^TOmniLinkedData; TOmniLinkedData = packed record Next: POmniLinkedData; Data: record end; //user data, variable size end; { TOmniLinkedData } TOmniBaseStack = class abstract(TInterfacedObject, IOmniStack) strict private obsDataBuffer : pointer; obsElementSize : integer; obsNumElements : integer; obsPublicChainP : PReferencedPtr; obsRecycleChainP: PReferencedPtr; class var obsIsInitialized: boolean; //default is false class var obsTaskPopLoops : cardinal; class var obsTaskPushLoops: cardinal; strict protected procedure MeasureExecutionTimes; class function PopLink(var chain: TReferencedPtr): POmniLinkedData; static; class procedure PushLink(const link: POmniLinkedData; var chain: TReferencedPtr); static; public destructor Destroy; override; procedure Empty; procedure Initialize(numElements, elementSize: integer); virtual; function IsEmpty: boolean; inline; function IsFull: boolean; inline; function Pop(var value): boolean; virtual; function Push(const value): boolean; virtual; property ElementSize: integer read obsElementSize; property NumElements: integer read obsNumElements; end; { TOmniBaseStack } TOmniStack = class(TOmniBaseStack, IOmniNotifySupport, IOmniMonitorSupport) strict private osMonitorSupport: IOmniMonitorSupport; osNotifySupport : IOmniNotifySupport; osOptions : TOmniContainerOptions; public constructor Create(numElements, elementSize: integer; options: TOmniContainerOptions = [coEnableMonitor, coEnableNotify]); function Pop(var value): boolean; override; function Push(const value): boolean; override; property MonitorSupport: IOmniMonitorSupport read osMonitorSupport implements IOmniMonitorSupport; property NotifySupport: IOmniNotifySupport read osNotifySupport implements IOmniNotifySupport; property Options: TOmniContainerOptions read osOptions; end; { TOmniStack } TOmniBaseQueue = class abstract(TInterfacedObject, IOmniQueue) strict private obqDataBuffer : pointer; obqElementSize : integer; obqNumElements : integer; obqPublicRingBuffer : POmniRingBuffer; obqRecycleRingBuffer: POmniRingBuffer; class var obqTaskInsertLoops: cardinal; //default is false class var obqTaskRemoveLoops: cardinal; class var obqIsInitialized : boolean; strict protected class procedure InsertLink(const data: pointer; const ringBuffer: POmniRingBuffer); static; class function RemoveLink(const ringBuffer: POmniRingBuffer): pointer; static; procedure MeasureExecutionTimes; public destructor Destroy; override; function Dequeue(var value): boolean; procedure Empty; function Enqueue(const value): boolean; procedure Initialize(numElements, elementSize: integer); virtual; function IsEmpty: boolean; function IsFull: boolean; property ElementSize: integer read obqElementSize; property NumElements: integer read obqNumElements; end; { TOmniBaseQueue } TOmniQueue = class(TOmniBaseQueue, IOmniNotifySupport, IOmniMonitorSupport) strict private oqMonitorSupport: IOmniMonitorSupport; oqNotifySupport : IOmniNotifySupport; oqOptions : TOmniContainerOptions; public constructor Create(numElements, elementSize: integer; options: TOmniContainerOptions = [coEnableMonitor, coEnableNotify]); function Dequeue(var value): boolean; function Enqueue(const value): boolean; property MonitorSupport: IOmniMonitorSupport read oqMonitorSupport implements IOmniMonitorSupport; property NotifySupport: IOmniNotifySupport read oqNotifySupport implements IOmniNotifySupport; property Options: TOmniContainerOptions read oqOptions; end; { TOmniQueue } implementation uses Windows, SysUtils, DSiWin32; type TOmniNotifySupport = class(TInterfacedObject, IOmniNotifySupport) strict private onsNewDataEvent: TDSiEventHandle; protected function GetNewDataEvent: THandle; public constructor Create; destructor Destroy; override; procedure Signal; property NewData: THandle read GetNewDataEvent; end; { TOmniNotifySupport } { Intel Atomic functions support } function AtomicCmpXchg4b(const Old4b, New4b: cardinal; var Destination): boolean; overload; //ATOMIC FUNCTION //begin // result := Old4b = PCardinal(Destination)^; // if result then // PCardinal(Destination)^ := New4b; //end; asm lock cmpxchg dword ptr [Destination], New4b setz al end; { AtomicCmpXchg4b } function AtomicCmpXchg4b(const Old4b, New4b: pointer; var Destination): boolean; overload; //ATOMIC FUNCTION //begin // result := Old4b = PPointer(Destination)^; // if result then // PPointer(Destination)^ := New4b; //end; asm lock cmpxchg dword ptr [Destination], New4b setz al end; { AtomicCmpXchg4b } function AtomicCmpXchg8b(const OldPData: pointer; OldReference: cardinal; NewPData: pointer; NewReference: cardinal; var Destination: TReferencedPtr): boolean; //ATOMIC FUNCTION //begin // result := (Destination.PData = OldPData) and (Destination.Reference = OldReference); // if result then // begin // Destination.PData := NewPData; // Destination.Reference := NewReference; // end; //end; asm push edi push ebx mov ebx, NewPData mov ecx, NewReference mov edi, Destination lock cmpxchg8b qword ptr [edi] setz al pop ebx pop edi end; { AtomicCmpXchg8b } function GetThreadId: cardinal; //result := GetCurrentThreadId; asm mov eax, fs:[$18] //eax := thread information block mov eax, [eax + $24] //eax := thread id end; { GetThreadId } function GetTimeStamp: int64; //result := QueryPerformanceCounter; asm rdtsc end; { GetTimeStamp } { TOmniNotifySupport } constructor TOmniNotifySupport.Create; begin inherited Create; onsNewDataEvent := CreateEvent(nil, false, false, nil); Win32Check(onsNewDataEvent <> 0); end; { TOmniNotifySupport.Create } destructor TOmniNotifySupport.Destroy; begin DSiCloseHandleAndNull(onsNewDataEvent); inherited Destroy; end; { TOmniNotifySupport.Destroy } function TOmniNotifySupport.GetNewDataEvent: THandle; begin Result := onsNewDataEvent; end; { TOmniNotifySupport.GetNewDataEvent } procedure TOmniNotifySupport.Signal; begin Win32Check(SetEvent(onsNewDataEvent)); end; { TOmniNotifySupport.Signal } { TOmniBaseStack } destructor TOmniBaseStack.Destroy; begin FreeMem(obsPublicChainP); inherited; end; { TOmniBaseStack.Destroy } procedure TOmniBaseStack.Empty; var linkedData: POmniLinkedData; begin repeat linkedData := PopLink(obsPublicChainP^); if not assigned(linkedData) then break; //repeat PushLink(linkedData, obsRecycleChainP^); until false; end; { TOmniBaseStack.Empty } procedure TOmniBaseStack.Initialize(numElements, elementSize: integer); var bufferElementSize: integer; currElement : POmniLinkedData; iElement : integer; nextElement : POmniLinkedData; begin Assert(SizeOf(cardinal) = SizeOf(pointer)); Assert(numElements > 0); Assert(elementSize > 0); obsNumElements := numElements; //calculate element size, round up to next 4-aligned value obsElementSize := (elementSize + 3) AND NOT 3; //calculate buffer element size, round up to next 4-aligned value bufferElementSize := ((SizeOf(TOmniLinkedData) + obsElementSize) + 3) AND NOT 3; //calculate DataBuffer GetMem(obsDataBuffer, bufferElementSize * numElements + 2 * SizeOf(TReferencedPtr)); if cardinal(obsDataBuffer) AND 7 <> 0 then raise Exception.Create('TOmniBaseContainer: obcBuffer is not 8-aligned'); obsPublicChainP := obsDataBuffer; inc(cardinal(obsDataBuffer), SizeOf(TReferencedPtr)); obsRecycleChainP := obsDataBuffer; inc(cardinal(obsDataBuffer), SizeOf(TReferencedPtr)); //Format buffer to recycleChain, init obsRecycleChain and obsPublicChain. //At the beginning, all elements are linked into the recycle chain. obsRecycleChainP^.PData := obsDataBuffer; currElement := obsRecycleChainP^.PData; for iElement := 0 to obsNumElements - 2 do begin nextElement := POmniLinkedData(integer(currElement) + bufferElementSize); currElement.Next := nextElement; currElement := nextElement; end; currElement.Next := nil; // terminate the chain obsPublicChainP^.PData := nil; MeasureExecutionTimes; end; { TOmniBaseStack.Initialize } function TOmniBaseStack.IsEmpty: boolean; begin Result := not assigned(obsPublicChainP^.PData); end; { TOmniBaseStack.IsEmpty } function TOmniBaseStack.IsFull: boolean; begin Result := not assigned(obsRecycleChainP^.PData); end; { TOmniBaseStack.IsFull } procedure TOmniBaseStack.MeasureExecutionTimes; const NumOfSamples = 10; var TimeTestField: array [0..2] of array [1..NumOfSamples] of int64; function GetMinAndClear(Rutine, Count: cardinal): int64; var m: cardinal; n: integer; x: integer; begin result := 0; for m := 1 to Count do begin x:= 1; for n:= 2 to NumOfSamples do if TimeTestField[Rutine, n] < TimeTestField[Rutine, x] then x := n; Inc(result, TimeTestField[Rutine, x]); TimeTestField[Rutine, x] := MaxLongInt; end; end; { GetMinAndClear } var affinity : string; currElement: POmniLinkedData; n : integer; begin { TOmniBaseStack.MeasureExecutionTimes } if not obsIsInitialized then begin affinity := DSiGetThreadAffinity; DSiSetThreadAffinity(affinity[1]); try //Calculate TaskPopDelay and TaskPushDelay counter values depend on CPU speed!!!} obsTaskPopLoops := 1; obsTaskPushLoops := 1; for n := 1 to NumOfSamples do begin //Measure RemoveLink rutine delay TimeTestField[0, n] := GetTimeStamp; currElement := PopLink(obsRecycleChainP^); TimeTestField[0, n] := GetTimeStamp - TimeTestField[0, n]; //Measure InsertLink rutine delay TimeTestField[1, n] := GetTimeStamp; PushLink(currElement, obsRecycleChainP^); TimeTestField[1, n] := GetTimeStamp - TimeTestField[1, n]; //Measure GetTimeStamp rutine delay TimeTestField[2, n] := GetTimeStamp; TimeTestField[2, n] := GetTimeStamp - TimeTestField[2, n]; end; //Calculate first 4 minimum average for GetTimeStamp n := GetMinAndClear(2, 4); //Calculate first 4 minimum average for RemoveLink rutine obsTaskPopLoops := (GetMinAndClear(0, 4) - n) div 2; //Calculate first 4 minimum average for InsertLink rutine obsTaskPushLoops := (GetMinAndClear(1, 4) - n) div 4; obsIsInitialized := true; finally DSiSetThreadAffinity(affinity); end; end; end; { TOmniBaseStack.MeasureExecutionTimes } function TOmniBaseStack.Pop(var value): boolean; var linkedData: POmniLinkedData; begin linkedData := PopLink(obsPublicChainP^); Result := assigned(linkedData); if not Result then Exit; Move(linkedData.Data, value, ElementSize); PushLink(linkedData, obsRecycleChainP^); end; { TOmniBaseStack.Pop } class function TOmniBaseStack.PopLink(var chain: TReferencedPtr): POmniLinkedData; //nil << Link.Next << Link.Next << ... << Link.Next //FILO buffer logic ^------ < chainHead //Advanced stack PopLink model with idle/busy status bit var AtStartReference: cardinal; CurrentReference: cardinal; TaskCounter : cardinal; ThreadReference : cardinal; label TryAgain; begin ThreadReference := GetThreadId + 1; //Reference.bit0 := 1 with chain do begin TryAgain: TaskCounter := obsTaskPopLoops; AtStartReference := Reference OR 1; //Reference.bit0 := 1 repeat CurrentReference := Reference; Dec(TaskCounter); until (TaskCounter = 0) or (CurrentReference AND 1 = 0); if (CurrentReference AND 1 <> 0) and (AtStartReference <> CurrentReference) or not AtomicCmpXchg4b(CurrentReference, ThreadReference, Reference) then goto TryAgain; //Reference is set... result := PData; //Empty test if result = nil then AtomicCmpXchg4b(ThreadReference, 0, Reference) //Clear Reference if task own reference else if not AtomicCmpXchg8b(result, ThreadReference, result.Next, 0, chain) then goto TryAgain; end; end; { TOmniBaseStack.PopLink } function TOmniBaseStack.Push(const value): boolean; var linkedData: POmniLinkedData; begin linkedData := PopLink(obsRecycleChainP^); Result := assigned(linkedData); if not Result then Exit; Move(value, linkedData.Data, ElementSize); PushLink(linkedData, obsPublicChainP^); end; { TOmniBaseStack.Push } class procedure TOmniBaseStack.PushLink(const link: POmniLinkedData; var chain: TReferencedPtr); //Advanced stack PushLink model with idle/busy status bit var PMemData : pointer; TaskCounter: cardinal; begin with chain do begin for TaskCounter := 0 to obsTaskPushLoops do if (Reference AND 1 = 0) then break; repeat PMemData := PData; link.Next := PMemData; until AtomicCmpXchg4b(PMemData, link, PData); end; end; { TOmniBaseStack.PushLink } { TOmniStack } constructor TOmniStack.Create(numElements, elementSize: integer; options: TOmniContainerOptions); begin inherited Create; Initialize(numElements, elementSize); osOptions := options; if coEnableMonitor in Options then osMonitorSupport := CreateOmniMonitorSupport; if coEnableNotify in Options then osNotifySupport := TOmniNotifySupport.Create; end; { TOmniStack.Create } function TOmniStack.Pop(var value): boolean; begin Result := inherited Pop(value); if Result then if coEnableNotify in Options then osNotifySupport.Signal; end; { TOmniStack.Pop } function TOmniStack.Push(const value): boolean; begin Result := inherited Push(value); if Result then begin if coEnableNotify in Options then osNotifySupport.Signal; if coEnableMonitor in Options then osMonitorSupport.Notify; end; end; { TOmniStack.Push } { TOmniBaseQueue } function TOmniBaseQueue.Dequeue(var value): boolean; var Data: pointer; begin Data := RemoveLink(obqPublicRingBuffer); Result := assigned(Data); if not Result then Exit; Move(Data^, value, ElementSize); InsertLink(Data, obqRecycleRingBuffer); end; { TOmniBaseQueue.Dequeue } destructor TOmniBaseQueue.Destroy; begin FreeMem(obqDataBuffer); FreeMem(obqPublicRingBuffer); FreeMem(obqRecycleRingBuffer); inherited; end; { TOmniBaseQueue.Destroy } procedure TOmniBaseQueue.Empty; var Data: pointer; begin repeat Data := RemoveLink(obqPublicRingBuffer); if assigned(Data) then InsertLink(Data, obqRecycleRingBuffer) else break; until false; end; { TOmniBaseQueue.Empty } function TOmniBaseQueue.Enqueue(const value): boolean; var Data: pointer; begin Data := RemoveLink(obqRecycleRingBuffer); Result := assigned(Data); if not Result then Exit; Move(value, Data^, ElementSize); InsertLink(Data, obqPublicRingBuffer); end; { TOmniBaseQueue.Enqueue } procedure TOmniBaseQueue.Initialize(numElements, elementSize: integer); var n : integer; ringBufferSize: cardinal; begin Assert(SizeOf(cardinal) = SizeOf(pointer)); Assert(numElements > 0); Assert(elementSize > 0); obqNumElements := numElements; // calculate element size, round up to next 4-aligned value obqElementSize := (elementSize + 3) AND NOT 3; // allocate obqDataBuffer GetMem(obqDataBuffer, elementSize * numElements + elementSize); // allocate RingBuffers ringBufferSize := SizeOf(TReferencedPtr) * (numElements + 1) + SizeOf(TOmniRingBuffer) - SizeOf(TReferencedPtrBuffer); obqPublicRingBuffer := AllocMem(ringBufferSize); Assert(cardinal(obqPublicRingBuffer) mod 8 = 0, 'TOmniBaseContainer: obcPublicRingBuffer is not 8-aligned'); obqRecycleRingBuffer := AllocMem(ringBufferSize); Assert(cardinal(obqRecycleRingBuffer) mod 8 = 0, 'TOmniBaseContainer: obcRecycleRingBuffer is not 8-aligned'); // set obqPublicRingBuffer head obqPublicRingBuffer.FirstIn.PData := @obqPublicRingBuffer.Buffer[0]; obqPublicRingBuffer.LastIn.PData := @obqPublicRingBuffer.Buffer[0]; obqPublicRingBuffer.StartBuffer := @obqPublicRingBuffer.Buffer[0]; obqPublicRingBuffer.EndBuffer := @obqPublicRingBuffer.Buffer[numElements]; // set obqRecycleRingBuffer head obqRecycleRingBuffer.FirstIn.PData := @obqRecycleRingBuffer.Buffer[0]; obqRecycleRingBuffer.LastIn.PData := @obqRecycleRingBuffer.Buffer[numElements]; obqRecycleRingBuffer.StartBuffer := @obqRecycleRingBuffer.Buffer[0]; obqRecycleRingBuffer.EndBuffer := @obqRecycleRingBuffer.Buffer[numElements]; // format obqRecycleRingBuffer for n := 0 to numElements do obqRecycleRingBuffer.Buffer[n].PData := pointer(cardinal(obqDataBuffer) + cardinal(n * elementSize)); MeasureExecutionTimes; end; { TOmniBaseQueue.Initialize } class procedure TOmniBaseQueue.InsertLink(const data: pointer; const ringBuffer: POmniRingBuffer); //FIFO buffer logic //Insert link to queue model with idle/busy status bit var AtStartReference: cardinal; CurrentLastIn : PReferencedPtr; CurrentReference: cardinal; NewLastIn : PReferencedPtr; TaskCounter : cardinal; ThreadReference : cardinal; label TryAgain; begin ThreadReference := GetThreadId + 1; //Reference.bit0 := 1 with ringBuffer^ do begin TryAgain: TaskCounter := obqTaskInsertLoops; AtStartReference := LastIn.Reference OR 1; //Reference.bit0 := 1 repeat CurrentReference := LastIn.Reference; Dec(TaskCounter); until (TaskCounter = 0) or (CurrentReference AND 1 = 0); if (CurrentReference AND 1 <> 0) and (AtStartReference <> CurrentReference) or not AtomicCmpXchg4b(CurrentReference, ThreadReference, LastIn.Reference) then goto TryAgain; //Reference is set... CurrentLastIn := LastIn.PData; AtomicCmpXchg4b(CurrentLastIn.Reference, ThreadReference, CurrentLastIn.Reference); if (ThreadReference <> LastIn.Reference) or not AtomicCmpXchg8b(CurrentLastIn.PData, ThreadReference, data, ThreadReference, CurrentLastIn^) then goto TryAgain; //Calculate ringBuffer next LastIn address NewLastIn := pointer(cardinal(CurrentLastIn) + SizeOf(TReferencedPtr)); if cardinal(NewLastIn) > cardinal(EndBuffer) then NewLastIn := StartBuffer; //Try to exchange and clear Reference if task own reference if not AtomicCmpXchg8b(CurrentLastIn, ThreadReference, NewLastIn, 0, LastIn) then goto TryAgain; end; end; { TOmniBaseQueue.InsertLink } function TOmniBaseQueue.IsEmpty: boolean; begin Result := (obqPublicRingBuffer.FirstIn.PData = obqPublicRingBuffer.LastIn.PData); end; { TOmniBaseQueue.IsEmpty } function TOmniBaseQueue.IsFull: boolean; var NewLastIn: pointer; begin NewLastIn := pointer(cardinal(obqPublicRingBuffer.LastIn.PData) + SizeOf(TReferencedPtr)); if cardinal(NewLastIn) > cardinal(obqPublicRingBuffer.EndBuffer) then NewLastIn := obqPublicRingBuffer.StartBuffer; result := (cardinal(NewLastIn) > cardinal(obqPublicRingBuffer.LastIn.PData)) or (obqRecycleRingBuffer.FirstIn.PData = obqRecycleRingBuffer.LastIn.PData); end; { TOmniBaseQueue.IsFull } procedure TOmniBaseQueue.MeasureExecutionTimes; const NumOfSamples = 10; var TimeTestField: array [0..2]of array [1..NumOfSamples] of int64; function GetMinAndClear(Rutine, Count: cardinal): int64; var m: cardinal; n: integer; x: integer; begin result := 0; for m := 1 to Count do begin x:= 1; for n:= 2 to NumOfSamples do if TimeTestField[Rutine, n] < TimeTestField[Rutine, x] then x := n; Inc(result, TimeTestField[Rutine, x]); TimeTestField[Rutine, x] := MaxLongInt; end; end; { GetMinAndClear } var affinity : string; currElement: pointer; n : integer; begin { TOmniBaseQueue.MeasureExecutionTimes } if not obqIsInitialized then begin affinity := DSiGetThreadAffinity; DSiSetThreadAffinity(affinity[1]); try //Calculate TaskPopDelay and TaskPushDelay counter values depend on CPU speed!!!} obqTaskRemoveLoops := 1; obqTaskInsertLoops := 1; for n := 1 to NumOfSamples do begin //Measure RemoveLink rutine delay TimeTestField[0, n] := GetTimeStamp; currElement := RemoveLink(obqRecycleRingBuffer); TimeTestField[0, n] := GetTimeStamp - TimeTestField[0, n]; //Measure InsertLink rutine delay TimeTestField[1, n] := GetTimeStamp; InsertLink(currElement, obqRecycleRingBuffer); TimeTestField[1, n] := GetTimeStamp - TimeTestField[1, n]; //Measure GetTimeStamp rutine delay TimeTestField[2, n] := GetTimeStamp; TimeTestField[2, n] := GetTimeStamp - TimeTestField[2, n]; end; //Calculate first 4 minimum average for GetTimeStamp n := GetMinAndClear(2, 4); //Calculate first 4 minimum average for RemoveLink rutine obqTaskRemoveLoops := (GetMinAndClear(0, 4) - n) div 4; //Calculate first 4 minimum average for InsertLink rutine obqTaskInsertLoops := (GetMinAndClear(1, 4) - n) div 4; obqIsInitialized := true; finally DSiSetThreadAffinity(affinity); end; end; end; { TOmniBaseQueue.MeasureExecutionTimes } class function TOmniBaseQueue.RemoveLink(const ringBuffer: POmniRingBuffer): pointer; //FIFO buffer logic //Remove link from queue model with idle/busy status bit var AtStartReference : cardinal; CurrentFirstIn : pointer; CurrentReference : cardinal; NewFirstIn : pointer; Reference : cardinal; TaskCounter : cardinal; label TryAgain; begin Reference := GetThreadId + 1; //Reference.bit0 := 1 with ringBuffer^ do begin TryAgain: TaskCounter := obqTaskRemoveLoops; AtStartReference := FirstIn.Reference OR 1; //Reference.bit0 := 1 repeat CurrentReference := FirstIn.Reference; Dec(TaskCounter); until (TaskCounter = 0) or (CurrentReference AND 1 = 0); if (CurrentReference AND 1 <> 0) and (AtStartReference <> CurrentReference) or not AtomicCmpXchg4b(CurrentReference, Reference, FirstIn.Reference) then goto TryAgain; //Reference is set... CurrentFirstIn := FirstIn.PData; //Empty test if CurrentFirstIn = LastIn.PData then begin //Clear Reference if task own reference AtomicCmpXchg4b(Reference, 0, FirstIn.Reference); Result := nil; Exit; end; //Load Result Result := PReferencedPtr(FirstIn.PData).PData; //Calculate ringBuffer next FirstIn address NewFirstIn := pointer(cardinal(CurrentFirstIn) + SizeOf(TReferencedPtr)); if cardinal(NewFirstIn) > cardinal(EndBuffer) then NewFirstIn := StartBuffer; //Try to exchange and clear Reference if task own reference if not AtomicCmpXchg8b(CurrentFirstIn, Reference, NewFirstIn, 0, FirstIn) then goto TryAgain; end; end; { TOmniBaseQueue.RemoveLink } { TOmniQueue } constructor TOmniQueue.Create(numElements, elementSize: integer; options: TOmniContainerOptions); begin inherited Create; Initialize(numElements, elementSize); oqOptions := options; if coEnableMonitor in Options then oqMonitorSupport := CreateOmniMonitorSupport; if coEnableNotify in Options then oqNotifySupport := TOmniNotifySupport.Create; end; { TOmniQueue.Create } function TOmniQueue.Dequeue(var value): boolean; begin Result := inherited Dequeue(value); if Result then if coEnableNotify in Options then oqNotifySupport.Signal; end; { TOmniQueue.Dequeue } function TOmniQueue.Enqueue(const value): boolean; begin Result := inherited Enqueue(value); if Result then begin if coEnableNotify in Options then oqNotifySupport.Signal; if coEnableMonitor in Options then oqMonitorSupport.Notify; end; end; { TOmniQueue.Enqueue } { initialization } procedure InitializeTimingInfo; var queue: TOmniBaseQueue; stack: TOmniBaseStack; begin stack := TOmniBaseStack.Create; stack.Initialize(10, 4); // enough for initialization FreeAndNil(stack); queue := TOmniBaseQueue.Create; queue.Initialize(10, 4); // enough for initialization FreeAndNil(queue); end; { InitializeTimingInfo } initialization InitializeTimingInfo; end.