///Two-way interprocess communication channel. Part of the OmniThreadLibrary project.
///Primoz Gabrijelcic
///
///This software is distributed under the BSD license.
///
///Copyright (c) 2008, Primoz Gabrijelcic
///All rights reserved.
///
///Redistribution and use in source and binary forms, with or without modification,
///are permitted provided that the following conditions are met:
///- Redistributions of source code must retain the above copyright notice, this
/// list of conditions and the following disclaimer.
///- Redistributions in binary form must reproduce the above copyright notice,
/// this list of conditions and the following disclaimer in the documentation
/// and/or other materials provided with the distribution.
///- The name of the Primoz Gabrijelcic may not be used to endorse or promote
/// products derived from this software without specific prior written permission.
///
///THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
///ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
///WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
///DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
///ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
///(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
///LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
///ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
///(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
///SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
///
///
/// Author : Primoz Gabrijelcic
/// Contributors : GJ, Lee_Nover
/// Creation date : 2008-06-12
/// Last modification : 2008-10-05
/// Version : 1.03
///
/// History:
/// 1.03: 2008-10-05
/// - Added two overloaded versions of IOmniCommunicationEndpoint.ReceivedWait,
/// which are just simple wrappers for WaitForSingleObject(NewMessageEvent) +
/// Receive.
/// - Defined OmniThreadLibrary-reserved message ID $FFFF.
/// 1.02: 2008-09-26
/// - Better default queue calculation that takes into account OtlContainers
/// overhead and FastMM4 granulation.
/// 1.01: 2008-09-20
/// - Added two TOmniMessage constructors.
///
unit OtlComm;
interface
uses
SyncObjs,
SpinLock,
GpStuff,
DSiWin32,
OtlCommon,
OtlContainers;
const
//reserved for internal OTL messaging
COtlReservedMsgID = $FFFF;
type
{$A4}
TOmniMessage = record
MsgID : word;
MsgData: TOmniValue;
constructor Create(aMsgID: word; aMsgData: TOmniValue); overload;
constructor Create(aMsgID: word); overload;
end; { TOmniMessage }
const
//calculate default queue size so that the queue memory gets as close to 64 KB as possible
CDefaultQueueSize = $FF00{adjusted for FastMM4 granularity} div (SizeOf(TOmniMessage) + 4{SizeOf(POmniLinkedData)}); {3264 entries}
type
IOmniCommunicationEndpoint = interface ['{910D329C-D049-48B9-B0C0-9434D2E57870}']
function GetNewMessageEvent: THandle;
//
function Receive(var msg: TOmniMessage): boolean; overload;
function Receive(var msgID: word; var msgData: TOmniValue): boolean; overload;
function ReceiveWait(var msg: TOmniMessage; timeout_ms: cardinal): boolean; overload;
function ReceiveWait(var msgID: word; var msgData: TOmniValue; timeout_ms: cardinal): boolean; overload;
procedure RemoveMonitor;
procedure Send(const msg: TOmniMessage); overload;
procedure Send(msgID: word); overload;
procedure Send(msgID: word; msgData: array of const); overload;
procedure Send(msgID: word; msgData: TOmniValue); overload;
procedure SetMonitor(hWindow: THandle; msg: cardinal; messageWParam, messageLParam:
integer);
property NewMessageEvent: THandle read GetNewMessageEvent;
end; { IOmniCommunicationEndpoint }
IOmniTwoWayChannel = interface ['{3ED1AB88-4209-4E01-AA79-A577AD719520}']
function Endpoint1: IOmniCommunicationEndpoint;
function Endpoint2: IOmniCommunicationEndpoint;
end; { IOmniTwoWayChannel }
{:Fixed-size ring buffer of TOmniValues references.
}
TOmniMessageQueue = class(TOmniQueue)
public
constructor Create(numMessages: integer); reintroduce;
function Dequeue: TOmniMessage; reintroduce;
function Enqueue(const value: TOmniMessage): boolean; reintroduce;
end; { TOmniMessageQueue }
function CreateTwoWayChannel(numElements: integer = CDefaultQueueSize):
IOmniTwoWayChannel;
implementation
uses
Windows,
SysUtils,
Variants,
{$IFDEF DEBUG}OtlCommBufferTest,{$ENDIF}
OtlEventMonitor;
type
TOmniCommunicationEndpoint = class(TInterfacedObject, IOmniCommunicationEndpoint)
strict private
ceReader_ref: TOmniMessageQueue;
ceWriter_ref: TOmniMessageQueue;
protected
function GetNewMessageEvent: THandle;
public
constructor Create(readQueue, writeQueue: TOmniMessageQueue);
function Receive(var msg: TOmniMessage): boolean; overload; inline;
function Receive(var msgID: word; var msgData: TOmniValue): boolean; overload; inline;
function ReceiveWait(var msg: TOmniMessage; timeout_ms: cardinal): boolean; overload; inline;
function ReceiveWait(var msgID: word; var msgData: TOmniValue; timeout_ms: cardinal):
boolean; overload; inline;
procedure RemoveMonitor; inline;
procedure Send(msgID: word); overload; inline;
procedure Send(msgID: word; msgData: array of const); overload;
procedure Send(msgID: word; msgData: TOmniValue); overload; inline;
procedure Send(const msg: TOmniMessage); overload; inline;
procedure SetMonitor(hWindow: THandle; msg: cardinal; messageWParam, messageLParam:
integer); inline;
property NewMessageEvent: THandle read GetNewMessageEvent;
end; { TOmniCommunicationEndpoint }
TOmniTwoWayChannel = class(TInterfacedObject, IOmniTwoWayChannel)
strict private
twcEndpoint : array [1..2] of IOmniCommunicationEndpoint;
twcLock : TSynchroObject;
twcMessageQueueSize: integer;
twcUnidirQueue : array [1..2] of TOmniMessageQueue;
strict protected
procedure CreateBuffers; inline;
public
constructor Create(messageQueueSize: integer);
destructor Destroy; override;
function Endpoint1: IOmniCommunicationEndpoint; inline;
function Endpoint2: IOmniCommunicationEndpoint; inline;
end; { TOmniTwoWayChannel }
{ exports }
function CreateTwoWayChannel(numElements: integer): IOmniTwoWayChannel;
begin
Result := TOmniTwoWayChannel.Create(numElements);
end; { CreateTwoWayChannel }
{ TOmniMessage }
constructor TOmniMessage.Create(aMsgID: word; aMsgData: TOmniValue);
begin
MsgID := aMsgID;
MsgData := aMsgData;
end; { TOmniMessage.Create }
constructor TOmniMessage.Create(aMsgID: word);
begin
MsgID := aMsgID;
MsgData := TOmniValue.Null;
end; { TOmniMessage.Create }
{ TOmniMessageQueue }
constructor TOmniMessageQueue.Create(numMessages: integer);
begin
inherited Create(numMessages, SizeOf(TOmniMessage));
end; { TOmniMessageQueue.Create }
function TOmniMessageQueue.Dequeue: TOmniMessage;
var
tmp: TOmniMessage;
begin
tmp.MsgData.RawZero;
if not inherited Dequeue(tmp) then
raise Exception.Create('TOmniMessageQueue.Dequeue: Message queue is empty');
Result := tmp;
end; { TOmniMessageQueue.Dequeue }
function TOmniMessageQueue.Enqueue(const value: TOmniMessage): boolean;
var
tmp: TOmniMessage;
begin
tmp := value;
Result := inherited Enqueue(tmp);
tmp.MsgData.RawZero;
end; { TOmniMessageQueue.Enqueue }
{ TOmniCommunicationEndpoint }
constructor TOmniCommunicationEndpoint.Create(readQueue, writeQueue: TOmniMessageQueue);
begin
inherited Create;
ceReader_ref := readQueue;
ceWriter_ref := writeQueue;
end; { TOmniCommunicationEndpoint.Create }
function TOmniCommunicationEndpoint.GetNewMessageEvent: THandle;
begin
Result := ceReader_ref.NotifySupport.NewDataEvent;
end; { TOmniCommunicationEndpoint.GetNewMessageEvent }
function TOmniCommunicationEndpoint.Receive(var msgID: word; var msgData:
TOmniValue): boolean;
var
msg: TOmniMessage;
begin
Result := Receive(msg);
if Result then begin
msgID := msg.msgID;
msgData := msg.msgData;
end;
end; { TOmniCommunicationEndpoint.Receive }
function TOmniCommunicationEndpoint.Receive(var msg: TOmniMessage): boolean;
begin
Result := not ceReader_ref.IsEmpty;
if Result then
msg := ceReader_ref.Dequeue;
end; { TOmniCommunicationEndpoint.Receive }
function TOmniCommunicationEndpoint.ReceiveWait(var msg: TOmniMessage; timeout_ms:
cardinal): boolean;
begin
Result := (WaitForSingleObject(NewMessageEvent, timeout_ms) = WAIT_OBJECT_0);
if Result then
Result := Receive(msg);
end; { TOmniCommunicationEndpoint.ReceiveWait }
function TOmniCommunicationEndpoint.ReceiveWait(var msgID: word; var msgData: TOmniValue;
timeout_ms: cardinal): boolean;
begin
Result := (WaitForSingleObject(NewMessageEvent, timeout_ms) = WAIT_OBJECT_0);
if Result then
Result := Receive(msgID, msgData);
end; { TOmniCommunicationEndpoint.ReceiveWait }
procedure TOmniCommunicationEndpoint.RemoveMonitor;
begin
ceWriter_ref.MonitorSupport.RemoveMonitor;
end; { TOmniCommunicationEndpoint.RemoveMonitor }
procedure TOmniCommunicationEndpoint.Send(const msg: TOmniMessage);
begin
if not ceWriter_ref.Enqueue(msg) then
raise Exception.Create('TOmniCommunicationEndpoint.Send: Queue is full');
end; { TOmniCommunicationEndpoint.Send }
procedure TOmniCommunicationEndpoint.Send(msgID: word; msgData: TOmniValue);
var
msg: TOmniMessage;
begin
msg.msgID := msgID;
msg.msgData := msgData;
Send(msg);
end; { TOmniCommunicationEndpoint.Send }
procedure TOmniCommunicationEndpoint.Send(msgID: word; msgData: array of const);
begin
Send(msgID, OpenArrayToVarArray(msgData));
end; { TOmniCommunicationEndpoint.Send }
procedure TOmniCommunicationEndpoint.Send(msgID: word);
begin
Send(msgID, TOmniValue.Null);
end; { TOmniCommunicationEndpoint.Send }
procedure TOmniCommunicationEndpoint.SetMonitor(hWindow: THandle; msg: cardinal;
messageWParam, messageLParam: integer);
begin
ceWriter_ref.MonitorSupport.SetMonitor(CreateOmniMonitorParams(
hWindow, msg, messageWParam, messageLParam));
end; { TOmniCommunicationEndpoint.SetMonitor }
{ TOmniTwoWayChannel }
constructor TOmniTwoWayChannel.Create(messageQueueSize: integer);
begin
inherited Create;
twcMessageQueueSize := messageQueueSize;
twcLock := TTicketSpinLock.Create;
end; { TOmniTwoWayChannel.Create }
destructor TOmniTwoWayChannel.Destroy;
begin
twcUnidirQueue[1].Free;
twcUnidirQueue[1] := nil;
twcUnidirQueue[2].Free;
twcUnidirQueue[2] := nil;
FreeAndNil(twcLock);
inherited;
end; { TOmniTwoWayChannel.Destroy }
procedure TOmniTwoWayChannel.CreateBuffers;
begin
if twcUnidirQueue[1] = nil then
twcUnidirQueue[1] := TOmniMessageQueue.Create(twcMessageQueueSize);
if twcUnidirQueue[2] = nil then
twcUnidirQueue[2] := TOmniMessageQueue.Create(twcMessageQueueSize);
end; { TOmniTwoWayChannel.CreateBuffers }
function TOmniTwoWayChannel.Endpoint1: IOmniCommunicationEndpoint;
begin
Assert((cardinal(@twcEndpoint[1]) AND 3) = 0);
if twcEndpoint[1] = nil then begin
twcLock.Acquire;
try
if twcEndpoint[1] = nil then begin
CreateBuffers;
twcEndpoint[1] := TOmniCommunicationEndpoint.Create(twcUnidirQueue[1], twcUnidirQueue[2]);
end;
finally twcLock.Release; end;
end;
Result := twcEndpoint[1];
end; { TOmniTwoWayChannel.Endpoint1 }
function TOmniTwoWayChannel.Endpoint2: IOmniCommunicationEndpoint;
begin
Assert((cardinal(@twcEndpoint[2]) AND 3) = 0);
if twcEndpoint[2] = nil then begin
twcLock.Acquire;
try
if twcEndpoint[2] = nil then begin
CreateBuffers;
twcEndpoint[2] := TOmniCommunicationEndpoint.Create(twcUnidirQueue[2], twcUnidirQueue[1]);
end;
finally twcLock.Release; end;
end;
Result := twcEndpoint[2];
end; { TOmniTwoWayChannel.Endpoint2 }
end.