///Thread pool implementation. Part of the OmniThreadLibrary project.
///Primoz Gabrijelcic
///
///This software is distributed under the BSD license.
///
///Copyright (c) 2009, Primoz Gabrijelcic
///All rights reserved.
///
///Redistribution and use in source and binary forms, with or without modification,
///are permitted provided that the following conditions are met:
///- Redistributions of source code must retain the above copyright notice, this
/// list of conditions and the following disclaimer.
///- Redistributions in binary form must reproduce the above copyright notice,
/// this list of conditions and the following disclaimer in the documentation
/// and/or other materials provided with the distribution.
///- The name of the Primoz Gabrijelcic may not be used to endorse or promote
/// products derived from this software without specific prior written permission.
///
///THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
///ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
///WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
///DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
///ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
///(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
///LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
///ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
///(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
///SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
///
///
/// Home : http://otl.17slon.com
/// Support : http://otl.17slon.com/forum/
/// Author : Primoz Gabrijelcic
/// E-Mail : primoz@gabrijelcic.org
/// Blog : http://thedelphigeek.com
/// Web : http://gp.17slon.com
/// Contributors : GJ, Lee_Nover
///
/// Creation date : 2008-06-12
/// Last modification : 2009-01-26
/// Version : 2.0
///
/// History:
/// 2.0: 2009-01-26
/// - Reimplemented using OmniThreadLibrary :)
/// 1.0: 2008-08-26
/// - First official release.
///
/// WARNING Tasks must be scheduled from the owner thread! WARNING
/// (access to task queues is not synchronised)
unit OtlThreadPool;
interface
{ TODO 1 -oPrimoz Gabrijelcic : Rewrite using one ITaskControl to manage thread pool. }
{ TODO 1 -oPrimoz Gabrijelcic : Use OtlCommunication to send messages to the Monitor. }
// TODO 1 -oPrimoz Gabrijelcic : Should be monitorable by the OmniTaskEventDispatch
// TODO 3 -oPrimoz Gabrijelcic : Needs an async event reporting unexpected states (kill threads, for example)
uses
Windows,
SysUtils,
OtlTask;
const
CDefaultIdleWorkerThreadTimeout_sec = 10;
CDefaultWaitOnTerminate_sec = 30;
CMaxConcurrentWorkers = 60; // enforced by the TOmniWorker limitations
// this is not configurable - don't increment it and expect the code to magically work!
type
IOmniThreadPool = interface;
IOmniThreadPoolMonitor = interface ['{09EFADE8-3F14-4184-87CA-131100EC57E4}']
function Detach(const task: IOmniThreadPool): IOmniThreadPool;
function Monitor(const task: IOmniThreadPool): IOmniThreadPool;
end; { IOmniThreadPoolMonitor }
TThreadPoolOperation = (tpoCreateThread, tpoDestroyThread, tpoKillThread,
tpoWorkItemCompleted);
TOmniThreadPoolMonitorInfo = class
strict private
otpmiTaskID : int64;
otpmiThreadID : integer;
otpmiThreadPoolOperation: TThreadPoolOperation;
otpmiUniqueID : int64;
public
constructor Create(uniqueID: int64; threadPoolOperation: TThreadPoolOperation; threadID:
integer); overload;
constructor Create(uniqueID, taskID: int64); overload;
property TaskID: int64 read otpmiTaskID;
property ThreadPoolOperation: TThreadPoolOperation read otpmiThreadPoolOperation;
property ThreadID: integer read otpmiThreadID;
property UniqueID: int64 read otpmiUniqueID;
end; { TOmniThreadPoolMonitorInfo }
///Worker thread lifetime reporting handler.
TOTPWorkerThreadEvent = procedure(Sender: TObject; threadID: DWORD) of object;
IOmniThreadPool = interface ['{1FA74554-1866-46DD-AC50-F0403E378682}']
function GetIdleWorkerThreadTimeout_sec: integer;
function GetMaxExecuting: integer;
function GetMaxQueued: integer;
function GetMaxQueuedTime_sec: integer;
function GetMinWorkers: integer;
function GetName: string;
function GetOnWorkerThreadCreated_Asy: TOTPWorkerThreadEvent;
function GetOnWorkerThreadDestroying_Asy: TOTPWorkerThreadEvent;
function GetUniqueID: int64;
function GetWaitOnTerminate_sec: integer;
procedure SetIdleWorkerThreadTimeout_sec(value: integer);
procedure SetMaxExecuting(value: integer);
procedure SetMaxQueued(value: integer);
procedure SetMaxQueuedTime_sec(value: integer);
procedure SetMinWorkers(value: integer);
procedure SetName(const value: string);
procedure SetWaitOnTerminate_sec(value: integer);
procedure SetOnWorkerThreadCreated_Asy(const value: TOTPWorkerThreadEvent);
procedure SetOnWorkerThreadDestroying_Asy(const value: TOTPWorkerThreadEvent);
//
function Cancel(taskID: int64): boolean;
procedure CancelAll;
function CountExecuting: integer;
function CountQueued: integer;
function IsIdle: boolean;
function MonitorWith(const monitor: IOmniThreadPoolMonitor): IOmniThreadPool;
function RemoveMonitor: IOmniThreadPool;
function SetMonitor(hWindow: THandle): IOmniThreadPool;
property IdleWorkerThreadTimeout_sec: integer read GetIdleWorkerThreadTimeout_sec
write SetIdleWorkerThreadTimeout_sec;
property MaxExecuting: integer read GetMaxExecuting write SetMaxExecuting;
property MaxQueued: integer read GetMaxQueued write SetMaxQueued;
property MaxQueuedTime_sec: integer read GetMaxQueuedTime_sec write SetMaxQueuedTime_sec;
property MinWorkers: integer read GetMinWorkers write SetMinWorkers;
property Name: string read GetName write SetName;
property UniqueID: int64 read GetUniqueID;
property WaitOnTerminate_sec: integer read GetWaitOnTerminate_sec write
SetWaitOnTerminate_sec;
property OnWorkerThreadCreated_Asy: TOTPWorkerThreadEvent read
GetOnWorkerThreadCreated_Asy write SetOnWorkerThreadCreated_Asy;
property OnWorkerThreadDestroying_Asy: TOTPWorkerThreadEvent read
GetOnWorkerThreadDestroying_Asy write SetOnWorkerThreadDestroying_Asy;
end; { IOmniThreadPool }
IOmniThreadPoolScheduler = interface ['{B7F5FFEF-2704-4CE0-ABF1-B20493E73650}']
procedure Schedule(const task: IOmniTask);
end; { IOmniThreadPoolScheduler }
function CreateThreadPool(const threadPoolName: string): IOmniThreadPool;
function GlobalOmniThreadPool: IOmniThreadPool;
implementation
uses
Messages,
Classes,
Contnrs,
{$IFNDEF Unicode} //Tiburon provides own TStringBuilder class
HVStringBuilder,
{$ENDIF}
DSiWin32,
SpinLock,
GpStuff,
OtlCommon,
OtlComm,
OtlTaskControl,
OtlEventMonitor;
const
WM_REQUEST_COMPLETED = WM_USER;
MSG_RUN = 1;
MSG_THREAD_CREATED = 2;
MSG_THREAD_DESTROYING = 3;
MSG_COMPLETED = 4;
MSG_STOP = 5;
MSG_CANCEL_RESULT = 6;
type
{$IFNDEF Unicode}
TStringBuilder = HVStringBuilder.StringBuilder;
{$ENDIF}
TOTPWorkerThread = class;
TOmniThreadPool = class;
TOTPWorkItem = class
strict private
owiScheduled_ms: int64;
owiScheduledAt : TDateTime;
owiStartedAt : TDateTime;
owiTask : IOmniTask;
owiThread : TOTPWorkerThread;
strict protected
function GetUniqueID: int64;
public
constructor Create(const task: IOmniTask);
function Description: string;
procedure TerminateTask(exitCode: integer; const exitMessage: string);
property ScheduledAt: TDateTime read owiScheduledAt;
property Scheduled_ms: int64 read owiScheduled_ms;
property StartedAt: TDateTime read owiStartedAt write owiStartedAt;
property UniqueID: int64 read GetUniqueID;
property Task: IOmniTask read owiTask;
property Thread: TOTPWorkerThread read owiThread write owiThread;
end; { TOTPWorkItem }
TOTPWorkerThread = class(TThread)
strict private
owtCommChannel : IOmniTwoWayChannel;
owtNewWorkEvent : TDSiEventHandle;
owtRemoveFromPool : boolean;
owtStartIdle_ms : int64;
owtStartStopping_ms: int64;
owtStopped : boolean;
owtTerminateEvent : TDSiEventHandle;
owtWorkItem_ref : TOTPWorkItem;
owtWorkItemLock : TTicketSpinLock;
strict protected
function Comm: IOmniCommunicationEndpoint;
procedure ExecuteWorkItem(workItem: TOTPWorkItem);
function GetOwnerCommEndpoint: IOmniCommunicationEndpoint;
procedure Log(const msg: string; params: array of const);
public
constructor Create;
destructor Destroy; override;
function Asy_TerminateWorkItem(var workItem: TOTPWorkItem): boolean;
function Description: string;
procedure Execute; override;
function GetWorkItemInfo(var scheduledAt, startedAt: TDateTime;
var description: string): boolean;
function IsExecuting(taskID: int64): boolean;
procedure Stop;
function WorkItemDescription: string;
property NewWorkEvent: TDSiEventHandle read owtNewWorkEvent;
property OwnerCommEndpoint: IOmniCommunicationEndpoint read GetOwnerCommEndpoint;
property RemoveFromPool: boolean read owtRemoveFromPool;
property StartIdle_ms: int64 read owtStartIdle_ms write owtStartIdle_ms;
property StartStopping_ms: int64 read owtStartStopping_ms write owtStartStopping_ms; //always modified from the owner thread
property Stopped: boolean read owtStopped write owtStopped; //always modified from the owner thread
property TerminateEvent: TDSiEventHandle read owtTerminateEvent;
property WorkItem_ref: TOTPWorkItem read owtWorkItem_ref write owtWorkItem_ref; //address of the work item this thread is working on
end; { TOTPWorkerThread }
TOTPWorker = class(TOmniWorker)
strict private
owDestroying : boolean;
owIdleWorkers : TObjectList;
owMonitorSupport : IOmniMonitorSupport;
owName : string;
owOnThreadCreated : TOTPWorkerThreadEvent;
owOnThreadDestroying: TOTPWorkerThreadEvent;
owRunningWorkers : TObjectList;
owStoppingWorkers : TObjectList;
owUniqueID : int64;
owWorkItemQueue : TObjectList;
strict protected
function ActiveWorkItemDescriptions: string;
procedure ForwardThreadCreated(threadID: DWORD);
procedure ForwardThreadDestroying(threadID: DWORD; threadPoolOperation:
TThreadPoolOperation; worker: TOTPWorkerThread = nil);
procedure InternalStop;
function LocateThread(threadID: DWORD): TOTPWorkerThread;
procedure Log(const msg: string; const params: array of const);
function NumRunningStoppedThreads: integer;
procedure ProcessCompletedWorkItem(workItem: TOTPWorkItem);
procedure RequestCompleted(workItem: TOTPWorkItem; worker: TOTPWorkerThread);
procedure ScheduleNext(workItem: TOTPWorkItem);
procedure StopThread(worker: TOTPWorkerThread);
protected
procedure Cleanup; override;
function Initialize: boolean; override;
public
CountQueued : TGp4AlignedInt;
CountRunning : TGp4AlignedInt;
IdleWorkerThreadTimeout_sec: TGp4AlignedInt;
MaxExecuting : TGp4AlignedInt;
MaxQueued : TGp4AlignedInt;
MaxQueuedTime_sec : TGp4AlignedInt;
MinWorkers : TGp4AlignedInt;
WaitOnTerminate_sec : TGp4AlignedInt;
constructor Create(const name: string; uniqueID: int64);
published
// invoked from TOmniThreadPool
procedure Cancel(const params: TOmniValue);
procedure CancelAll(var doneSignal: TOmniWaitableValue);
procedure MaintainanceTimer;
procedure PruneWorkingQueue;
procedure RemoveMonitor;
procedure Schedule(var workItem: TOTPWorkItem);
procedure SetMonitor(const hWindow: TOmniValue);
procedure SetName(const name: TOmniValue);
procedure SetOnThreadCreated(const eventHandler: TOmniValue);
procedure SetOnThreadDestroying(const eventHandler: TOmniValue);
// invoked from TOTPWorkerThreads
procedure MsgCompleted(var msg: TOmniMessage); message MSG_COMPLETED;
procedure MsgThreadCreated(var msg: TOmniMessage); message MSG_THREAD_CREATED;
procedure MsgThreadDestroying(var msg: TOmniMessage); message MSG_THREAD_DESTROYING;
end; { TOTPWorker }
TOmniThreadPool = class(TInterfacedObject, IOmniThreadPool, IOmniThreadPoolScheduler)
strict private
otpOnThreadCreated : TOTPWorkerThreadEvent;
otpOnThreadDestroying: TOTPWorkerThreadEvent;
otpPoolName : string;
otpUniqueID : int64;
otpWorker : IOmniWorker;
otpWorkerTask : IOmniTaskControl;
strict protected
procedure Log(const msg: string; const params: array of const);
protected
function GetIdleWorkerThreadTimeout_sec: integer;
function GetMaxExecuting: integer;
function GetMaxQueued: integer;
function GetMaxQueuedTime_sec: integer;
function GetMinWorkers: integer;
function GetName: string;
function GetOnWorkerThreadCreated_Asy: TOTPWorkerThreadEvent;
function GetOnWorkerThreadDestroying_Asy: TOTPWorkerThreadEvent;
function GetUniqueID: int64;
function GetWaitOnTerminate_sec: integer;
procedure SetIdleWorkerThreadTimeout_sec(value: integer);
procedure SetMaxExecuting(value: integer);
procedure SetMaxQueued(value: integer);
procedure SetMaxQueuedTime_sec(value: integer);
procedure SetMinWorkers(value: integer);
procedure SetName(const value: string);
procedure SetOnWorkerThreadCreated_Asy(const value: TOTPWorkerThreadEvent);
procedure SetOnWorkerThreadDestroying_Asy(const value: TOTPWorkerThreadEvent);
procedure SetWaitOnTerminate_sec(value: integer);
function WorkerObj: TOTPWorker;
public
constructor Create(const name: string);
destructor Destroy; override;
function Cancel(taskID: int64): boolean;
procedure CancelAll;
function CountExecuting: integer;
function CountQueued: integer;
function IsIdle: boolean;
function MonitorWith(const monitor: IOmniThreadPoolMonitor): IOmniThreadPool;
function RemoveMonitor: IOmniThreadPool;
procedure Schedule(const task: IOmniTask);
function SetMonitor(hWindow: THandle): IOmniThreadPool;
property IdleWorkerThreadTimeout_sec: integer read GetIdleWorkerThreadTimeout_sec
write SetIdleWorkerThreadTimeout_sec;
property MaxExecuting: integer read GetMaxExecuting write SetMaxExecuting;
property MaxQueued: integer read GetMaxQueued write SetMaxQueued;
property MaxQueuedTime_sec: integer read GetMaxQueuedTime_sec write SetMaxQueuedTime_sec;
property MinWorkers: integer read GetMinWorkers write SetMinWorkers;
property Name: string read GetName write SetName;
property UniqueID: int64 read GetUniqueID;
property WaitOnTerminate_sec: integer read GetWaitOnTerminate_sec write
SetWaitOnTerminate_sec;
property OnWorkerThreadCreated_Asy: TOTPWorkerThreadEvent
read GetOnWorkerThreadCreated_Asy write SetOnWorkerThreadCreated_Asy;
property OnWorkerThreadDestroying_Asy: TOTPWorkerThreadEvent
read GetOnWorkerThreadDestroying_Asy write SetOnWorkerThreadDestroying_Asy;
end; { TOmniThreadPool }
const
CGlobalOmniThreadPoolName = 'GlobalOmniThreadPool';
var
GOmniThreadPool: IOmniThreadPool = nil;
{ exports }
function GlobalOmniThreadPool: IOmniThreadPool;
begin
if not assigned(GOmniThreadPool) then
GOmniThreadPool := CreateThreadPool(CGlobalOmniThreadPoolName);
Result := GOmniThreadPool;
end; { GlobalOmniThreadPool }
function CreateThreadPool(const threadPoolName: string): IOmniThreadPool;
begin
Result := TOmniThreadPool.Create(threadPoolName);
end; { CreateThreadPool }
{ TOmniThreadPoolMonitorInfo }
constructor TOmniThreadPoolMonitorInfo.Create(uniqueID: int64; threadPoolOperation:
TThreadPoolOperation; threadID: integer);
begin
otpmiUniqueID := uniqueID;
otpmiThreadPoolOperation := threadPoolOperation;
otpmiThreadID := threadID;
end; { TOmniThreadPoolMonitorInfo.Create }
constructor TOmniThreadPoolMonitorInfo.Create(uniqueID, taskID: int64);
begin
otpmiUniqueID := uniqueID;
otpmiThreadPoolOperation := tpoWorkItemCompleted;
otpmiTaskID := taskID;
end; { TOmniThreadPoolMonitorInfo.Create }
{ TOTPWorkItem }
constructor TOTPWorkItem.Create(const task: IOmniTask);
begin
inherited Create;
owiTask := task;
owiScheduledAt := Now;
owiScheduled_ms := DSiTimeGetTime64;
end; { TOTPWorkItem.Create }
function TOTPWorkItem.Description: string;
begin
Result := Format('%s:%d', [Task.Name, UniqueID]);
end; { TOTPWorkItem.Description }
function TOTPWorkItem.GetUniqueID: int64;
begin
if assigned(Task) then
Result := Task.UniqueID
else
Result := -1;
end; { TOTPWorkItem.GetUniqueID }
procedure TOTPWorkItem.TerminateTask(exitCode: integer; const exitMessage: string);
begin
if assigned(owiTask) then begin
owiTask.Enforced(false);
owiTask.SetExitStatus(exitCode, exitMessage);
owiTask.Terminate;
owiTask := nil;
end;
end; { TOTPWorkItem.TerminateTask }
{ TOTPWorkerThread }
constructor TOTPWorkerThread.Create;
begin
inherited Create(true);
{$IFDEF LogThreadPool}Log('Creating thread %s', [Description]);{$ENDIF LogThreadPool}
owtNewWorkEvent := CreateEvent(nil, false, false, nil);
owtTerminateEvent := CreateEvent(nil, false, false, nil);
owtWorkItemLock := TTicketSpinLock.Create;
owtCommChannel := CreateTwoWayChannel;
Resume;
end; { TOTPWorkerThread.Create }
destructor TOTPWorkerThread.Destroy;
begin
{$IFDEF LogThreadPool}Log('Destroying thread %s', [Description]);{$ENDIF LogThreadPool}
FreeAndNil(owtWorkItemLock);
DSiCloseHandleAndNull(owtTerminateEvent);
DSiCloseHandleAndNull(owtNewWorkEvent);
inherited Destroy;
end; { TOTPWorkerThread.Destroy }
///Take the work item ownership from the thread. Called asynchronously from the thread pool.
///True if thread should be killed.
///2008-07-26
function TOTPWorkerThread.Asy_TerminateWorkItem(var workItem: TOTPWorkItem): boolean;
begin
{$IFDEF LogThreadPool}Log('Asy_TerminateWorkItem thread %s', [Description]);{$ENDIF LogThreadPool}
Result := false;
owtWorkItemLock.Acquire;
try
if assigned(WorkItem_ref) then begin
{$IFDEF LogThreadPool}Log('Thread %s has work item', [Description]);{$ENDIF LogThreadPool}
workItem := WorkItem_ref;
WorkItem_ref := nil;
if assigned(workItem) and assigned(workItem.Task) and (not workItem.Task.Stopped) then begin
workItem.TerminateTask(EXIT_THREADPOOL_CANCELLED, 'Cancelled');
Result := true;
end
else if assigned(workItem) then
Result := false;
end;
finally owtWorkItemLock.Release end;
end; { TOTPWorkerThread.Asy_TerminateWorkItem }
function TOTPWorkerThread.Comm: IOmniCommunicationEndpoint;
begin
Result := owtCommChannel.Endpoint1;
end; { TOTPWorkerThread.Comm }
function TOTPWorkerThread.Description: string;
begin
if not assigned(Self) then
Result := ''
else
Result := Format('%p:%d', [pointer(Self), GetCurrentThreadID]);
end; { TOTPWorkerThread.Description }
procedure TOTPWorkerThread.Execute;
var
msg: TOmniMessage;
begin
{$IFDEF LogThreadPool}Log('>>>Execute thread %s', [Description]);{$ENDIF LogThreadPool}
Comm.Send(MSG_THREAD_CREATED, ThreadID);
try
while true do begin
if Comm.ReceiveWait(msg, INFINITE) then begin
case msg.MsgID of
MSG_RUN:
ExecuteWorkItem(TOTPWorkItem(msg.MsgData.AsObject));
MSG_STOP:
begin
Stop;
break; //while
end;
else
raise Exception.CreateFmt('TOTPWorkerThread.Execute: Unexpected message %d',
[msg.MsgID]);
end; //case
end; //if Comm.ReceiveWait
end; //while Comm.ReceiveWait()
finally Comm.Send(MSG_THREAD_DESTROYING, ThreadID); end;
{$IFDEF LogThreadPool}Log('<<Gently stop the worker thread.
procedure TOTPWorkerThread.Stop;
var
task: IOmniTask;
begin
{$IFDEF LogThreadPool}Log('Stop thread %s', [Description]);{$ENDIF LogThreadPool}
owtWorkItemLock.Acquire;
try
if assigned(WorkItem_ref) then begin
task := WorkItem_ref.Task;
if assigned(task) then
task.Terminate;
end;
finally owtWorkItemLock.Release end;
end; { TOTPWorkerThread.Stop }
function TOTPWorkerThread.WorkItemDescription: string;
begin
owtWorkItemLock.Acquire;
try
if assigned(WorkItem_ref) then begin
Result := WorkItem_ref.Description;
end
else
Result := '';
finally owtWorkItemLock.Release; end;
end; { TOTPWorkerThread.WorkItemDescription }
{ TOTPWorker }
constructor TOTPWorker.Create(const name: string; uniqueID: int64);
begin
inherited Create;
owName := name;
owUniqueID := uniqueID;
end; { TOTPWorker.Create }
function TOTPWorker.ActiveWorkItemDescriptions: string;
var
description : string;
iWorker : integer;
sbDescriptions: TStringBuilder;
scheduledAt : TDateTime;
startedAt : TDateTime;
worker : TOTPWorkerThread;
begin
sbDescriptions := TStringBuilder.Create;
try
for iWorker := 0 to owRunningWorkers.Count - 1 do begin
worker := TOTPWorkerThread(owRunningWorkers[iWorker]);
if worker.GetWorkItemInfo(scheduledAt, startedAt, description) then
sbDescriptions.
Append('[').Append(iWorker+1).Append('] ').
Append(FormatDateTime('hh:nn:ss', scheduledAt)).Append(' / ').
Append(FormatDateTime('hh:nn:ss', startedAt)).Append(' ').
Append(description);
end;
Result := sbDescriptions.ToString;
finally FreeAndNil(sbDescriptions); end;
end; { TGpThreadPool.ActiveWorkItemDescriptions }
///True: Normal exit, False: Thread was killed.
procedure TOTPWorker.Cancel(const params: TOmniValue);
var
endWait_ms : int64;
iWorker : integer;
taskID : int64;
wasTerminated: boolean;
worker : TOTPWorkerThread;
workItem : TOTPWorkItem;
begin
taskID := params[0];
wasTerminated := true;
for iWorker := 0 to owRunningWorkers.Count - 1 do begin
worker := TOTPWorkerThread(owRunningWorkers[iWorker]);
if worker.IsExecuting(taskID) then begin
{$IFDEF LogThreadPool}Log('Cancel request %d on thread %p:%d', [taskID, pointer(worker), worker.ThreadID]);{$ENDIF LogThreadPool}
owRunningWorkers.Delete(iWorker);
worker.Stop;
endWait_ms := DSiTimeGetTime64 + int64(WaitOnTerminate_sec)*1000;
while (DSiTimeGetTime64 < endWait_ms) and (not worker.Stopped) do begin
ProcessMessages;
Sleep(10);
end;
SuspendThread(worker.Handle);
if worker.Asy_TerminateWorkItem(workItem) then begin
ProcessCompletedWorkItem(workItem);
{$IFDEF LogThreadPool}Log('Terminating unstoppable thread %s, num idle = %d, num running = %d[%d]', [worker.Description, tpIdleWorkers.Count, tpRunningWorkers.Count, MaxExecuting]);{$ENDIF LogThreadPool}
TerminateThread(worker.Handle, cardinal(-1));
ForwardThreadDestroying(worker.ThreadID, tpoKillThread, worker);
FreeAndNil(worker);
wasTerminated := false;
end
else begin
ResumeThread(worker.Handle);
owIdleWorkers.Add(worker);
{$IFDEF LogThreadPool}Log('Thread %s moved to the idle list, num idle = %d, num running = %d[%d]', [worker.Description, tpIdleWorkers.Count, tpRunningWorkers.Count, MaxExecuting]);{$ENDIF LogThreadPool}
end;
break; //for
end;
end; //for iWorker
(VarToObj(params[1]) as TOmniWaitableValue).Signal(wasTerminated);
end; { TOTPWorker.Cancel }
procedure TOTPWorker.CancelAll(var doneSignal: TOmniWaitableValue);
begin
InternalStop;
doneSignal.Signal;
end; { TOTPWorker.CancelAll }
procedure TOTPWorker.Cleanup;
begin
owDestroying := true;
InternalStop;
FreeAndNil(owStoppingWorkers);
FreeAndNil(owRunningWorkers);
FreeAndNil(owIdleWorkers);
FreeAndNil(owWorkItemQueue);
end; { TOTPWorker.Cleanup }
procedure TOTPWorker.ForwardThreadCreated(threadID: DWORD);
begin
if assigned(owOnThreadCreated) then
owOnThreadCreated(Self, threadID);
owMonitorSupport.Notify(
TOmniThreadPoolMonitorInfo.Create(owUniqueID, tpoCreateThread, threadID));
end; { TOTPWorker.ForwardThreadCreated }
procedure TOTPWorker.ForwardThreadDestroying(threadID: DWORD; threadPoolOperation:
TThreadPoolOperation; worker: TOTPWorkerThread);
begin
if not assigned(worker) then
worker := LocateThread(threadID);
if assigned(worker) then begin
Task.UnregisterComm(worker.OwnerCommEndpoint);
Worker.Stopped := true;
end;
if assigned(owOnThreadDestroying) then
owOnThreadDestroying(Self, threadID);
owMonitorSupport.Notify(
TOmniThreadPoolMonitorInfo.Create(owUniqueID, threadPoolOperation, threadID));
end; { TOTPWorker.ForwardThreadDestroying }
function TOTPWorker.Initialize: boolean;
begin
owMonitorSupport := CreateOmniMonitorSupport;
owIdleWorkers := TObjectList.Create(false);
owRunningWorkers := TObjectList.Create(false);
CountRunning.Value := 0;
owStoppingWorkers := TObjectList.Create(false);
owWorkItemQueue := TObjectList.Create(false);
CountQueued.Value := 0;
IdleWorkerThreadTimeout_sec.Value := CDefaultIdleWorkerThreadTimeout_sec;
WaitOnTerminate_sec.Value := CDefaultWaitOnTerminate_sec;
MaxExecuting.Value := Length(DSiGetThreadAffinity);
Task.SetTimer(1000, @TOTPWorker.MaintainanceTimer);
Result := true;
end; { TOTPWorker.Initialize }
procedure TOTPWorker.InternalStop;
var
endWait_ms : int64;
iWorker : integer;
iWorkItem : integer;
queuedItems: TObjectList {of TOTPWorkItem};
worker : TOTPWorkerThread;
workItem : TOTPWorkItem;
begin
{$IFDEF LogThreadPool}Log('Terminating queued tasks', []);{$ENDIF LogThreadPool}
queuedItems := TObjectList.Create(false);
try
for iWorkItem := 0 to owWorkItemQueue.Count - 1 do
queuedItems.Add(owWorkItemQueue[iWorkItem]);
owWorkItemQueue.Clear;
CountQueued.Value := 0;
for iWorkItem := 0 to queuedItems.Count - 1 do begin
workItem := TOTPWorkItem(queuedItems[iWorkItem]);
workItem.TerminateTask(EXIT_THREADPOOL_CANCELLED, 'Cancelled');
RequestCompleted(workItem, nil);
end; //for iWorkItem
finally FreeAndNil(queuedItems); end;
{$IFDEF LogThreadPool}Log('Stopping all threads', []);{$ENDIF LogThreadPool}
for iWorker := 0 to owIdleWorkers.Count - 1 do
StopThread(TOTPWorkerThread(owIdleWorkers[iWorker]));
owIdleWorkers.Clear;
for iWorker := 0 to owRunningWorkers.Count - 1 do
StopThread(TOTPWorkerThread(owRunningWorkers[iWorker]));
owRunningWorkers.Clear;
CountRunning.Value := 0;
endWait_ms := DSiTimeGetTime64 + int64(WaitOnTerminate_sec)*1000;
while (endWait_ms > DSiTimeGetTime64) and (NumRunningStoppedThreads > 0) do begin
ProcessMessages;
Sleep(10);
end;
for iWorker := 0 to owStoppingWorkers.Count - 1 do begin
worker := TOTPWorkerThread(owStoppingWorkers[iWorker]);
worker.Asy_TerminateWorkItem(workItem);
FreeAndNil(worker);
end;
owStoppingWorkers.Clear;
end; { TOTPWorker.InternalStop }
function TOTPWorker.LocateThread(threadID: DWORD): TOTPWorkerThread;
var
oThread: pointer;
begin
for oThread in owRunningWorkers do begin
Result := TOTPWorkerThread(oThread);
if Result.ThreadID = threadID then
Exit;
end;
for oThread in owIdleWorkers do begin
Result := TOTPWorkerThread(oThread);
if Result.ThreadID = threadID then
Exit;
end;
for oThread in owStoppingWorkers do begin
Result := TOTPWorkerThread(oThread);
if Result.ThreadID = threadID then
Exit;
end;
Result := nil;
end; { TOTPWorker.LocateThread }
procedure TOTPWorker.Log(const msg: string; const params: array of const);
begin
// TODO 1 -oPrimoz Gabrijelcic : Pass log messages to the event monitor
{$IFDEF LogThreadPool}
// use whatever logger you want
{$ENDIF LogThreadPool}
end; { TOTPWorker.Log }
procedure TOTPWorker.MaintainanceTimer;
var
iWorker: integer;
worker : TOTPWorkerThread;
begin
PruneWorkingQueue;
if IdleWorkerThreadTimeout_sec > 0 then begin
iWorker := 0;
while (owIdleWorkers.CardCount > MinWorkers.Value) and (iWorker < owIdleWorkers.Count) do begin
worker := TOTPWorkerThread(owIdleWorkers[iWorker]);
if (worker.StartStopping_ms = 0) and
((worker.StartIdle_ms + int64(IdleWorkerThreadTimeout_sec)*1000) < DSiTimeGetTime64) then
begin
{$IFDEF LogThreadPool}Log('Destroying idle thread %s because it was idle for more than %d seconds', [worker.Description, IdleWorkerThreadTimeout_sec]);{$ENDIF LogThreadPool}
owIdleWorkers.Delete(iWorker);
StopThread(worker);
end
else
Inc(iWorker);
end; //while
end;
iWorker := 0;
while iWorker < owStoppingWorkers.Count do begin
worker := TOTPWorkerThread(owStoppingWorkers[iWorker]);
if worker.Stopped or
((worker.StartStopping_ms + int64(WaitOnTerminate_sec)*1000) < DSiTimeGetTime64) then
begin
if not worker.Stopped then begin
SuspendThread(worker.Handle);
if worker.Stopped then begin
ResumeThread(worker.Handle);
break; //while
end;
TerminateThread(worker.Handle, cardinal(-1));
ForwardThreadDestroying(worker.ThreadID, tpoKillThread, worker);
end
else begin
{$IFDEF LogThreadPool}Log('Removing stopped thread %s', [worker.Description]);{$ENDIF LogThreadPool}
end;
owStoppingWorkers.Delete(iWorker);
FreeAndNil(worker);
end
else
Inc(iWorker);
end;
end; { TOTPWorker.MaintainanceTimer }
procedure TOTPWorker.MsgCompleted(var msg: TOmniMessage);
begin
ProcessCompletedWorkItem(TOTPWorkItem(msg.MsgData.AsObject));
end; { TOTPWorker.MsgCompleted }
procedure TOTPWorker.MsgThreadCreated(var msg: TOmniMessage);
begin
ForwardThreadCreated(msg.MsgData);
end; { TOTPWorker.MsgThreadCreated }
procedure TOTPWorker.MsgThreadDestroying(var msg: TOmniMessage);
begin
ForwardThreadDestroying(msg.MsgData, tpoDestroyThread);
end; { TOTPWorker.MsgThreadDestroying }
///Counts number of threads in the 'stopping' queue that are still doing work.
///2007-07-10
function TOTPWorker.NumRunningStoppedThreads: integer;
var
iThread: integer;
worker : TOTPWorkerThread;
begin
Result := 0;
for iThread := 0 to owStoppingWorkers.Count - 1 do begin
worker := TOTPWorkerThread(owStoppingWorkers[iThread]);
if not Worker.Stopped then
Inc(Result);
end; //for iThread
end; { TOTPWorker.NumRunningStoppedThreads }
procedure TOTPWorker.ProcessCompletedWorkItem(workItem: TOTPWorkItem);
var
worker: TOTPWorkerThread;
begin
worker := workItem.Thread;
{$IFDEF LogThreadPool}Log('Thread %s completed request %s with status %s:%s',
[worker.Description, workItem.Description, GetEnumName(TypeInfo(TGpTPStatus), Ord(workItem.Status)), workItem.LastError]);{$ENDIF LogThreadPool}
if owDestroying then begin
FreeAndNil(workItem);
Exit;
end;
owMonitorSupport.Notify(
TOmniThreadPoolMonitorInfo.Create(owUniqueID, workItem.UniqueID));
{$IFDEF LogThreadPool}Log('Thread %s completed request %s with status %s:%s', [worker.Description, workItem.Description, GetEnumName(TypeInfo(TGpTPStatus), Ord(workItem.Status)), workItem.LastError]);{$ENDIF LogThreadPool}
{$IFDEF LogThreadPool}Log('Destroying %s', [workItem.Description]);{$ENDIF LogThreadPool}
FreeAndNil(workItem);
// if (not (owDestroying)) and (owStoppingWorkers.IndexOf(worker) >= 0) then begin
// owStoppingWorkers.Remove(worker);
// owIdleWorkers.Add(worker);
// end;
if owRunningWorkers.IndexOf(worker) < 0 then
worker := nil;
if assigned(worker) then begin // move it back to the idle queue
owRunningWorkers.Extract(worker);
CountRunning.Decrement;
if (not worker.RemoveFromPool) and (owRunningWorkers.CardCount < MaxExecuting.Value) then begin
worker.StartIdle_ms := DSiTimeGetTime64;
owIdleWorkers.Add(worker);
{$IFDEF LogThreadPool}Log('Thread %s moved back to the idle list, num idle = %d, num running = %d[%d]', [worker.Description, tpIdleWorkers.Count, tpRunningWorkers.Count, MaxExecuting]);{$ENDIF LogThreadPool}
end
else begin
{$IFDEF LogThreadPool}Log('Destroying thread %s, num idle = %d, num running = %d[%d]', [worker.Description, tpIdleWorkers.Count, tpRunningWorkers.Count, MaxExecuting]);{$ENDIF LogThreadPool}
StopThread(worker);
end;
end;
if (not owDestroying) and (owWorkItemQueue.Count > 0) and
((owIdleWorkers.Count > 0) or (owRunningWorkers.CardCount < MaxExecuting.Value)) then
begin
workItem := TOTPWorkItem(owWorkItemQueue[0]);
owWorkItemQueue.Delete(0);
CountQueued.Decrement;
{$IFDEF LogThreadPool}Log('Dequeueing %s ', [workItem.Description]);{$ENDIF LogThreadPool}
ScheduleNext(workItem);
end;
end; { TOTPWorker.ProcessCompletedWorkItem }
procedure TOTPWorker.PruneWorkingQueue;
var
errorMsg : string;
iWorkItem : integer;
maxWaitTime_ms: int64;
workItem : TOTPWorkItem;
begin
if MaxQueued.Value > 0 then begin
while owWorkItemQueue.CardCount > MaxQueued.Value do begin
workItem := TOTPWorkItem(owWorkItemQueue[owWorkItemQueue.Count - 1]);
{$IFDEF LogThreadPool}Log('Removing request %s from work item queue because queue length > %d', [workItem.Description, tpMaxQueueLength]);{$ENDIF LogThreadPool}
owWorkItemQueue.Delete(owWorkItemQueue.Count - 1);
CountQueued.Decrement;
errorMsg := Format('Execution queue is too long (%d work items)', [owWorkItemQueue.Count]);
workItem.TerminateTask(EXIT_THREADPOOL_QUEUE_TOO_LONG, errorMsg);
RequestCompleted(workItem, nil);
end; //while
end;
if MaxQueuedTime_sec.Value > 0 then begin
iWorkItem := 0;
while iWorkItem < owWorkItemQueue.Count do begin
workItem := TOTPWorkItem(owWorkItemQueue[iWorkItem]);
maxWaitTime_ms := workItem.Scheduled_ms + int64(MaxQueuedTime_sec.Value)*1000;
if maxWaitTime_ms > DSiTimeGetTime64 then
Inc(iWorkItem)
else begin
{$IFDEF LogThreadPool}Log('Removing request %s from work item queue because it is older than %d seconds', [workItem.Description, tpMaxQueuedTime_sec]);{$ENDIF LogThreadPool}
owWorkItemQueue.Delete(iWorkItem);
CountQueued.Decrement;
errorMsg := Format('Maximum queued time exceeded.' +
' Pool = %0:s, Now = %1:s, Max executing = %2:d,' +
' Removed entry queue time = %3:s, Removed entry description = %4:s.' +
' Active entries: %5:s',
[{0}owName, {1}FormatDateTime('hh:nn:ss', Now), {2}MaxExecuting.Value,
{3}FormatDateTime('hh:nn:ss', workItem.ScheduledAt),
{4}workItem.Description, {5}ActiveWorkItemDescriptions]);
workItem.TerminateTask(EXIT_THREADPOOL_STALE_TASK, errorMsg);
RequestCompleted(workItem, nil);
end;
end; //while
end;
end; { TOTPWorker.PruneWorkingQueue }
procedure TOTPWorker.RemoveMonitor;
begin
owMonitorSupport.RemoveMonitor;
end; { TOTPWorker.RemoveMonitor }
procedure TOTPWorker.RequestCompleted(workItem: TOTPWorkItem; worker: TOTPWorkerThread);
begin
workItem.Thread := worker;
ProcessCompletedWorkItem(workItem);
end; { TOTPWorker.RequestCompleted }
procedure TOTPWorker.Schedule(var workItem: TOTPWorkItem);
begin
ScheduleNext(workItem);
PruneWorkingQueue;
end; { TOTPWorker.Schedule }
procedure TOTPWorker.ScheduleNext(workItem: TOTPWorkItem);
var
worker: TOTPWorkerThread;
begin
worker := nil;
if owIdleWorkers.Count > 0 then begin
worker := TOTPWorkerThread(owIdleWorkers[owIdleWorkers.Count - 1]);
owIdleWorkers.Delete(owIdleWorkers.Count - 1);
owRunningWorkers.Add(worker);
CountRunning.Increment;
{$IFDEF LogThreadPool}Log('Allocated thread from idle pool, num idle = %d, num running = %d[%d]', [owIdleWorkers.Count, owRunningWorkers.Count, MaxExecuting]);{$ENDIF LogThreadPool}
end
else if (MaxExecuting.Value <= 0) or (owRunningWorkers.CardCount < MaxExecuting.Value) then begin
if (owRunningWorkers.Count + owIdleWorkers.Count + owStoppingWorkers.Count) >= CMaxConcurrentWorkers then
raise Exception.CreateFmt('TOTPWorker.ScheduleNext: Cannot start more than %d threads ' +
'due to the implementation limitations', [CMaxConcurrentWorkers]);
worker := TOTPWorkerThread.Create;
Task.RegisterComm(worker.OwnerCommEndpoint);
owRunningWorkers.Add(worker);
CountRunning.Increment;
{$IFDEF LogThreadPool}Log('Created new thread %s, num idle = %d, num running = %d[%d]', [worker.Description, owIdleWorkers.Count, owRunningWorkers.Count, MaxExecuting]);{$ENDIF LogThreadPool}
end;
if assigned(worker) then begin
{$IFDEF LogThreadPool}Log('Started %s', [workItem.Description]);{$ENDIF LogThreadPool}
workItem.StartedAt := Now;
workItem.Thread := worker;
//worker.WorkItem_ref := workItem;
worker.OwnerCommEndpoint.Send(MSG_RUN, workItem);
end
else begin
{$IFDEF LogThreadPool}Log('Queued %s ', [workItem.Description]);{$ENDIF LogThreadPool}
owWorkItemQueue.Add(workItem);
CountQueued.Increment;
if (MaxQueued > 0) and (owWorkItemQueue.CardCount >= MaxQueued.Value) then
PruneWorkingQueue;
end;
end; { TOTPWorker.ScheduleNext }
procedure TOTPWorker.SetMonitor(const hWindow: TOmniValue);
begin
owMonitorSupport.SetMonitor(CreateOmniMonitorParams(hWindow, COmniPoolMsg, 0, 0));
end; { TOTPWorker.SetMonitor }
procedure TOTPWorker.SetName(const name: TOmniValue);
begin
owName := name;
end; { TOTPWorker.SetName }
procedure TOTPWorker.SetOnThreadCreated(const eventHandler: TOmniValue);
begin
TMethod(owOnThreadCreated).Code := pointer(cardinal(eventHandler[0]));
TMethod(owOnThreadCreated).Data := pointer(cardinal(eventHandler[1]));
end; { TOTPWorker.SetOnThreadCreated }
procedure TOTPWorker.SetOnThreadDestroying(const eventHandler: TOmniValue);
begin
TMethod(owOnThreadDestroying).Code := pointer(cardinal(eventHandler[0]));
TMethod(owOnThreadDestroying).Data := pointer(cardinal(eventHandler[1]));
end; { TOTPWorker.SetOnThreadDestroying }
///Move the thread to the 'stopping' list and tell it to CancelAll.
/// Thread is guaranted not to be in 'idle' or 'working' list when StopThread is called.
///2007-07-10
procedure TOTPWorker.StopThread(worker: TOTPWorkerThread);
begin
{$IFDEF LogThreadPool}Log('Stopping worker thread %s', [worker.Description]);{$ENDIF LogThreadPool}
owStoppingWorkers.Add(worker);
worker.StartStopping_ms := DSiTimeGetTime64;
worker.OwnerCommEndpoint.Send(MSG_STOP);
{$IFDEF LogThreadPool}Log('num stopped = %d', [tpStoppingWorkers.Count]);{$ENDIF LogThreadPool}
end; { TOTPWorker.StopThread }
{ TOmniThreadPool }
constructor TOmniThreadPool.Create(const name: string);
begin
inherited Create;
{$IFDEF LogThreadPool}Log('Creating thread pool %p [%s]', [pointer(self), name]);{$ENDIF LogThreadPool}
otpPoolName := name;
otpUniqueID := OtlUID.Increment;
otpWorker := TOTPWorker.Create(name, otpUniqueID);
otpWorkerTask := CreateTask(otpWorker, Format('OmniThreadPool manager %s', [name])).Run;
end; { TOmniThreadPool.Create }
destructor TOmniThreadPool.Destroy;
begin
{$IFDEF LogThreadPool}Log('Destroying thread pool %p', [pointer(self), otpPoolName]);{$ENDIF LogThreadPool}
otpWorkerTask.Terminate;
inherited;
end; { TOmniThreadPool.Destroy }
///True: Normal exit, False: Thread was killed.
function TOmniThreadPool.Cancel(taskID: int64): boolean;
var
res: TOmniWaitableValue;
begin
res := TOmniWaitableValue.Create;
try
otpWorkerTask.Invoke(@TOTPWorker.Cancel, [taskID, res]);
res.WaitFor(INFINITE);
Result := res.Value;
finally FreeAndNil(res); end;
end; { TOmniThreadPool.Cancel }
procedure TOmniThreadPool.CancelAll;
var
res: TOmniWaitableValue;
begin
res := TOmniWaitableValue.Create;
try
otpWorkerTask.Invoke(@TOTPWorker.CancelAll, res);
res.WaitFor(INFINITE);
finally FreeAndNil(res); end;
end; { TOmniThreadPool.CancelAll }
function TOmniThreadPool.CountExecuting: integer;
begin
Result := WorkerObj.CountRunning.Value;
end; { TOmniThreadPool.CountExecuting }
function TOmniThreadPool.CountQueued: integer;
begin
Result := WorkerObj.CountQueued.Value;
end; { TOmniThreadPool.CountQueued }
function TOmniThreadPool.GetIdleWorkerThreadTimeout_sec: integer;
begin
Result := WorkerObj.IdleWorkerThreadTimeout_sec.Value;
end; { TOmniThreadPool.GetIdleWorkerThreadTimeout_sec }
function TOmniThreadPool.GetMaxExecuting: integer;
begin
Result := WorkerObj.MaxExecuting.Value;
end; { TOmniThreadPool.GetMaxExecuting }
function TOmniThreadPool.GetMaxQueued: integer;
begin
Result := WorkerObj.MaxQueued.Value;
end; { TOmniThreadPool.GetMaxQueued }
function TOmniThreadPool.GetMaxQueuedTime_sec: integer;
begin
Result := WorkerObj.MaxQueuedTime_sec.Value;
end; { TOmniThreadPool.GetMaxQueuedTime_sec }
function TOmniThreadPool.GetMinWorkers: integer;
begin
Result := WorkerObj.MinWorkers.Value;
end; { TOmniThreadPool.GetMinWorkers }
function TOmniThreadPool.GetName: string;
begin
Result := otpPoolName;
end; { TOmniThreadPool.GetName }
function TOmniThreadPool.GetOnWorkerThreadCreated_Asy: TOTPWorkerThreadEvent;
begin
Result := otpOnThreadCreated;
end; { TOmniThreadPool.GetOnWorkerThreadCreated_Asy }
function TOmniThreadPool.GetOnWorkerThreadDestroying_Asy: TOTPWorkerThreadEvent;
begin
Result := otpOnThreadDestroying;
end; { TOmniThreadPool.GetOnWorkerThreadDestroying_Asy }
function TOmniThreadPool.GetUniqueID: int64;
begin
Result := otpUniqueID;
end; { TOmniThreadPool.GetUniqueID }
function TOmniThreadPool.GetWaitOnTerminate_sec: integer;
begin
Result := WorkerObj.WaitOnTerminate_sec.Value;
end; { TOmniThreadPool.GetWaitOnTerminate_sec }
function TOmniThreadPool.IsIdle: boolean;
begin
if CountQueued <> 0 then
Result := false
else if CountExecuting <> 0 then
Result := false
else
Result := true;
end; { TOmniThreadPool.IsIdle }
procedure TOmniThreadPool.Log(const msg: string; const params: array of const);
begin
{$IFDEF LogThreadPool}
// use whatever logger you want
{$ENDIF LogThreadPool}
end; { TGpThreadPool.Log }
function TOmniThreadPool.MonitorWith(const monitor: IOmniThreadPoolMonitor): IOmniThreadPool;
begin
monitor.Monitor(Self);
Result := Self;
end; { TOmniThreadPool.MonitorWith }
function TOmniThreadPool.RemoveMonitor: IOmniThreadPool;
begin
otpWorkerTask.Invoke(@TOTPWorker.RemoveMonitor);
Result := Self;
end; { TOmniThreadPool.RemoveMonitor }
procedure TOmniThreadPool.Schedule(const task: IOmniTask);
begin
otpWorkerTask.Invoke(@TOTPWorker.Schedule, TOTPWorkItem.Create(task));
end; { TOmniThreadPool.Schedule }
procedure TOmniThreadPool.SetIdleWorkerThreadTimeout_sec(value: integer);
begin
WorkerObj.IdleWorkerThreadTimeout_sec.Value := value;
end; { TOmniThreadPool.SetIdleWorkerThreadTimeout_sec }
procedure TOmniThreadPool.SetMaxExecuting(value: integer);
begin
if value > CMaxConcurrentWorkers then
raise Exception.CreateFmt('TOmniThreadPool.SetMaxExecuting: ' +
'MaxExecuting cannot be larger than %d due to the implementation limitations',
[CMaxConcurrentWorkers]);
WorkerObj.MaxExecuting.Value := value;
end; { TOmniThreadPool.SetMaxExecuting }
procedure TOmniThreadPool.SetMaxQueued(value: integer);
begin
WorkerObj.MaxQueued.Value := value;
otpWorkerTask.Invoke(@TOTPWorker.PruneWorkingQueue);
end; { TOmniThreadPool.SetMaxQueued }
procedure TOmniThreadPool.SetMaxQueuedTime_sec(value: integer);
begin
WorkerObj.MaxQueuedTime_sec.Value := value;
otpWorkerTask.Invoke(@TOTPWorker.PruneWorkingQueue);
end; { TOmniThreadPool.SetMaxQueuedTime_sec }
procedure TOmniThreadPool.SetMinWorkers(value: integer);
begin
WorkerObj.MinWorkers.Value := value;
end; { TOmniThreadPool.SetMinWorkers }
function TOmniThreadPool.SetMonitor(hWindow: THandle): IOmniThreadPool;
begin
otpWorkerTask.Invoke(@TOTPWorker.SetMonitor, hWindow);
Result := Self;
end; { TOmniThreadPool.SetMonitor }
procedure TOmniThreadPool.SetName(const value: string);
begin
otpPoolName := value;
otpWorkerTask.Invoke(@TOTPWorker.SetName, value);
end; { TOmniThreadPool.SetName }
procedure TOmniThreadPool.SetOnWorkerThreadCreated_Asy(const value:
TOTPWorkerThreadEvent);
begin
otpOnThreadCreated := value;
otpWorkerTask.Invoke(@TOTPWorker.SetOnThreadCreated,
[TMethod(value).Code, TMethod(value).Data]);
end; { TOmniThreadPool.SetOnWorkerThreadCreated_Asy }
procedure TOmniThreadPool.SetOnWorkerThreadDestroying_Asy(const value:
TOTPWorkerThreadEvent);
begin
otpOnThreadDestroying := value;
otpWorkerTask.Invoke(@TOTPWorker.SetOnThreadDestroying,
[TMethod(value).Code, TMethod(value).Data]);
end; { TOmniThreadPool.SetOnWorkerThreadDestroying_Asy }
procedure TOmniThreadPool.SetWaitOnTerminate_sec(value: integer);
begin
WorkerObj.WaitOnTerminate_sec.Value := value;
end; { TOmniThreadPool.SetWaitOnTerminate_sec }
function TOmniThreadPool.WorkerObj: TOTPWorker;
begin
Result := (otpWorker.Implementor as TOTPWorker);
end; { TOmniThreadPool.WorkerObj }
initialization
//assumptions made in the code above
Assert(SizeOf(pointer) = SizeOf(cardinal));
end.