///Task control encapsulation. Part of the OmniThreadLibrary project. ///Primoz Gabrijelcic /// ///This software is distributed under the BSD license. /// ///Copyright (c) 2009, Primoz Gabrijelcic ///All rights reserved. /// ///Redistribution and use in source and binary forms, with or without modification, ///are permitted provided that the following conditions are met: ///- Redistributions of source code must retain the above copyright notice, this /// list of conditions and the following disclaimer. ///- Redistributions in binary form must reproduce the above copyright notice, /// this list of conditions and the following disclaimer in the documentation /// and/or other materials provided with the distribution. ///- The name of the Primoz Gabrijelcic may not be used to endorse or promote /// products derived from this software without specific prior written permission. /// ///THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ///ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ///WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE ///DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ///ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES ///(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; ///LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ///ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT ///(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS ///SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. /// /// /// Home : http://otl.17slon.com /// Support : http://otl.17slon.com/forum/ /// Author : Primoz Gabrijelcic /// E-Mail : primoz@gabrijelcic.org /// Blog : http://thedelphigeek.com /// Web : http://gp.17slon.com /// Contributors : GJ, Lee_Nover /// /// Creation date : 2008-06-12 /// Last modification : 2009-01-26 /// Version : 1.08 /// /// History: /// 1.08: 2009-01-26 /// - Implemented IOmniTaskControl.Enforced decorator. /// - Added TOmniWorker.ProcessMessages - a support for worker to recursively /// process messages inside message handlers. /// 1.07: 2009-01-19 /// - Implemented IOmniTaskControlList, a list of IOmniTaskControl interfaces. /// - TOmniTaskGroup reimplemented using IOmniTaskControlList. /// 1.06: 2008-12-15 /// - TOmniWorker's internal message loop can now be overridden at various places /// and even fully replaced with a custom code. /// 1.05a: 2008-11-17 /// - [Jamie] Fixed bug in TOmniTaskExecutor.Asy_SetTimerInt. /// 1.05: 2008-11-01 /// - IOmniTaskControl.Terminate kills the task thread if it doesn't terminate in /// the specified amount of time. /// 1.04a: 2008-10-06 /// - IOmniTaskControl.Invoke modified to return IOmniTaskControl. /// 1.04: 2008-10-05 /// - Implemented IOmniTaskControl.Invoke (six overloads), used for string- and /// pointer-based method dispatch (see demo 18 for more details and demo 19 /// for benchmarks). /// - Implemented two SetTimer overloads using new invocation methods. /// - Implemented IOmniTaskControl.SetQueue, which can be used to increase (or /// reduce) the size of the IOmniTaskControl<->IOmniTask communication queue. /// This function must be called before .SetMonitor, .RemoveMonitor, .Run or /// .Schedule. /// 1.03b: 2008-09-26 /// - More stringent Win32 API result checking. /// 1.03a: 2008-09-25 /// - Bug fixed: TOmniTaskControl.Schedule always scheduled task to the global /// thread pool. /// 1.03: 2008-09-20 /// - Implemented IOmniTaskGroup.SendToAll. This should be looked at as a temporary /// solution. IOmniTaskGroup should expose communication interface (just like /// IOmniTask and IOmniTaskControl) but in this case it should be one-to-many /// queue connecting IOmniTaskGroup's Comm to all tasks inside the group. /// 1.02: 2008-09-19 /// - Added enumerator to the IOmniTaskGroup interface. /// - Implemented IOmniTaskGroup.RegisterAllCommWith and .UnregisterAllCommFrom. /// - Bug fixed in TOmniTaskExecutor.Asy_DispatchMessages - program crashed if /// communications unregistered inside task's own timer method. /// - Setting timer interval resets timer countdown. /// 1.01: 2008-09-18 /// - Implemented SetTimer on the IOmniTask side. /// - Bug fixed: IOmniTaskGroup.RunAll was not returning a result. /// 1.0a: 2008-08-29 /// - Bug fixed: .MsgWait was not functional. /// 1.0: 2008-08-26 /// - First official release. /// ///Literature /// - Lock my Object… Please!, Allen Bauer, /// http://blogs.codegear.com/abauer/2008/02/19/38856 /// - Threading in C#, Joseph Albahari, http://www.albahari.com/threading/ /// - Coordination Data Structures Overview, Emad Omara, /// http://blogs.msdn.com/pfxteam/archive/2008/06/18/8620615.aspx /// - Erlang, http://en.wikipedia.org/wiki/Erlang_(programming_language) {$IF CompilerVersion >= 20} {$DEFINE OTL_Anonymous} {$IFEND} {$WARN SYMBOL_PLATFORM OFF} unit OtlTaskControl; interface uses Windows, SysUtils, Variants, Classes, SyncObjs, GpStuff, OtlCommon, OtlComm, OtlTask, OtlThreadPool; type IOmniTaskControl = interface; IOmniTaskControlMonitor = interface ['{20CB3AB7-04D8-454B-AEFE-CFCFF8F27301}'] function Detach(const task: IOmniTaskControl): IOmniTaskControl; function Monitor(const task: IOmniTaskControl): IOmniTaskControl; end; { IOmniTaskControlMonitor } {$TYPEINFO ON} {$METHODINFO ON} IOmniWorker = interface ['{CA63E8C2-9B0E-4BFA-A527-31B2FCD8F413}'] function GetImplementor: TObject; function GetTask: IOmniTask; procedure SetExecutor(executor: TObject); procedure SetTask(const value: IOmniTask); // procedure Cleanup; procedure DispatchMessage(var msg: TOmniMessage); procedure Timer; function Initialize: boolean; property Task: IOmniTask read GetTask write SetTask; property Implementor: TObject read GetImplementor; end; { IOmniWorker } TOmniWorker = class(TInterfacedObject, IOmniWorker) strict private owExecutor: TObject; {TOmniTaskExecutor} owTask : IOmniTask; strict protected procedure ProcessMessages; protected procedure Cleanup; virtual; procedure DispatchMessage(var msg: TOmniMessage); virtual; function GetImplementor: TObject; function GetTask: IOmniTask; function Initialize: boolean; virtual; procedure SetExecutor(executor: TObject); procedure SetTask(const value: IOmniTask); public procedure Timer; virtual; property Task: IOmniTask read GetTask write SetTask; property Implementor: TObject read GetImplementor; end; { TOmniWorker } {$TYPEINFO OFF} {$METHODINFO OFF} TOmniTaskProcedure = procedure(const task: IOmniTask); TOmniTaskMethod = procedure(const task: IOmniTask) of object; TOTLThreadPriority = (tpIdle, tpLowest, tpBelowNormal, tpNormal, tpAboveNormal, tpHighest); IOmniTaskGroup = interface; IOmniTaskControl = interface ['{881E94CB-8C36-4CE7-9B31-C24FD8A07555}'] function GetComm: IOmniCommunicationEndpoint; function GetExitCode: integer; function GetExitMessage: string; function GetLock: TSynchroObject; function GetName: string; function GetUniqueID: int64; // function Alertable: IOmniTaskControl; function ChainTo(const task: IOmniTaskControl; ignoreErrors: boolean = false): IOmniTaskControl; function Enforced(forceExecution: boolean = true): IOmniTaskControl; function Invoke(const msgMethod: pointer): IOmniTaskControl; overload; function Invoke(const msgMethod: pointer; msgData: array of const): IOmniTaskControl; overload; function Invoke(const msgMethod: pointer; msgData: TOmniValue): IOmniTaskControl; overload; function Invoke(const msgName: string): IOmniTaskControl; overload; function Invoke(const msgName: string; msgData: array of const): IOmniTaskControl; overload; function Invoke(const msgName: string; msgData: TOmniValue): IOmniTaskControl; overload; function Join(const group: IOmniTaskGroup): IOmniTaskControl; function Leave(const group: IOmniTaskGroup): IOmniTaskControl; function MonitorWith(const monitor: IOmniTaskControlMonitor): IOmniTaskControl; function MsgWait(wakeMask: DWORD = QS_ALLEVENTS): IOmniTaskControl; function RemoveMonitor: IOmniTaskControl; function Run: IOmniTaskControl; function Schedule(const threadPool: IOmniThreadPool = nil {default pool}): IOmniTaskControl; function SetMonitor(hWindow: THandle): IOmniTaskControl; function SetParameter(const paramName: string; const paramValue: TOmniValue): IOmniTaskControl; overload; function SetParameter(const paramValue: TOmniValue): IOmniTaskControl; overload; function SetParameters(const parameters: array of TOmniValue): IOmniTaskControl; function SetPriority(threadPriority: TOTLThreadPriority): IOmniTaskControl; function SetTimer(interval_ms: cardinal; timerMessageID: integer = -1): IOmniTaskControl; overload; function SetTimer(interval_ms: cardinal; const timerMessageName: string): IOmniTaskControl; overload; function SetTimer(interval_ms: cardinal; const timerMethod: pointer): IOmniTaskControl; overload; function SetQueueSize(numMessages: integer): IOmniTaskControl; function Terminate(maxWait_ms: cardinal = INFINITE): boolean; //will kill thread after timeout function TerminateWhen(event: THandle): IOmniTaskControl; function WaitFor(maxWait_ms: cardinal): boolean; function WaitForInit: boolean; function WithCounter(const counter: IOmniCounter): IOmniTaskControl; function WithLock(const lock: TSynchroObject; autoDestroyLock: boolean = true): IOmniTaskControl; // property Comm: IOmniCommunicationEndpoint read GetComm; property ExitCode: integer read GetExitCode; property ExitMessage: string read GetExitMessage; property Lock: TSynchroObject read GetLock; property Name: string read GetName; property UniqueID: int64 read GetUniqueID; end; { IOmniTaskControl } IOmniTaskControlListEnumerator = interface function GetCurrent: IOmniTaskControl; function MoveNext: boolean; property Current: IOmniTaskControl read GetCurrent; end; { IOmniTaskControlListEnumerator } IOmniTaskControlList = interface function Get(idxItem: integer): IOmniTaskControl; function GetCapacity: integer; function GetCount: integer; procedure Put(idxItem: integer; const value: IOmniTaskControl); procedure SetCapacity(const value: integer); procedure SetCount(const value: integer); // function Add(const item: IOmniTaskControl): integer; procedure Clear; procedure Delete(idxItem: integer); procedure Exchange(idxItem1, idxItem2: integer); function First: IOmniTaskControl; function GetEnumerator: IOmniTaskControlListEnumerator; function IndexOf(const item: IOmniTaskControl): integer; procedure Insert(idxItem: integer; const item: IOmniTaskControl); function Last: IOmniTaskControl; function Remove(const item: IOmniTaskControl): integer; property Capacity: Integer read GetCapacity write SetCapacity; property Count: integer read GetCount write SetCount; property Items[idxItem: integer]: IOmniTaskControl read Get write Put; default; end; { IOmniTaskControlList } //v1.1 extensions: // maybe: Comm: IOmniCommunicationEndpoint, which is actually one-to-many-to-one // function Sequential: IOmniTaskGroup; // function Parallel(useThreadPool: IOmniThreadPool): IOmniTaskGroup; // maybe: if one of group processes dies, TerminateAll should automatically happen? IOmniTaskGroup = interface ['{B36C08B4-0F71-422C-8613-63C4D04676B7}'] function Add(const taskControl: IOmniTaskControl): IOmniTaskGroup; function GetEnumerator: IOmniTaskControlListEnumerator; function RegisterAllCommWith(const task: IOmniTask): IOmniTaskGroup; function Remove(const taskControl: IOmniTaskControl): IOmniTaskGroup; function RunAll: IOmniTaskGroup; procedure SendToAll(const msg: TOmniMessage); function TerminateAll(maxWait_ms: cardinal = INFINITE): boolean; function UnregisterAllCommFrom(const task: IOmniTask): IOmniTaskGroup; function WaitForAll(maxWait_ms: cardinal = INFINITE): boolean; end; { IOmniTaskGroup } {$IFDEF OTL_Anonymous} TOmniTaskFunction = reference to procedure (task: IOmniTask); function CreateTask(worker: TOmniTaskFunction; const taskName: string = ''): IOmniTaskControl; overload; {$ENDIF OTL_Anonymous} function CreateTask(worker: TOmniTaskProcedure; const taskName: string = ''): IOmniTaskControl; overload; function CreateTask(worker: TOmniTaskMethod; const taskName: string = ''): IOmniTaskControl; overload; function CreateTask(const worker: IOmniWorker; const taskName: string = ''): IOmniTaskControl; overload; // function CreateTask(worker: IOmniTaskGroup; const taskName: string = ''): IOmniTaskControl; overload; function CreateTaskGroup: IOmniTaskGroup; function CreateTaskControlList: IOmniTaskControlList; implementation uses Messages, TypInfo, ObjAuto, DetailedRTTI, DSiWin32, GpLists, GpStringHash, OtlEventMonitor; type TOmniInternalMessageType = (imtStringMsg, imtAddressMsg); TOmniInternalMessage = class strict private imInternalMessageType: TOmniInternalMessageType; public class function InternalType(const msg: TOmniMessage): TOmniInternalMessageType; constructor Create(internalMessageType: TOmniInternalMessageType); property InternalMessageType: TOmniInternalMessageType read imInternalMessageType; end; { TOmniInternalMessage } TOmniInternalStringMsg = class(TOmniInternalMessage) strict private ismMsgData: TOmniValue; ismMsgName: string; public class function CreateMessage(const msgName: string; msgData: TOmniValue): TOmniMessage; inline; class procedure UnpackMessage(const msg: TOmniMessage; var msgName: string; var msgData: TOmniValue); inline; constructor Create(const msgName: string; const msgData: TOmniValue); property MsgData: TOmniValue read ismMsgData; property MsgName: string read ismMsgName; end; { TOmniInternalStringMsg } TOmniInternalAddressMsg = class(TOmniInternalMessage) strict private ismMsgData : TOmniValue; ismMsgMethod: pointer; public class function CreateMessage(const msgMethod: pointer; msgData: TOmniValue): TOmniMessage; inline; class procedure UnpackMessage(const msg: TOmniMessage; var msgMethod: pointer; var msgData: TOmniValue); inline; constructor Create(const msgMethod: pointer; const msgData: TOmniValue); property MsgData: TOmniValue read ismMsgData; property MsgMethod: pointer read ismMsgMethod; end; { TOmniInternalAddressMsg } TOmniInvokeType = (itUnknown, itSelf, itSelfAndOmniValue, itSelfAndObject); TOmniInvokeSignature_Self = procedure(Self: TObject); TOmniInvokeSignature_Self_OmniValue = procedure(Self: TObject; var value: TOmniValue); TOmniInvokeSignature_Self_Object = procedure(Self: TObject; var obj: TObject); TOmniInvokeInfo = class strict private oiiAddress : pointer; oiiSignature: TOmniInvokeType; public constructor Create(methodAddr: pointer; methodSignature: TOmniInvokeType); property Address: pointer read oiiAddress; property Signature: TOmniInvokeType read oiiSignature; end; { TOmniInvokeInfo } TOmniTaskControlOption = (tcoAlertableWait, tcoMessageWait, tcoForceExecution); TOmniTaskControlOptions = set of TOmniTaskControlOption; TOmniExecutorType = (etNone, etMethod, etProcedure, etWorker, etFunction); TOmniTaskExecutor = class strict private type TOmniMessageInfo = record IdxFirstMessage : cardinal; IdxFirstTerminate: cardinal; IdxLastMessage : cardinal; IdxLastTerminate : cardinal; IdxRebuildHandles: cardinal; NumWaitHandles : cardinal; WaitFlags : DWORD; WaitHandles : array [0..63] of THandle; WaitWakeMask : DWORD; end; strict private oteCommList : TInterfaceList; oteCommRebuildHandles: THandle; oteExecutorType : TOmniExecutorType; oteExitCode : TGp4AlignedInt; oteExitMessage : string; {$IFDEF OTL_Anonymous} oteFunc : TOmniTaskFunction; {$ENDIF OTL_Anonymous} oteInternalLock : TOmniCS; oteLastTimer_ms : int64; oteMethod : TOmniTaskMethod; oteMethodHash : TGpStringObjectHash; oteMsgInfo : TOmniMessageInfo; oteOptions : TOmniTaskControlOptions; oteOptionsLock : TOmniCS; otePriority : TOTLThreadPriority; oteProc : TOmniTaskProcedure; oteTerminateHandles : TGpIntegerList; oteTimerInterval_ms : TGp4AlignedInt; oteTimerMessageID : TGp4AlignedInt; oteTimerMessageMethod: TGp4AlignedInt; oteTimerMessageName : string; oteWakeMask : DWORD; oteWaitHandlesGen : int64; oteWorkerInitialized : THandle; oteWorkerInitOK : boolean; oteWorkerIntf : IOmniWorker; strict protected procedure CallOmniTimer; procedure Cleanup; procedure DispatchMessages(const task: IOmniTask); procedure DispatchOmniMessage(msg: TOmniMessage); function GetExitCode: integer; inline; function GetExitMessage: string; procedure GetMethodAddrAndSignature(const methodName: string; var methodAddress: pointer; var methodSignature: TOmniInvokeType); procedure GetMethodNameFromInternalMessage(const msg: TOmniMessage; var msgName: string; var msgData: TOmniValue); function GetOptions: TOmniTaskControlOptions; function GetTimerInterval_ms: cardinal; inline; function GetTimerMessageID: integer; inline; function GetTimerMessageMethod: pointer; function GetTimerMessageName: string; procedure Initialize; procedure ProcessThreadMessages; procedure RaiseInvalidSignature(const methodName: string); procedure RemoveTerminationEvents(const srcMsgInfo: TOmniMessageInfo; var dstMsgInfo: TOmniMessageInfo); procedure SetOptions(const value: TOmniTaskControlOptions); procedure SetTimerInt(interval_ms: cardinal; timerMsgID: integer; const timerMsgName: string; const timerMsgMethod: pointer); procedure SetTimerInterval_ms(const value: cardinal); procedure SetTimerMessageID(const value: integer); procedure SetTimerMessageMethod(const value: pointer); procedure SetTimerMessageName(const value: string); protected function DispatchEvent(awaited: cardinal; const task: IOmniTask; var msgInfo: TOmniMessageInfo): boolean; virtual; procedure MainMessageLoop(const task: IOmniTask; var msgInfo: TOmniMessageInfo); virtual; procedure MessageLoopPayload; virtual; procedure ProcessMessages(task: IOmniTask); virtual; procedure RebuildWaitHandles(const task: IOmniTask; var msgInfo: TOmniMessageInfo); virtual; function TimeUntilNextTimer_ms: cardinal; virtual; function WaitForEvent(msgInfo: TOmniMessageInfo; timeout_ms: cardinal): cardinal; virtual; public constructor Create(const workerIntf: IOmniWorker); overload; constructor Create(method: TOmniTaskMethod); overload; constructor Create(proc: TOmniTaskProcedure); overload; {$IFDEF OTL_Anonymous} constructor Create(func: TOmniTaskFunction); overload; {$ENDIF OTL_Anonymous} destructor Destroy; override; procedure Asy_Execute(const task: IOmniTask); procedure Asy_RegisterComm(const comm: IOmniCommunicationEndpoint); procedure Asy_SetExitStatus(exitCode: integer; const exitMessage: string); procedure Asy_SetTimer(interval_ms: cardinal; timerMsgID: integer); overload; procedure Asy_SetTimer(interval_ms: cardinal; const timerMethod: pointer); overload; procedure Asy_SetTimer(interval_ms: cardinal; const timerMsgName: string); overload; procedure Asy_UnregisterComm(const comm: IOmniCommunicationEndpoint); procedure TerminateWhen(handle: THandle); function WaitForInit: boolean; property ExitCode: integer read GetExitCode; property ExitMessage: string read GetExitMessage; property Options: TOmniTaskControlOptions read GetOptions write SetOptions; property Priority: TOTLThreadPriority read otePriority write otePriority; property TimerInterval_ms: cardinal read GetTimerInterval_ms write SetTimerInterval_ms; property TimerMessageID: integer read GetTimerMessageID write SetTimerMessageID; property TimerMessageMethod: pointer read GetTimerMessageMethod write SetTimerMessageMethod; property TimerMessageName: string read GetTimerMessageName write SetTimerMessageName; property WakeMask: DWORD read oteWakeMask write oteWakeMask; property WorkerInitialized: THandle read oteWorkerInitialized; property WorkerInitOK: boolean read oteWorkerInitOK; property WorkerIntf: IOmniWorker read oteWorkerIntf; end; { TOmniTaskExecutor } TOmniSharedTaskInfo = class strict private ostiChainIgnoreErrors: boolean; ostiChainTo : IOmniTaskControl; ostiCommChannel : IOmniTwoWayChannel; ostiCounter : IOmniCounter; ostiLock : TSynchroObject; ostiMonitorWindow : THandle; ostiTaskName : string; ostiTerminatedEvent : THandle; ostiTerminateEvent : THandle; ostiUniqueID : int64; public property ChainIgnoreErrors: boolean read ostiChainIgnoreErrors write ostiChainIgnoreErrors; property ChainTo: IOmniTaskControl read ostiChainTo write ostiChainTo; property CommChannel: IOmniTwoWayChannel read ostiCommChannel write ostiCommChannel; property Counter: IOmniCounter read ostiCounter write ostiCounter; property Lock: TSynchroObject read ostiLock write ostiLock; property MonitorWindow: THandle read ostiMonitorWindow write ostiMonitorWindow; property TaskName: string read ostiTaskName write ostiTaskName; property TerminatedEvent: THandle read ostiTerminatedEvent write ostiTerminatedEvent; property TerminateEvent: THandle read ostiTerminateEvent write ostiTerminateEvent; property UniqueID: int64 read ostiUniqueID write ostiUniqueID; end; { TOmniSharedTaskInfo } TOmniTask = class(TInterfacedObject, IOmniTask, IOmniTaskExecutor) strict private otExecuting : boolean; otExecutor_ref : TOmniTaskExecutor; otParameters_ref: TOmniValueContainer; otSharedInfo : TOmniSharedTaskInfo; protected function GetComm: IOmniCommunicationEndpoint; inline; function GetCounter: IOmniCounter; function GetLock: TSynchroObject; function GetName: string; inline; function GetParam(idxParam: integer): TOmniValue; inline; function GetParamByName(const paramName: string): TOmniValue; inline; function GetTerminateEvent: THandle; inline; function GetUniqueID: int64; inline; procedure Terminate; public constructor Create(executor: TOmniTaskExecutor; parameters: TOmniValueContainer; sharedInfo: TOmniSharedTaskInfo); procedure Enforced(forceExecution: boolean = true); procedure Execute; procedure RegisterComm(const comm: IOmniCommunicationEndpoint); procedure SetExitStatus(exitCode: integer; const exitMessage: string); procedure SetTimer(interval_ms: cardinal; timerMessageID: integer = -1); overload; procedure SetTimer(interval_ms: cardinal; const timerMethod: pointer); overload; procedure SetTimer(interval_ms: cardinal; const timerMessageName: string); overload; function Stopped: boolean; procedure StopTimer; function Terminated: boolean; procedure UnregisterComm(const comm: IOmniCommunicationEndpoint); property Comm: IOmniCommunicationEndpoint read GetComm; property Counter: IOmniCounter read GetCounter; property Lock: TSynchroObject read GetLock; property Name: string read GetName; property Param[idxParam: integer]: TOmniValue read GetParam; property ParamByName[const paramName: string]: TOmniValue read GetParamByName; property TerminateEvent: THandle read GetTerminateEvent; property UniqueID: int64 read GetUniqueID; end; { TOmniTask } TOmniThread = class(TThread) // Factor this class into OtlThread unit? strict private otTask: IOmniTask; protected procedure Execute; override; public constructor Create(task: IOmniTask); property Task: IOmniTask read otTask; end; { TOmniThread } IOmniTaskControlInternals = interface ['{CE7B53E0-902E-413F-AB6E-B97E7F4B0AD5}'] function GetTerminatedEvent: THandle; function GetTerminateEvent: THandle; // property TerminatedEvent: THandle read GetTerminatedEvent; property TerminateEvent: THandle read GetTerminateEvent; end; { IOmniTaskControlInternals } TOmniTaskControl = class(TInterfacedObject, IOmniTaskControl, IOmniTaskControlInternals) strict private otcDestroyLock: boolean; otcExecutor : TOmniTaskExecutor; otcOwningPool : IOmniThreadPool; otcParameters : TOmniValueContainer; otcQueueLength: integer; otcSharedInfo : TOmniSharedTaskInfo; otcThread : TOmniThread; strict protected function CreateTask: IOmniTask; procedure EnsureCommChannel; inline; procedure Initialize(const taskName: string); protected function GetComm: IOmniCommunicationEndpoint; inline; function GetExitCode: integer; inline; function GetExitMessage: string; inline; function GetLock: TSynchroObject; function GetName: string; inline; function GetOptions: TOmniTaskControlOptions; function GetTerminatedEvent: THandle; function GetTerminateEvent: THandle; function GetUniqueID: int64; inline; procedure SetOptions(const value: TOmniTaskControlOptions); function SetPriority(threadPriority: TOTLThreadPriority): IOmniTaskControl; public constructor Create(const worker: IOmniWorker; const taskName: string); overload; constructor Create(worker: TOmniTaskMethod; const taskName: string); overload; constructor Create(worker: TOmniTaskProcedure; const taskName: string); overload; {$IFDEF OTL_Anonymous} constructor Create(worker: TOmniTaskFunction; const taskName: string); overload; {$ENDIF OTL_Anonymous} destructor Destroy; override; function Alertable: IOmniTaskControl; function ChainTo(const task: IOmniTaskControl; ignoreErrors: boolean = false): IOmniTaskControl; function Enforced(forceExecution: boolean = true): IOmniTaskControl; function Invoke(const msgMethod: pointer): IOmniTaskControl; overload; inline; function Invoke(const msgMethod: pointer; msgData: array of const): IOmniTaskControl; overload; function Invoke(const msgMethod: pointer; msgData: TOmniValue): IOmniTaskControl; overload; inline; function Invoke(const msgName: string): IOmniTaskControl; overload; inline; function Invoke(const msgName: string; msgData: array of const): IOmniTaskControl; overload; function Invoke(const msgName: string; msgData: TOmniValue): IOmniTaskControl; overload; inline; function Join(const group: IOmniTaskGroup): IOmniTaskControl; function Leave(const group: IOmniTaskGroup): IOmniTaskControl; function MonitorWith(const monitor: IOmniTaskControlMonitor): IOmniTaskControl; function MsgWait(wakeMask: DWORD = QS_ALLEVENTS): IOmniTaskControl; function RemoveMonitor: IOmniTaskControl; function Run: IOmniTaskControl; function Schedule(const threadPool: IOmniThreadPool = nil {default pool}): IOmniTaskControl; function SetMonitor(hWindow: THandle): IOmniTaskControl; function SetParameter(const paramName: string; const paramValue: TOmniValue): IOmniTaskControl; overload; function SetParameter(const paramValue: TOmniValue): IOmniTaskControl; overload; function SetParameters(const parameters: array of TOmniValue): IOmniTaskControl; function SetQueueSize(numMessages: integer): IOmniTaskControl; function SetTimer(interval_ms: cardinal; timerMessageID: integer = -1): IOmniTaskControl; overload; function SetTimer(interval_ms: cardinal; const timerMethod: pointer): IOmniTaskControl; overload; function SetTimer(interval_ms: cardinal; const timerMessageName: string): IOmniTaskControl; overload; function Terminate(maxWait_ms: cardinal = INFINITE): boolean; //will kill thread after timeout function TerminateWhen(event: THandle): IOmniTaskControl; function WaitFor(maxWait_ms: cardinal): boolean; function WaitForInit: boolean; function WithCounter(const counter: IOmniCounter): IOmniTaskControl; function WithLock(const lock: TSynchroObject; autoDestroyLock: boolean = true): IOmniTaskControl; property Comm: IOmniCommunicationEndpoint read GetComm; property ExitCode: integer read GetExitCode; property ExitMessage: string read GetExitMessage; property Lock: TSynchroObject read GetLock; property Name: string read GetName; property Options: TOmniTaskControlOptions read GetOptions write SetOptions; property UniqueID: int64 read GetUniqueID; end; { TOmniTaskControl } TOmniTaskControlList = class; TOmniTaskControlListEnumerator = class(TInterfacedObject, IOmniTaskControlListEnumerator) strict private otcleTaskEnum: TInterfaceListEnumerator; protected function GetCurrent: IOmniTaskControl; function MoveNext: boolean; public constructor Create(taskList: TInterfaceList); end; { TOmniTaskControlListEnumerator } TOmniTaskControlList = class(TInterfacedObject, IOmniTaskControlList) strict private otclList: TInterfaceList; protected function Get(idxItem: integer): IOmniTaskControl; function GetCapacity: integer; function GetCount: integer; procedure Put(idxItem: integer; const value: IOmniTaskControl); procedure SetCapacity(const value: integer); procedure SetCount(const value: integer); public constructor Create; destructor Destroy; override; function Add(const item: IOmniTaskControl): integer; procedure Clear; procedure Delete(idxItem: integer); procedure Exchange(idxItem1, idxItem2: integer); function First: IOmniTaskControl; function GetEnumerator: IOmniTaskControlListEnumerator; function IndexOf(const item: IOmniTaskControl): integer; procedure Insert(idxItem: integer; const item: IOmniTaskControl); function Last: IOmniTaskControl; function Remove(const item: IOmniTaskControl): integer; end; { TOmniTaskControlList } TOmniTaskGroup = class(TInterfacedObject, IOmniTaskGroup) strict private otgRegisteredWith: IOmniTask; otgTaskList : IOmniTaskControlList; strict protected procedure AutoUnregisterComms; procedure InternalUnregisterAllCommFrom(const task: IOmniTask); public constructor Create; destructor Destroy; override; function Add(const taskControl: IOmniTaskControl): IOmniTaskGroup; function GetEnumerator: IOmniTaskControlListEnumerator; function RegisterAllCommWith(const task: IOmniTask): IOmniTaskGroup; function Remove(const taskControl: IOmniTaskControl): IOmniTaskGroup; function RunAll: IOmniTaskGroup; procedure SendToAll(const msg: TOmniMessage); function TerminateAll(maxWait_ms: cardinal = INFINITE): boolean; function UnregisterAllCommFrom(const task: IOmniTask): IOmniTaskGroup; function WaitForAll(maxWait_ms: cardinal = INFINITE): boolean; end; { TOmniTaskGroup } { exports } {$IFDEF OTL_Anonymous} function CreateTask(worker: TOmniTaskFunction; const taskName: string = ''): IOmniTaskControl; begin Result := TOmniTaskControl.Create(worker, taskName); end; { CreateTask } {$ENDIF OTL_Anonymous} function CreateTask(worker: TOmniTaskProcedure; const taskName: string): IOmniTaskControl; begin Result := TOmniTaskControl.Create(worker, taskName); end; { CreateTask } function CreateTask(worker: TOmniTaskMethod; const taskName: string): IOmniTaskControl; begin Result := TOmniTaskControl.Create(worker, taskName); end; { CreateTask } function CreateTask(const worker: IOmniWorker; const taskName: string): IOmniTaskControl; begin Result := TOmniTaskControl.Create(worker, taskName); end; { CreateTask } function CreateTaskGroup: IOmniTaskGroup; begin Result := TOmniTaskGroup.Create; end; { CreateTaskGroup } function CreateTaskControlList: IOmniTaskControlList; begin Result := TOmniTaskControlList.Create; end; { CreateTaskControlList } { TOmniInternalMessage } constructor TOmniInternalMessage.Create(internalMessageType: TOmniInternalMessageType); begin imInternalMessageType := internalMessageType; end; { TOmniInternalMessage.Create } class function TOmniInternalMessage.InternalType( const msg: TOmniMessage): TOmniInternalMessageType; begin Assert(msg.MsgID = COtlReservedMsgID); Result := TOmniInternalMessage(msg.MsgData.AsObject).InternalMessageType; end; { TOmniInternalMessage.InternalType } { TOmniInternalStringMsg } constructor TOmniInternalStringMsg.Create(const msgName: string; const msgData: TOmniValue); begin inherited Create(imtStringMsg); ismMsgName := msgName; ismMsgData := msgData; end; { TOmniInternalStringMsg.Create } class function TOmniInternalStringMsg.CreateMessage(const msgName: string; msgData: TOmniValue): TOmniMessage; begin Result := TOmniMessage.Create(COtlReservedMsgID, TOmniInternalStringMsg.Create(msgName, msgData)); end; { TOmniInternalStringMsg.CreateMessage } class procedure TOmniInternalStringMsg.UnpackMessage(const msg: TOmniMessage; var msgName: string; var msgData: TOmniValue); var stringMsg: TOmniInternalStringMsg; begin stringMsg := TOmniInternalStringMsg(msg.MsgData.AsObject); msgName := stringMsg.MsgName; msgData := stringMsg.MsgData; FreeAndNil(stringMsg) end; { TOmniInternalStringMsg.UnpackMessage } { TOmniInternalAddressMsg } constructor TOmniInternalAddressMsg.Create(const msgMethod: pointer; const msgData: TOmniValue); begin inherited Create(imtAddressMsg); ismMsgMethod := msgMethod; ismMsgData := msgData; end; { TOmniInternalAddressMsg.Create } class function TOmniInternalAddressMsg.CreateMessage(const msgMethod: pointer; msgData: TOmniValue): TOmniMessage; begin Result := TOmniMessage.Create(COtlReservedMsgID, TOmniInternalAddressMsg.Create(msgMethod, msgData)); end; { TOmniInternalAddressMsg.CreateMessage } class procedure TOmniInternalAddressMsg.UnpackMessage(const msg: TOmniMessage; var msgMethod: pointer; var msgData: TOmniValue); var addr: TOmniInternalAddressMsg; begin addr := TOmniInternalAddressMsg(msg.MsgData.AsObject); msgMethod := addr.MsgMethod; msgData := addr.MsgData; FreeAndNil(addr) end; { TOmniInternalAddressMsg.UnpackMessage } { TOmniTask } constructor TOmniTask.Create(executor: TOmniTaskExecutor; parameters: TOmniValueContainer; sharedInfo: TOmniSharedTaskInfo); begin inherited Create; otExecutor_ref := executor; otParameters_ref := parameters; otSharedInfo := sharedInfo; end; { TOmniTask.Create } procedure TOmniTask.Enforced(forceExecution: boolean); begin if forceExecution then otExecutor_ref.Options := otExecutor_ref.Options + [tcoForceExecution] else otExecutor_ref.Options := otExecutor_ref.Options - [tcoForceExecution]; end; { TOmniTask.Enforced } procedure TOmniTask.Execute; begin otExecuting := true; try try {$IFNDEF OTL_DontSetThreadName} SetThreadName(otSharedInfo.TaskName); {$ENDIF OTL_DontSetThreadName} if (tcoForceExecution in otExecutor_ref.Options) or (not Terminated) then try otExecutor_ref.Asy_Execute(Self); except on E: Exception do SetExitStatus(EXIT_EXCEPTION, E.ClassName + ': ' + E.Message); end; finally if otSharedInfo.MonitorWindow <> 0 then Win32Check(PostMessage(otSharedInfo.MonitorWindow, COmniTaskMsg_Terminated, integer(Int64Rec(UniqueID).Lo), integer(Int64Rec(UniqueID).Hi))); end; finally SetEvent(otSharedInfo.TerminatedEvent); end; if assigned(otSharedInfo.ChainTo) and (otSharedInfo.ChainIgnoreErrors or (otExecutor_ref.ExitCode = EXIT_OK)) then otSharedInfo.ChainTo.Run; // TODO 1 -oPrimoz Gabrijelcic : Should execute the chained task in the same thread (should work when run in a pool) otSharedInfo.ChainTo := nil; end; { TOmniTask.Execute } function TOmniTask.GetComm: IOmniCommunicationEndpoint; begin Result := otSharedInfo.CommChannel.Endpoint2; end; { TOmniTask.GetComm } function TOmniTask.GetCounter: IOmniCounter; begin Result := otSharedInfo.Counter; end; { TOmniTask.GetCounter } function TOmniTask.GetLock: TSynchroObject; begin Result := otSharedInfo.Lock; end; { TOmniTask.GetLock } function TOmniTask.GetName: string; begin Result := otSharedInfo.TaskName; end; { TOmniTask.GetName } function TOmniTask.GetParam(idxParam: integer): TOmniValue; begin Result := otParameters_ref.ParamByIdx(idxParam); end; { TOmniTask.GetParam } function TOmniTask.GetParamByName(const paramName: string): TOmniValue; begin Result := otParameters_ref.ParamByName(paramName); end; { TOmniTask.GetParamByName } function TOmniTask.GetTerminateEvent: THandle; begin Result := otSharedInfo.TerminateEvent; end; { TOmniTask.GetTerminateEvent } function TOmniTask.GetUniqueID: int64; begin Result := otSharedInfo.UniqueID; end; { TOmniTask.GetUniqueID } procedure TOmniTask.RegisterComm(const comm: IOmniCommunicationEndpoint); begin otExecutor_ref.Asy_RegisterComm(comm); end; { TOmniTask.RegisterComm } procedure TOmniTask.SetExitStatus(exitCode: integer; const exitMessage: string); begin otExecutor_ref.Asy_SetExitStatus(exitCode, exitMessage); end; { TOmniTask.SetExitStatus } procedure TOmniTask.SetTimer(interval_ms: cardinal; timerMessageID: integer); begin otExecutor_ref.Asy_SetTimer(interval_ms, timerMessageID); end; { TOmniTask.SetTimer } procedure TOmniTask.SetTimer(interval_ms: cardinal; const timerMethod: pointer); begin otExecutor_ref.Asy_SetTimer(interval_ms, timerMethod); end; { TOmniTask.SetTimer } procedure TOmniTask.SetTimer(interval_ms: cardinal; const timerMessageName: string); begin otExecutor_ref.Asy_SetTimer(interval_ms, timerMessageName); end; { TOmniTask.SetTimer } function TOmniTask.Stopped: boolean; begin Result := WaitForSingleObject(otSharedInfo.TerminatedEvent, 0) <> WAIT_TIMEOUT; end; { TOmniTask.Stopped } procedure TOmniTask.StopTimer; begin SetTimer(0); end; { TOmniTask.StopTimer } procedure TOmniTask.Terminate; begin SetEvent(otSharedInfo.TerminateEvent); if not otExecuting then //call Execute to run at least cleanup code Execute; end; { TOmniTask.Terminate } function TOmniTask.Terminated: boolean; begin Result := WaitForSingleObject(otSharedInfo.TerminateEvent, 0) <> WAIT_TIMEOUT; end; { TOmniTask.Terminated } procedure TOmniTask.UnregisterComm(const comm: IOmniCommunicationEndpoint); begin otExecutor_ref.Asy_UnregisterComm(comm); end; { TOmniTask.UnregisterComm } { TOmniWorker } procedure TOmniWorker.Cleanup; begin //do-nothing end; { TOmniWorker.Cleanup } procedure TOmniWorker.DispatchMessage(var msg: TOmniMessage); begin Dispatch(msg); end; { TOmniWorker.DispatchMessage } function TOmniWorker.GetImplementor: TObject; begin Result := Self; end; { TOmniWorker.GetImplementor } function TOmniWorker.GetTask: IOmniTask; begin Result := owTask; end; { TOmniWorker.GetTask } function TOmniWorker.Initialize: boolean; begin //do-nothing Result := true; end; { TOmniWorker.Initialize } procedure TOmniWorker.ProcessMessages; begin (owExecutor as TOmniTaskExecutor).ProcessMessages(Task); end; { TOmniWorker.ProcessMessages } procedure TOmniWorker.SetExecutor(executor: TObject); begin owExecutor := executor; end; { TOmniWorker.SetExecutor } procedure TOmniWorker.SetTask(const value: IOmniTask); begin owTask := value; end; { TOmniWorker.SetTask } procedure TOmniWorker.Timer; begin //do-nothing end; { TOmniWorker.Timer } { TOmniInvokeInfo } constructor TOmniInvokeInfo.Create(methodAddr: pointer; methodSignature: TOmniInvokeType); begin inherited Create; oiiAddress := methodAddr; oiiSignature := methodSignature; end; { TOmniInvokeInfo.Create } { TOmniTaskExecutor } constructor TOmniTaskExecutor.Create(const workerIntf: IOmniWorker); begin oteExecutorType := etWorker; oteWorkerIntf := workerIntf; workerIntf.SetExecutor(Self); Initialize; end; { TOmniTaskExecutor.Create } constructor TOmniTaskExecutor.Create(method: TOmniTaskMethod); begin oteExecutorType := etMethod; oteMethod := method; Initialize; end; { TOmniTaskExecutor.Create } constructor TOmniTaskExecutor.Create(proc: TOmniTaskProcedure); begin oteExecutorType := etProcedure; oteProc := proc; Initialize; end; { TOmniTaskExecutor.Create } {$IFDEF OTL_Anonymous} constructor TOmniTaskExecutor.Create(func: TOmniTaskFunction); begin oteExecutorType := etFunction; oteFunc := func; Initialize; end; { TOmniTaskExecutor.Create } {$ENDIF OTL_Anonymous} destructor TOmniTaskExecutor.Destroy; begin oteInternalLock.Acquire; try FreeAndNil(oteCommList); finally oteInternalLock.Release; end; FreeAndNil(oteTerminateHandles); FreeAndNil(oteMethodHash); DSiCloseHandleAndNull(oteCommRebuildHandles); DSiCloseHandleAndNull(oteWorkerInitialized); inherited; end; { TOmniTaskExecutor.Destroy } procedure TOmniTaskExecutor.Asy_Execute(const task: IOmniTask); const CThreadPriorityNum: array [TOTLThreadPriority] of integer = ( THREAD_PRIORITY_IDLE, THREAD_PRIORITY_LOWEST, THREAD_PRIORITY_BELOW_NORMAL, THREAD_PRIORITY_NORMAL, THREAD_PRIORITY_ABOVE_NORMAL, THREAD_PRIORITY_HIGHEST); begin SetThreadPriority(GetCurrentThread, CThreadPriorityNum[Priority]); try case oteExecutorType of etMethod: oteMethod(task); etProcedure: oteProc(task); etFunction: {$IFDEF OTL_Anonymous} oteFunc(task); {$ELSE} raise Exception.Create('TOmniTaskExecutor.Asy_Execute: ' + 'Anonymous function execution is not supported on Delphi 2007'); {$ENDIF OTL_Anonymous} etWorker: DispatchMessages(task); else raise Exception.Create('TOmniTaskExecutor.Asy_Execute: Executor is not set'); end; finally Cleanup; end; end; { TOmniTaskExecutor.Asy_Execute } procedure TOmniTaskExecutor.Asy_RegisterComm(const comm: IOmniCommunicationEndpoint); begin if oteExecutorType <> etWorker then raise Exception.Create('TOmniTaskExecutor.Asy_RegisterComm: ' + 'Additional communication support is only available when working with an IOmniWorker'); oteInternalLock.Acquire; try if not assigned(oteCommList) then oteCommList := TInterfaceList.Create; oteCommList.Add(comm); SetEvent(oteCommRebuildHandles); finally oteInternalLock.Release; end; end; { TOmniTaskExecutor.Asy_RegisterComm } procedure TOmniTaskExecutor.Asy_SetExitStatus(exitCode: integer; const exitMessage: string); begin oteExitCode.Value := cardinal(exitCode); oteInternalLock.Acquire; try oteExitMessage := exitMessage; UniqueString(oteExitMessage); finally oteInternalLock.Release; end; end; { TOmniTaskExecutor.Asy_SetExitStatus } procedure TOmniTaskExecutor.Asy_SetTimer(interval_ms: cardinal; timerMsgID: integer); begin SetTimerInt(interval_ms, timerMsgID, '', nil); end; { TOmniTaskExecutor.Asy_SetTimer } procedure TOmniTaskExecutor.Asy_SetTimer(interval_ms: cardinal; const timerMethod: pointer); begin SetTimerInt(interval_ms, -1, '', timerMethod); end; { TOmniTaskExecutor.Asy_SetTimer } procedure TOmniTaskExecutor.Asy_SetTimer(interval_ms: cardinal; const timerMsgName: string); begin SetTimerInt(interval_ms, -1, timerMsgName, nil); end; { TOmniTaskExecutor.Asy_SetTimer } procedure TOmniTaskExecutor.Asy_UnregisterComm(const comm: IOmniCommunicationEndpoint); begin if oteExecutorType <> etWorker then raise Exception.Create('TOmniTaskExecutor.Asy_UnregisterComm: ' + 'Additional communication support is only available when working with an IOmniWorker'); oteInternalLock.Acquire; try oteCommList.Remove(comm); if oteCommList.Count = 0 then FreeAndNil(oteCommList); SetEvent(oteCommRebuildHandles); finally oteInternalLock.Release; end; end; { TOmniTaskExecutor.Asy_UnregisterComm } procedure TOmniTaskExecutor.CallOmniTimer; var msg : TOmniMessage; timerMsgID : integer; timerMsgMethod: pointer; timerMsgName : string; begin oteInternalLock.Acquire; try timerMsgName := oteTimerMessageName; UniqueString(timerMsgName); timerMsgID := integer(oteTimerMessageID.Value); timerMsgMethod := pointer(oteTimerMessageMethod.Value); finally oteInternalLock.Release; end; if (timerMsgID >= 0) or (timerMsgName <> '') or (timerMsgMethod <> nil) then begin if timerMsgID >= 0 then begin msg.MsgID := timerMsgID; msg.MsgData := TOmniValue.Null; end else if timerMsgName <> '' then msg := TOmniInternalStringMsg.CreateMessage(timerMsgName, TOmniValue.Null) else msg := TOmniInternalAddressMsg.CreateMessage(timerMsgMethod, TOmniValue.Null); DispatchOmniMessage(msg); end else if assigned(WorkerIntf) then WorkerIntf.Timer; end; { TOmniTaskExecutor.CallOmniTimer } procedure TOmniTaskExecutor.Cleanup; begin oteWorkerIntf := nil; end; { TOmniTaskExecutor.Cleanup } function TOmniTaskExecutor.DispatchEvent(awaited: cardinal; const task: IOmniTask; var msgInfo: TOmniMessageInfo): boolean; var gotMsg: boolean; msg : TOmniMessage; begin Result := false; if ((msgInfo.IdxFirstTerminate <> cardinal(-1)) and ((awaited >= msgInfo.IdxFirstTerminate) and (awaited <= msgInfo.IdxLastTerminate))) or (awaited = WAIT_ABANDONED) then Exit else if (awaited >= msgInfo.IdxFirstMessage) and (awaited <= msgInfo.IdxLastMessage) then begin if awaited = msgInfo.IdxFirstMessage then gotMsg := task.Comm.Receive(msg) else begin oteInternalLock.Acquire; try gotMsg := (oteCommList[awaited - msgInfo.IdxFirstMessage - 1] as IOmniCommunicationEndpoint).Receive(msg); finally oteInternalLock.Release; end; end; if gotMsg and assigned(WorkerIntf) then DispatchOmniMessage(msg); end // comm handles else if awaited = msgInfo.IdxRebuildHandles then RebuildWaitHandles(task, msgInfo) else if awaited = (WAIT_OBJECT_0 + msgInfo.NumWaitHandles) then //message ProcessThreadMessages else if awaited = WAIT_IO_COMPLETION then // do-nothing else if awaited = WAIT_TIMEOUT then begin if (TimerInterval_ms > 0) and ((DSiTimeGetTime64 - oteLastTimer_ms) >= TimerInterval_ms) then begin CallOmniTimer; oteLastTimer_ms := DSiTimeGetTime64; end; end //WAIT_TIMEOUT else //errors RaiseLastOSError; if WaitForSingleObject(oteCommRebuildHandles, 0) = WAIT_OBJECT_0 then //could get set inside timer or message handler RebuildWaitHandles(task, msgInfo); Result := true; end; { TOmniTaskExecutor.DispatchEvent } procedure TOmniTaskExecutor.DispatchMessages(const task: IOmniTask); begin try oteWorkerInitOK := false; try if assigned(WorkerIntf) then begin WorkerIntf.SetExecutor(Self); WorkerIntf.Task := task; if not WorkerIntf.Initialize then Exit; end; oteWorkerInitOK := true; finally SetEvent(WorkerInitialized); end; if tcoMessageWait in Options then oteMsgInfo.WaitWakeMask := WakeMask else oteMsgInfo.WaitWakeMask := 0; if tcoAlertableWait in Options then oteMsgInfo.WaitFlags := MWMO_ALERTABLE else oteMsgInfo.WaitFlags := 0; RebuildWaitHandles(task, oteMsgInfo); oteLastTimer_ms := DSiTimeGetTime64; MainMessageLoop(task, oteMsgInfo); finally if assigned(WorkerIntf) then begin WorkerIntf.Cleanup; WorkerIntf.Task := nil; end; end; end; { TOmniTaskExecutor.DispatchMessages } procedure TOmniTaskExecutor.DispatchOmniMessage(msg: TOmniMessage); var methodAddr : pointer; methodInfoObj : TObject; methodInfo : TOmniInvokeInfo absolute methodInfoObj; methodName : string; methodSignature: TOmniInvokeType; msgData : TOmniValue; obj : TObject; begin if msg.MsgID = COtlReservedMsgID then begin Assert(assigned(WorkerIntf)); GetMethodNameFromInternalMessage(msg, methodName, msgData); if methodName = '' then raise Exception.Create('TOmniTaskExecutor.DispatchOmniMessage: Method name not set'); if not assigned(oteMethodHash) then oteMethodHash := TGpStringObjectHash.Create(17, true); //usually there won't be many methods if not oteMethodHash.Find(methodName, methodInfoObj) then begin GetMethodAddrAndSignature(methodName, methodAddr, methodSignature); methodInfo := TOmniInvokeInfo.Create(methodAddr, methodSignature); oteMethodHash.Add(methodName, methodInfo); end; case methodInfo.Signature of itSelf: TOmniInvokeSignature_Self(methodInfo.Address)(WorkerIntf.Implementor); itSelfAndOmniValue: TOmniInvokeSignature_Self_OmniValue(methodInfo.Address)(WorkerIntf.Implementor, msgData); itSelfAndObject: begin obj := msgData.AsObject; TOmniInvokeSignature_Self_Object(methodInfo.Address)(WorkerIntf.Implementor, obj); end else RaiseInvalidSignature(methodName); end; //case methodSignature end else WorkerIntf.DispatchMessage(msg); end; { TOmniTaskExecutor.DispatchMessage } function TOmniTaskExecutor.GetExitCode: integer; begin Result := oteExitCode; end; { TOmniTaskExecutor.GetExitCode } function TOmniTaskExecutor.GetExitMessage: string; begin oteInternalLock.Acquire; try Result := oteExitMessage; UniqueString(Result); finally oteInternalLock.Release; end; end; { TOmniTaskExecutor.GetExitMessage } procedure TOmniTaskExecutor.GetMethodAddrAndSignature(const methodName: string; var methodAddress: pointer; var methodSignature: TOmniInvokeType); const CShortLen = SizeOf(ShortString) - 1; var headerEnd : cardinal; methodInfoHeader: PMethodInfoHeader; paramNum : integer; params : PParamInfo; paramType : PTypeInfo; begin // with great thanks to Hallvar Vassbotn [http://hallvards.blogspot.com/2006/04/published-methods_27.html] // and David Glassborow [http://davidglassborow.blogspot.com/2006/05/class-rtti.html] methodInfoHeader := ObjAuto.GetMethodInfo(WorkerIntf.Implementor, ShortString(methodName)); methodAddress := WorkerIntf.Implementor.MethodAddress(methodName); // find the method info if not (assigned(methodInfoHeader) and assigned(methodAddress)) then raise Exception.CreateFmt('TOmniTaskExecutor.DispatchMessage: ' + 'Cannot find message method %s.%s', [WorkerIntf.Implementor.ClassName, methodName]); // check the RTTI sanity if methodInfoHeader.Len <= (SizeOf(TMethodInfoHeader) - CShortLen + Length(methodInfoHeader.Name)) then raise Exception.CreateFmt('TOmniTaskExecutor.DispatchMessage: ' + 'Class %d was compiled without RTTI', [WorkerIntf.Implementor.ClassName]); // we can only process procedures if assigned(methodInfoHeader.ReturnInfo.ReturnType) then raise Exception.CreateFmt('TOmniTaskExecutor.DispatchMessage: ' + 'Method %s.%s must not return result', [WorkerIntf.Implementor.ClassName, methodName]); // only limited subset of method signatures is allowed: // (Self), (Self, const TOmniValue), (Self, var TObject) headerEnd := cardinal(methodInfoHeader) + methodInfoHeader^.Len; params := PParamInfo(cardinal(methodInfoHeader) + SizeOf(methodInfoHeader^) - CShortLen + SizeOf(TReturnInfo) + Length(methodInfoHeader^.Name)); paramNum := 0; methodSignature := itUnknown; // Loop over the parameters while cardinal(params) < headerEnd do begin Inc(paramNum); paramType := params.ParamType^; if paramNum = 1 then if (params^.Flags <> []) or (paramType^.Kind <> tkClass) then RaiseInvalidSignature(methodName) else methodSignature := itSelf else if paramNum = 2 then //code says 'const' but GetMethodInfo says 'pfVar' :( if (params^.Flags * [pfConst, pfVar] <> []) and (paramType^.Kind = tkRecord) and (SameText(string(paramType^.Name), 'TOmniValue')) then methodSignature := itSelfAndOmniValue else if (params^.Flags = [pfVar]) and (paramType^.Kind = tkClass) then methodSignature := itSelfAndObject else RaiseInvalidSignature(methodName) else RaiseInvalidSignature(methodName); params := params.NextParam; end; end; { TOmniTaskExecutor.GetMethodAddrAndSignature } procedure TOmniTaskExecutor.GetMethodNameFromInternalMessage(const msg: TOmniMessage; var msgName: string; var msgData: TOmniValue); var internalType: TOmniInternalMessageType; method : pointer; begin internalType := TOmniInternalMessage.InternalType(msg); case internalType of imtStringMsg: TOmniInternalStringMsg.UnpackMessage(msg, msgName, msgData); imtAddressMsg: begin TOmniInternalAddressMsg.UnpackMessage(msg, method, msgData); msgName := WorkerIntf.Implementor.MethodName(method); if msgName = '' then raise Exception.CreateFmt('TOmniTaskExecutor.GetMethodNameFromInternalMessage: ' + 'Cannot find method name for method %p', [method]); end else raise Exception.CreateFmt('TOmniTaskExecutor.GetMethodNameFromInternalMessage: ' + 'Internal message type %s is not supported', [GetEnumName(TypeInfo(TOmniInternalMessageType), Ord(internalType))]); end; //case internalType end; { TOmniTaskExecutor.GetMethodNameFromInternalMessage } function TOmniTaskExecutor.GetOptions: TOmniTaskControlOptions; begin oteOptionsLock.Acquire; try Result := oteOptions; finally oteOptionsLock.Release; end; end; { TOmniTaskExecutor.GetOptions } function TOmniTaskExecutor.GetTimerInterval_ms: cardinal; begin Result := oteTimerInterval_ms.Value; end; { TOmniTaskExecutor.GetTimerInterval_ms } function TOmniTaskExecutor.GetTimerMessageID: integer; begin Result := integer(oteTimerMessageID.Value); end; { TOmniTaskExecutor.GetTimerMessageID } function TOmniTaskExecutor.GetTimerMessageMethod: pointer; begin Result := pointer(oteTimerMessageMethod.Value); end; { TOmniTaskExecutor.GetTimerMessageMethod } function TOmniTaskExecutor.GetTimerMessageName: string; begin oteInternalLock.Acquire; try Result := oteTimerMessageName; UniqueString(Result); finally oteInternalLock.Release; end; end; { TOmniTaskExecutor.GetTimerMessageName } procedure TOmniTaskExecutor.Initialize; begin oteWorkerInitialized := CreateEvent(nil, true, false, nil); Win32Check(oteWorkerInitialized <> 0); oteCommRebuildHandles := CreateEvent(nil, false, false, nil); Win32Check(oteCommRebuildHandles <> 0); oteTimerMessageID.Value := cardinal(-1); end; { TOmniTaskExecutor.Initialize } procedure TOmniTaskExecutor.MainMessageLoop(const task: IOmniTask; var msgInfo: TOmniMessageInfo); begin while DispatchEvent(WaitForEvent(msgInfo, TimeUntilNextTimer_ms), task, msgInfo) do MessageLoopPayload; end; { TOmniTaskExecutor.MainMessageLoop } procedure TOmniTaskExecutor.MessageLoopPayload; begin //placeholder that can be overridden end; { TOmniTaskExecutor.MessageLoopPayload } procedure TOmniTaskExecutor.ProcessMessages(task: IOmniTask); var awaited : cardinal; msgInfo : TOmniMessageInfo; waitHandlesGen: int64; begin RemoveTerminationEvents(oteMsgInfo, msgInfo); waitHandlesGen := oteWaitHandlesGen; repeat awaited := WaitForEvent(msgInfo, 0); if awaited = WAIT_TIMEOUT then Exit; if not DispatchEvent(awaited, task, msgInfo) then Exit; MessageLoopPayload; if waitHandlesGen <> oteWaitHandlesGen then //DispatchEvent just rebuilt our internal copy RebuildWaitHandles(task, oteMsgInfo); until false; end; { TOmniTaskExecutor.ProcessMessages } procedure TOmniTaskExecutor.ProcessThreadMessages; var msg: TMsg; begin while PeekMessage(Msg, 0, 0, 0, PM_REMOVE) and (Msg.Message <> WM_QUIT) do begin TranslateMessage(Msg); DispatchMessage(Msg); end; end; { TOmniTaskControl.ProcessThreadMessages } procedure TOmniTaskExecutor.RaiseInvalidSignature(const methodName: string); begin raise Exception.CreateFmt('TOmniTaskExecutor: ' + 'Method %s.%s has invalid signature. Only following ' + 'signatures are supported: (Self), ' + '(Self, const TOmniValue), (Self, var TObject)', [WorkerIntf.Implementor.ClassName, methodName]); end; { TOmniTaskExecutor.RaiseInvalidSignature } procedure TOmniTaskExecutor.RebuildWaitHandles(const task: IOmniTask; var msgInfo: TOmniMessageInfo); var iHandle: integer; iIntf : integer; intf : IInterface; begin Inc(oteWaitHandlesGen); oteInternalLock.Acquire; try msgInfo.IdxFirstTerminate := 0; msgInfo.WaitHandles[0] := task.TerminateEvent; msgInfo.IdxLastTerminate := msgInfo.IdxFirstTerminate; if assigned(oteTerminateHandles) then for iHandle in oteTerminateHandles do begin Inc(msgInfo.IdxLastTerminate); if msgInfo.IdxLastTerminate > High(msgInfo.WaitHandles) then raise Exception.CreateFmt('TOmniTaskExecutor: ' + 'Cannot wait on more than %d handles', [High(msgInfo.WaitHandles)]); msgInfo.WaitHandles[msgInfo.IdxLastTerminate] := THandle(iHandle); end; msgInfo.IdxRebuildHandles := msgInfo.IdxLastTerminate + 1; msgInfo.WaitHandles[msgInfo.IdxRebuildHandles] := oteCommRebuildHandles; msgInfo.IdxFirstMessage := msgInfo.IdxRebuildHandles + 1; msgInfo.WaitHandles[msgInfo.IdxFirstMessage] := task.Comm.NewMessageEvent; msgInfo.IdxLastMessage := msgInfo.IdxFirstMessage; if assigned(oteCommList) then for iIntf := 0 to oteCommList.Count - 1 do begin intf := oteCommList[iIntf]; Inc(msgInfo.IdxLastMessage); if msgInfo.IdxLastMessage > High(msgInfo.WaitHandles) then raise Exception.CreateFmt('TOmniTaskExecutor: ' + 'Cannot wait on more than %d handles', [High(msgInfo.WaitHandles)]); msgInfo.WaitHandles[msgInfo.IdxLastMessage] := (intf as IOmniCommunicationEndpoint).NewMessageEvent; end; msgInfo.NumWaitHandles := msgInfo.IdxLastMessage + 1; finally oteInternalLock.Release; end; end; { RebuildWaitHandles } procedure TOmniTaskExecutor.RemoveTerminationEvents(const srcMsgInfo: TOmniMessageInfo; var dstMsgInfo: TOmniMessageInfo); var offset: cardinal; begin offset := srcMsgInfo.IdxLastTerminate + 1; dstMsgInfo.IdxFirstTerminate := cardinal(-1); dstMsgInfo.IdxLastTerminate := cardinal(-1); dstMsgInfo.IdxFirstMessage := srcMsgInfo.IdxFirstMessage - offset; dstMsgInfo.IdxLastMessage := srcMsgInfo.IdxLastMessage - offset; dstMsgInfo.IdxRebuildHandles := srcMsgInfo.IdxRebuildHandles - offset; dstMsgInfo.NumWaitHandles := srcMsgInfo.NumWaitHandles - offset; dstMsgInfo.WaitFlags := srcMsgInfo.WaitFlags; dstMsgInfo.WaitWakeMask := srcMsgInfo.WaitWakeMask; Move(srcMsgInfo.WaitHandles[offset], dstMsgInfo.WaitHandles[0], (Length(dstMsgInfo.WaitHandles) - integer(offset)) * SizeOf(THandle)); end; { TOmniTaskExecutor.RemoveTerminationEvents } procedure TOmniTaskExecutor.SetOptions(const value: TOmniTaskControlOptions); begin if (([tcoAlertableWait, tcoMessageWait] * Options) <> []) and (oteExecutorType <> etWorker) then raise Exception.Create('TOmniTaskExecutor.SetOptions: ' + 'Trying to set IOmniWorker specific option(s)'); oteOptionsLock.Acquire; try oteOptions := value; finally oteOptionsLock.Release; end; end; { TOmniTaskExecutor.SetOptions } procedure TOmniTaskExecutor.SetTimerInterval_ms(const value: cardinal); begin if oteExecutorType <> etWorker then raise Exception.Create('TOmniTaskExecutor.SetTimerInterval_ms: ' + 'Timer support is only available when working with an IOmniWorker'); oteTimerInterval_ms.Value := value; end; { TOmniTaskExecutor.SetTimerInterval_ms } procedure TOmniTaskExecutor.SetTimerInt(interval_ms: cardinal; timerMsgID: integer; const timerMsgName: string; const timerMsgMethod: pointer); begin oteInternalLock.Acquire; try oteTimerMessageID.Value := cardinal(timerMsgID); oteTimerInterval_ms.Value := cardinal(interval_ms); oteTimerMessageMethod.Value := cardinal(timerMsgMethod); oteTimerMessageName := timerMsgName; UniqueString(oteTimerMessageName); oteLastTimer_ms := DSiTimeGetTime64; finally oteInternalLock.Release; end; SetEvent(oteCommRebuildHandles); end; { TOmniTaskExecutor.SetTimerInt } procedure TOmniTaskExecutor.SetTimerMessageID(const value: integer); begin if oteExecutorType <> etWorker then raise Exception.Create('TOmniTaskExecutor.SetTimerMessageID: ' + 'Timer support is only available when working with an IOmniWorker'); oteTimerMessageID.Value := cardinal(value); end; { TOmniTaskExecutor.SetTimerMessageID } procedure TOmniTaskExecutor.SetTimerMessageMethod(const value: pointer); begin if oteExecutorType <> etWorker then raise Exception.Create('TOmniTaskExecutor.SetTimerMessageID: ' + 'Timer support is only available when working with an IOmniWorker'); oteTimerMessageMethod.Value := cardinal(value); end; { TOmniTaskExecutor.SetTimerMessageMethod } procedure TOmniTaskExecutor.SetTimerMessageName(const value: string); begin if oteExecutorType <> etWorker then raise Exception.Create('TOmniTaskExecutor.SetTimerMessageID: ' + 'Timer support is only available when working with an IOmniWorker'); oteTimerMessageName := value; UniqueString(oteTimerMessageName); end; { TOmniTaskExecutor.SetTimerMessageName } procedure TOmniTaskExecutor.TerminateWhen(handle: THandle); begin Assert(SizeOf(THandle) = SizeOf(integer)); if not assigned(oteTerminateHandles) then oteTerminateHandles := TGpIntegerList.Create; oteTerminateHandles.Add(handle); end; { TOmniTaskExecutor.TerminateWhen } function TOmniTaskExecutor.TimeUntilNextTimer_ms: cardinal; var timeout_ms: int64; begin if TimerInterval_ms <= 0 then Result := INFINITE else begin timeout_ms := TimerInterval_ms - (DSiTimeGetTime64 - oteLastTimer_ms); if timeout_ms < 0 then timeout_ms := 0; Result := timeout_ms; end; end; { TOmniTaskExecutor.TimeUntilNextTimer_ms } function TOmniTaskExecutor.WaitForInit: boolean; begin if oteExecutorType <> etWorker then raise Exception.Create('TOmniTaskExecutor.WaitForInit: ' + 'Wait for init is only available when working with an IOmniWorker'); WaitForSingleObject(WorkerInitialized, INFINITE); Result := WorkerInitOK; end; { TOmniTaskExecutor.WaitForInit } function TOmniTaskExecutor.WaitForEvent(msgInfo: TOmniMessageInfo; timeout_ms: cardinal): cardinal; begin Result := MsgWaitForMultipleObjectsEx(msgInfo.NumWaitHandles, msgInfo.WaitHandles, timeout_ms, msgInfo.WaitWakeMask, msgInfo.WaitFlags); end; { TOmniTaskExecutor.WaitForEvent } { TOmniTaskControl } constructor TOmniTaskControl.Create(const worker: IOmniWorker; const taskName: string); begin otcExecutor := TOmniTaskExecutor.Create(worker); Initialize(taskName); end; { TOmniTaskControl.Create } constructor TOmniTaskControl.Create(worker: TOmniTaskMethod; const taskName: string); begin otcExecutor := TOmniTaskExecutor.Create(worker); Initialize(taskName); end; { TOmniTaskControl.Create } constructor TOmniTaskControl.Create(worker: TOmniTaskProcedure; const taskName: string); begin otcExecutor := TOmniTaskExecutor.Create(worker); Initialize(taskName); end; { TOmniTaskControl.Create } {$IFDEF OTL_Anonymous} constructor TOmniTaskControl.Create(worker: TOmniTaskFunction; const taskName: string); begin otcExecutor := TOmniTaskExecutor.Create(worker); Initialize(taskName); end; { TOmniTaskControl.Create } {$ENDIF OTL_Anonymous} destructor TOmniTaskControl.Destroy; begin { TODO : Do we need wait-and-kill mechanism here to prevent shutdown locks? } if assigned(otcThread) then begin Terminate; FreeAndNil(otcThread); end; if otcDestroyLock then begin otcSharedInfo.Lock.Free; otcSharedInfo.Lock := nil; end; FreeAndNil(otcExecutor); otcSharedInfo.CommChannel := nil; if otcSharedInfo.TerminateEvent <> 0 then begin CloseHandle(otcSharedInfo.TerminateEvent); otcSharedInfo.TerminateEvent := 0; end; if otcSharedInfo.TerminatedEvent <> 0 then begin CloseHandle(otcSharedInfo.TerminatedEvent); otcSharedInfo.TerminatedEvent := 0; end; FreeAndNil(otcParameters); FreeAndNil(otcSharedInfo); inherited Destroy; end; { TOmniTaskControl.Destroy } function TOmniTaskControl.Alertable: IOmniTaskControl; begin Options := Options + [tcoAlertableWait]; Result := Self; end; { TOmniTaskControl.Alertable } function TOmniTaskControl.ChainTo(const task: IOmniTaskControl; ignoreErrors: boolean): IOmniTaskControl; begin otcSharedInfo.ChainTo := task; otcSharedInfo.ChainIgnoreErrors := ignoreErrors; Result := Self; end; { TOmniTaskControl.ChainTo } function TOmniTaskControl.CreateTask: IOmniTask; begin EnsureCommChannel; Result := TOmniTask.Create(otcExecutor, otcParameters, otcSharedInfo); end; { TOmniTaskControl.CreateTask } function TOmniTaskControl.Enforced(forceExecution: boolean = true): IOmniTaskControl; begin if forceExecution then Options := Options + [tcoForceExecution] else Options := Options - [tcoForceExecution]; Result := Self; end; { TOmniTaskControl.Enforced } procedure TOmniTaskControl.EnsureCommChannel; begin if not assigned(otcSharedInfo.CommChannel) then otcSharedInfo.CommChannel := CreateTwoWayChannel(otcQueueLength); end; { TOmniTaskControl.EnsureCommChannel } function TOmniTaskControl.GetComm: IOmniCommunicationEndpoint; begin EnsureCommChannel; Result := otcSharedInfo.CommChannel.Endpoint1; end; { TOmniTaskControl.GetComm } function TOmniTaskControl.GetExitCode: integer; begin Result := otcExecutor.ExitCode; end; { TOmniTaskControl.GetExitCode } function TOmniTaskControl.GetExitMessage: string; begin Result := otcExecutor.ExitMessage; end; { TOmniTaskControl.GetExitMessage } function TOmniTaskControl.GetLock: TSynchroObject; begin Result := otcSharedInfo.Lock; end; { TOmniTaskControl.GetLock } function TOmniTaskControl.GetName: string; begin Result := otcSharedInfo.TaskName; end; { TOmniTaskControl.GetName } function TOmniTaskControl.GetOptions: TOmniTaskControlOptions; begin Result := otcExecutor.Options; end; { TOmniTaskControl.GetOptions } function TOmniTaskControl.GetTerminatedEvent: THandle; begin Result := otcSharedInfo.TerminatedEvent; end; { TOmniTaskControl.GetTerminatedEvent } function TOmniTaskControl.GetTerminateEvent: THandle; begin Result := otcSharedInfo.TerminateEvent; end; { TOmniTaskControl.GetTerminateEvent } function TOmniTaskControl.GetUniqueID: int64; begin Result := otcSharedInfo.UniqueID; end; { TOmniTaskControl.GetUniqueID } procedure TOmniTaskControl.Initialize; begin otcExecutor.Options := [tcoForceExecution]; otcQueueLength := CDefaultQueueSize; otcSharedInfo := TOmniSharedTaskInfo.Create; otcSharedInfo.TaskName := taskName; otcSharedInfo.UniqueID := OtlUID.Increment; otcParameters := TOmniValueContainer.Create; otcSharedInfo.TerminateEvent := CreateEvent(nil, true, false, nil); Win32Check(otcSharedInfo.TerminateEvent <> 0); otcSharedInfo.TerminatedEvent := CreateEvent(nil, true, false, nil); Win32Check(otcSharedInfo.TerminatedEvent <> 0); end; { TOmniTaskControl.Initialize } function TOmniTaskControl.Invoke(const msgMethod: pointer): IOmniTaskControl; begin Invoke(msgMethod, TOmniValue.Null); Result := Self; end; { TOmniTaskControl.Invoke } function TOmniTaskControl.Invoke(const msgMethod: pointer; msgData: array of const): IOmniTaskControl; begin Invoke(msgMethod, OpenArrayToVarArray(msgData)); Result := Self; end; { TOmniTaskControl.Invoke } function TOmniTaskControl.Invoke(const msgMethod: pointer; msgData: TOmniValue): IOmniTaskControl; begin Comm.Send(TOmniInternalAddressMsg.CreateMessage(msgMethod, msgData)); Result := Self; end; { TOmniTaskControl.Invoke } function TOmniTaskControl.Invoke(const msgName: string): IOmniTaskControl; begin Invoke(msgName, TOmniValue.Null); Result := Self; end; { TOmniCommunicationEndpoint.Invoke } function TOmniTaskControl.Invoke(const msgName: string; msgData: array of const): IOmniTaskControl; begin Invoke(msgName, OpenArrayToVarArray(msgData)); Result := Self; end; { TOmniCommunicationEndpoint.Invoke } function TOmniTaskControl.Invoke(const msgName: string; msgData: TOmniValue): IOmniTaskControl; begin Comm.Send(TOmniInternalStringMsg.CreateMessage(msgName, msgData)); Result := Self; end; { TOmniCommunicationEndpoint.Invoke } function TOmniTaskControl.Join(const group: IOmniTaskGroup): IOmniTaskControl; begin group.Add(Self); Result := Self; end; { TOmniTaskControl.Join } function TOmniTaskControl.Leave(const group: IOmniTaskGroup): IOmniTaskControl; begin group.Remove(Self); Result := Self; end; { TOmniTaskControl.Leave } function TOmniTaskControl.MonitorWith(const monitor: IOmniTaskControlMonitor): IOmniTaskControl; begin monitor.Monitor(Self); Result := Self; end; { TOmniTaskControl.MonitorWith } function TOmniTaskControl.MsgWait(wakeMask: DWORD): IOmniTaskControl; begin Options := Options + [tcoMessageWait]; otcExecutor.WakeMask := wakeMask; Result := Self; end; { TOmniTaskControl.MsgWait } function TOmniTaskControl.RemoveMonitor: IOmniTaskControl; begin otcSharedInfo.MonitorWindow := 0; EnsureCommChannel; otcSharedInfo.CommChannel.Endpoint2.RemoveMonitor; Result := Self; end; { TOmniTaskControl.RemoveMonitor } function TOmniTaskControl.Run: IOmniTaskControl; begin otcParameters.Lock; otcThread := TOmniThread.Create(CreateTask); otcThread.Resume; Result := Self; end; { TOmniTaskControl.Run } function TOmniTaskControl.Schedule(const threadPool: IOmniThreadPool): IOmniTaskControl; begin otcParameters.Lock; if assigned(threadPool) then otcOwningPool := threadPool else otcOwningPool := GlobalOmniThreadPool; (otcOwningPool as IOmniThreadPoolScheduler).Schedule(CreateTask); Result := Self; end; { TOmniTaskControl.Schedule } function TOmniTaskControl.SetMonitor(hWindow: THandle): IOmniTaskControl; begin if otcParameters.IsLocked then raise Exception.Create('TOmniTaskControl.SetMonitor: Monitor can only be assigned while task is not running'); otcSharedInfo.MonitorWindow := hWindow; EnsureCommChannel; otcSharedInfo.CommChannel.Endpoint2.SetMonitor(hWindow, COmniTaskMsg_NewMessage, integer(Int64Rec(UniqueID).Lo), integer(Int64Rec(UniqueID).Hi)); Result := Self; end; { TOmniTaskControl.SetMonitor } procedure TOmniTaskControl.SetOptions(const value: TOmniTaskControlOptions); begin otcExecutor.Options := value; end; { TOmniTaskControl.SetOptions } function TOmniTaskControl.SetParameter(const paramName: string; const paramValue: TOmniValue): IOmniTaskControl; begin otcParameters.Add(paramValue, paramName); Result := Self; end; { TOmniTaskControl.SetParameter } function TOmniTaskControl.SetParameter(const paramValue: TOmniValue): IOmniTaskControl; begin Result := SetParameter('', paramValue); end; { TOmniTaskControl.SetParameter } function TOmniTaskControl.SetParameters(const parameters: array of TOmniValue): IOmniTaskControl; begin otcParameters.Assign(parameters); Result := Self; end; { TOmniTaskControl.SetParameters } function TOmniTaskControl.SetPriority(threadPriority: TOTLThreadPriority): IOmniTaskControl; begin otcExecutor.Priority := threadPriority; Result := Self; end; { TOmniTaskControl.SetPriority } function TOmniTaskControl.SetQueueSize(numMessages: integer): IOmniTaskControl; begin if assigned(otcSharedInfo.CommChannel) then raise Exception.Create('TOmniTaskControl.SetQueueSize: Cannot set queue size. ' + 'Queue already exists'); otcQueueLength := numMessages; Result := Self; end; { TOmniTaskControl.SetQueueSize } function TOmniTaskControl.SetTimer(interval_ms: cardinal; timerMessageID: integer): IOmniTaskControl; begin otcExecutor.Asy_SetTimer(interval_ms, timerMessageID); Result := Self; end; { TOmniTaskControl.SetTimer } function TOmniTaskControl.SetTimer(interval_ms: cardinal; const timerMethod: pointer): IOmniTaskControl; begin otcExecutor.Asy_SetTimer(interval_ms, timerMethod); Result := Self; end; { TOmniTaskControl.SetTimer } function TOmniTaskControl.SetTimer(interval_ms: cardinal; const timerMessageName: string): IOmniTaskControl; begin otcExecutor.Asy_SetTimer(interval_ms, timerMessageName); Result := Self; end; { TOmniTaskControl.SetTimer } function TOmniTaskControl.Terminate(maxWait_ms: cardinal): boolean; begin //TODO : reset executor and exit immediately if task was not started at all or raise exception? SetEvent(otcSharedInfo.TerminateEvent); Result := WaitFor(maxWait_ms); if not Result then begin if assigned(otcThread) then begin TerminateThread(otcThread.Handle, cardinal(-1)); otcThread := nil; end else if assigned(otcOwningPool) then begin otcOwningPool.Cancel(UniqueID); otcOwningPool := nil; end; end; end; { TOmniTaskControl.Terminate } function TOmniTaskControl.TerminateWhen(event: THandle): IOmniTaskControl; begin otcExecutor.TerminateWhen(event); Result := Self; end; { TOmniTaskControl.TerminateWhen } function TOmniTaskControl.WaitFor(maxWait_ms: cardinal): boolean; begin Result := (WaitForSingleObject(otcSharedInfo.TerminatedEvent, maxWait_ms) = WAIT_OBJECT_0); end; { TOmniTaskControl.WaitFor } function TOmniTaskControl.WaitForInit: boolean; begin Result := otcExecutor.WaitForInit; end; { TOmniTaskControl.WaitForInit } function TOmniTaskControl.WithCounter(const counter: IOmniCounter): IOmniTaskControl; begin otcSharedInfo.Counter := counter; Result := Self; end; { TOmniTaskControl.WithCounter } function TOmniTaskControl.WithLock(const lock: TSynchroObject; autoDestroyLock: boolean = true): IOmniTaskControl; begin otcSharedInfo.Lock := lock; otcDestroyLock := autoDestroyLock; end; { TOmniTaskControl.WithLock } { TOmniThread } constructor TOmniThread.Create(task: IOmniTask); begin inherited Create(true); otTask := task; end; { TOmniThread.Create } procedure TOmniThread.Execute; begin (otTask as IOmniTaskExecutor).Execute; end; { TOmniThread.Execute } { TOmniTaskControlListEnumerator } constructor TOmniTaskControlListEnumerator.Create(taskList: TInterfaceList); begin otcleTaskEnum := taskList.GetEnumerator; end; { TOmniTaskControlListEnumerator.Create } function TOmniTaskControlListEnumerator.GetCurrent: IOmniTaskControl; begin Result := otcleTaskEnum.GetCurrent as IOmniTaskControl; end; { TOmniTaskControlListEnumerator.GetCurrent } function TOmniTaskControlListEnumerator.MoveNext: boolean; begin Result := otcleTaskEnum.MoveNext; end; { TOmniTaskControlListEnumerator.MoveNext } { TOmniTaskControlList } constructor TOmniTaskControlList.Create; begin inherited Create; otclList := TInterfaceList.Create; end; { TOmniTaskControlList.Create } destructor TOmniTaskControlList.Destroy; begin FreeAndNil(otclList); inherited Destroy; end; { TOmniTaskControlList.Destroy } function TOmniTaskControlList.Add(const item: IOmniTaskControl): integer; begin Result := otclList.Add(item); end; { TOmniTaskControlList.Add } procedure TOmniTaskControlList.Clear; begin otclList.Clear; end; { TOmniTaskControlList.Clear } procedure TOmniTaskControlList.Delete(idxItem: integer); begin otclList.Delete(idxItem); end; { TOmniTaskControlList.Delete } procedure TOmniTaskControlList.Exchange(idxItem1, idxItem2: integer); begin otclList.Exchange(idxItem1, idxItem2); end; { TOmniTaskControlList.Exchange } function TOmniTaskControlList.First: IOmniTaskControl; begin Result := otclList.First as IOmniTaskControl; end; { TOmniTaskControlList.First } function TOmniTaskControlList.Get(idxItem: integer): IOmniTaskControl; begin Result := otclList[idxItem] as IOmniTaskControl; end; { TOmniTaskControlList.Get } function TOmniTaskControlList.GetCapacity: integer; begin Result := otclList.Capacity; end; { TOmniTaskControlList.GetCapacity } function TOmniTaskControlList.GetCount: integer; begin Result := otclList.Count; end; { TOmniTaskControlList.GetCount } function TOmniTaskControlList.GetEnumerator: IOmniTaskControlListEnumerator; begin Result := TOmniTaskControlListEnumerator.Create(otclList); end; { TOmniTaskControlList.GetEnumerator } function TOmniTaskControlList.IndexOf(const item: IOmniTaskControl): integer; begin Result := otclList.IndexOf(item); end; { TOmniTaskControlList.IndexOf } procedure TOmniTaskControlList.Insert(idxItem: integer; const item: IOmniTaskControl); begin otclList.Insert(idxItem, item); end; { TOmniTaskControlList.Insert } function TOmniTaskControlList.Last: IOmniTaskControl; begin Result := otclList.Last as IOmniTaskControl; end; { TOmniTaskControlList.Last } procedure TOmniTaskControlList.Put(idxItem: integer; const value: IOmniTaskControl); begin otclList[idxItem] := value; end; { TOmniTaskControlList.Put } function TOmniTaskControlList.Remove(const item: IOmniTaskControl): integer; begin Result := otclList.Remove(item); end; { TOmniTaskControlList.Remove } procedure TOmniTaskControlList.SetCapacity(const value: integer); begin otclList.Capacity := value; end; { TOmniTaskControlList.SetCapacity } procedure TOmniTaskControlList.SetCount(const value: integer); begin otclList.Count := value; end; { TOmniTaskControlList.SetCount } { TOmniTaskGroup } constructor TOmniTaskGroup.Create; begin inherited Create; otgTaskList := TOmniTaskControlList.Create; end; { TOmniTaskGroup.Create } destructor TOmniTaskGroup.Destroy; begin AutoUnregisterComms; inherited Destroy; end; { TOmniTaskGroup.Destroy } function TOmniTaskGroup.Add(const taskControl: IOmniTaskControl): IOmniTaskGroup; begin otgTaskList.Add(taskControl); Result := Self; end; { TOmniTaskGroup.Add } procedure TOmniTaskGroup.AutoUnregisterComms; begin if assigned(otgRegisteredWith) then InternalUnregisterAllCommFrom(otgRegisteredWith); end; { TOmniTaskGroup.AutoUnregisterComms } function TOmniTaskGroup.GetEnumerator: IOmniTaskControlListEnumerator; begin Result := otgTaskList.GetEnumerator; end; { TOmniTaskGroup.GetEnumerator } procedure TOmniTaskGroup.InternalUnregisterAllCommFrom(const task: IOmniTask); var groupTask: IOmniTaskControl; begin for groupTask in Self do task.UnregisterComm(groupTask.Comm); otgRegisteredWith := nil; end; { TOmniTaskGroup.InternalUnregisterAllCommFrom } function TOmniTaskGroup.RegisterAllCommWith(const task: IOmniTask): IOmniTaskGroup; var groupTask: IOmniTaskControl; begin AutoUnregisterComms; for groupTask in Self do task.RegisterComm(groupTask.Comm); otgRegisteredWith := task; Result := Self; end; { TOmniTaskGroup.RegisterAllCommWith } function TOmniTaskGroup.Remove(const taskControl: IOmniTaskControl): IOmniTaskGroup; begin otgTaskList.Remove(taskControl); Result := Self; end; { TOmniTaskGroup.Remove } function TOmniTaskGroup.RunAll: IOmniTaskGroup; var iIntf: integer; begin for iIntf := 0 to otgTaskList.Count - 1 do (otgTaskList[iIntf] as IOMniTaskControl).Run; Result := Self; end; { TOmniTaskGroup.RunAll } procedure TOmniTaskGroup.SendToAll(const msg: TOmniMessage); var groupTask: IOmniTaskControl; begin for groupTask in Self do groupTask.Comm.Send(msg); end; { TOmniTaskGroup.SendToAll } function TOmniTaskGroup.TerminateAll(maxWait_ms: cardinal): boolean; var iIntf: integer; begin for iIntf := 0 to otgTaskList.Count - 1 do SetEvent((otgTaskList[iIntf] as IOmniTaskControlInternals).TerminateEvent); Result := WaitForAll(maxWait_ms); end; { TOmniTaskGroup.TerminateAll } function TOmniTaskGroup.UnregisterAllCommFrom(const task: IOmniTask): IOmniTaskGroup; begin InternalUnregisterAllCommFrom(task); Result := Self; end; { TOmniTaskGroup.UnregisterAllCommFrom } function TOmniTaskGroup.WaitForAll(maxWait_ms: cardinal = INFINITE): boolean; var iIntf : integer; waitHandles: array [0..63] of THandle; begin for iIntf := 0 to otgTaskList.Count - 1 do waitHandles[iIntf] := (otgTaskList[iIntf] as IOmniTaskControlInternals).TerminatedEvent; Result := WaitForMultipleObjects(otgTaskList.Count, @waitHandles, true, maxWait_ms) = WAIT_OBJECT_0; end; { TOmniTaskGroup.WaitForAll } end.