Abstract
How can one write code to block (wait) on multiple syncro events at once, in a multi-threaded application? We are of course, talking about Delphi. By “multiple synchro events at once”, I mean that the thread unblocks on some logical combination of the statuses of the member events. Here I use the word “event” in a very general sense, not to be confused with the more specific sense of the word event, as in Embarcadero’s “TEvent”. In the general sense, by “event” or “synchro event”, I mean the broad class of synchro objects like semaphore (TSemaphore) and event (TEvent).
The typical and most common logical combination is the “OR” operation. In other words, how can we block on multiple synchro events and be unblocked when the first event is signaled, or a time-out occurs. When we are finally unblocked, how can we know which event was the signaled one?
The Task
Write some re-usable library code, in Delphi XE7+, to block on multiple synchro events at once, with time-out. The code must be:
- cross-platform,
- Bullet proof and
- Have a really simple API.
The wait function should allow for optional time-out, detect which entity caused the signal, and should leverage operating system capabilities for optimal efficiency.
In this post, I will focus a particular subset of this task, where:
- all the synchro events are semaphores (eg. TSemaphore, but not TEvent)
- access to the semaphores is restricted to this library. In other words, they are unnamed.
- The wait condition is simply just to wait on the chronologically first semaphore to be signaled, or a time-out to occur, whichever first.
However, I will also address, in less detail, the more general task.
By Why?
One of the big bug-bears in multi-threaded programming is that if a thread is blocked on some semaphore for normal operational purposes, then it can’t check if it is time to gracefully shut-down until it is unblocked. Unblocking driven by the semaphore being signaled. But if it is time to gracefully shutdown, then it is quiet likely that the semaphore, in which it is blocked on will never again be signaled, precisely because it is time to shut-down. For this reason, a lot of multi-threaded applications (whether written in Delphi or not) have problems shutting down gracefully. I call this the “block-and-check-terminate” problem. Some readers may be quick to respond, that this is a non-issue. With “proper” design, your programs will never have a “block-and-check-terminate” problem. While this may well be true, in my experience, I have found that in real applications, designing to avoid “block-and-check-terminate” problems, without a generic soltion, is complex and tedious.
What is needed, is a generic cross-platform solution.
What have others done
Jedi
The Jedi Component Library, in it’s multi-thread component code, provided a wrapper for thread and semaphore. It associated a special semaphore with each thread. The thread semaphore would start life as unsignaled. A call to terminate the thread would cause the special semaphore to be signaled. Whenever the thread would wait on some regular semaphore, the wrap would be defined in such a way that it would call the windows WaitForMultipleObjects() function on two semaphores: the explicit semaphore, and the thread’s special semaphore. If the thread was terminated while it was blocked on some operational semaphore, the thread would unblock, detect the condition and properly handle it.
It was a elegant solution to the most common subset of our task. The problem is though, that it only works for Windows. Android, iOS and OSX have no o/s API equivalent to WaitForMultipleObjects(). I don’t know why. Maybe mobile applications just have less need for within-app parallelism? If you search StackOverflow, there are more than a few questions asking how to wait on multiple semaphores, on Android or iOS, and there are no real solutions given. This is just my opinion. If you disagree, please post a comment below.
Just Kill threads
I’ve seen this done. When the program needs to shut-down, it just kills its non-main threads with an explicit kill command. Any self-respecting developer will be very uncomfortable with the solution, and it’s degree of safety is very situational.
Design Around
Design around the issue, so that no thread ever has to wait on multiple conditions. Good luck with that.
Design Above
Use a threading library, like OTL, to provide sufficient higher level structures to support parallel tasking, that there is no design need for lower level structures such as semaphores and events. You could do this, but it is a bit limiting. I think even with OTL, you will still run into requirements where you need to wait on multiple semaphores.
Count-down Latch
This SO post, suggests using a TCountdownEvent object (SyncObjs unit). When either of the component semaphores is signaled, also signal the count-down latch. To wait on the first of either, wait on the latch. The limitations of this solution are:
The code that signals the component semaphores has to know about the latch rules. That’s not a general solution.
What happens when something wants to wait directly on a component semaphore? The relationship between the component semaphores and the latch is destroyed. So you either have to have code to deal with this (complex), or the latch solution is just used in situations where the only consumer of either semaphore is the consumer of both. Another problem is that it may be inefficient (CPU-wise) for Windows.
Offered Solution
The solution that I offer has a story that goes like this …
Take the synchro classes that you want to use (TSemaphore, TEvent etc), and wrap them so that you take control of their WaitFor() and Signal() methods.
Imagine a new object, called a “Condition”. A condition is a syncro object which is a composite view of an arbitrary set of member synchro objects. This is the Composite Pattern applied to synchro objects. The condition is considered signaled according to some rule of your design, based on the signal status of the members. For example, it could be deemed signaled, when one or more members are signaled, but clear when all members are clear. Apply a rule, that you cant directly signal the condition, because that would be meaningless.
The condition is implemented by a private semaphore. When a member semaphore is signaled, the aforementioned condition semaphore is signaled (or not, if you have a different composition rule). When a member semaphore is successfully waited on, from code outside of the condition, the condition status needs to be updated atomically. When the condition is waited for, and unblocked, resource counts need to be decremented from the contributing signaler.
Construction, destruction and operation of the condition needs to happen transparently, from direct operations on the member semaphores. We achieve this by each condition keeping a record of its member semaphores and their contribution to the resource count; and also, each member semaphore keeping a record of the list of conditions that it is entangled in. Construction and destruction of conditions needs to safely and correctly update entanglement lists.
A single critical section (Gate), is shared by all synchro objects and conditions that might be entangled together. The gate must be passed in as a construction parameter.
When the operating system call WaitForMultipleObjects() is available, it is used instead of the private condition semaphore. This call will be available if and only if:
- the operating system is win32/64; and
- all member objects are descendants of THandleObject (and thus have a windows “handle”); and
- the client specifies so via a construction parameter.
The Full Source Code
Listing 1 below shows a solution for the aforementioned subset of the task. To implement other conditions, other than “unblock on first chronological member”, override the TSyncroCondition methods marked as virtual.
unit SBD.TL.SyncObjs2; interface uses System.SyncObjs, Generics.Collections, SysUtils; const Forever = System.INFINITE; TimeOut = cardinal( $FFFFFFFF); WaitForError = 0; type TSynchoConditionList = class; ISynchroCondition = interface; TSBDSemaphore = class private FEntangledConditions: TSynchoConditionList; FGate: TCriticalSection; FCount: cardinal; FMax: cardinal; FBase: TSemaphore; function BaseWaitFor( TimeLimit: cardinal): TWaitResult; procedure BaseSignal; procedure WaitedVia_External; public constructor Create( Gate: TCriticalSection; InitialCount, MaxCount: cardinal); destructor Destroy; override; function WaitFor( TimeLimit: cardinal): TWaitResult; function Signal: boolean; function Count: cardinal; function AsConditions( AConstrainToHandleSyncros: boolean): ISynchroCondition; end; ISynchroCondition = interface ['{CFD0DD74-6EB4-4CCF-9EE1-8BBAC759151A}'] function WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult; function Join( const Addend: ISynchroCondition): ISynchroCondition; end; IInterfaceHelper = interface ['{40D9B899-AF13-4FC1-AF04-23E8416E1FEE}'] function AsObject: TObject; end; TSyncroCondition = class( TInterfacedObject, ISynchroCondition, IInterfaceHelper) protected // Override these methods to implement conditions other that "first chronological". function ConditionIsSignaled: boolean; virtual; function ConditionWillBeSignaledAfterSignal( Sem: TSBDSemaphore):boolean; virtual; function ConditionWillBeSignaledAfterWait ( Sem: TSBDSemaphore):boolean; virtual; function WaitCanUseOS_API: boolean; virtual; function OS_API_WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult; virtual; private FisSignalled: boolean; FWillBeSignaled: boolean; FSignals: TDictionary<TSBDSemaphore,cardinal>; FGate: TCriticalSection; FisBroken: boolean; FBase: TSemaphore; FInited: boolean; FConstrainedToAllHandleSyncros: boolean; constructor CreateWithOne( Origin: TSBDSemaphore; AConstrainToHandleSyncros: boolean); destructor Destroy; override; procedure Presignal ( Sem: TSBDSemaphore); procedure Postsignal( Sem: TSBDSemaphore); procedure PostWait ( Sem: TSBDSemaphore); procedure BaseSignal; procedure BaseWaitForever; function BaseWaitFor( TimeLimit: cardinal): TWaitResult; function WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult; function FindASignallingContributor( var Sem: TSBDSemaphore): boolean; procedure NotifyMemberDestroyed( Member: TSBDSemaphore); function AsObject: TObject; function Join( const Addend: ISynchroCondition): ISynchroCondition; procedure CheckInit; end; TSynchoConditionList = class( TList<TSyncroCondition>) public end; implementation function TSBDSemaphore.Count: cardinal; begin result := FCount end; constructor TSBDSemaphore.Create( Gate: TCriticalSection; InitialCount, MaxCount: cardinal); begin Assert( MaxCount >= 1); Assert( InitialCount <= MaxCount); FGate := Gate; FBase := TSemaphore.Create( nil, InitialCount, MaxCount, '', False); FEntangledConditions := TSynchoConditionList.Create; FCount := InitialCount; FMax := MaxCount end; destructor TSBDSemaphore.Destroy; var Cnd: TSyncroCondition; begin FGate.Enter; try for Cnd in FEntangledConditions do Cnd.NotifyMemberDestroyed( self); FBase.Free finally FGate.Leave end; inherited end; function TSBDSemaphore.Signal: boolean; var Cnd: TSyncroCondition; Saturated: boolean; begin FGate.Enter; try Saturated := FCount >= FMax; if not Saturated then begin Inc( FCount); for Cnd in FEntangledConditions do Cnd.Presignal( self) end; BaseSignal; if not Saturated then for Cnd in FEntangledConditions do Cnd.Postsignal( self); finally FGate.Leave end; end; procedure TSyncroCondition.Presignal( Sem: TSBDSemaphore); begin Assert( FGate = Sem.FGate); CheckInit; FWillBeSignaled := ConditionWillBeSignaledAfterSignal( Sem); FSignals[ Sem] := FSignals[ Sem] + 1; if (not FisSignalled) and FWillBeSignaled then BaseSignal; end; procedure TSyncroCondition.BaseWaitForever; begin BaseWaitFor( Forever) end; constructor TSyncroCondition.CreateWithOne( Origin: TSBDSemaphore; AConstrainToHandleSyncros: boolean); begin FisBroken := False; FGate := Origin.FGate; FConstrainedToAllHandleSyncros := AConstrainToHandleSyncros; {$IFDEF MSWINDOWS} if FConstrainedToAllHandleSyncros then Assert( Origin.FBase is THandleObject); {$ENDIF MSWINDOWS} FInited := False; FSignals := TDictionary<TSBDSemaphore,cardinal>.Create; FSignals.Add( Origin, Origin.FCount); FisSignalled := False; FWillBeSignaled := False; FBase := nil end; procedure TSyncroCondition.CheckInit; var InitialCount: cardinal; begin if FInited then exit; FInited := True; FisSignalled := ConditionIsSignaled; FWillBeSignaled := FisSignalled; if FisSignalled then InitialCount := 0 else InitialCount := 1; if not WaitCanUseOS_API then FBase := TSemaphore.Create( nil, InitialCount, 1, '', False) end; destructor TSyncroCondition.Destroy; var Member: TSBDSemaphore; begin FGate.Enter; try FisBroken := True; for Member in FSignals.Keys do Member.FEntangledConditions.Remove( self); FSignals.Free; FBase.Free finally FGate.Leave end; inherited end; procedure TSyncroCondition.NotifyMemberDestroyed( Member: TSBDSemaphore); begin if FGate <> Member.FGate then FisBroken := True; if FisBroken then exit; FGate.Enter; try if FSignals.ContainsKey( Member) then begin FisBroken := True; FSignals.Remove( Member) end finally FGate.Leave end end; function TSyncroCondition.OS_API_WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult; {$IFDEF MSWINDOWS} var HandleObjs: THandleObjectArray; SignaledObj: THandleObject; Member: TSBDSemaphore; i: integer; {$ENDIF MSWINDOWS} begin {$IFDEF MSWINDOWS} SetLength( HandleObjs, FSignals.Count); i := -1; for Member in FSignals.Keys do begin Inc( i); HandleObjs[ i] := Member.FBase as THandleObject end; result := THandleObject.WaitForMultiple( HandleObjs, TimeLimit, False, SignaledObj, False, 0); if result = wrSignaled then begin i := -1; for Member in FSignals.Keys do begin Inc( i); if SignaledObj <> Member.FBase then continue; Member.WaitedVia_External; break end end {$ELSE} result := wrError {$ENDIF MSWINDOWS} end; function TSyncroCondition.WaitCanUseOS_API: boolean; begin {$IFDEF MSWINDOWS} result := FConstrainedToAllHandleSyncros; {$ELSE} result := False {$ENDIF MSWINDOWS} end; function TSyncroCondition.WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult; var TimeLeft: cardinal; AfterWaitClock, Elapsed: TDateTime; Confirmed: boolean; doRetry: boolean; Count: cardinal; begin if FisBroken then begin result := wrAbandoned; exit end; CheckInit; if WaitCanUseOS_API then result := OS_API_WaitFor( TimeLimit, Contributor) else begin TimeLeft := TimeLimit; repeat result := BaseWaitFor( TimeLeft); if result <> wrSignaled then break; if TimeLeft > 0 then AfterWaitClock := Now; FGate.Enter; try Confirmed := FindASignallingContributor( Contributor); if Confirmed then begin result := Contributor.BaseWaitFor( 0); Confirmed := result = wrSignaled end; doRetry := result = wrTimeOut; if doRetry and (TimeLeft > 0) then begin Elapsed := Trunc( (Now - AfterWaitClock) * MSecsPerDay); if TimeLeft > Elapsed then Dec( TimeLeft, Elapsed) else TimeLeft := 0 end; if Confirmed then begin Count := FSignals[ Contributor]; if Count > 0 then FSignals[ Contributor] := Count - 1 end; if ConditionIsSignaled then BaseSignal; finally FGate.Leave end; if doRetry then Sleep(1) until not doRetry end end; procedure TSyncroCondition.Postsignal( Sem: TSBDSemaphore); begin CheckInit; if FisSignalled and (not FWillBeSignaled) then BaseWaitForever end; procedure TSBDSemaphore.WaitedVia_External; var Cnd: TSyncroCondition; Saturated: boolean; begin FGate.Enter; try Saturated := FCount = 0; if not Saturated then begin Dec( FCount); for Cnd in FEntangledConditions do Cnd.PostWait( self) end finally FGate.Leave end; end; function TSBDSemaphore.WaitFor( TimeLimit: cardinal): TWaitResult; begin result := BaseWaitFor( TimeLimit); if result = wrSignaled then WaitedVia_External end; procedure TSyncroCondition.PostWait( Sem: TSBDSemaphore); var WR: TWaitResult; Saturated: boolean; Count: cardinal; begin Assert( FGate = Sem.FGate); Assert( not FisBroken); CheckInit; FWillBeSignaled := ConditionWillBeSignaledAfterWait( Sem); Count := FSignals[ Sem]; if Count > 0 then FSignals[ Sem] := Count - 1; if FisSignalled <> FWillBeSignaled then begin if FWillBeSignaled then BaseSignal else BaseWaitForever end end; function TSBDSemaphore.AsConditions( AConstrainToHandleSyncros: boolean): ISynchroCondition; begin result := TSyncroCondition.Create( self, AConstrainToHandleSyncros); end; procedure TSBDSemaphore.BaseSignal; begin FBase.Release end; function TSBDSemaphore.BaseWaitFor( TimeLimit: cardinal): TWaitResult; begin result := FBase.WaitFor( TimeLimit) end; function TSyncroCondition.FindASignallingContributor( var Sem: TSBDSemaphore): boolean; var Pair: TPair<TSBDSemaphore,cardinal>; begin result := not FisBroken; if not result then exit; result := False; for Pair in FSignals do begin result := Pair.Value > 0; if not result then continue; Sem := Pair.Key; break end end; function TSyncroCondition.AsObject: TObject; begin result := self end; function TSyncroCondition.Join( const Addend: ISynchroCondition): ISynchroCondition; var Composite, Friend: TSyncroCondition; Pair: TPair<TSBDSemaphore,cardinal>; InitialCount: cardinal; begin FGate.Enter; try Composite := TSyncroCondition.Create; result := Composite; Friend := (Addend as IInterfaceHelper).AsObject as TSyncroCondition; Assert( FGate = Friend.FGate); Composite.FisBroken := FisBroken or Friend.FisBroken; Composite.FGate := FGate; FSignals := TDictionary<TSBDSemaphore,cardinal>.Create; for Pair in FSignals do Composite.FSignals.Add( Pair.Key, Pair.Value); for Pair in Friend.FSignals do begin if Composite.FSignals.ContainsKey( Pair.Key) then Composite.FSignals[ Pair.Key] := Composite.FSignals[ Pair.Key] + Friend.FSignals[ Pair.Key] else Composite.FSignals.Add( Pair.Key, Pair.Value) end; for Pair in Composite.FSignals do if Pair.Key.FEntangledConditions.IndexOf( self) = -1 then Pair.Key.FEntangledConditions.Add( self); Composite.FisSignalled := ConditionIsSignaled; Composite.FWillBeSignaled := FisSignalled; Composite.FInited := False; Composite.FBase := nil; {$IFDEF MSWINDOWS} Assert( FConstrainedToAllHandleSyncros = Friend.FConstrainedToAllHandleSyncros); {$ENDIF MSWINDOWS} Composite.FConstrainedToAllHandleSyncros := FConstrainedToAllHandleSyncros; finally FGate.Leave end; end; procedure TSyncroCondition.BaseSignal; begin if assigned( FBase) then FBase.Release; FisSignalled := True end; function TSyncroCondition.BaseWaitFor( TimeLimit: cardinal): TWaitResult; begin if assigned( FBase) then result := FBase.WaitFor( TimeLimit) else result := wrError; FisSignalled := False end; function TSyncroCondition.ConditionIsSignaled: boolean; var Pair: TPair<TSBDSemaphore,cardinal>; begin result := False; for Pair in FSignals do begin result := Pair.Value > 0; if not result then continue; break end end; function TSyncroCondition.ConditionWillBeSignaledAfterSignal( Sem: TSBDSemaphore): boolean; begin result := True end; function TSyncroCondition.ConditionWillBeSignaledAfterWait( Sem: TSBDSemaphore): boolean; var Pair: TPair<TSBDSemaphore,cardinal>; Count, Sum: cardinal; begin result := False; Sum := 0; for Pair in FSignals do begin Count := Pair.Value; if (Pair.Key = Sem) and (Count > 0) then Dec( Count); Inc( Sum, Count); result := Sum > 0; if not result then continue; break end end; end.