How to wait on multiple events (Delphi)

Abstract

How can one write code to block (wait) on multiple syncro events at once, in a multi-threaded application? We are of course, talking about Delphi. By “multiple synchro events at once”, I mean that the thread unblocks on some logical combination of the statuses of the member events. Here I use the word “event” in a very general sense, not to be confused with the more specific sense of the word event, as in Embarcadero’s “TEvent”. In the general sense, by “event” or “synchro event”, I mean the broad class of synchro objects like semaphore (TSemaphore) and event (TEvent).

The typical and most common logical combination is the “OR” operation. In other words, how can we block on multiple synchro events and be unblocked when the first event is signaled, or a time-out occurs. When we are finally unblocked, how can we know which event was the signaled one?

The Task

Write some re-usable library code, in Delphi XE7+, to block on multiple synchro events at once, with time-out. The code must be:

  • cross-platform,
  • Bullet proof and
  • Have a really simple API.

The wait function should allow for optional time-out, detect which entity caused the signal, and should leverage operating system capabilities for optimal efficiency.

In this post, I will focus a particular subset of this task, where:

  • all the synchro events are semaphores (eg. TSemaphore, but not TEvent)
  • access to the semaphores is restricted to this library. In other words, they are unnamed.
  • The wait condition is simply just to wait on the chronologically first semaphore to be signaled, or a time-out to occur, whichever first.

However, I will also address, in less detail, the more general task.

By Why?

One of the big bug-bears in multi-threaded programming is that if a thread is blocked on some semaphore for normal operational purposes, then it can’t check if it is time to gracefully shut-down until it is unblocked. Unblocking driven by the semaphore being signaled. But if it is time to gracefully shutdown, then it is quiet likely that the semaphore, in which it is blocked on will never again be signaled, precisely because it is time to shut-down. For this reason, a lot of multi-threaded applications (whether written in Delphi or not) have problems shutting down gracefully. I call this the “block-and-check-terminate” problem. Some readers may be quick to respond, that this is a non-issue. With “proper” design, your programs will never have a “block-and-check-terminate” problem. While this may well be true, in my experience, I have found that in real applications, designing to avoid “block-and-check-terminate” problems, without a generic soltion, is complex and tedious.

What is needed, is a generic cross-platform solution.

What have others done

Jedi

The Jedi Component Library, in it’s multi-thread component code, provided a wrapper for thread and semaphore. It associated a special semaphore with each thread. The thread semaphore would start life as unsignaled. A call to terminate the thread would cause the special semaphore to be signaled. Whenever the thread would wait on some regular semaphore, the wrap would be defined in such a way that it would call the windows WaitForMultipleObjects() function on two semaphores: the explicit semaphore, and the thread’s special semaphore. If the thread was terminated while it was blocked on some operational semaphore, the thread would unblock, detect the condition and properly handle it.

It was a elegant solution to the most common subset of our task. The problem is though, that it only works for Windows. Android, iOS and OSX have no o/s API equivalent to WaitForMultipleObjects(). I don’t know why. Maybe mobile applications just have less need for within-app parallelism? If you search StackOverflow, there are more than a few questions asking how to wait on multiple semaphores, on Android or iOS, and there are no real solutions given. This is just my opinion. If you disagree, please post a comment below.

Just Kill threads

I’ve seen this done. When the program needs to shut-down, it just kills its non-main threads with an explicit kill command. Any self-respecting developer will be very uncomfortable with the solution, and it’s degree of safety is very situational.

Design Around

Design around the issue, so that no thread ever has to wait on multiple conditions. Good luck with that.

Design Above

Use a threading library, like OTL, to provide sufficient higher level structures to support parallel tasking, that there is no design need for lower level structures such as semaphores and events. You could do this, but it is a bit limiting. I think even with OTL, you will still run into requirements where you need to wait on multiple semaphores.

Count-down Latch

This SO post, suggests using a TCountdownEvent object (SyncObjs unit). When either of the component semaphores is signaled, also signal the count-down latch. To wait on the first of either, wait on the latch. The limitations of this solution are:
The code that signals the component semaphores has to know about the latch rules. That’s not a general solution.
What happens when something wants to wait directly on a component semaphore? The relationship between the component semaphores and the latch is destroyed. So you either have to have code to deal with this (complex), or the latch solution is just used in situations where the only consumer of either semaphore is the consumer of both. Another problem is that it may be inefficient (CPU-wise) for Windows.

Offered Solution

The solution that I offer has a story that goes like this …

Take the synchro classes that you want to use (TSemaphore, TEvent etc), and wrap them so that you take control of their WaitFor() and Signal() methods.

Imagine a new object, called a “Condition”. A condition is a syncro object which is a composite view of an arbitrary set of member synchro objects. This is the Composite Pattern applied to synchro objects. The condition is considered signaled according to some rule of your design, based on the signal status of the members. For example, it could be deemed signaled, when one or more members are signaled, but clear when all members are clear. Apply a rule, that you cant directly signal the condition, because that would be meaningless.

The condition is implemented by a private semaphore. When a member semaphore is signaled, the aforementioned condition semaphore is signaled (or not, if you have a different composition rule). When a member semaphore is successfully waited on, from code outside of the condition, the condition status needs to be updated atomically. When the condition is waited for, and unblocked, resource counts need to be decremented from the contributing signaler.

Construction, destruction and operation of the condition needs to happen transparently, from direct operations on the member semaphores. We achieve this by each condition keeping a record of its member semaphores and their contribution to the resource count; and also, each member semaphore keeping a record of the list of conditions that it is entangled in. Construction and destruction of conditions needs to safely and correctly update entanglement lists.

A single critical section (Gate), is shared by all synchro objects and conditions that might be entangled together. The gate must be passed in as a construction parameter.

When the operating system call WaitForMultipleObjects() is available, it is used instead of the private condition semaphore. This call will be available if and only if:

  1. the operating system is win32/64; and
  2. all member objects are descendants of THandleObject (and thus have a windows “handle”); and
  3. the client specifies so via a construction parameter.

The Full Source Code

Listing 1 below shows a solution for the aforementioned subset of the task. To implement other conditions, other than “unblock on first chronological member”, override the TSyncroCondition methods marked as virtual.

unit SBD.TL.SyncObjs2;
interface
uses System.SyncObjs, Generics.Collections, SysUtils;

const
  Forever = System.INFINITE;
  TimeOut = cardinal( $FFFFFFFF);
  WaitForError = 0;

type

  TSynchoConditionList = class;

  ISynchroCondition = interface;
  TSBDSemaphore = class
    private
      FEntangledConditions: TSynchoConditionList;
      FGate: TCriticalSection;
      FCount: cardinal;
      FMax: cardinal;
      FBase: TSemaphore;

      function  BaseWaitFor( TimeLimit: cardinal): TWaitResult;
      procedure BaseSignal;
      procedure WaitedVia_External;

    public
      constructor Create( Gate: TCriticalSection; InitialCount, MaxCount: cardinal);
      destructor Destroy; override;
      function  WaitFor( TimeLimit: cardinal): TWaitResult;
      function  Signal: boolean;
      function  Count: cardinal;
      function  AsConditions( AConstrainToHandleSyncros: boolean): ISynchroCondition;
    end;

  ISynchroCondition = interface
    ['{CFD0DD74-6EB4-4CCF-9EE1-8BBAC759151A}']
      function WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult;
      function Join( const Addend: ISynchroCondition): ISynchroCondition;
    end;

  IInterfaceHelper = interface
    ['{40D9B899-AF13-4FC1-AF04-23E8416E1FEE}']
      function AsObject: TObject;
  end;

  TSyncroCondition = class( TInterfacedObject, ISynchroCondition, IInterfaceHelper)
    protected
      // Override these methods to implement conditions other that "first chronological".
      function  ConditionIsSignaled: boolean;                                    virtual;
      function  ConditionWillBeSignaledAfterSignal( Sem: TSBDSemaphore):boolean; virtual;
      function  ConditionWillBeSignaledAfterWait  ( Sem: TSBDSemaphore):boolean; virtual;
      function  WaitCanUseOS_API: boolean;                                       virtual;
      function  OS_API_WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult; virtual;

    private
      FisSignalled: boolean;
      FWillBeSignaled: boolean;
      FSignals: TDictionary<TSBDSemaphore,cardinal>;
      FGate: TCriticalSection;
      FisBroken: boolean;
      FBase: TSemaphore;
      FInited: boolean;
      FConstrainedToAllHandleSyncros: boolean;

      constructor CreateWithOne( Origin: TSBDSemaphore; AConstrainToHandleSyncros: boolean);
      destructor Destroy; override;

      procedure Presignal ( Sem: TSBDSemaphore);
      procedure Postsignal( Sem: TSBDSemaphore);
      procedure PostWait  ( Sem: TSBDSemaphore);
      procedure BaseSignal;
      procedure BaseWaitForever;
      function  BaseWaitFor( TimeLimit: cardinal): TWaitResult;
      function  WaitFor( TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult;
      function  FindASignallingContributor( var Sem: TSBDSemaphore): boolean;
      procedure NotifyMemberDestroyed( Member: TSBDSemaphore);
      function  AsObject: TObject;
      function  Join( const Addend: ISynchroCondition): ISynchroCondition;
      procedure CheckInit;
    end;

  TSynchoConditionList = class( TList<TSyncroCondition>)
    public
    end;

implementation




function TSBDSemaphore.Count: cardinal;
begin
result := FCount
end;

constructor TSBDSemaphore.Create( Gate: TCriticalSection; InitialCount, MaxCount: cardinal);
begin
Assert( MaxCount >= 1);
Assert( InitialCount <= MaxCount);
FGate := Gate;
FBase := TSemaphore.Create( nil, InitialCount, MaxCount, '', False);
FEntangledConditions := TSynchoConditionList.Create;
FCount := InitialCount;
FMax   := MaxCount
end;

destructor TSBDSemaphore.Destroy;
var
  Cnd: TSyncroCondition;
begin
FGate.Enter;
try
  for Cnd in FEntangledConditions do
    Cnd.NotifyMemberDestroyed( self);
  FBase.Free
finally
  FGate.Leave
  end;
inherited
end;

function TSBDSemaphore.Signal: boolean;
var
  Cnd: TSyncroCondition;
  Saturated: boolean;
begin
FGate.Enter;
try
  Saturated := FCount >= FMax;
  if not Saturated then
    begin
    Inc( FCount);
    for Cnd in FEntangledConditions do
      Cnd.Presignal( self)
    end;
  BaseSignal;
  if not Saturated then
    for Cnd in FEntangledConditions do
      Cnd.Postsignal( self);
finally
  FGate.Leave
  end;
end;

procedure TSyncroCondition.Presignal( Sem: TSBDSemaphore);
begin
Assert( FGate = Sem.FGate);
CheckInit;
FWillBeSignaled := ConditionWillBeSignaledAfterSignal( Sem);
FSignals[ Sem] := FSignals[ Sem] + 1;
if (not FisSignalled) and FWillBeSignaled then
  BaseSignal;
end;

procedure TSyncroCondition.BaseWaitForever;
begin
BaseWaitFor( Forever)
end;

constructor TSyncroCondition.CreateWithOne( Origin: TSBDSemaphore; AConstrainToHandleSyncros: boolean);
begin
FisBroken := False;
FGate     := Origin.FGate;
FConstrainedToAllHandleSyncros := AConstrainToHandleSyncros;
{$IFDEF MSWINDOWS}
if FConstrainedToAllHandleSyncros then
  Assert( Origin.FBase is THandleObject);
{$ENDIF MSWINDOWS}
FInited   := False;
FSignals  := TDictionary<TSBDSemaphore,cardinal>.Create;
FSignals.Add( Origin, Origin.FCount);
FisSignalled    := False;
FWillBeSignaled := False;
FBase := nil
end;

procedure TSyncroCondition.CheckInit;
var
  InitialCount: cardinal;
begin
if FInited then exit;
FInited := True;
FisSignalled    := ConditionIsSignaled;
FWillBeSignaled := FisSignalled;
if FisSignalled then
    InitialCount := 0
  else
    InitialCount := 1;
if not WaitCanUseOS_API then
  FBase := TSemaphore.Create( nil, InitialCount, 1, '', False)
end;


destructor TSyncroCondition.Destroy;
var
  Member: TSBDSemaphore;
begin
FGate.Enter;
try
  FisBroken := True;
  for Member in FSignals.Keys do
    Member.FEntangledConditions.Remove( self);
  FSignals.Free;
  FBase.Free
finally
  FGate.Leave
  end;
inherited
end;

procedure TSyncroCondition.NotifyMemberDestroyed( Member: TSBDSemaphore);
begin
if FGate <> Member.FGate then
  FisBroken := True;
if FisBroken then exit;
FGate.Enter;
try
  if FSignals.ContainsKey( Member) then
    begin
    FisBroken := True;
    FSignals.Remove( Member)
    end
finally
  FGate.Leave
  end
end;

function TSyncroCondition.OS_API_WaitFor(
  TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult;
{$IFDEF MSWINDOWS}
var
  HandleObjs: THandleObjectArray;
  SignaledObj: THandleObject;
  Member: TSBDSemaphore;
  i: integer;
{$ENDIF MSWINDOWS}
begin
{$IFDEF MSWINDOWS}
  SetLength( HandleObjs, FSignals.Count);
  i := -1;
  for Member in FSignals.Keys do
    begin
    Inc( i);
    HandleObjs[ i] := Member.FBase as THandleObject
    end;
  result := THandleObject.WaitForMultiple( HandleObjs, TimeLimit, False, SignaledObj, False, 0);
  if result = wrSignaled then
    begin
    i := -1;
    for Member in FSignals.Keys do
      begin
      Inc( i);
      if SignaledObj <> Member.FBase then continue;
      Member.WaitedVia_External;
      break
      end
    end
{$ELSE}
  result := wrError
{$ENDIF MSWINDOWS}
end;

function TSyncroCondition.WaitCanUseOS_API: boolean;
begin
{$IFDEF MSWINDOWS}
result := FConstrainedToAllHandleSyncros;
{$ELSE}
result := False
{$ENDIF MSWINDOWS}
end;

function TSyncroCondition.WaitFor(
  TimeLimit: cardinal; var Contributor: TSBDSemaphore): TWaitResult;
var
  TimeLeft: cardinal;
  AfterWaitClock, Elapsed: TDateTime;
  Confirmed: boolean;
  doRetry: boolean;
  Count: cardinal;
begin
if FisBroken then
  begin
  result := wrAbandoned;
  exit
  end;
CheckInit;
if WaitCanUseOS_API then
    result := OS_API_WaitFor( TimeLimit, Contributor)
  else
    begin
    TimeLeft := TimeLimit;
    repeat
      result := BaseWaitFor( TimeLeft);
      if result <> wrSignaled then break;
      if TimeLeft > 0 then
        AfterWaitClock := Now;
      FGate.Enter;
      try
        Confirmed := FindASignallingContributor( Contributor);
        if Confirmed then
          begin
          result    := Contributor.BaseWaitFor( 0);
          Confirmed := result = wrSignaled
          end;
        doRetry := result = wrTimeOut;
        if doRetry and (TimeLeft > 0) then
          begin
          Elapsed := Trunc( (Now - AfterWaitClock) * MSecsPerDay);
          if TimeLeft > Elapsed then
              Dec( TimeLeft, Elapsed)
            else
              TimeLeft := 0
          end;
        if Confirmed then
          begin
          Count := FSignals[ Contributor];
          if Count > 0 then
            FSignals[ Contributor] := Count - 1
          end;
        if ConditionIsSignaled then
          BaseSignal;
      finally
        FGate.Leave
        end;
      if doRetry then
        Sleep(1)
    until not doRetry
    end
end;

procedure TSyncroCondition.Postsignal( Sem: TSBDSemaphore);
begin
CheckInit;
if FisSignalled and (not FWillBeSignaled) then
  BaseWaitForever
end;

procedure TSBDSemaphore.WaitedVia_External;
var
  Cnd: TSyncroCondition;
  Saturated: boolean;
begin
FGate.Enter;
try
  Saturated := FCount = 0;
  if not Saturated then
    begin
    Dec( FCount);
    for Cnd in FEntangledConditions do
      Cnd.PostWait( self)
    end
finally
  FGate.Leave
  end;
end;

function TSBDSemaphore.WaitFor( TimeLimit: cardinal): TWaitResult;
begin
result := BaseWaitFor( TimeLimit);
if result = wrSignaled then
  WaitedVia_External
end;


procedure TSyncroCondition.PostWait( Sem: TSBDSemaphore);
var
  WR: TWaitResult;
  Saturated: boolean;
  Count: cardinal;
begin
Assert( FGate = Sem.FGate);
Assert( not FisBroken);
CheckInit;
FWillBeSignaled := ConditionWillBeSignaledAfterWait( Sem);
Count           := FSignals[ Sem];
if Count > 0 then
  FSignals[ Sem] := Count - 1;
if FisSignalled <> FWillBeSignaled then
  begin
  if FWillBeSignaled then
      BaseSignal
    else
      BaseWaitForever
  end
end;


function TSBDSemaphore.AsConditions( AConstrainToHandleSyncros: boolean): ISynchroCondition;
begin
result := TSyncroCondition.Create( self, AConstrainToHandleSyncros);
end;

procedure TSBDSemaphore.BaseSignal;
begin
FBase.Release
end;

function TSBDSemaphore.BaseWaitFor( TimeLimit: cardinal): TWaitResult;
begin
result := FBase.WaitFor( TimeLimit)
end;


function TSyncroCondition.FindASignallingContributor(
  var Sem: TSBDSemaphore): boolean;
var
  Pair: TPair<TSBDSemaphore,cardinal>;
begin
result := not FisBroken;
if not result then exit;
result := False;
for Pair in FSignals do
  begin
  result := Pair.Value > 0;
  if not result then continue;
  Sem := Pair.Key;
  break
  end
end;

function TSyncroCondition.AsObject: TObject;
begin
result := self
end;

function TSyncroCondition.Join(
  const Addend: ISynchroCondition): ISynchroCondition;
var
  Composite, Friend: TSyncroCondition;
  Pair: TPair<TSBDSemaphore,cardinal>;
  InitialCount: cardinal;
begin
FGate.Enter;
try
  Composite := TSyncroCondition.Create;
  result    := Composite;
  Friend    := (Addend as IInterfaceHelper).AsObject as TSyncroCondition;
  Assert( FGate = Friend.FGate);
  Composite.FisBroken := FisBroken or Friend.FisBroken;
  Composite.FGate     := FGate;
  FSignals            := TDictionary<TSBDSemaphore,cardinal>.Create;
  for Pair in FSignals do
    Composite.FSignals.Add( Pair.Key, Pair.Value);
  for Pair in Friend.FSignals do
    begin
    if Composite.FSignals.ContainsKey( Pair.Key) then
        Composite.FSignals[ Pair.Key] := Composite.FSignals[ Pair.Key] + Friend.FSignals[ Pair.Key]
      else
        Composite.FSignals.Add( Pair.Key, Pair.Value)
    end;
  for Pair in  Composite.FSignals do
    if Pair.Key.FEntangledConditions.IndexOf( self) = -1 then
      Pair.Key.FEntangledConditions.Add( self);
  Composite.FisSignalled    := ConditionIsSignaled;
  Composite.FWillBeSignaled := FisSignalled;
  Composite.FInited         := False;
  Composite.FBase           := nil;
  {$IFDEF MSWINDOWS}
  Assert( FConstrainedToAllHandleSyncros = Friend.FConstrainedToAllHandleSyncros);
  {$ENDIF MSWINDOWS}
  Composite.FConstrainedToAllHandleSyncros := FConstrainedToAllHandleSyncros;
finally
  FGate.Leave
  end;
end;

procedure TSyncroCondition.BaseSignal;
begin
if assigned( FBase) then
  FBase.Release;
FisSignalled := True
end;

function TSyncroCondition.BaseWaitFor( TimeLimit: cardinal): TWaitResult;
begin
if assigned( FBase) then
    result := FBase.WaitFor( TimeLimit)
  else
    result := wrError;
FisSignalled := False
end;


function TSyncroCondition.ConditionIsSignaled: boolean;
var
  Pair: TPair<TSBDSemaphore,cardinal>;
begin
result := False;
for Pair in FSignals do
  begin
  result := Pair.Value > 0;
  if not result then continue;
  break
  end
end;

function TSyncroCondition.ConditionWillBeSignaledAfterSignal(
  Sem: TSBDSemaphore): boolean;
begin
result := True
end;

function TSyncroCondition.ConditionWillBeSignaledAfterWait(
  Sem: TSBDSemaphore): boolean;
var
  Pair: TPair<TSBDSemaphore,cardinal>;
  Count, Sum: cardinal;
begin
result := False;
Sum    := 0;
for Pair in FSignals do
  begin
  Count := Pair.Value;
  if (Pair.Key = Sem) and (Count > 0) then
    Dec( Count);
  Inc( Sum, Count);
  result := Sum > 0;
  if not result then continue;
  break
  end
end;


end.


This entry was posted in Delphi. Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *

Comments Protected by WP-SpamShield Spam Plugin