///Microlocking containers. Part of the OmniThreadLibrary project.
///TOmni[Base]Queue requires Pentium 4 processor (or newer) unless OTL_OLDCPU is defined.
///Primoz Gabrijelcic, GJ
///
///This software is distributed under the BSD license.
///
///Copyright (c) 2011 Primoz Gabrijelcic
///All rights reserved.
///
///Redistribution and use in source and binary forms, with or without modification,
///are permitted provided that the following conditions are met:
///- Redistributions of source code must retain the above copyright notice, this
/// list of conditions and the following disclaimer.
///- Redistributions in binary form must reproduce the above copyright notice,
/// this list of conditions and the following disclaimer in the documentation
/// and/or other materials provided with the distribution.
///- The name of the Primoz Gabrijelcic may not be used to endorse or promote
/// products derived from this software without specific prior written permission.
///
///THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
///ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
///WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
///DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
///ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
///(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
///LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
///ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
///(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
///SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
///
///
/// Author : GJ, Primoz Gabrijelcic
/// Creation date : 2008-07-13
/// Last modification : 2011-12-18
/// Version : 3.0
///
/// History:
/// 3.0: 2011-12-19
/// - [GJ] Implemented 64-bit TOmni[Base]Bounded(Queue|Stack).
/// - Fixed TOmni[Base]Queue to work in 64-bit world.
/// 2.05: 2011-08-26
/// - Implemented TOmni[Base]Queue.IsEmpty. Keep in mind that the returned value may
/// not be valid for any ammount of time if other threads are reading from/
/// writing to the queue.
/// 2.04: 2010-07-01
/// - Includes OTLOptions.inc.
/// 2.03a: 2010-05-06
/// - Fixed memory leak in TOmni[Base]Queue when queueing String, WideString,
/// Variant and Extended values.
/// 2.03: 2010-02-18
/// - Reversed head and tail because they were used illogically.
/// 2.02a: 2010-02-09
/// - Dynamically allocate head/tail structures so that they are allways 8-allocated.
/// - Optimized algorithm using atomic move instead of atomic compare-and-swap in
/// some places (thanks to GJ).
/// 2.02: 2010-02-08
/// - New ABA- and MREW-free dynamic queue algorithm.
/// - Dynamic queue parameters are settable in the constructor.
/// 2.01: 2010-02-04
/// - Uses CAS8 instead of CAS32.
/// 2.0: 2009-12-25
/// - Implemented dynamically allocated, O(1) enqueue and dequeue, threadsafe,
/// microlocking queue. Class TOmniBaseQueue contains base implementation
/// while TOmniQueue adds notification support.
/// - Big class rename: TOmniBaseStack -> TOmniBaseBoundedStack,
/// TOmniStack -> TOmniBoundedStack, TOmniBaseQueue -> TOmniBaseBoundedQueue,
/// TOmniQueue -> TOmniBoundedQueue.
/// 1.02: 2009-12-22
/// - TOmniContainerSubject moved into OtlContainerObserver because it will also be
/// used in OtlCollections.
/// 1.01b: 2009-11-11
/// - [GJ] better fix for the initialization crash.
/// 1.01a: 2009-11-10
/// - Bug fixed: Initialization code could crash with range check error.
/// 1.01: 2008-10-26
/// - [GJ] Redesigned stack with better lock contention.
/// - [GJ] Totally redesigned queue, which is no longer based on stack and allows
/// multiple readers.
///
unit OtlContainers;
{$I OtlOptions.inc}
{$OPTIMIZATION ON}
{$WARN SYMBOL_PLATFORM OFF}
{$IFNDEF CPUX64}
{$DEFINE OTL_OLDCPU} // undefine if you're sure your code will only run on a CPU that supports SSE2 instruction set (more specifically, Move64 instruction)
{$ENDIF ~CPUX64}
//DEFINE DEBUG_OMNI_QUEUE to enable assertions in TOmniBaseQueue
interface
uses
Classes,
DSiWin32,
GpStuff,
OtlCommon,
OtlSync,
OtlContainerObserver;
const
CPartlyEmptyLoadFactor = 0.8; // When an element count drops below 80%, the container is considered 'partly empty'.
CAlmostFullLoadFactor = 0.9; // When an element count raises above 90%, the container is considered 'almost full'.
type
{$IF CompilerVersion < 23} //pre-XE2
NativeInt = integer;
{$IFEND}
{:Lock-free, multiple writer, multiple reader, bounded stack.
}
IOmniStack = interface ['{F4C57327-18A0-44D6-B95D-2D51A0EF32B4}']
procedure Empty;
procedure Initialize(numElements, elementSize: integer);
function IsEmpty: boolean;
function IsFull: boolean;
function Pop(var value): boolean;
function Push(const value): boolean;
end; { IOmniStack }
{:Lock-free, multiple writer, multiple reader ring buffer (bounded queue).
}
IOmniQueue = interface ['{AE6454A2-CDB4-43EE-9F1B-5A7307593EE9}']
function Dequeue(var value): boolean;
procedure Empty;
function Enqueue(const value): boolean;
procedure Initialize(numElements, elementSize: integer);
function IsEmpty: boolean;
function IsFull: boolean;
end; { IOmniQueue }
PReferencedPtr = ^TReferencedPtr;
TReferencedPtr = record
PData : pointer;
Reference: NativeInt;
end; { TReferencedPtr }
TReferencedPtrBuffer = array [0..MaxInt shr 5] of TReferencedPtr;
POmniRingBuffer = ^TOmniRingBuffer;
TOmniRingBuffer = packed record
FirstIn : TReferencedPtr;
Dummy : array[1..128 - SizeOf(TReferencedPtr)] of byte; // push LastIn into next cache line
LastIn : TReferencedPtr;
StartBuffer : pointer;
EndBuffer : pointer;
Buffer : TReferencedPtrBuffer;
end; { TOmniRingBuffer }
POmniLinkedData = ^TOmniLinkedData;
TOmniLinkedData = packed record
Next: POmniLinkedData;
Data: record end; //user data, variable size
end; { TOmniLinkedData }
TOmniBaseBoundedStack = class abstract(TInterfacedObject, IOmniStack)
strict private
obsDataBuffer : pointer;
obsElementSize : integer;
obsNumElements : integer;
obsPublicChainP : PReferencedPtr;
obsRecycleChainP: PReferencedPtr;
class var
class var obsIsInitialized: boolean; //default is false
class var obsTaskPopLoops : NativeInt;
class var obsTaskPushLoops: NativeInt;
strict protected
procedure MeasureExecutionTimes;
class function PopLink(var chain: TReferencedPtr): POmniLinkedData; static;
class procedure PushLink(const link: POmniLinkedData; var chain: TReferencedPtr); static;
public
destructor Destroy; override;
procedure Empty;
procedure Initialize(numElements, elementSize: integer); virtual;
function IsEmpty: boolean; inline;
function IsFull: boolean; inline;
function Pop(var value): boolean;
function Push(const value): boolean;
property ElementSize: integer read obsElementSize;
property NumElements: integer read obsNumElements;
end; { TOmniBaseBoundedStack }
TOmniBoundedStack = class(TOmniBaseBoundedStack)
strict private
osAlmostFullCount : integer;
osContainerSubject: TOmniContainerSubject;
osInStackCount : TGp4AlignedInt;
osPartlyEmptyCount: integer;
public
constructor Create(numElements, elementSize: integer;
partlyEmptyLoadFactor: real = CPartlyEmptyLoadFactor;
almostFullLoadFactor: real = CAlmostFullLoadFactor);
destructor Destroy; override;
function Pop(var value): boolean;
function Push(const value): boolean;
property ContainerSubject: TOmniContainerSubject read osContainerSubject;
end; { TOmniBoundedStack }
TOmniBaseBoundedQueue = class abstract(TInterfacedObject, IOmniQueue)
strict private
obqDataBuffer : pointer;
obqElementSize : integer;
obqNumElements : integer;
obqPublicRingBuffer : POmniRingBuffer;
obqRecycleRingBuffer: POmniRingBuffer;
class var
class var obqIsInitialized : boolean;
class var obqTaskInsertLoops: NativeInt; //default is false
class var obqTaskRemoveLoops: NativeInt;
strict protected
class procedure InsertLink(const data: pointer; const ringBuffer: POmniRingBuffer);
static;
procedure MeasureExecutionTimes;
class function RemoveLink(const ringBuffer: POmniRingBuffer): pointer; static;
public
destructor Destroy; override;
function Dequeue(var value): boolean;
procedure Empty;
function Enqueue(const value): boolean;
procedure Initialize(numElements, elementSize: integer); virtual;
function IsEmpty: boolean;
function IsFull: boolean;
property ElementSize: integer read obqElementSize;
property NumElements: integer read obqNumElements;
end; { TOmniBaseBoundedQueue }
TOmniBoundedQueue = class(TOmniBaseBoundedQueue)
strict private
oqAlmostFullCount : integer;
oqContainerSubject: TOmniContainerSubject;
oqInQueueCount : TGp4AlignedInt;
oqPartlyEmptyCount: integer;
public
constructor Create(numElements, elementSize: integer;
partlyEmptyLoadFactor: real = CPartlyEmptyLoadFactor;
almostFullLoadFactor: real = CAlmostFullLoadFactor);
destructor Destroy; override;
function Dequeue(var value): boolean;
function Enqueue(const value): boolean;
property ContainerSubject: TOmniContainerSubject read oqContainerSubject;
end; { TOmniBoundedQueue }
TOmniQueueTag = (tagFree, tagAllocating, tagAllocated, tagRemoving,
tagEndOfList, tagExtending, tagBlockPointer, tagDestroying, tagHeader,
tagSentinel);
TOmniTaggedValue = packed record
Value : TOmniValue; //aligned for faster data access; overlaps with header's unreleased slot count, which must be pointer-aligned
Tag : TOmniQueueTag;
Offset : word;
{$IFDEF CPUX64}
Stuffing: array[1..4] of byte; // make TOmniTaggedValue 3 pointers big
{$ENDIF CPUX64}
function CASTag(oldTag, newTag: TOmniQueueTag): boolean; inline;
end; { TOmniTaggedValue }
POmniTaggedValue = ^TOmniTaggedValue;
TOmniTaggedPointer = packed record
Slot : POmniTaggedValue;
Tag : TOmniQueueTag;
{$IFNDEF CPUX64}
Stuffing: array [1..3] of byte; // record size must be congruent to 0 (mod 4)
{$ELSE CPUX64}
Stuffing: array [1..7] of byte; // record size must be congruent to 0 (mod 8)
{$ENDIF CPUX64}
function CAS(oldSlot: POmniTaggedValue; oldTag: TOmniQueueTag;
newSlot: POmniTaggedValue; newTag: TOmniQueueTag): boolean; inline;
procedure Move(newSlot: POmniTaggedValue; newTag: TOmniQueueTag); inline;
end; { TOmniTaggedPointer }
POmniTaggedPointer = ^TOmniTaggedPointer;
///Dynamically allocated, O(1) enqueue and dequeue, threadsafe, microlocking queue.
TOmniBaseQueue = class
strict private // keep 4-aligned
obcCachedBlock: POmniTaggedValue;
strict private
obcBlockSize : integer;
obcHeadPointer: POmniTaggedPointer;
obcMemStack : TOmniBaseBoundedStack;
obcNumSlots : integer;
obcTailPointer: POmniTaggedPointer;
strict protected
{$IFDEF DEBUG_OMNI_QUEUE}
procedure Assert(condition: boolean);
{$ENDIF DEBUG_OMNI_QUEUE}
strict protected
function AllocateBlock: POmniTaggedValue;
procedure Cleanup; virtual;
procedure Initialize; virtual;
function NextSlot(slot: POmniTaggedValue): POmniTaggedValue; inline;
procedure PartitionMemory(memory: POmniTaggedValue);
procedure PreallocateMemory;
function PrevSlot(slot: POmniTaggedValue): POmniTaggedValue; inline;
procedure ReleaseBlock(firstSlot: POmniTaggedValue; forceFree: boolean = false);
public
constructor Create(blockSize: integer = 65536; numCachedBlocks: integer = 4);
destructor Destroy; override;
function Dequeue: TOmniValue;
procedure Enqueue(const value: TOmniValue);
function IsEmpty: boolean;
function TryDequeue(var value: TOmniValue): boolean;
end; { TOmniBaseQueue }
TOmniQueue = class(TOmniBaseQueue)
strict private
ocContainerSubject: TOmniContainerSubject;
strict protected
procedure Cleanup; override;
procedure Initialize; override;
public
function Dequeue: TOmniValue;
procedure Enqueue(const value: TOmniValue);
function TryDequeue(var value: TOmniValue): boolean;
property ContainerSubject: TOmniContainerSubject read ocContainerSubject;
end; { TOmniQueue }
implementation
uses
Windows,
SysUtils;
{$IFDEF CPUX64}
procedure AsmInt3;
asm
.noframe
int 3
end; { AsmInt3 }
procedure AsmPause;
asm
.noframe
pause;
end; { AsmPause }
{$ENDIF CPUX64}
{ TOmniBaseBoundedStack }
destructor TOmniBaseBoundedStack.Destroy;
begin
FreeMem(obsPublicChainP);
inherited;
end; { TOmniBaseBoundedStack.Destroy }
procedure TOmniBaseBoundedStack.Empty;
var
linkedData: POmniLinkedData;
begin
repeat
linkedData := PopLink(obsPublicChainP^);
if not assigned(linkedData) then
break; //repeat
PushLink(linkedData, obsRecycleChainP^);
until false;
end; { TOmniBaseBoundedStack.Empty }
procedure TOmniBaseBoundedStack.Initialize(numElements, elementSize: integer);
var
bufferElementSize : integer;
currElement : POmniLinkedData;
iElement : integer;
nextElement : POmniLinkedData;
roundedElementSize: integer;
begin
Assert(SizeOf(NativeInt) = SizeOf(pointer));
Assert(numElements > 0);
Assert(elementSize > 0);
obsNumElements := numElements;
obsElementSize := elementSize;
//calculate element size, round up to next aligned value
roundedElementSize := (elementSize + SizeOf(pointer) - 1) AND NOT (SizeOf(pointer) - 1);
//calculate buffer element size, round up to next aligned value
bufferElementSize := ((SizeOf(TOmniLinkedData) + roundedElementSize) + SizeOf(pointer) - 1) AND NOT (SizeOf(pointer) - 1);
//calculate DataBuffer
GetMem(obsDataBuffer, bufferElementSize * numElements + 2 * SizeOf(TReferencedPtr));
if NativeInt(obsDataBuffer) AND (SizeOf(pointer) - 1) <> 0 then
raise Exception.Create('TOmniBaseContainer: obcBuffer is not aligned');
obsPublicChainP := obsDataBuffer;
inc(NativeInt(obsDataBuffer), SizeOf(TReferencedPtr));
obsRecycleChainP := obsDataBuffer;
inc(NativeInt(obsDataBuffer), SizeOf(TReferencedPtr));
//Format buffer to recycleChain, init obsRecycleChain and obsPublicChain.
//At the beginning, all elements are linked into the recycle chain.
obsRecycleChainP^.PData := obsDataBuffer;
currElement := obsRecycleChainP^.PData;
for iElement := 0 to obsNumElements - 2 do begin
nextElement := POmniLinkedData(NativeInt(currElement) + bufferElementSize);
currElement.Next := nextElement;
currElement := nextElement;
end;
currElement.Next := nil; // terminate the chain
obsPublicChainP^.PData := nil;
MeasureExecutionTimes;
end; { TOmniBaseBoundedStack.Initialize }
function TOmniBaseBoundedStack.IsEmpty: boolean;
begin
Result := not assigned(obsPublicChainP^.PData);
end; { TOmniBaseBoundedStack.IsEmpty }
function TOmniBaseBoundedStack.IsFull: boolean;
begin
Result := not assigned(obsRecycleChainP^.PData);
end; { TOmniBaseBoundedStack.IsFull }
procedure TOmniBaseBoundedStack.MeasureExecutionTimes;
const
NumOfSamples = 10;
var
TimeTestField: array [0..1] of array [1..NumOfSamples] of int64;
function GetMinAndClear(routine, count: cardinal): int64;
var
m: cardinal;
n: integer;
x: integer;
begin
Result := 0;
for m := 1 to count do begin
x:= 1;
for n:= 2 to NumOfSamples do
if TimeTestField[routine, n] < TimeTestField[routine, x] then
x := n;
Inc(Result, TimeTestField[routine, x]);
TimeTestField[routine, x] := MaxLongInt;
end;
end; { GetMinAndClear }
var
affinity : string;
currElement: POmniLinkedData;
n : integer;
begin { TOmniBaseBoundedStack.MeasureExecutionTimes }
if not obsIsInitialized then begin
affinity := DSiGetThreadAffinity;
DSiSetThreadAffinity(affinity[1]);
try
//Calculate TaskPopDelay and TaskPushDelay counter values depend on CPU speed!!!}
obsTaskPopLoops := 1;
obsTaskPushLoops := 1;
for n := 1 to NumOfSamples do begin
DSiYield;
//Measure RemoveLink rutine delay
TimeTestField[0, n] := GetCPUTimeStamp;
currElement := PopLink(obsRecycleChainP^);
TimeTestField[0, n] := GetCPUTimeStamp - TimeTestField[0, n];
//Measure InsertLink rutine delay
TimeTestField[1, n] := GetCPUTimeStamp;
PushLink(currElement, obsRecycleChainP^);
TimeTestField[1, n] := GetCPUTimeStamp - TimeTestField[1, n];
end;
//Calculate first 4 minimum average for RemoveLink rutine
obsTaskPopLoops := GetMinAndClear(0, 4) div 4;
//Calculate first 4 minimum average for InsertLink rutine
obsTaskPushLoops := GetMinAndClear(1, 4) div 4;
obsIsInitialized := true;
finally DSiSetThreadAffinity(affinity); end;
end;
end; { TOmniBaseBoundedStack.MeasureExecutionTimes }
function TOmniBaseBoundedStack.Pop(var value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obsPublicChainP^);
Result := assigned(linkedData);
if not Result then
Exit;
Move(linkedData.Data, value, ElementSize);
PushLink(linkedData, obsRecycleChainP^);
end; { TOmniBaseBoundedStack.Pop }
class function TOmniBaseBoundedStack.PopLink(var chain: TReferencedPtr): POmniLinkedData;
//nil << Link.Next << Link.Next << ... << Link.Next
//FILO buffer logic ^------ < chainHead
//Advanced stack PopLink model with idle/busy status bit
var
AtStartReference: NativeInt;
CurrentReference: NativeInt;
TaskCounter : NativeInt;
ThreadReference : NativeInt;
label
TryAgain;
begin
ThreadReference := GetThreadId + 1; //Reference.bit0 := 1
with chain do begin
TryAgain:
TaskCounter := obsTaskPopLoops;
AtStartReference := Reference OR 1; //Reference.bit0 := 1
repeat
CurrentReference := Reference;
Dec(TaskCounter);
until (TaskCounter = 0) or (CurrentReference AND 1 = 0);
if (CurrentReference AND 1 <> 0) and (AtStartReference <> CurrentReference) or
not CAS(CurrentReference, ThreadReference, Reference)
then
goto TryAgain;
//Reference is set...
result := PData;
//Empty test
if result = nil then
CAS(ThreadReference, 0, Reference) //Clear Reference if task own reference
else
if not CAS(result, ThreadReference, result.Next, 0, chain) then
goto TryAgain;
end;
end; { TOmniBaseBoundedStack.PopLink }
function TOmniBaseBoundedStack.Push(const value): boolean;
var
linkedData: POmniLinkedData;
begin
linkedData := PopLink(obsRecycleChainP^);
Result := assigned(linkedData);
if not Result then
Exit;
Move(value, linkedData.Data, ElementSize);
PushLink(linkedData, obsPublicChainP^);
end; { TOmniBaseBoundedStack.Push }
class procedure TOmniBaseBoundedStack.PushLink(const link: POmniLinkedData; var chain: TReferencedPtr);
//Advanced stack PushLink model with idle/busy status bit
var
PMemData : pointer;
TaskCounter: NativeInt;
begin
with chain do begin
for TaskCounter := 0 to obsTaskPushLoops do
if (Reference AND 1 = 0) then
break;
repeat
PMemData := PData;
link.Next := PMemData;
until CAS(PMemData, link, PData);
end;
end; { TOmniBaseBoundedStack.PushLink }
{ TOmniBoundedStack }
constructor TOmniBoundedStack.Create(numElements, elementSize: integer; partlyEmptyLoadFactor,
almostFullLoadFactor: real);
begin
inherited Create;
Initialize(numElements, elementSize);
osContainerSubject := TOmniContainerSubject.Create;
osInStackCount.Value := 0;
osPartlyEmptyCount := Round(numElements * partlyEmptyLoadFactor);
if osPartlyEmptyCount >= numElements then
osPartlyEmptyCount := numElements - 1;
osAlmostFullCount := Round(numElements * almostFullLoadFactor);
if osAlmostFullCount >= numElements then
osAlmostFullCount := numElements - 1;
end; { TOmniBoundedStack.Create }
destructor TOmniBoundedStack.Destroy;
begin
FreeAndNil(osContainerSubject);
inherited;
end; { TOmniBoundedStack.Destroy }
function TOmniBoundedStack.Pop(var value): boolean;
var
countAfter: integer;
begin
Result := inherited Pop(value);
if Result then begin
countAfter := osInStackCount.Decrement;
ContainerSubject.Notify(coiNotifyOnAllRemoves);
if countAfter <= osPartlyEmptyCount then
ContainerSubject.NotifyOnce(coiNotifyOnPartlyEmpty);
end;
end; { TOmniBoundedStack.Pop }
function TOmniBoundedStack.Push(const value): boolean;
var
countAfter: integer;
begin
Result := inherited Push(value);
if Result then begin
countAfter := osInStackCount.Increment;
ContainerSubject.Notify(coiNotifyOnAllInserts);
if countAfter >= osAlmostFullCount then
ContainerSubject.NotifyOnce(coiNotifyOnAlmostFull);
end;
end; { TOmniBoundedStack.Push }
{ TOmniBaseBoundedQueue }
destructor TOmniBaseBoundedQueue.Destroy;
begin
FreeMem(obqDataBuffer);
FreeMem(obqPublicRingBuffer);
FreeMem(obqRecycleRingBuffer);
inherited;
end; { TOmniBaseBoundedQueue.Destroy }
function TOmniBaseBoundedQueue.Dequeue(var value): boolean;
var
Data: pointer;
begin
Data := RemoveLink(obqPublicRingBuffer);
Result := assigned(Data);
if not Result then
Exit;
Move(Data^, value, ElementSize);
InsertLink(Data, obqRecycleRingBuffer);
end; { TOmniBaseBoundedQueue.Dequeue }
procedure TOmniBaseBoundedQueue.Empty;
var
Data: pointer;
begin
repeat
Data := RemoveLink(obqPublicRingBuffer);
if assigned(Data) then
InsertLink(Data, obqRecycleRingBuffer)
else
break;
until false;
end; { TOmniBaseBoundedQueue.Empty }
function TOmniBaseBoundedQueue.Enqueue(const value): boolean;
var
Data: pointer;
begin
Data := RemoveLink(obqRecycleRingBuffer);
Result := assigned(Data);
if not Result then
Exit;
Move(value, Data^, ElementSize);
InsertLink(Data, obqPublicRingBuffer);
end; { TOmniBaseBoundedQueue.Enqueue }
procedure TOmniBaseBoundedQueue.Initialize(numElements, elementSize: integer);
var
n : integer;
ringBufferSize : cardinal;
roundedElementSize: integer;
begin
Assert(SizeOf(NativeInt) = SizeOf(pointer));
Assert(numElements > 0);
Assert(elementSize > 0);
obqNumElements := numElements;
obqElementSize := elementSize;
// calculate element size, round up to next aligned value
roundedElementSize := (elementSize + SizeOf(pointer) - 1) AND NOT (SizeOf(pointer) - 1);
// allocate obqDataBuffer
GetMem(obqDataBuffer, roundedElementSize * numElements + roundedElementSize);
// allocate RingBuffers
ringBufferSize := SizeOf(TReferencedPtr) * (numElements + 1) +
SizeOf(TOmniRingBuffer) - SizeOf(TReferencedPtrBuffer);
obqPublicRingBuffer := AllocMem(ringBufferSize);
Assert(NativeInt(obqPublicRingBuffer) mod (SizeOf(pointer) * 2) = 0,
'TOmniBaseContainer: obcPublicRingBuffer is not 8-aligned');
obqRecycleRingBuffer := AllocMem(ringBufferSize);
Assert(NativeInt(obqRecycleRingBuffer) mod (SizeOf(pointer) * 2) = 0,
'TOmniBaseContainer: obcRecycleRingBuffer is not 8-aligned');
// set obqPublicRingBuffer head
obqPublicRingBuffer.FirstIn.PData := @obqPublicRingBuffer.Buffer[0];
obqPublicRingBuffer.LastIn.PData := @obqPublicRingBuffer.Buffer[0];
obqPublicRingBuffer.StartBuffer := @obqPublicRingBuffer.Buffer[0];
obqPublicRingBuffer.EndBuffer := @obqPublicRingBuffer.Buffer[numElements];
// set obqRecycleRingBuffer head
obqRecycleRingBuffer.FirstIn.PData := @obqRecycleRingBuffer.Buffer[0];
obqRecycleRingBuffer.LastIn.PData := @obqRecycleRingBuffer.Buffer[numElements];
obqRecycleRingBuffer.StartBuffer := @obqRecycleRingBuffer.Buffer[0];
obqRecycleRingBuffer.EndBuffer := @obqRecycleRingBuffer.Buffer[numElements];
// format obqRecycleRingBuffer
for n := 0 to numElements do
obqRecycleRingBuffer.Buffer[n].PData := pointer(NativeInt(obqDataBuffer) + NativeInt(n * roundedElementSize));
MeasureExecutionTimes;
end; { TOmniBaseBoundedQueue.Initialize }
class procedure TOmniBaseBoundedQueue.InsertLink(const data: pointer; const ringBuffer:
POmniRingBuffer);
//FIFO buffer logic
//Insert link to queue model with idle/busy status bit
var
AtStartReference: NativeInt;
CurrentLastIn : PReferencedPtr;
CurrentReference: NativeInt;
NewLastIn : PReferencedPtr;
TaskCounter : NativeInt;
ThreadReference : NativeInt;
label
TryAgain;
begin
ThreadReference := GetThreadId + 1; //Reference.bit0 := 1
with ringBuffer^ do begin
TryAgain:
TaskCounter := obqTaskInsertLoops;
AtStartReference := LastIn.Reference OR 1; //Reference.bit0 := 1
repeat
CurrentReference := LastIn.Reference;
Dec(TaskCounter);
until (TaskCounter = 0) or (CurrentReference AND 1 = 0);
if (CurrentReference AND 1 <> 0) and (AtStartReference <> CurrentReference) or
not CAS(CurrentReference, ThreadReference, LastIn.Reference)
then
goto TryAgain;
//Reference is set...
CurrentLastIn := LastIn.PData;
CAS(CurrentLastIn.Reference, ThreadReference, CurrentLastIn.Reference);
if (ThreadReference <> LastIn.Reference) or
not CAS(CurrentLastIn.PData, ThreadReference, data, ThreadReference, CurrentLastIn^)
then
goto TryAgain;
//Calculate ringBuffer next LastIn address
NewLastIn := pointer(NativeInt(CurrentLastIn) + SizeOf(TReferencedPtr));
if NativeInt(NewLastIn) > NativeInt(EndBuffer) then
NewLastIn := StartBuffer;
//Try to exchange and clear Reference if task own reference
if not CAS(CurrentLastIn, ThreadReference, NewLastIn, 0, LastIn) then
goto TryAgain;
end;
end; { TOmniBaseBoundedQueue.InsertLink }
function TOmniBaseBoundedQueue.IsEmpty: boolean;
begin
Result := (obqPublicRingBuffer.FirstIn.PData = obqPublicRingBuffer.LastIn.PData);
end; { TOmniBaseBoundedQueue.IsEmpty }
function TOmniBaseBoundedQueue.IsFull: boolean;
var
NewLastIn: pointer;
begin
NewLastIn := pointer(NativeInt(obqPublicRingBuffer.LastIn.PData) + SizeOf(TReferencedPtr));
if NativeInt(NewLastIn) > NativeInt(obqPublicRingBuffer.EndBuffer) then
NewLastIn := obqPublicRingBuffer.StartBuffer;
result := (NativeInt(NewLastIn) = NativeInt(obqPublicRingBuffer.LastIn.PData)) or
(obqRecycleRingBuffer.FirstIn.PData = obqRecycleRingBuffer.LastIn.PData);
end; { TOmniBaseBoundedQueue.IsFull }
procedure TOmniBaseBoundedQueue.MeasureExecutionTimes;
const
NumOfSamples = 10;
var
TimeTestField: array [0..1] of array [1..NumOfSamples] of int64;
function GetMinAndClear(routine, count: cardinal): int64;
var
m: cardinal;
n: integer;
x: integer;
begin
Result := 0;
for m := 1 to count do begin
x:= 1;
for n:= 2 to NumOfSamples do
if TimeTestField[routine, n] < TimeTestField[routine, x] then
x := n;
Inc(Result, TimeTestField[routine, x]);
TimeTestField[routine, x] := MaxLongInt;
end;
end; { GetMinAndClear }
var
affinity : string;
currElement: pointer;
n : integer;
begin { TOmniBaseBoundedQueue.MeasureExecutionTimes }
if not obqIsInitialized then begin
affinity := DSiGetThreadAffinity;
DSiSetThreadAffinity(affinity[1]);
try
//Calculate TaskPopDelay and TaskPushDelay counter values depend on CPU speed!!!}
obqTaskRemoveLoops := 1;
obqTaskInsertLoops := 1;
for n := 1 to NumOfSamples do begin
DSiYield;
//Measure RemoveLink rutine delay
TimeTestField[0, n] := GetCPUTimeStamp;
currElement := RemoveLink(obqRecycleRingBuffer);
TimeTestField[0, n] := GetCPUTimeStamp - TimeTestField[0, n];
//Measure InsertLink rutine delay
TimeTestField[1, n] := GetCPUTimeStamp;
InsertLink(currElement, obqRecycleRingBuffer);
TimeTestField[1, n] := GetCPUTimeStamp - TimeTestField[1, n];
end;
obqTaskRemoveLoops := GetMinAndClear(0, 4) div 4;
obqTaskInsertLoops := GetMinAndClear(1, 4) div 4;
obqIsInitialized := true;
finally DSiSetThreadAffinity(affinity); end;
end;
end; { TOmniBaseBoundedQueue.MeasureExecutionTimes }
class function TOmniBaseBoundedQueue.RemoveLink(const ringBuffer: POmniRingBuffer): pointer;
//FIFO buffer logic
//Remove link from queue model with idle/busy status bit
var
AtStartReference : NativeInt;
CurrentFirstIn : pointer;
CurrentReference : NativeInt;
NewFirstIn : pointer;
Reference : NativeInt;
TaskCounter : NativeInt;
label
TryAgain;
begin
Reference := GetThreadId + 1; //Reference.bit0 := 1
with ringBuffer^ do begin
TryAgain:
TaskCounter := obqTaskRemoveLoops;
AtStartReference := FirstIn.Reference OR 1; //Reference.bit0 := 1
repeat
CurrentReference := FirstIn.Reference;
Dec(TaskCounter);
until (TaskCounter = 0) or (CurrentReference AND 1 = 0);
if (CurrentReference AND 1 <> 0) and (AtStartReference <> CurrentReference) or
not CAS(CurrentReference, Reference, FirstIn.Reference)
then
goto TryAgain;
//Reference is set...
CurrentFirstIn := FirstIn.PData;
//Empty test
if CurrentFirstIn = LastIn.PData then begin
//Clear Reference if task own reference
CAS(Reference, 0, FirstIn.Reference);
Result := nil;
Exit;
end;
//Load Result
Result := PReferencedPtr(FirstIn.PData).PData;
//Calculate ringBuffer next FirstIn address
NewFirstIn := pointer(NativeInt(CurrentFirstIn) + SizeOf(TReferencedPtr));
if NativeInt(NewFirstIn) > NativeInt(EndBuffer) then
NewFirstIn := StartBuffer;
//Try to exchange and clear Reference if task own reference
if not CAS(CurrentFirstIn, Reference, NewFirstIn, 0, FirstIn) then
goto TryAgain;
end;
end; { TOmniBaseBoundedQueue.RemoveLink }
{ TOmniBoundedQueue }
constructor TOmniBoundedQueue.Create(numElements, elementSize: integer; partlyEmptyLoadFactor,
almostFullLoadFactor: real);
begin
inherited Create;
oqContainerSubject := TOmniContainerSubject.Create;
oqInQueueCount.Value := 0;
oqPartlyEmptyCount := Round(numElements * partlyEmptyLoadFactor);
if oqPartlyEmptyCount >= numElements then
oqPartlyEmptyCount := numElements - 1;
oqAlmostFullCount := Round(numElements * almostFullLoadFactor);
if oqAlmostFullCount >= numElements then
oqAlmostFullCount := numElements - 1;
Initialize(numElements, elementSize);
end; { TOmniBoundedQueue.Create }
destructor TOmniBoundedQueue.Destroy;
begin
FreeAndNil(oqContainerSubject);
inherited;
end; { TOmniBoundedQueue.Destroy }
function TOmniBoundedQueue.Dequeue(var value): boolean;
var
countAfter: integer;
begin
Result := inherited Dequeue(value);
if Result then begin
countAfter := oqInQueueCount.Decrement;
ContainerSubject.Notify(coiNotifyOnAllRemoves);
if countAfter <= oqPartlyEmptyCount then
ContainerSubject.NotifyOnce(coiNotifyOnPartlyEmpty);
end;
end; { TOmniBoundedQueue.Dequeue }
function TOmniBoundedQueue.Enqueue(const value): boolean;
var
countAfter: integer;
begin
Result := inherited Enqueue(value);
if Result then begin
countAfter := oqInQueueCount.Increment;
ContainerSubject.Notify(coiNotifyOnAllInserts);
if countAfter >= oqAlmostFullCount then
ContainerSubject.NotifyOnce(coiNotifyOnAlmostFull);
end;
end; { TOmniBoundedQueue.Enqueue }
(*
TOmniQueue
===============
tags:
tagFree
tagAllocating
tagAllocated
tagRemoving
tagEndOfList
tagExtending
tagBlockPointer
tagDestroying
tagHeader
tagSentinel
header contains:
tail
slot = 4/8 bytes (32/64-bit)
tag = 4/8 bytes
head
slot = 4/8 bytes
tag = 4/8 bytes
all are 4-aligned
slot contains:
TOmniValue = 13/17 bytes
tag = 1 byte
offset = 2 bytes
stuffing = 0/4 bytes
TOmniValues are 4/8-aligned
block is initialized to:
[tagHeader, num slots - 1, 0] [tagSentinel, 0, 1] [tagFree 0, 2] .. [tagFree, 0, num slots - 2] [tagEndOfList, 0, num slots - 1]
Enqueue:
repeat
head = header.head.slot
old_tag = header.head.tag
if header.head.CAS(head, tagFree, head, tagAllocating) then
break
else if header.head.CAS(head, tagEndOfList, head, tagExtending) then
break
else
yield
forever
if header.head.tag = tagAllocating then
store into slot
header.head.CAS(head, tagAllocating, NextSlot(head), NextSlot(head).tag)
else
allocate block // from cache, if possible
next = second data slot in the new block
set next to
set last slot in the original block to
header.head.CAS(head, tagExtending, next, next.tag)
// queue is now unlocked
preallocate block
Dequeue:
repeat
tail = header.tail.slot
old_tag = header.tail.tag
caughtTheHead := NextSlot(header.tail.slot) = header.head.slot;
if tail.tail.CAS(tail, tagAllocated, tail, tagRemoving) then
tail.tag = tagRemovings
break
else if header.tail.Tag = tagSentinel then
if caughtTheHead then
return false
else if header.tail.CAS(tail, tagSentinel, tail, tagRemoving) then
tail.tag = tagRemoving
break
else if header.tail.CAS(tail, tagBlockPointer, tail, tagDestrogin) then
tail.tag = tagDestroying
break
else
yield
forever
firstSlot = tail - tail.Offset // point to first slot
if old_tag in [tagSentinel, tagAllocated] then
next = NextSlot(tail)
if tag = tagAllocated then
fetch stored value
if caughtTheHead then
header.tail.CAS(tail, tagRemoving, tail, tagSentinel)
firstSlot = nil // do not decrement the header counter
else
header.tail.CAS(tail, tagRemoving, next, next.tag)
else
next = tail.value // points to the next block's sentinel
header.tail.CAS(tail, tagDestroying, next, tagSentinel)
old_tag = tagSentinel // force retry
// queue is now unlocked
if assigned(firstSlot) and (InterlockedDecrement(firstSlot.value) = 0) then
release block
if old_tag = tagSentinel
retry from beginning
*)
{$DEFINE USE_MOVEDPTR}
{$IFDEF DEBUG_OMNI_QUEUE}{$UNDEF USE_MOVEDPTR}{$ENDIF}
{$IFDEF OTL_OLDCPU}{$UNDEF USE_MOVEDPTR}{$ENDIF}
{ TOmniTaggedValue }
function TOmniTaggedValue.CASTag(oldTag, newTag: TOmniQueueTag): boolean;
begin
Result := CAS8(Ord(oldTag), Ord(newTag), Tag);
end; { TOmniTaggedValue.CASTag }
{ TOmniTaggedPointer }
function TOmniTaggedPointer.CAS(oldSlot: POmniTaggedValue; oldTag: TOmniQueueTag;
newSlot: POmniTaggedValue; newTag: TOmniQueueTag): boolean;
begin
Result := OtlSync.CAS(oldSlot, NativeInt(oldTag), newSlot, NativeInt(newTag), Self);
end; { TOmniTaggedPointer.CAS }
procedure TOmniTaggedPointer.Move(newSlot: POmniTaggedValue; newTag: TOmniQueueTag);
begin
MoveDPtr(newSlot, ord(newTag), Self);
end; { TOmniTaggedPointer.Move }
{ TOmniBaseQueue }
constructor TOmniBaseQueue.Create(blockSize: integer; numCachedBlocks: integer);
var
iMem : integer;
memory: POmniTaggedValue;
begin
inherited Create;
obcBlockSize := (((blockSize-1) div SizeOf(TOmniTaggedValue)) + 1) * SizeOf(TOmniTaggedValue);
obcNumSlots := obcBlockSize div SizeOf(TOmniTaggedValue);
if obcNumSlots > 65536 then
raise Exception.CreateFmt('TOmniBaseQueue.Create: Maximum block size is %d',
[65536 * SizeOf(TOmniTaggedValue)]);
obcMemStack := TOmniBaseBoundedStack.Create;
obcMemStack.Initialize(numCachedBlocks, SizeOf(pointer));
for iMem := 1 to numCachedBlocks do begin
memory := AllocMem(obcBlockSize);
PartitionMemory(memory);
Assert(obcMemStack.Push(memory));
end;
Initialize;
end; { TOmniBaseQueue.Create }
function TOmniBaseQueue.Dequeue: TOmniValue;
begin
if not TryDequeue(Result) then
raise Exception.Create('TOmniBaseQueue.Dequeue: Message queue is empty');
end; { TOmniBaseQueue.Dequeue }
destructor TOmniBaseQueue.Destroy;
var
memory: pointer;
begin
Cleanup;
while obcMemStack.Pop(memory) do
FreeMem(memory);
FreeAndNil(obcMemStack);
inherited;
end; { TOmniBaseQueue.Destroy }
function TOmniBaseQueue.AllocateBlock: POmniTaggedValue;
begin
if not obcMemStack.Pop(Result) then begin
Result := AllocMem(obcBlockSize);
PartitionMemory(Result);
end;
end; { TOmniBaseQueue.AllocateBlock }
{$IFDEF DEBUG_OMNI_QUEUE}
procedure TOmniBaseQueue.Assert(condition: boolean);
begin
if not condition then
{$IFDEF CPUX64}AsmInt3;{$ELSE}asm int 3; end;{$ENDIF CPUX64}
end; { TOmniBaseQueue.Assert }
{$ENDIF DEBUG_OMNI_QUEUE}
procedure TOmniBaseQueue.Cleanup;
var
pBlock: POmniTaggedValue;
pSlot : POmniTaggedValue;
begin
pSlot := obcTailPointer.Slot;
while assigned(pSlot) do begin
if pSlot.Tag in [tagBlockPointer, tagEndOfList] then begin
pBlock := pSlot;
pSlot := POmniTaggedValue(pSlot.Value.RawData^); //retrieveing Value.AsPointer raises exception
Dec(pBlock, pBlock.Offset);
ReleaseBlock(pBlock, true);
end
else begin
if (pSlot.Tag = tagAllocated) then
pSlot.Value._ReleaseAndClear;
Inc(pSlot);
end;
end;
if assigned(obcCachedBlock) then
FreeMem(obcCachedBlock);
FreeMem(obcTailPointer);
FreeMem(obcHeadPointer);
end; { TOmniBaseQueue.Cleanup }
procedure TOmniBaseQueue.Enqueue(const value: TOmniValue);
var
extension: POmniTaggedValue;
next : POmniTaggedValue;
head : POmniTaggedValue;
begin
repeat
head := obcHeadPointer.Slot;
if (obcHeadPointer.Tag = tagFree)
and obcHeadPointer.CAS(head, tagFree, head, tagAllocating)
then
break //repeat
else if (obcHeadPointer.Tag = tagEndOfList)
and obcHeadPointer.CAS(head, tagEndOfList, head, tagExtending)
then
break //repeat
else // very temporary condition, retry quickly
{$IFDEF CPUX64}AsmPause;{$ELSE}asm pause; end;{$ENDIF ~CPUX64}
until false;
{$IFDEF DEBUG_OMNI_QUEUE} Assert(head = obcHeadPointer.Slot); {$ENDIF}
if obcHeadPointer.Tag = tagAllocating then begin // enqueueing
next := NextSlot(head);
head.Value := value; // this works because the slot was initialized to zero when allocating
{$IFNDEF DEBUG_OMNI_QUEUE}
head.Tag := tagAllocated; {$ELSE} Assert(head.CASTag(tagFree, tagAllocated)); {$ENDIF}
{$IFDEF USE_MOVEDPTR} // release the lock
obcHeadPointer.Move(next, next.Tag); {$ELSE} Assert(obcHeadPointer.CAS(head, tagAllocating, next, next.Tag)); {$ENDIF}
end
else begin // allocating memory
{$IFDEF DEBUG_OMNI_QUEUE} Assert(obcHeadPointer.Tag = tagExtending); {$ENDIF}
extension := AllocateBlock; // returns pointer to the header
Inc(extension, 2); // move over header and sentinel to the first data slot
{$IFNDEF DEBUG_OMNI_QUEUE}
extension.Tag := tagAllocated; {$ELSE} Assert(extension.CASTag(tagFree, tagAllocated)); {$ENDIF}
extension.Value := value; // this works because the slot was initialized to zero when allocating
Dec(extension); // forward reference points to the sentinel
head.Value := extension;
{$IFNDEF DEBUG_OMNI_QUEUE}
head.Tag := tagBlockPointer; {$ELSE} Assert(head.CASTag(tagEndOfList, tagBlockPointer)); {$ENDIF}
Inc(extension, 2); // get to the first free slot
{$IFDEF USE_MOVEDPTR} // release the lock
obcHeadPointer.Move(extension, extension.Tag); {$ELSE} Assert(obcHeadPointer.CAS(head, tagExtending, extension, extension.Tag)); {$ENDIF}
PreallocateMemory;
end;
end; { TOmniBaseQueue.Enqueue }
procedure TOmniBaseQueue.Initialize;
begin
obcTailPointer := AllocMem(SizeOf(TOmniTaggedPointer));
obcHeadPointer := AllocMem(SizeOf(TOmniTaggedPointer));
Assert(NativeInt(obcTailPointer) mod (2*SizeOf(pointer)) = 0);
Assert(NativeInt(obcHeadPointer) mod (2*SizeOf(pointer)) = 0);
Assert(NativeInt(@obcCachedBlock) mod SizeOf(pointer) = 0);
obcTailPointer.Slot := NextSlot(AllocateBlock); // point to the sentinel
obcTailPointer.Tag := obcTailPointer.Slot.Tag;
{$IFDEF DEBUG_OMNI_QUEUE}
Assert(obcTailPointer.Tag = tagSentinel);
Assert(PrevSlot(obcTailPointer.Slot).Tag = tagHeader);
{$ENDIF DEBUG_OMNI_QUEUE}
obcHeadPointer.Slot := NextSlot(obcTailPointer.Slot);
obcHeadPointer.Tag := obcHeadPointer.Slot.Tag;
{$IFDEF DEBUG_OMNI_QUEUE}
Assert(obcHeadPointer.Tag = tagFree);
{$ENDIF DEBUG_OMNI_QUEUE}
obcCachedBlock := AllocateBlock; // pre-allocate memory
end; { TOmniBaseQueue.Initialize }
function TOmniBaseQueue.IsEmpty: boolean;
var
caughtTheHead: boolean;
header : POmniTaggedValue;
next : POmniTaggedValue;
tag : TOmniQueueTag;
tail : POmniTaggedValue;
begin
// Basically the same as TryDequeue except that it doesn't dequeue anything.
Result := false; // to keep compiler happy
tag := tagSentinel;
while tag = tagSentinel do begin
repeat
tail := obcTailPointer.Slot;
caughtTheHead := NextSlot(obcTailPointer.Slot) = obcHeadPointer.Slot; // an approximation; we don't care if in a moment this won't be true anymore
if (obcTailPointer.Tag = tagAllocated) then begin
Result := false;
Exit;
end
else if (obcTailPointer.Tag = tagSentinel) then begin
if caughtTheHead then begin
Result := true;
Exit;
end
else if obcTailPointer.CAS(tail, tagSentinel, tail, tagRemoving) then begin
tag := tagSentinel;
break; //repeat
end
end
else if (obcTailPointer.Tag = tagBlockPointer)
and obcTailPointer.CAS(tail, tagBlockPointer, tail, tagDestroying) then
begin
tag := tagBlockPointer;
break; //repeat
end
else
{$IFDEF CPUX64}AsmPause;{$ELSE}asm pause; end;{$ENDIF ~CPUX64}
until false;
{$IFDEF DEBUG_OMNI_QUEUE} Assert(tail = obcTailPointer.Slot); {$ENDIF}
header := tail;
Dec(header, header.Offset);
{$IFDEF DEBUG_OMNI_QUEUE} Assert(header.Tag = tagHeader); {$ENDIF}
{$IFDEF DEBUG_OMNI_QUEUE} Assert(tag <> tagAllocated); {$ENDIF}
{$IFDEF DEBUG_OMNI_QUEUE} Assert(not caughtTheHead); {$ENDIF}
if tag = tagSentinel then begin
next := NextSlot(tail);
{$IFDEF USE_MOVEDPTR} // release the lock
obcTailPointer.Move(next, next.Tag); {$ELSE} Assert(obcTailPointer.CAS(tail, tagRemoving, next, next.Tag)); {$ENDIF}
end
else begin // releasing memory
{$IFDEF DEBUG_OMNI_QUEUE} Assert(tag = tagBlockPointer); {$ENDIF}
next := POmniTaggedValue(tail.Value.AsPointer); // next points to the sentinel
{$IFDEF DEBUG_OMNI_QUEUE} Assert(next.Tag = tagSentinel); {$ENDIF}
{$IFDEF USE_MOVEDPTR} // release the lock
obcTailPointer.Move(next, tagSentinel); {$ELSE} Assert(obcTailPointer.CAS(tail, tagDestroying, next, tagSentinel)); {$ENDIF}
tag := tagSentinel; // retry
end;
if assigned(header) and (InterlockedDecrement(PInteger(header)^) = 0) then
ReleaseBlock(header);
end; //while tag = tagSentinel
end; { TOmniBaseQueue.IsEmpty }
function TOmniBaseQueue.NextSlot(slot: POmniTaggedValue): POmniTaggedValue;
begin
Result := slot;
Inc(Result);
end; { TOmniBaseQueue.NextSlot }
procedure TOmniBaseQueue.PartitionMemory(memory: POmniTaggedValue);
var
iSlot: integer;
begin
Assert(Ord(tagFree) = 0);
memory.Tag := tagHeader;
PInteger(memory)^ := obcNumSlots - 1; // don't count the header
Inc(memory);
memory.Tag := tagSentinel;
memory.Offset := 1;
Inc(memory);
for iSlot := 2 to obcNumSlots - 2 do begin
memory.Offset := iSlot;
Inc(memory);
end;
memory.Tag := tagEndOfList;
memory.Offset := obcNumSlots - 1;
end; { TOmniBaseQueue.PartitionMemory }
procedure TOmniBaseQueue.PreallocateMemory;
var
memory: POmniTaggedValue;
begin
if obcMemStack.IsEmpty then begin
memory := AllocMem(obcBlockSize);
PartitionMemory(memory);
if not obcMemStack.Push(memory) then
FreeMem(memory);
end;
end; { TOmniBaseQueue.PreallocateMemory }
function TOmniBaseQueue.PrevSlot(slot: POmniTaggedValue): POmniTaggedValue;
begin
Result := slot;
Dec(Result);
end; { TOmniBaseQueue.PrevSlot }
procedure TOmniBaseQueue.ReleaseBlock(firstSlot: POmniTaggedValue; forceFree: boolean);
begin
{$IFDEF DEBUG_OMNI_QUEUE}Assert(firstSlot.Tag = tagHeader);{$ENDIF DEBUG_OMNI_QUEUE}
if forceFree or obcMemStack.IsFull then
FreeMem(firstSlot)
else begin
ZeroMemory(firstSlot, obcBlockSize);
PartitionMemory(firstSlot);
if not obcMemStack.Push(firstSlot) then
FreeMem(firstSlot);
end;
end; { TOmniBaseQueue.ReleaseBlock }
function TOmniBaseQueue.TryDequeue(var value: TOmniValue): boolean;
var
caughtTheHead: boolean;
tail : POmniTaggedValue;
header : POmniTaggedValue;
next : POmniTaggedValue;
tag : TOmniQueueTag;
begin
tag := tagSentinel;
Result := true;
while Result and (tag = tagSentinel) do begin
repeat
tail := obcTailPointer.Slot;
caughtTheHead := NextSlot(obcTailPointer.Slot) = obcHeadPointer.Slot; // an approximation; we don't care if in a moment this won't be true anymore
if (obcTailPointer.Tag = tagAllocated)
and obcTailPointer.CAS(tail, tagAllocated, tail, tagRemoving) then
begin
tag := tagAllocated;
break; //repeat
end
else if (obcTailPointer.Tag = tagSentinel) then begin
if caughtTheHead then begin
Result := false;
break; //repeat
end
else if obcTailPointer.CAS(tail, tagSentinel, tail, tagRemoving) then begin
tag := tagSentinel;
break; //repeat
end
end
else if (obcTailPointer.Tag = tagBlockPointer)
and obcTailPointer.CAS(tail, tagBlockPointer, tail, tagDestroying) then
begin
tag := tagBlockPointer;
break; //repeat
end
else
{$IFDEF CPUX64}AsmPause;{$ELSE}asm pause; end;{$ENDIF ~CPUX64}
until false;
if Result then begin // dequeueing
{$IFDEF DEBUG_OMNI_QUEUE} Assert(tail = obcTailPointer.Slot); {$ENDIF}
header := tail;
Dec(header, header.Offset);
{$IFDEF DEBUG_OMNI_QUEUE} Assert(header.Tag = tagHeader); {$ENDIF}
if tag in [tagSentinel, tagAllocated] then begin
next := NextSlot(tail);
if tag = tagAllocated then begin // sentinel doesn't contain any useful value
value := tail.Value;
tail.Value._ReleaseAndClear;
end;
if caughtTheHead then begin
{$IFDEF USE_MOVEDPTR} // release the lock; as this is the last element, don't move forward
obcTailPointer.Move(tail, tagSentinel); {$ELSE} Assert(obcTailPointer.CAS(tail, tagRemoving, tail, tagSentinel)); {$ENDIF}
header := nil; // do NOT decrement the counter; this slot will be retagged again
end
else
{$IFDEF USE_MOVEDPTR} // release the lock
obcTailPointer.Move(next, next.Tag); {$ELSE} Assert(obcTailPointer.CAS(tail, tagRemoving, next, next.Tag)); {$ENDIF}
end
else begin // releasing memory
{$IFDEF DEBUG_OMNI_QUEUE} Assert(tag = tagBlockPointer); {$ENDIF}
next := POmniTaggedValue(tail.Value.AsPointer); // next points to the sentinel
{$IFDEF DEBUG_OMNI_QUEUE} Assert(next.Tag = tagSentinel); {$ENDIF}
{$IFDEF USE_MOVEDPTR} // release the lock
obcTailPointer.Move(next, tagSentinel); {$ELSE} Assert(obcTailPointer.CAS(tail, tagDestroying, next, tagSentinel)); {$ENDIF}
tag := tagSentinel; // retry
end;
if assigned(header) and (InterlockedDecrement(PInteger(header)^) = 0) then
ReleaseBlock(header);
end;
end; //while Result and (tag = tagSentinel)
end; { TOmniBaseQueue.TryDequeue }
{ initialization }
procedure InitializeTimingInfo;
var
queue: TOmniBaseBoundedQueue;
stack: TOmniBaseBoundedStack;
begin
stack := TOmniBaseBoundedStack.Create;
stack.Initialize(10, 4); // enough for initialization
FreeAndNil(stack);
queue := TOmniBaseBoundedQueue.Create;
queue.Initialize(10, 4); // enough for initialization
FreeAndNil(queue);
end; { InitializeTimingInfo }
procedure TOmniQueue.Cleanup;
begin
FreeAndNil(ocContainerSubject);
inherited;
end; { TOmniQueue.Cleanup }
function TOmniQueue.Dequeue: TOmniValue;
begin
Result := inherited Dequeue;
ContainerSubject.Notify(coiNotifyOnAllRemoves);
end; { TOmniQueue.Dequeue }
procedure TOmniQueue.Enqueue(const value: TOmniValue);
begin
inherited Enqueue(value);
ContainerSubject.Notify(coiNotifyOnAllInserts);
end; { TOmniQueue.Enqueue }
procedure TOmniQueue.Initialize;
begin
inherited;
ocContainerSubject := TOmniContainerSubject.Create;
end; { TOmniQueue.Initialize }
function TOmniQueue.TryDequeue(var value: TOmniValue): boolean;
begin
Result := inherited TryDequeue(value);
if Result then
ContainerSubject.Notify(coiNotifyOnAllRemoves);
end; { TOmniQueue.TryDequeue }
initialization
Assert(SizeOf(TOmniTaggedValue) = {$IFDEF CPUX64}3{$ELSE}4{$ENDIF}*SizeOf(pointer));
Assert(SizeOf(TOmniTaggedPointer) = 2*SizeOf(pointer));
Assert(SizeOf(pointer) = SizeOf(NativeInt));
InitializeTimingInfo;
end.