Fully managed pool of worker threads
The Worker Threadpool in ShellBrowser Delphi
Our Worker Threadpool provides a fully managed pool of worker threads to handle tasks or work items asynchronously. By maintaining a pool of threads instead of starting a thread for each task:
- Performance is increased.
- Latency is avoided in execution due to frequent creation and destruction of threads for short-lived tasks.
- No manual thread management but concentration directed towards the actual tasks.
The number of available threads is tuned automatically to the computing resources available to the program.
We’ve used the Threadpool internally for years under the hood, but with some refactoring that has recently taken place, we thought that it might be interesting outside ShellBrowser, too.
So, with the following text, we hope to give you enough insight and examples to give it a try with our ShellBrowser Delphi Components.
In ShellBrowser, to use our Worker Threadpool or run the code samples below, simply add the unit Jam.Threading
to the uses clause.
Basics explained
Using ShellBrowser there is always one global Threadpool instance, that is accessible using TWorkerPool.GlobalInstance
.
However, you can also create new Threadpool instances.
Adding work items to any kind of Threadpool is accomplished with a call to
Using the Priority argument, you can determine if the work item should be queued at the beginning (TPriority.Highest
) or end (Priority.Normal
) of the work items that are already waiting for being processed. Thus, this option also allows to implement a LIFO or FIFO approach in a Threadpool.
In the first argument, you pass an instance implementing the IWorkItem interface. You can implement your own work items but for ease of use there are some predefined WorkItem types. We’ll start out with these.
Making simple method calls and anonymous procedures async
The “TAsyncProcedure” is a simple WorkItem that can take two anonymous procedures: the first one is executed on one of the threads provided by the Threadpool. When the work item has finished, another procedure is called in the main thread.
This simple example illustrates how to send an email asynchronously using some TMail class:
procedure SendMail(pRecipient, pSubject, pBody: string);
var Mail: TMail;
begin
Mail := TMail.Create();
Mail.Recipient := pRecipient;
Mail.Subject := pSubject;
Mail.Body := pBody;
TWorkerPool.GlobalInstance.AddWorkItem(TAsyncProcedure.Create(
Mail.Send,
nil,
procedure
begin
UpdateUI();
Mail.Free();
end));
end;
The mail is sent in parallel via the Threadpool, and when finished the UI is updated in the main thread. Note, that there is no need to free the TAsyncProcedure instance that is created. Like all work items we’ll introduce, it is ref-counted and will be cleaned up automatically when there is no reference left.
The middle “ICancellationIndicator” parameter, that is passed <nil> in the example, will be explained a little later.
Using IFuture / TFuture
A future can be used to calculate a result value asynchronously while doing other work in the current thread. Accessing the result value of a future using its Value
property will block until the result was finally calculated.
In this example, we will check if a path is accessible. This usually takes only a few milliseconds, however, if the path is a network drive that is unreachable, the underlying Windows API call may block for 10-30 seconds. Doing this in the main thread can cause freezes in the user interfaces.
TPathExists = class(TFuture<Boolean>)
protected
fPath: string;
procedure DoWork(); override;
public
constructor Create(pPath: string);
end;
procedure TPathExists.Create(pPath: string);
begin
inherited;
fPath := pPath;
end;
procedure TPathExists.DoWork();
begin
fResult := FileExists(fPath) or DirectoryExists(fPath);
end;
And how to use this class:
procedure LoadFile(pPath: string);
var PathExists: IFuture<Boolean>;
begin
PathExists := TPathExists.Create(pPath):
TWorkerPool.GlobalInstance.AddWorkItem((IWorkItem)PathExists);
// Prepare something else that takes a while ...
if not PathExists.Value then
ShowMessage('The given path does not exist: ' + pPath);
// Load the file ...
end;
This time we have created a subclass of TFuture with all needed methods implemented.
Here is a similar example using an anonymous function again.
procedure LoadFile(pPath: string);
var lFuture: IFuture<Boolean>;
lValid: Boolean;
begin
lFuture := TFuture<Boolean>.Construct(
function: Boolean begin
Result := FileExists(pPath) or DirectoryExists(pPath);
end, nil);
TWorkerPool.GlobalInstance.AddWorkItem(lFuture as IWorkItem);
if not (lFuture.TryGetValue(1000, lValid) = TTaskStatus.Completed) then
lValid := false;
if not lValid then
ShowMessage('The given path does not exist or could not be determined in time: ' + pPath);
end;
Use the “TFuture<TResult>.Construct” method for anonymous implementations. Also, instead of potentially blocking the GUI Thread using the IFuture.GetValue method, a timeout of 1s is given via the TryGetValue method. This can be helpful if the program can safely continue with a default value. The function returns a TTaskStatus as indicator if it has completed correctly.
TFuture and TAsyncProcedure have a shortcut, that automatically uses the shared global Threadpool. So, instead of the above, you may also write:
TFuture<Boolean>.CreateAndQueue(
function: Boolean
begin
Result := FileExists(pPath) or DirectoryExists(pPath);;
end, nil);
How to implement an IWorkItem
All tasks or work-items that are added to the Worker Threadpool need to implement the IWorkItem interface. We’ve already seen a custom implementation of TFuture in the preceding example.
If neither the TAsyncProcedure
nor a TFuture
fit your needs, you can implement your own work item class. The easiest way is to derive from our TBasicWorkItem
class. The main difference to a Future is that a work-item can fire a callback event when its work is finished.
We will now implement the previous task as IWorkItem that calls a callback at the end.
TPathExists = class(TBasicWorkItem)
protected
fPath: string;
procedure DoWork(); override;
public
FileDoesExist: Boolean;
constructor Create(pPath: string; pCallBack: TWorkDoneEvent);
end;
procedure TPathExists.Create(pPath: string; pCallBack: TWorkDoneEvent);
begin
inherited;
fPath := pPath;
OnWorkDone := pCallBack;
end;
procedure TPathExists.DoWork();
begin
FileDoesExist:= FileExists(fPath) or DirectoryExists(fPath);
end;
And how to use this class:
procedure TEditor.LoadFile(pPath: string);
begin
TWorkerPool.GlobalInstance.AddWorkItem(TPathExists.Create(pPath, LoadExistingFile));
end;
procedure TEditor.LoadExistingFile(pWorkItem: IWorkItem);
begin
if not (pWorkItem as TPathExists).FileDoesExist then
begin
ShowMessage('The file path does not exist: ' + (pWorkItem as TPathExists).fPath);
exit;
end;
// Load the file …
end;
The callback is automatically called synchronized in the UI Thread. This can be configured but is useful in most use cases.
Note, that for simple use cases like this one, there is no good reason to go for an implemented WorkItem instead of using the anonymous TAsyncProcedure
version or TFuture
explained before; however, there might be more complex use-cases, that can be encapsulated nicely using a subclassed WorkItem.
Cancelling work items
In the preceding examples, we have assumed that the process is still running when a work item has finished its asynchronous work, and also, that the result is still welcome when the asynchronous part is done.
This is of course not always true. CancellationIndicators or a CancellationToken can be used, to cancel tasks that are still in the queue or that are currently being processed. No methods or callbacks are executed on cancelled work items.
By supplying an ICancellationIndicator
the work of one or many work-items can be aborted in a proper way. There is only a single boolean function named IsCancellationRequested
to be implemented:
TPresenter = class(TObject, ICancellationIndicator)
fAborted: Boolean;
function IsCancellationRequested(): Boolean;
procedure Abort();
end;
function TPresenter.IsCancellationRequested(): Boolean;
begin
exit(fAborted);
end;
procedure TPresenter.Abort();
begin
fAborted := True;
end;
Alternatively, you may use our rich record type CancellationToken
. It has a Cancel()
method that signals cancellation and a Reset()
method which resets its state for re-using it.
TDataScanner = class(TObject)
fCancellationToken: CancellationToken;
destructor Destroy();
procedure Start();
end;
destructor TDataScanner.Destroy ();
begin
fCancellationToken.Cancel();
inherited;
end;
procedure TDataScanner.Start();
begin
fCancellationToken.Reset();
// Do some work and supply fCancellationToken where an ICancellationIndicator is needed
end;
Using an own Worker Threadpool
When to use an own Threadpool
Reasons for using an own Threadpool can be:
- Long lasting background work, that should be kept separate from other tasks. As an example, the FileList in ShellBrowser uses an own Threadpool for searching for files on large drives.
- Limit the number of threads used for a certain task, maybe because a used resource cannot handle more than 1 or 2 consumers or queries anyway. This pattern is applied for the thumbnail view of the ShellList since there were issues, where delays in Windows API calls could have otherwise exhausted the main Threadpool very quickly.
- You need to know, when all work items of a certain type have been processed. There is a “OnAllWorkItemsFinished” event, that is issued after processing; however, it can hardly be used, if the Threadpool is shared for unrelated workloads. This pattern is frequently applied in UltraSearch and TreeSize.
How to use an own Threadpool
customThreadPoolConfig := TWorkerPoolConfig.Create();
customThreadPoolConfig.MaxRunningThreads := 2;
customThreadPoolConfig.MinRunningThreads := 2;
customThreadPoolConfig.OnAllWorkItemsFinished :=
procedure(pWasCancelled: Boolean)
begin
OutputDebugString('Processed all workitems');
end;
fThreadPool := TWorkerPool.CreateInstance(customThreadPoolConfig);
The settings are self-explaining. The custom Threadpool we have created will always run exactly two threads that can handle work items assigned to it.
Conclusion
A lot of the functionality and concepts introduced here will look familiar if you have worked with RAD Studio’s Parallel Programming Library.
In addition, there is support for cancellation tokens, affecting work items queued or running; a procedure that runs synchronized in the main thread after a work item has finished; configuration options for the Threadpool, such as MaxCPUUsage or the control of priority for each work item.
Please don’t hesitate to give it a try and share any feedback or suggestions you have. We are happy to consider it in future releases of ShellBrowser.
The Threadpool is available with the ShellBrowser Components Package. Haven’t used ShellBrowser Delphi Components yet? Why not give the 30 days free trial a try!
See our API documentation for further help.