Abstract
How can one write code to block (wait) on multiple syncro events at once, in a multi-threaded application? We are of course, talking about Delphi. By “multiple synchro events at once”, I mean that the thread unblocks on some logical combination of the statuses of the member events. Here I use the word “event” in a very general sense, not to be confused with the more specific sense of the word event, as in Embarcadero’s “TEvent”. In the general sense, by “event” or “synchro event”, I mean the broad class of synchro objects like semaphore (TSemaphore) and event (TEvent).
The typical and most common logical combination is the “OR” operation. In other words, how can we block on multiple synchro events and be unblocked when the first event is signaled, or a time-out occurs. When we are finally unblocked, how can we know which event was the signaled one?
The Task
Write some re-usable library code, in Delphi XE7+, to block on multiple synchro events at once, with time-out. The code must be:
- cross-platform,
- Bullet proof and
- Have a really simple API.
The wait function should allow for optional time-out, detect which entity caused the signal, and should leverage operating system capabilities for optimal efficiency.
In this post, I will focus a particular subset of this task, where:
- all the synchro events are semaphores (eg. TSemaphore, but not TEvent)
- access to the semaphores is restricted to this library. In other words, they are unnamed.
- The wait condition is simply just to wait on the chronologically first semaphore to be signaled, or a time-out to occur, whichever first.
However, I will also address, in less detail, the more general task.
By Why?
One of the big bug-bears in multi-threaded programming is that if a thread is blocked on some semaphore for normal operational purposes, then it can’t check if it is time to gracefully shut-down until it is unblocked. Unblocking driven by the semaphore being signaled. But if it is time to gracefully shutdown, then it is quiet likely that the semaphore, in which it is blocked on will never again be signaled, precisely because it is time to shut-down. For this reason, a lot of multi-threaded applications (whether written in Delphi or not) have problems shutting down gracefully. I call this the “block-and-check-terminate” problem. Some readers may be quick to respond, that this is a non-issue. With “proper” design, your programs will never have a “block-and-check-terminate” problem. While this may well be true, in my experience, I have found that in real applications, designing to avoid “block-and-check-terminate” problems, without a generic soltion, is complex and tedious.
What is needed, is a generic cross-platform solution.
What have others done
Jedi
The Jedi Component Library, in it’s multi-thread component code, provided a wrapper for thread and semaphore. It associated a special semaphore with each thread. The thread semaphore would start life as unsignaled. A call to terminate the thread would cause the special semaphore to be signaled. Whenever the thread would wait on some regular semaphore, the wrap would be defined in such a way that it would call the windows WaitForMultipleObjects() function on two semaphores: the explicit semaphore, and the thread’s special semaphore. If the thread was terminated while it was blocked on some operational semaphore, the thread would unblock, detect the condition and properly handle it.
It was a elegant solution to the most common subset of our task. The problem is though, that it only works for Windows. Android, iOS and OSX have no o/s API equivalent to WaitForMultipleObjects(). I don’t know why. Maybe mobile applications just have less need for within-app parallelism? If you search StackOverflow, there are more than a few questions asking how to wait on multiple semaphores, on Android or iOS, and there are no real solutions given. This is just my opinion. If you disagree, please post a comment below.
Just Kill threads
I’ve seen this done. When the program needs to shut-down, it just kills its non-main threads with an explicit kill command. Any self-respecting developer will be very uncomfortable with the solution, and it’s degree of safety is very situational.
Design Around
Design around the issue, so that no thread ever has to wait on multiple conditions. Good luck with that.
Design Above
Use a threading library, like OTL, to provide sufficient higher level structures to support parallel tasking, that there is no design need for lower level structures such as semaphores and events. You could do this, but it is a bit limiting. I think even with OTL, you will still run into requirements where you need to wait on multiple semaphores.
Count-down Latch
This SO post, suggests using a TCountdownEvent object (SyncObjs unit). When either of the component semaphores is signaled, also signal the count-down latch. To wait on the first of either, wait on the latch. The limitations of this solution are:
The code that signals the component semaphores has to know about the latch rules. That’s not a general solution.
What happens when something wants to wait directly on a component semaphore? The relationship between the component semaphores and the latch is destroyed. So you either have to have code to deal with this (complex), or the latch solution is just used in situations where the only consumer of either semaphore is the consumer of both. Another problem is that it may be inefficient (CPU-wise) for Windows.
Offered Solution
The solution that I offer has a story that goes like this …
Take the synchro classes that you want to use (TSemaphore, TEvent etc), and wrap them so that you take control of their WaitFor() and Signal() methods.
Imagine a new object, called a “Condition”. A condition is a syncro object which is a composite view of an arbitrary set of member synchro objects. This is the Composite Pattern applied to synchro objects. The condition is considered signaled according to some rule of your design, based on the signal status of the members. For example, it could be deemed signaled, when one or more members are signaled, but clear when all members are clear. Apply a rule, that you cant directly signal the condition, because that would be meaningless.
The condition is implemented by a private semaphore. When a member semaphore is signaled, the aforementioned condition semaphore is signaled (or not, if you have a different composition rule). When a member semaphore is successfully waited on, from code outside of the condition, the condition status needs to be updated atomically. When the condition is waited for, and unblocked, resource counts need to be decremented from the contributing signaler.
Construction, destruction and operation of the condition needs to happen transparently, from direct operations on the member semaphores. We achieve this by each condition keeping a record of its member semaphores and their contribution to the resource count; and also, each member semaphore keeping a record of the list of conditions that it is entangled in. Construction and destruction of conditions needs to safely and correctly update entanglement lists.
A single critical section (Gate), is shared by all synchro objects and conditions that might be entangled together. The gate must be passed in as a construction parameter.
When the operating system call WaitForMultipleObjects() is available, it is used instead of the private condition semaphore. This call will be available if and only if:
- the operating system is win32/64; and
- all member objects are descendants of THandleObject (and thus have a windows “handle”); and
- the client specifies so via a construction parameter.
The Full Source Code
Listing 1 below shows a solution for the aforementioned subset of the task. To implement other conditions, other than “unblock on first chronological member”, override the TSyncroCondition methods marked as virtual.
unit SBD.TL.SyncObjs2;
interface
uses System.SyncObjs, Generics.Collections, SysUtils;
const
Forever = System.INFINITE;
TimeOut = cardinal( $FFFFFFFF);
WaitForError = 0;
type
TSynchoConditionList = class;
ISynchroCondition = interface;
TSBDSemaphore = class
private
FEntangledConditions: TSynchoConditionList;
FGate: TCriticalSection;
FCount: cardinal;
FMax: cardinal;
FBase: TSemaphore;
function BaseWaitFor( TimeLimit: cardinal): TWaitResult;
procedure BaseSignal;
procedure WaitedVia_External;
public
constructor Create( Gate: TCriticalSection; InitialCount, MaxCount: cardinal);
destructor Destroy; override;
function WaitFor( TimeLimit: cardinal): TWaitResult;
function Signal: boolean;
function Count: cardinal;
function AsConditions( AConstrainToHandleSyncros: boolean): ISynchroCondition;
end;
ISynchroCondition = interface
['{CFD0DD74-6EB4-4CCF-9EE1-8BBAC759151A}']
function WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult;
function Join( const Addend: ISynchroCondition): ISynchroCondition;
end;
IInterfaceHelper = interface
['{40D9B899-AF13-4FC1-AF04-23E8416E1FEE}']
function AsObject: TObject;
end;
TSyncroCondition = class( TInterfacedObject, ISynchroCondition, IInterfaceHelper)
protected
// Override these methods to implement conditions other that "first chronological".
function ConditionIsSignaled: boolean; virtual;
function ConditionWillBeSignaledAfterSignal( Sem: TSBDSemaphore):boolean; virtual;
function ConditionWillBeSignaledAfterWait ( Sem: TSBDSemaphore):boolean; virtual;
function WaitCanUseOS_API: boolean; virtual;
function OS_API_WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult; virtual;
private
FisSignalled: boolean;
FWillBeSignaled: boolean;
FSignals: TDictionary<TSBDSemaphore,cardinal>;
FGate: TCriticalSection;
FisBroken: boolean;
FBase: TSemaphore;
FInited: boolean;
FConstrainedToAllHandleSyncros: boolean;
constructor CreateWithOne( Origin: TSBDSemaphore; AConstrainToHandleSyncros: boolean);
destructor Destroy; override;
procedure Presignal ( Sem: TSBDSemaphore);
procedure Postsignal( Sem: TSBDSemaphore);
procedure PostWait ( Sem: TSBDSemaphore);
procedure BaseSignal;
procedure BaseWaitForever;
function BaseWaitFor( TimeLimit: cardinal): TWaitResult;
function WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult;
function FindASignallingContributor( var Sem: TSBDSemaphore): boolean;
procedure NotifyMemberDestroyed( Member: TSBDSemaphore);
function AsObject: TObject;
function Join( const Addend: ISynchroCondition): ISynchroCondition;
procedure CheckInit;
end;
TSynchoConditionList = class( TList<TSyncroCondition>)
public
end;
implementation
function TSBDSemaphore.Count: cardinal;
begin
result := FCount
end;
constructor TSBDSemaphore.Create( Gate: TCriticalSection; InitialCount, MaxCount: cardinal);
begin
Assert( MaxCount >= 1);
Assert( InitialCount <= MaxCount);
FGate := Gate;
FBase := TSemaphore.Create( nil, InitialCount, MaxCount, '', False);
FEntangledConditions := TSynchoConditionList.Create;
FCount := InitialCount;
FMax := MaxCount
end;
destructor TSBDSemaphore.Destroy;
var
Cnd: TSyncroCondition;
begin
FGate.Enter;
try
for Cnd in FEntangledConditions do
Cnd.NotifyMemberDestroyed( self);
FBase.Free
finally
FGate.Leave
end;
inherited
end;
function TSBDSemaphore.Signal: boolean;
var
Cnd: TSyncroCondition;
Saturated: boolean;
begin
FGate.Enter;
try
Saturated := FCount >= FMax;
if not Saturated then
begin
Inc( FCount);
for Cnd in FEntangledConditions do
Cnd.Presignal( self)
end;
BaseSignal;
if not Saturated then
for Cnd in FEntangledConditions do
Cnd.Postsignal( self);
finally
FGate.Leave
end;
end;
procedure TSyncroCondition.Presignal( Sem: TSBDSemaphore);
begin
Assert( FGate = Sem.FGate);
CheckInit;
FWillBeSignaled := ConditionWillBeSignaledAfterSignal( Sem);
FSignals[ Sem] := FSignals[ Sem] + 1;
if (not FisSignalled) and FWillBeSignaled then
BaseSignal;
end;
procedure TSyncroCondition.BaseWaitForever;
begin
BaseWaitFor( Forever)
end;
constructor TSyncroCondition.CreateWithOne( Origin: TSBDSemaphore; AConstrainToHandleSyncros: boolean);
begin
FisBroken := False;
FGate := Origin.FGate;
FConstrainedToAllHandleSyncros := AConstrainToHandleSyncros;
{$IFDEF MSWINDOWS}
if FConstrainedToAllHandleSyncros then
Assert( Origin.FBase is THandleObject);
{$ENDIF MSWINDOWS}
FInited := False;
FSignals := TDictionary<TSBDSemaphore,cardinal>.Create;
FSignals.Add( Origin, Origin.FCount);
FisSignalled := False;
FWillBeSignaled := False;
FBase := nil
end;
procedure TSyncroCondition.CheckInit;
var
InitialCount: cardinal;
begin
if FInited then exit;
FInited := True;
FisSignalled := ConditionIsSignaled;
FWillBeSignaled := FisSignalled;
if FisSignalled then
InitialCount := 0
else
InitialCount := 1;
if not WaitCanUseOS_API then
FBase := TSemaphore.Create( nil, InitialCount, 1, '', False)
end;
destructor TSyncroCondition.Destroy;
var
Member: TSBDSemaphore;
begin
FGate.Enter;
try
FisBroken := True;
for Member in FSignals.Keys do
Member.FEntangledConditions.Remove( self);
FSignals.Free;
FBase.Free
finally
FGate.Leave
end;
inherited
end;
procedure TSyncroCondition.NotifyMemberDestroyed( Member: TSBDSemaphore);
begin
if FGate <> Member.FGate then
FisBroken := True;
if FisBroken then exit;
FGate.Enter;
try
if FSignals.ContainsKey( Member) then
begin
FisBroken := True;
FSignals.Remove( Member)
end
finally
FGate.Leave
end
end;
function TSyncroCondition.OS_API_WaitFor(
TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult;
{$IFDEF MSWINDOWS}
var
HandleObjs: THandleObjectArray;
SignaledObj: THandleObject;
Member: TSBDSemaphore;
i: integer;
{$ENDIF MSWINDOWS}
begin
{$IFDEF MSWINDOWS}
SetLength( HandleObjs, FSignals.Count);
i := -1;
for Member in FSignals.Keys do
begin
Inc( i);
HandleObjs[ i] := Member.FBase as THandleObject
end;
result := THandleObject.WaitForMultiple( HandleObjs, TimeLimit, False, SignaledObj, False, 0);
if result = wrSignaled then
begin
i := -1;
for Member in FSignals.Keys do
begin
Inc( i);
if SignaledObj <> Member.FBase then continue;
Member.WaitedVia_External;
break
end
end
{$ELSE}
result := wrError
{$ENDIF MSWINDOWS}
end;
function TSyncroCondition.WaitCanUseOS_API: boolean;
begin
{$IFDEF MSWINDOWS}
result := FConstrainedToAllHandleSyncros;
{$ELSE}
result := False
{$ENDIF MSWINDOWS}
end;
function TSyncroCondition.WaitFor(
TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult;
var
TimeLeft: cardinal;
AfterWaitClock, Elapsed: TDateTime;
Confirmed: boolean;
doRetry: boolean;
Count: cardinal;
begin
if FisBroken then
begin
result := wrAbandoned;
exit
end;
CheckInit;
if WaitCanUseOS_API then
result := OS_API_WaitFor( TimeLimit, Contributor)
else
begin
TimeLeft := TimeLimit;
repeat
result := BaseWaitFor( TimeLeft);
if result <> wrSignaled then break;
if TimeLeft > 0 then
AfterWaitClock := Now;
FGate.Enter;
try
Confirmed := FindASignallingContributor( Contributor);
if Confirmed then
begin
result := Contributor.BaseWaitFor( 0);
Confirmed := result = wrSignaled
end;
doRetry := result = wrTimeOut;
if doRetry and (TimeLeft > 0) then
begin
Elapsed := Trunc( (Now - AfterWaitClock) * MSecsPerDay);
if TimeLeft > Elapsed then
Dec( TimeLeft, Elapsed)
else
TimeLeft := 0
end;
if Confirmed then
begin
Count := FSignals[ Contributor];
if Count > 0 then
FSignals[ Contributor] := Count - 1
end;
if ConditionIsSignaled then
BaseSignal;
finally
FGate.Leave
end;
if doRetry then
Sleep(1)
until not doRetry
end
end;
procedure TSyncroCondition.Postsignal( Sem: TSBDSemaphore);
begin
CheckInit;
if FisSignalled and (not FWillBeSignaled) then
BaseWaitForever
end;
procedure TSBDSemaphore.WaitedVia_External;
var
Cnd: TSyncroCondition;
Saturated: boolean;
begin
FGate.Enter;
try
Saturated := FCount = 0;
if not Saturated then
begin
Dec( FCount);
for Cnd in FEntangledConditions do
Cnd.PostWait( self)
end
finally
FGate.Leave
end;
end;
function TSBDSemaphore.WaitFor( TimeLimit: cardinal): TWaitResult;
begin
result := BaseWaitFor( TimeLimit);
if result = wrSignaled then
WaitedVia_External
end;
procedure TSyncroCondition.PostWait( Sem: TSBDSemaphore);
var
WR: TWaitResult;
Saturated: boolean;
Count: cardinal;
begin
Assert( FGate = Sem.FGate);
Assert( not FisBroken);
CheckInit;
FWillBeSignaled := ConditionWillBeSignaledAfterWait( Sem);
Count := FSignals[ Sem];
if Count > 0 then
FSignals[ Sem] := Count - 1;
if FisSignalled <> FWillBeSignaled then
begin
if FWillBeSignaled then
BaseSignal
else
BaseWaitForever
end
end;
function TSBDSemaphore.AsConditions( AConstrainToHandleSyncros: boolean): ISynchroCondition;
begin
result := TSyncroCondition.Create( self, AConstrainToHandleSyncros);
end;
procedure TSBDSemaphore.BaseSignal;
begin
FBase.Release
end;
function TSBDSemaphore.BaseWaitFor( TimeLimit: cardinal): TWaitResult;
begin
result := FBase.WaitFor( TimeLimit)
end;
function TSyncroCondition.FindASignallingContributor(
var Sem: TSBDSemaphore): boolean;
var
Pair: TPair<TSBDSemaphore,cardinal>;
begin
result := not FisBroken;
if not result then exit;
result := False;
for Pair in FSignals do
begin
result := Pair.Value > 0;
if not result then continue;
Sem := Pair.Key;
break
end
end;
function TSyncroCondition.AsObject: TObject;
begin
result := self
end;
function TSyncroCondition.Join(
const Addend: ISynchroCondition): ISynchroCondition;
var
Composite, Friend: TSyncroCondition;
Pair: TPair<TSBDSemaphore,cardinal>;
InitialCount: cardinal;
begin
FGate.Enter;
try
Composite := TSyncroCondition.Create;
result := Composite;
Friend := (Addend as IInterfaceHelper).AsObject as TSyncroCondition;
Assert( FGate = Friend.FGate);
Composite.FisBroken := FisBroken or Friend.FisBroken;
Composite.FGate := FGate;
FSignals := TDictionary<TSBDSemaphore,cardinal>.Create;
for Pair in FSignals do
Composite.FSignals.Add( Pair.Key, Pair.Value);
for Pair in Friend.FSignals do
begin
if Composite.FSignals.ContainsKey( Pair.Key) then
Composite.FSignals[ Pair.Key] := Composite.FSignals[ Pair.Key] + Friend.FSignals[ Pair.Key]
else
Composite.FSignals.Add( Pair.Key, Pair.Value)
end;
for Pair in Composite.FSignals do
if Pair.Key.FEntangledConditions.IndexOf( self) = -1 then
Pair.Key.FEntangledConditions.Add( self);
Composite.FisSignalled := ConditionIsSignaled;
Composite.FWillBeSignaled := FisSignalled;
Composite.FInited := False;
Composite.FBase := nil;
{$IFDEF MSWINDOWS}
Assert( FConstrainedToAllHandleSyncros = Friend.FConstrainedToAllHandleSyncros);
{$ENDIF MSWINDOWS}
Composite.FConstrainedToAllHandleSyncros := FConstrainedToAllHandleSyncros;
finally
FGate.Leave
end;
end;
procedure TSyncroCondition.BaseSignal;
begin
if assigned( FBase) then
FBase.Release;
FisSignalled := True
end;
function TSyncroCondition.BaseWaitFor( TimeLimit: cardinal): TWaitResult;
begin
if assigned( FBase) then
result := FBase.WaitFor( TimeLimit)
else
result := wrError;
FisSignalled := False
end;
function TSyncroCondition.ConditionIsSignaled: boolean;
var
Pair: TPair<TSBDSemaphore,cardinal>;
begin
result := False;
for Pair in FSignals do
begin
result := Pair.Value > 0;
if not result then continue;
break
end
end;
function TSyncroCondition.ConditionWillBeSignaledAfterSignal(
Sem: TSBDSemaphore): boolean;
begin
result := True
end;
function TSyncroCondition.ConditionWillBeSignaledAfterWait(
Sem: TSBDSemaphore): boolean;
var
Pair: TPair<TSBDSemaphore,cardinal>;
Count, Sum: cardinal;
begin
result := False;
Sum := 0;
for Pair in FSignals do
begin
Count := Pair.Value;
if (Pair.Key = Sem) and (Count > 0) then
Dec( Count);
Inc( Sum, Count);
result := Sum > 0;
if not result then continue;
break
end
end;
end.