///Lock-free containers. Part of the OmniThreadLibrary project.
///Primoz Gabrijelcic, GJ
///
///This software is distributed under the BSD license.
///
///Copyright (c) 2009 Primoz Gabrijelcic
///All rights reserved.
///
///Redistribution and use in source and binary forms, with or without modification,
///are permitted provided that the following conditions are met:
///- Redistributions of source code must retain the above copyright notice, this
/// list of conditions and the following disclaimer.
///- Redistributions in binary form must reproduce the above copyright notice,
/// this list of conditions and the following disclaimer in the documentation
/// and/or other materials provided with the distribution.
///- The name of the Primoz Gabrijelcic may not be used to endorse or promote
/// products derived from this software without specific prior written permission.
///
///THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
///ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
///WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
///DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
///ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
///(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
///LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
///ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
///(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
///SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
///
///
/// Author : GJ, Primoz Gabrijelcic
/// Creation date : 2008-07-13
/// Last modification : 2009-11-11
/// Version : 1.01b
///
/// History:
/// 1.01b: 2009-11-11
/// - [GJ] better fix for the initialization crash.
/// 1.01a: 2009-11-10
/// - Bug fixed: Initialization code could crash with range check error.
/// 1.01: 2008-10-26
/// - [GJ] Redesigned stack with better lock contention.
/// - [GJ] Totally redesigned queue, which is no longer based on stack and allows
/// multiple readers.
///
{$WARN SYMBOL_PLATFORM OFF}
unit OtlContainers;
{$R-,O+,A8}
interface
uses
Classes,
DSiWin32,
GpStuff,
OtlCommon,
OtlSync,
OtlContainerObserver;
const
CPartlyEmptyLoadFactor = 0.8; // When an element count drops below 90%, the container is considered 'partly empty'.
CAlmostFullLoadFactor = 0.9; // When an element count raises above 90%, the container is considered 'almost full'.
type
TOmniContainerSubject = class
strict private
csListLocks : array [TOmniContainerObserverInterest] of TOmniMREW;
csObserverLists: array [TOmniContainerObserverInterest] of TList;
protected
procedure Notify(interest: TOmniContainerObserverInterest);
procedure NotifyOnce(interest: TOmniContainerObserverInterest);
procedure Rearm(interest: TOmniContainerObserverInterest);
public
constructor Create;
destructor Destroy; override;
procedure Attach(const observer: TOmniContainerObserver;
interest: TOmniContainerObserverInterest);
procedure Detach(const observer: TOmniContainerObserver;
interest: TOmniContainerObserverInterest);
end; { TOmniContainerSubject }
{:Lock-free, single writer, single reader, size-limited stack.
}
IOmniStack = interface ['{F4C57327-18A0-44D6-B95D-2D51A0EF32B4}']
procedure Empty;
procedure Initialize(numElements, elementSize: integer);
function IsEmpty: boolean;
function IsFull: boolean;
function Pop(var value): boolean;
function Push(const value): boolean;
end; { IOmniStack }
{:Lock-free, single writer, single reader ring buffer.
}
IOmniQueue = interface ['{AE6454A2-CDB4-43EE-9F1B-5A7307593EE9}']
function Dequeue(var value): boolean;
procedure Empty;
function Enqueue(const value): boolean;
procedure Initialize(numElements, elementSize: integer);
function IsEmpty: boolean;
function IsFull: boolean;
end; { IOmniQueue }
PReferencedPtr = ^TReferencedPtr;
TReferencedPtr = record
PData : pointer;
Reference: cardinal;
end; { TReferencedPtr }
TReferencedPtrBuffer = array [0..MaxInt shr 4] of TReferencedPtr;
POmniRingBuffer = ^TOmniRingBuffer;
TOmniRingBuffer = packed record
FirstIn : TReferencedPtr;
LastIn : TReferencedPtr;
StartBuffer : pointer;
EndBuffer : pointer;
Buffer : TReferencedPtrBuffer;
end; { TOmniRingBuffer }
POmniLinkedData = ^TOmniLinkedData;
TOmniLinkedData = packed record
Next: POmniLinkedData;
Data: record end; //user data, variable size
end; { TOmniLinkedData }
TOmniBaseStack = class abstract(TInterfacedObject, IOmniStack)
strict private
obsDataBuffer : pointer;
obsElementSize : integer;
obsNumElements : integer;
obsPublicChainP : PReferencedPtr;
obsRecycleChainP: PReferencedPtr;
class var obsIsInitialized: boolean; //default is false
class var obsTaskPopLoops : cardinal;
class var obsTaskPushLoops: cardinal;
strict protected
procedure MeasureExecutionTimes;
class function PopLink(var chain: TReferencedPtr): POmniLinkedData; static;
class procedure PushLink(const link: POmniLinkedData; var chain: TReferencedPtr); static;
public
destructor Destroy; override;
procedure Empty;
procedure Initialize(numElements, elementSize: integer); virtual;
function IsEmpty: boolean; inline;
function IsFull: boolean; inline;
function Pop(var value): boolean; virtual;
function Push(const value): boolean; virtual;
property ElementSize: integer read obsElementSize;
property NumElements: integer read obsNumElements;
end; { TOmniBaseStack }
TOmniStack = class(TOmniBaseStack)
strict private
osAlmostFullCount : integer;
osContainerSubject: TOmniContainerSubject;
osInStackCount : TGp4AlignedInt;
osPartlyEmptyCount: integer;
public
constructor Create(numElements, elementSize: integer;
partlyEmptyLoadFactor: real = CPartlyEmptyLoadFactor;
almostFullLoadFactor: real = CAlmostFullLoadFactor);
destructor Destroy; override;
function Pop(var value): boolean; override;
function Push(const value): boolean; override;
property ContainerSubject: TOmniContainerSubject read osContainerSubject;
end; { TOmniStack }
TOmniBaseQueue = class abstract(TInterfacedObject, IOmniQueue)
strict private
obqDataBuffer : pointer;
obqElementSize : integer;
obqNumElements : integer;
obqPublicRingBuffer : POmniRingBuffer;
obqRecycleRingBuffer : POmniRingBuffer;
class var obqTaskInsertLoops: cardinal; //default is false
class var obqTaskRemoveLoops: cardinal;
class var obqIsInitialized : boolean;
strict protected
class procedure InsertLink(const data: pointer; const ringBuffer: POmniRingBuffer);
static;
class function RemoveLink(const ringBuffer: POmniRingBuffer): pointer; static;
procedure MeasureExecutionTimes;
public
destructor Destroy; override;
function Dequeue(var value): boolean;
procedure Empty;
function Enqueue(const value): boolean;
procedure Initialize(numElements, elementSize: integer); virtual;
function IsEmpty: boolean;
function IsFull: boolean;
property ElementSize: integer read obqElementSize;
property NumElements: integer read obqNumElements;
end; { TOmniBaseQueue }
TOmniQueue = class(TOmniBaseQueue)
strict private
oqAlmostFullCount : integer;
oqContainerSubject: TOmniContainerSubject;
oqInQueueCount : TGp4AlignedInt;
oqPartlyEmptyCount: integer;
public
constructor Create(numElements, elementSize: integer;
partlyEmptyLoadFactor: real = CPartlyEmptyLoadFactor;
almostFullLoadFactor: real = CAlmostFullLoadFactor);
destructor Destroy; override;
function Dequeue(var value): boolean;
function Enqueue(const value): boolean;
property ContainerSubject: TOmniContainerSubject read oqContainerSubject;
end; { TOmniQueue }
implementation
uses
Windows,
SysUtils;
{ TOmniContainerSubject }
constructor TOmniContainerSubject.Create;
var
interest: TOmniContainerObserverInterest;
begin
inherited Create;
for interest := Low(TOmniContainerObserverInterest) to High(TOmniContainerObserverInterest) do
csObserverLists[interest] := TList.Create;
end; { TOmniContainerSubject.Create }
destructor TOmniContainerSubject.Destroy;
var
interest: TOmniContainerObserverInterest;
begin
for interest := Low(TOmniContainerObserverInterest) to High(TOmniContainerObserverInterest) do begin
csObserverLists[interest].Free;
csObserverLists[interest] := nil;
end;
inherited;
end; { TOmniContainerSubject.Destroy }
procedure TOmniContainerSubject.Attach(const observer: TOmniContainerObserver;
interest: TOmniContainerObserverInterest);
begin
csListLocks[interest].EnterWriteLock;
try
if csObserverLists[interest].IndexOf(observer) < 0 then
csObserverLists[interest].Add(observer);
finally csListLocks[interest].ExitWriteLock; end;
end; { TOmniContainerSubject.Attach }
procedure TOmniContainerSubject.Detach(const observer: TOmniContainerObserver;
interest: TOmniContainerObserverInterest);
begin
csListLocks[interest].EnterWriteLock;
try
csObserverLists[interest].Remove(observer);
finally csListLocks[interest].ExitWriteLock; end;
end; { TOmniContainerSubject.Detach }
procedure TOmniContainerSubject.Notify(interest: TOmniContainerObserverInterest);
var
iObserver: integer;
list : TList;
begin
{$R-}
csListLocks[interest].EnterReadLock;
try
list := csObserverLists[interest];
for iObserver := 0 to list.Count - 1 do begin
TOmniContainerObserver(list[iObserver]).Notify;
end;
finally csListLocks[interest].ExitReadLock; end;
{$R+}
end; { TOmniContainerSubject.Notify }
procedure TOmniContainerSubject.NotifyOnce(interest: TOmniContainerObserverInterest);
var
iObserver: integer;
list : TList;
observer : TOmniContainerObserver;
begin
{$R-}
csListLocks[interest].EnterReadLock;
try
list := csObserverLists[interest];
for iObserver := 0 to list.Count - 1 do begin
observer := TOmniContainerObserver(list[iObserver]);
if observer.CanNotify then begin
observer.Notify;
observer.Deactivate;
end;
end;
finally csListLocks[interest].ExitReadLock; end;
{$R+}
end; { TOmniContainerSubject.NotifyAndRemove }
procedure TOmniContainerSubject.Rearm(interest: TOmniContainerObserverInterest);
var
iObserver: integer;
list : TList;
begin
{$R-}
csListLocks[interest].EnterReadLock;
try
list := csObserverLists[interest];
for iObserver := 0 to list.Count - 1 do
TOmniContainerObserver(list[iObserver]).Activate;
finally csListLocks[interest].ExitReadLock; end;
{$R+}
end; { TOmniContainerSubject.Rearm }
{ TOmniBaseStack }
destructor TOmniBaseStack.Destroy;
begin
FreeMem(obsPublicChainP);
inherited;
end; { TOmniBaseStack.Destroy }
procedure TOmniBaseStack.Empty;
var
linkedData: POmniLinkedData;
begin
repeat
linkedData := PopLink(obsPublicChainP^);
if not assigned(linkedData) then
break; //repeat
PushLink(linkedData, obsRecycleChainP^);
until false;
end; { TOmniBaseStack.Empty }
procedure TOmniBaseStack.Initialize(numElements, elementSize: integer);
var
bufferElementSize: integer;
currElement : POmniLinkedData;
iElement : integer;
nextElement : POmniLinkedData;
begin
Assert(SizeOf(cardinal) = SizeOf(pointer));
Assert(numElements > 0);
Assert(elementSize > 0);
obsNumElements := numElements;
//calculate element size, round up to next 4-aligned value
obsElementSize := (elementSize + 3) AND NOT 3;
//calculate buffer element size, round up to next 4-aligned value
bufferElementSize := ((SizeOf(TOmniLinkedData) + obsElementSize) + 3) AND NOT 3;
//calculate DataBuffer
GetMem(obsDataBuffer, bufferElementSize * numElements + 2 * SizeOf(TReferencedPtr));
if cardinal(obsDataBuffer) AND 7 <> 0 then
raise Exception.Create('TOmniBaseContainer: obcBuffer is not 8-aligned');
obsPublicChainP := obsDataBuffer;
inc(cardinal(obsDataBuffer), SizeOf(TReferencedPtr));
obsRecycleChainP := obsDataBuffer;
inc(cardinal(obsDataBuffer), SizeOf(TReferencedPtr));
//Format buffer to recycleChain, init obsRecycleChain and obsPublicChain.
//At the beginning, all elements are linked into the recycle chain.
obsRecycleChainP^.PData := obsDataBuffer;
currElement := obsRecycleChainP^.PData;
for iElement := 0 to obsNumElements - 2 do begin
nextElement := POmniLinkedData(integer(currElement) + bufferElementSize);
currElement.Next := nextElement;
currElement := nextElement;
end;
currElement.Next := nil; // terminate the chain
obsPublicChainP^.PData := nil;
MeasureExecutionTimes;
end; { TOmniBaseStack.Initialize }
function TOmniBaseStack.IsEmpty: boolean;
begin
Result := not assigned(obsPublicChainP^.PData);
end; { TOmniBaseStack.IsEmpty }
function TOmniBaseStack.IsFull: boolean;
begin
Result := not assigned(obsRecycleChainP^.PData);
end; { TOmniBaseStack.IsFull }
procedure TOmniBaseStack.MeasureExecutionTimes;
const
NumOfSamples = 10;
var
TimeTestField: array [0..1] of array [1..NumOfSamples] of int64;
function GetMinAndClear(routine, count: cardinal): int64;
var
m: cardinal;
n: integer;
x: integer;
begin
Result := 0;
for m := 1 to count do begin
x:= 1;
for n:= 2 to NumOfSamples do
if TimeTestField[routine, n] < TimeTestField[routine, x] then
x := n;
Inc(Result, TimeTestField[routine, x]);
TimeTestField[routine, x] := MaxLongInt;
end;
end; { GetMinAndClear }
var
affinity : string;
currElement: POmniLinkedData;
n : integer;
begin { TOmniBaseStack.MeasureExecutionTimes }
if not obsIsInitialized then begin
affinity := DSiGetThreadAffinity;
DSiSetThreadAffinity(affinity[1]);
try
//Calculate TaskPopDelay and TaskPushDelay counter values depend on CPU speed!!!}
obsTaskPopLoops := 1;
obsTaskPushLoops := 1;
for n := 1 to NumOfSamples do begin
DSiYield;
//Measure RemoveLink rutine delay
TimeTestField[0, n] := GetCPUTimeStamp;
currElement := PopLink(obsRecycleChainP^);
TimeTestField[0, n] := GetCPUTimeStamp - TimeTestField[0, n];
//Measure InsertLink rutine delay
TimeTestField[1, n] := GetCPUTimeStamp;
PushLink(currElement, obsRecycleChainP^);
TimeTestField[1, n] := GetCPUTimeStamp - TimeTestField[1, n];
end;
//Calculate first 4 minimum average for RemoveLink rutine
obsTaskPopLoops := GetMinAndClear(0, 4) div 4;
//Calculate first 4 minimum average for InsertLink rutine
obsTaskPushLoops := GetMinAndClear(1, 4) div 4;
obsIsInitialized := true;
finally DSiSetThreadAffinity(affinity); end;
end;
end; { TOmniBaseStack.MeasureExecutionTimes }
function TOmniBaseStack.Pop(var value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obsPublicChainP^);
Result := assigned(linkedData);
if not Result then
Exit;
Move(linkedData.Data, value, ElementSize);
PushLink(linkedData, obsRecycleChainP^);
end; { TOmniBaseStack.Pop }
class function TOmniBaseStack.PopLink(var chain: TReferencedPtr): POmniLinkedData;
//nil << Link.Next << Link.Next << ... << Link.Next
//FILO buffer logic ^------ < chainHead
//Advanced stack PopLink model with idle/busy status bit
var
AtStartReference: cardinal;
CurrentReference: cardinal;
TaskCounter : cardinal;
ThreadReference : cardinal;
label
TryAgain;
begin
ThreadReference := GetThreadId + 1; //Reference.bit0 := 1
with chain do begin
TryAgain:
TaskCounter := obsTaskPopLoops;
AtStartReference := Reference OR 1; //Reference.bit0 := 1
repeat
CurrentReference := Reference;
Dec(TaskCounter);
until (TaskCounter = 0) or (CurrentReference AND 1 = 0);
if (CurrentReference AND 1 <> 0) and (AtStartReference <> CurrentReference) or
not CAS32(CurrentReference, ThreadReference, Reference)
then
goto TryAgain;
//Reference is set...
result := PData;
//Empty test
if result = nil then
CAS32(ThreadReference, 0, Reference) //Clear Reference if task own reference
else
if not CAS64(result, ThreadReference, result.Next, 0, chain) then
goto TryAgain;
end;
end; { TOmniBaseStack.PopLink }
function TOmniBaseStack.Push(const value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obsRecycleChainP^);
Result := assigned(linkedData);
if not Result then
Exit;
Move(value, linkedData.Data, ElementSize);
PushLink(linkedData, obsPublicChainP^);
end; { TOmniBaseStack.Push }
class procedure TOmniBaseStack.PushLink(const link: POmniLinkedData; var chain: TReferencedPtr);
//Advanced stack PushLink model with idle/busy status bit
var
PMemData : pointer;
TaskCounter: cardinal;
begin
with chain do begin
for TaskCounter := 0 to obsTaskPushLoops do
if (Reference AND 1 = 0) then
break;
repeat
PMemData := PData;
link.Next := PMemData;
until CAS32(PMemData, link, PData);
end;
end; { TOmniBaseStack.PushLink }
{ TOmniStack }
constructor TOmniStack.Create(numElements, elementSize: integer; partlyEmptyLoadFactor,
almostFullLoadFactor: real);
begin
inherited Create;
Initialize(numElements, elementSize);
osContainerSubject := TOmniContainerSubject.Create;
osInStackCount.Value := 0;
osPartlyEmptyCount := Round(numElements * partlyEmptyLoadFactor);
if osPartlyEmptyCount >= numElements then
osPartlyEmptyCount := numElements - 1;
osAlmostFullCount := Round(numElements * almostFullLoadFactor);
if osAlmostFullCount >= numElements then
osAlmostFullCount := numElements - 1;
end; { TOmniStack.Create }
destructor TOmniStack.Destroy;
begin
FreeAndNil(osContainerSubject);
inherited;
end; { TOmniStack.Destroy }
function TOmniStack.Pop(var value): boolean;
var
countAfter: integer;
begin
Result := inherited Pop(value);
if Result then begin
countAfter := osInStackCount.Decrement; //' range check error??
ContainerSubject.Notify(coiNotifyOnAllRemoves);
if countAfter <= osPartlyEmptyCount then
ContainerSubject.NotifyOnce(coiNotifyOnPartlyEmpty);
end;
end; { TOmniStack.Pop }
function TOmniStack.Push(const value): boolean;
var
countAfter : integer;
begin
Result := inherited Push(value);
if Result then begin
countAfter := osInStackCount.Increment;
ContainerSubject.Notify(coiNotifyOnAllInserts);
if countAfter >= osAlmostFullCount then
ContainerSubject.NotifyOnce(coiNotifyOnAlmostFull);
end;
end; { TOmniStack.Push }
{ TOmniBaseQueue }
function TOmniBaseQueue.Dequeue(var value): boolean;
var
Data: pointer;
begin
Data := RemoveLink(obqPublicRingBuffer);
Result := assigned(Data);
if not Result then
Exit;
Move(Data^, value, ElementSize);
InsertLink(Data, obqRecycleRingBuffer);
end; { TOmniBaseQueue.Dequeue }
destructor TOmniBaseQueue.Destroy;
begin
FreeMem(obqDataBuffer);
FreeMem(obqPublicRingBuffer);
FreeMem(obqRecycleRingBuffer);
inherited;
end; { TOmniBaseQueue.Destroy }
procedure TOmniBaseQueue.Empty;
var
Data: pointer;
begin
repeat
Data := RemoveLink(obqPublicRingBuffer);
if assigned(Data) then
InsertLink(Data, obqRecycleRingBuffer)
else
break;
until false;
end; { TOmniBaseQueue.Empty }
function TOmniBaseQueue.Enqueue(const value): boolean;
var
Data: pointer;
begin
Data := RemoveLink(obqRecycleRingBuffer);
Result := assigned(Data);
if not Result then
Exit;
Move(value, Data^, ElementSize);
InsertLink(Data, obqPublicRingBuffer);
end; { TOmniBaseQueue.Enqueue }
procedure TOmniBaseQueue.Initialize(numElements, elementSize: integer);
var
n : integer;
ringBufferSize: cardinal;
begin
Assert(SizeOf(cardinal) = SizeOf(pointer));
Assert(numElements > 0);
Assert(elementSize > 0);
obqNumElements := numElements;
// calculate element size, round up to next 4-aligned value
obqElementSize := (elementSize + 3) AND NOT 3;
// allocate obqDataBuffer
GetMem(obqDataBuffer, elementSize * numElements + elementSize);
// allocate RingBuffers
ringBufferSize := SizeOf(TReferencedPtr) * (numElements + 1) +
SizeOf(TOmniRingBuffer) - SizeOf(TReferencedPtrBuffer);
obqPublicRingBuffer := AllocMem(ringBufferSize);
Assert(cardinal(obqPublicRingBuffer) mod 8 = 0,
'TOmniBaseContainer: obcPublicRingBuffer is not 8-aligned');
obqRecycleRingBuffer := AllocMem(ringBufferSize);
Assert(cardinal(obqRecycleRingBuffer) mod 8 = 0,
'TOmniBaseContainer: obcRecycleRingBuffer is not 8-aligned');
// set obqPublicRingBuffer head
obqPublicRingBuffer.FirstIn.PData := @obqPublicRingBuffer.Buffer[0];
obqPublicRingBuffer.LastIn.PData := @obqPublicRingBuffer.Buffer[0];
obqPublicRingBuffer.StartBuffer := @obqPublicRingBuffer.Buffer[0];
obqPublicRingBuffer.EndBuffer := @obqPublicRingBuffer.Buffer[numElements];
// set obqRecycleRingBuffer head
obqRecycleRingBuffer.FirstIn.PData := @obqRecycleRingBuffer.Buffer[0];
obqRecycleRingBuffer.LastIn.PData := @obqRecycleRingBuffer.Buffer[numElements];
obqRecycleRingBuffer.StartBuffer := @obqRecycleRingBuffer.Buffer[0];
obqRecycleRingBuffer.EndBuffer := @obqRecycleRingBuffer.Buffer[numElements];
// format obqRecycleRingBuffer
for n := 0 to numElements do
obqRecycleRingBuffer.Buffer[n].PData := pointer(cardinal(obqDataBuffer) + cardinal(n * elementSize));
MeasureExecutionTimes;
end; { TOmniBaseQueue.Initialize }
class procedure TOmniBaseQueue.InsertLink(const data: pointer; const ringBuffer:
POmniRingBuffer);
//FIFO buffer logic
//Insert link to queue model with idle/busy status bit
var
AtStartReference: cardinal;
CurrentLastIn : PReferencedPtr;
CurrentReference: cardinal;
NewLastIn : PReferencedPtr;
TaskCounter : cardinal;
ThreadReference : cardinal;
label
TryAgain;
begin
ThreadReference := GetThreadId + 1; //Reference.bit0 := 1
with ringBuffer^ do begin
TryAgain:
TaskCounter := obqTaskInsertLoops;
AtStartReference := LastIn.Reference OR 1; //Reference.bit0 := 1
repeat
CurrentReference := LastIn.Reference;
Dec(TaskCounter);
until (TaskCounter = 0) or (CurrentReference AND 1 = 0);
if (CurrentReference AND 1 <> 0) and (AtStartReference <> CurrentReference) or
not CAS32(CurrentReference, ThreadReference, LastIn.Reference)
then
goto TryAgain;
//Reference is set...
CurrentLastIn := LastIn.PData;
CAS32(CurrentLastIn.Reference, ThreadReference, CurrentLastIn.Reference);
if (ThreadReference <> LastIn.Reference) or
not CAS64(CurrentLastIn.PData, ThreadReference, data, ThreadReference, CurrentLastIn^)
then
goto TryAgain;
//Calculate ringBuffer next LastIn address
NewLastIn := pointer(cardinal(CurrentLastIn) + SizeOf(TReferencedPtr));
if cardinal(NewLastIn) > cardinal(EndBuffer) then
NewLastIn := StartBuffer;
//Try to exchange and clear Reference if task own reference
if not CAS64(CurrentLastIn, ThreadReference, NewLastIn, 0, LastIn) then
goto TryAgain;
end;
end; { TOmniBaseQueue.InsertLink }
function TOmniBaseQueue.IsEmpty: boolean;
begin
Result := (obqPublicRingBuffer.FirstIn.PData = obqPublicRingBuffer.LastIn.PData);
end; { TOmniBaseQueue.IsEmpty }
function TOmniBaseQueue.IsFull: boolean;
var
NewLastIn: pointer;
begin
NewLastIn := pointer(cardinal(obqPublicRingBuffer.LastIn.PData) + SizeOf(TReferencedPtr));
if cardinal(NewLastIn) > cardinal(obqPublicRingBuffer.EndBuffer) then
NewLastIn := obqPublicRingBuffer.StartBuffer;
result := (cardinal(NewLastIn) = cardinal(obqPublicRingBuffer.LastIn.PData)) or
(obqRecycleRingBuffer.FirstIn.PData = obqRecycleRingBuffer.LastIn.PData);
end; { TOmniBaseQueue.IsFull }
procedure TOmniBaseQueue.MeasureExecutionTimes;
const
NumOfSamples = 10;
var
TimeTestField: array [0..1] of array [1..NumOfSamples] of int64;
function GetMinAndClear(routine, count: cardinal): int64;
var
m: cardinal;
n: integer;
x: integer;
begin
Result := 0;
for m := 1 to count do begin
x:= 1;
for n:= 2 to NumOfSamples do
if TimeTestField[routine, n] < TimeTestField[routine, x] then
x := n;
Inc(Result, TimeTestField[routine, x]);
TimeTestField[routine, x] := MaxLongInt;
end;
end; { GetMinAndClear }
var
affinity : string;
currElement: pointer;
n : integer;
begin { TOmniBaseQueue.MeasureExecutionTimes }
if not obqIsInitialized then begin
affinity := DSiGetThreadAffinity;
DSiSetThreadAffinity(affinity[1]);
try
//Calculate TaskPopDelay and TaskPushDelay counter values depend on CPU speed!!!}
obqTaskRemoveLoops := 1;
obqTaskInsertLoops := 1;
for n := 1 to NumOfSamples do begin
DSiYield;
//Measure RemoveLink rutine delay
TimeTestField[0, n] := GetCPUTimeStamp;
currElement := RemoveLink(obqRecycleRingBuffer);
TimeTestField[0, n] := GetCPUTimeStamp - TimeTestField[0, n];
//Measure InsertLink rutine delay
TimeTestField[1, n] := GetCPUTimeStamp;
InsertLink(currElement, obqRecycleRingBuffer);
TimeTestField[1, n] := GetCPUTimeStamp - TimeTestField[1, n];
end;
obqTaskRemoveLoops := GetMinAndClear(0, 4) div 4;
obqTaskInsertLoops := GetMinAndClear(1, 4) div 4;
obqIsInitialized := true;
finally DSiSetThreadAffinity(affinity); end;
end;
end; { TOmniBaseQueue.MeasureExecutionTimes }
class function TOmniBaseQueue.RemoveLink(const ringBuffer: POmniRingBuffer): pointer;
//FIFO buffer logic
//Remove link from queue model with idle/busy status bit
var
AtStartReference : cardinal;
CurrentFirstIn : pointer;
CurrentReference : cardinal;
NewFirstIn : pointer;
Reference : cardinal;
TaskCounter : cardinal;
label
TryAgain;
begin
Reference := GetThreadId + 1; //Reference.bit0 := 1
with ringBuffer^ do begin
TryAgain:
TaskCounter := obqTaskRemoveLoops;
AtStartReference := FirstIn.Reference OR 1; //Reference.bit0 := 1
repeat
CurrentReference := FirstIn.Reference;
Dec(TaskCounter);
until (TaskCounter = 0) or (CurrentReference AND 1 = 0);
if (CurrentReference AND 1 <> 0) and (AtStartReference <> CurrentReference) or
not CAS32(CurrentReference, Reference, FirstIn.Reference)
then
goto TryAgain;
//Reference is set...
CurrentFirstIn := FirstIn.PData;
//Empty test
if CurrentFirstIn = LastIn.PData then begin
//Clear Reference if task own reference
CAS32(Reference, 0, FirstIn.Reference);
Result := nil;
Exit;
end;
//Load Result
Result := PReferencedPtr(FirstIn.PData).PData;
//Calculate ringBuffer next FirstIn address
NewFirstIn := pointer(cardinal(CurrentFirstIn) + SizeOf(TReferencedPtr));
if cardinal(NewFirstIn) > cardinal(EndBuffer) then
NewFirstIn := StartBuffer;
//Try to exchange and clear Reference if task own reference
if not CAS64(CurrentFirstIn, Reference, NewFirstIn, 0, FirstIn) then
goto TryAgain;
end;
end; { TOmniBaseQueue.RemoveLink }
{ TOmniQueue }
constructor TOmniQueue.Create(numElements, elementSize: integer; partlyEmptyLoadFactor,
almostFullLoadFactor: real);
begin
inherited Create;
oqContainerSubject := TOmniContainerSubject.Create;
oqInQueueCount.Value := 0;
oqPartlyEmptyCount := Round(numElements * partlyEmptyLoadFactor);
if oqPartlyEmptyCount >= numElements then
oqPartlyEmptyCount := numElements - 1;
oqAlmostFullCount := Round(numElements * almostFullLoadFactor);
if oqAlmostFullCount >= numElements then
oqAlmostFullCount := numElements - 1;
Initialize(numElements, elementSize);
end; { TOmniQueue.Create }
destructor TOmniQueue.Destroy;
begin
FreeAndNil(oqContainerSubject);
inherited;
end; { TOmniQueue.Destroy }
function TOmniQueue.Dequeue(var value): boolean;
var
countAfter: integer;
begin
Result := inherited Dequeue(value);
if Result then begin
countAfter := oqInQueueCount.Decrement;
ContainerSubject.Notify(coiNotifyOnAllRemoves);
if countAfter <= oqPartlyEmptyCount then
ContainerSubject.NotifyOnce(coiNotifyOnPartlyEmpty);
end;
end; { TOmniQueue.Dequeue }
function TOmniQueue.Enqueue(const value): boolean;
var
countAfter: integer;
begin
Result := inherited Enqueue(value);
if Result then begin
countAfter := oqInQueueCount.Increment;
ContainerSubject.Notify(coiNotifyOnAllInserts);
if countAfter >= oqAlmostFullCount then
ContainerSubject.NotifyOnce(coiNotifyOnAlmostFull);
end;
end; { TOmniQueue.Enqueue }
{ initialization }
procedure InitializeTimingInfo;
var
queue: TOmniBaseQueue;
stack: TOmniBaseStack;
begin
stack := TOmniBaseStack.Create;
stack.Initialize(10, 4); // enough for initialization
FreeAndNil(stack);
queue := TOmniBaseQueue.Create;
queue.Initialize(10, 4); // enough for initialization
FreeAndNil(queue);
end; { InitializeTimingInfo }
initialization
InitializeTimingInfo;
end.