mormot.core.threads--TSynThreadPool
mormot.core.threads--TSynThreadPool
{ ************ 面向服务器进程的线程池 }
TSynThreadPool = class; // 前向声明TSynThreadPool类
/// 定义了TSynThreadPool所使用的工作线程
TSynThreadPoolWorkThread = class(TSynThread)
protected
fOwner: TSynThreadPool; // 线程池所有者
fThreadNumber: integer; // 线程编号
{$ifndef USE_WINIOCP} // 如果不使用Windows I/O完成端口
fProcessingContext: pointer; // 正在处理的上下文,受fOwner.fSafe.Lock保护
fEvent: TSynEvent; // 同步事件
{$endif USE_WINIOCP}
procedure NotifyThreadStart(Sender: TSynThread); // 通知线程开始
procedure DoTask(Context: pointer); // 异常安全地调用fOwner.Task()
public
/// 初始化线程
constructor Create(Owner: TSynThreadPool); reintroduce;
/// 终结线程
destructor Destroy; override;
/// 循环等待并执行挂起的任务,通过调用fOwner.Task()
procedure Execute; override;
/// 关联的线程池
property Owner: TSynThreadPool
read fOwner;
end;
TSynThreadPoolWorkThreads = array of TSynThreadPoolWorkThread; // 线程池工作线程数组类型
/// 一个简单的线程池,用于例如快速处理HTTP/1.0请求
// - 在Windows上通过I/O完成端口实现,或在Linux/POSIX上使用经典的事件驱动方法
TSynThreadPool = class
protected
{$ifndef USE_WINIOCP} // 如果不使用Windows I/O完成端口
fSafe: TOSLightLock; // 使用更稳定的锁
{$endif USE_WINIOCP}
fWorkThread: TSynThreadPoolWorkThreads; // 工作线程数组
fWorkThreadCount: integer; // 工作线程数量
fRunningThreads: integer; // 正在运行的线程数量
fExceptionsCount: integer; // 异常计数(未在代码中明确使用,但可能用于调试或监控)
fContentionAbortDelay: integer; // 由于争用而拒绝连接的延迟时间(毫秒)
fOnThreadTerminate: TOnNotifyThread; // 线程终止通知事件
fOnThreadStart: TOnNotifyThread; // 线程开始通知事件
fContentionTime: Int64; // 等待队列可用槽位的总时间(毫秒)
fContentionAbortCount: cardinal; // 由于争用而中止的任务数
fContentionCount: cardinal; // 等待队列可用槽位的次数
fName: RawUtf8; // 线程池名称
fTerminated: boolean; // 线程池是否已终止
{$ifdef USE_WINIOCP} // 如果使用Windows I/O完成端口
fRequestQueue: THandle; // IOCP有其自己的内部队列
{$else}
fQueuePendingContext: boolean; // 当所有线程都忙时,是否应维护一个内部队列
fPendingContext: array of pointer; // 挂起的上下文数组
fPendingContextCount: integer; // 挂起的上下文数量
function GetPendingContextCount: integer; // 获取挂起上下文数量的函数
function PopPendingContext: pointer; // 从挂起上下文数组中弹出一个元素的函数
function QueueLength: integer; virtual; // 获取队列长度的虚拟函数(可能用于调试)
{$endif USE_WINIOCP}
/// 在I/O错误时结束线程
function NeedStopOnIOError: boolean; virtual;
/// 在通知后要执行的进程,这是一个抽象方法,需要被子类实现
procedure Task(aCaller: TSynThreadPoolWorkThread; aContext: pointer); virtual; abstract;
/// 中止任务的进程
procedure TaskAbort(aContext: Pointer); virtual;
public
/// 使用指定的线程数初始化线程池
// - 抽象的Task()方法将由其中一个线程调用
// - 一个线程池最多可以关联256个线程
// - 在Windows上,可以可选地接受一个之前使用Windows重叠I/O(IOCP)打开的aOverlapHandle
// - 在POSIX上,如果aQueuePendingContext=true,则将挂起的上下文存储到内部队列中,
// 以便在队列未满时Push()返回true
{$ifdef USE_WINIOCP}
constructor Create(NumberOfThreads: integer = 32; aOverlapHandle: THandle = INVALID_HANDLE_VALUE; const aName: RawUtf8 = '');
{$else}
constructor Create(NumberOfThreads: integer = 32; aQueuePendingContext: boolean = false; const aName: RawUtf8 = '');
{$endif USE_WINIOCP}
/// 关闭线程池,释放所有关联的线程
destructor Destroy; override;
/// 让线程池处理一个指定的任务(作为指针)
// - 如果没有空闲线程可用,并且Create(aQueuePendingContext=false)被使用,则返回false(调用者稍后应重试)
// - 如果在Create中aQueuePendingContext为true,或使用了IOCP,则提供的上下文将被添加到内部列表,并在可能时处理
// - 如果aWaitOnContention默认为false,则在队列满时立即返回
// - 设置aWaitOnContention=true以等待最多ContentionAbortDelay毫秒并重试将任务排队
function Push(aContext: pointer; aWaitOnContention: boolean = false): boolean;
{$ifndef USE_WINIOCP}
/// 在Push()返回false后调用,以查看队列是否确实已满
// - 如果QueuePendingContext为false,则返回false
function QueueIsFull: boolean;
/// 如果所有线程都忙时,线程池是否应维护一个内部队列
// - 作为Create构造函数的参数提供
property QueuePendingContext: boolean
read fQueuePendingContext;
{$endif USE_WINIOCP}
/// 对此线程池中定义的线程的低级访问
property WorkThread: TSynThreadPoolWorkThreads
read fWorkThread;
published
/// 线程池中可用的线程数
// - 映射Create()参数,即默认为32
property WorkThreadCount: integer
read fWorkThreadCount;
/// 当前在此线程池中处理任务的线程数
// - 范围在0..WorkThreadCount之间
property RunningThreads: integer
read fRunningThreads;
/// 由于线程池争用而被拒绝的任务数
// - 如果此数字很高,请考虑设置更高的线程数,或分析并调整Task方法
property ContentionAbortCount: cardinal
read fContentionAbortCount;
/// 由于争用而拒绝连接的延迟时间(毫秒)
// - 默认为5000,即等待IOCP或aQueuePendingContext内部列表中有空间可用5秒
// - 在此延迟期间,不接受新的连接(即不调用Accept),以便负载均衡器可以检测到争用并切换到池中的另一个实例,
// 或直接客户端最终可能会拒绝其连接,因此不会开始发送数据
property ContentionAbortDelay: integer
read fContentionAbortDelay write fContentionAbortDelay;
/// 等待队列中可用槽位的总时间(毫秒)
// - 争用不会立即失败,但会重试直到ContentionAbortDelay
// - 此处的高数值需要对Task方法进行代码重构
property ContentionTime: Int64
read fContentionTime;
/// 线程池等待队列中可用槽位的次数
// - 争用不会立即失败,但会重试直到ContentionAbortDelay
// - 此处的高数值可能需要增加线程数
// - 使用此属性和ContentionTime来计算平均争用时间
property ContentionCount: cardinal
read fContentionCount;
{$ifndef USE_WINIOCP}
/// 当前等待分配给线程的输入任务数
property PendingContextCount: integer
read GetPendingContextCount;
{$endif USE_WINIOCP}
end;
{$M-} // 关闭内存管理消息
const
// 允许TSynThreadPoolWorkThread堆栈使用最多256 * 2MB = 512MB的RAM
THREADPOOL_MAXTHREADS = 256;
由于 TSynThreadPool
类是一个高度抽象且依赖于特定实现的类(如它可能使用Windows的I/O完成端口或Linux/POSIX的事件驱动机制),编写一个完整的例程代码可能会相当复杂,并且需要模拟或实际实现这些依赖项。然而,我可以提供一个简化的示例,该示例展示了如何创建 TSynThreadPool
实例、如何向其推送任务,并如何大致实现 Task
方法。
请注意,以下代码是一个高度简化的示例,并不包含所有 TSynThreadPool
类定义中的功能,特别是与Windows I/O完成端口或Linux/POSIX事件驱动机制相关的部分。此外,由于 TSynThreadPool
是一个假设的类(因为它不是Delphi标准库或广泛认可的第三方库的一部分),我将基于您提供的类定义来编写这个示例。
program TSynThreadPoolDemo;
{$MODE DELPHI}
uses
SysUtils, Classes; // 引入必要的单元
// 假设TSynThreadPool及其依赖项已经在某个单元中定义
// 这里我们使用一个占位符单元名YourThreadPoolUnit
uses YourThreadPoolUnit;
// 一个简单的任务上下文类(仅作为示例)
type
TMyTaskContext = record
Data: Integer;
end;
// TSynThreadPool的Task方法的实现类
type
TMyThreadPool = class(TSynThreadPool)
protected
procedure Task(aCaller: TSynThreadPoolWorkThread; aContext: Pointer); override;
end;
{ TMyThreadPool }
procedure TMyThreadPool.Task(aCaller: TSynThreadPoolWorkThread; aContext: Pointer);
var
Ctx: PMyTaskContext;
begin
Ctx := PMyTaskContext(aContext);
WriteLn('Processing task with data: ', Ctx.Data);
// 在这里添加处理任务的代码
// ...
end;
var
Pool: TMyThreadPool;
Ctx: TMyTaskContext;
I: Integer;
begin
try
// 创建一个线程池实例,假设我们想要使用4个工作线程
Pool := TMyThreadPool.Create(4);
try
// 模拟向线程池推送一些任务
for I := 1 to 10 do
begin
Ctx.Data := I;
if not Pool.Push(@Ctx) then
begin
// 在这个简化的示例中,我们不会处理Push返回false的情况
// 在实际应用中,您可能需要等待、重试或将任务放入另一个队列中
WriteLn('Failed to push task to the pool (this should not happen in this simplified example)');
end;
end;
// 在这个简化的示例中,我们没有等待所有任务完成
// 在实际应用中,您可能需要等待线程池中的所有任务都完成后再继续
// ...
finally
// 销毁线程池实例,这将释放所有关联的资源
Pool.Free;
end;
except
on E: Exception do
WriteLn('An error occurred: ', E.Message);
end;
// 保持控制台窗口打开,直到用户按任意键
WriteLn('Press Enter to exit...');
ReadLn;
end.
// 注意:由于TSynThreadPool是一个假设的类,并且上述代码没有实现所有细节(如线程池的实际工作线程管理、任务队列等),
// 因此这个示例主要是为了展示如何使用该类(如果它存在的话)的大致结构。
// 在实际应用中,您需要根据TSynThreadPool类的具体实现来调整此代码。
重要说明:
- 占位符单元:在上述代码中,我使用了
YourThreadPoolUnit
作为包含TSynThreadPool
类定义的占位符单元名。在实际应用中,您需要将其替换为包含该类定义的实际单元名。 - 任务上下文:我定义了一个简单的
TMyTaskContext
记录类型来作为任务的上下文。在实际应用中,您可能需要根据需要定义更复杂的上下文类型。 - 错误处理:在
Push
方法返回false
的情况下,上述代码仅打印了一条消息,并没有采取任何恢复措施。在实际应用中,您可能需要实现更复杂的错误处理逻辑(如重试、等待或将任务放入另一个队列中)。 - 等待任务完成:上述代码没有等待线程池中的所有任务都完成。在实际应用中,您可能需要实现某种形式的等待机制(例如,使用同步事件或计数器)来确保所有任务都已完成后再继续执行后续代码。
- 线程池实现:由于
TSynThreadPool
是一个假设的类,并且其实现细节(如工作线程的管理、任务队列的实现等)并未在您的类定义中给出,因此上述代码仅提供了一个大致的框架。在实际应用中,您需要根据TSynThreadPool
类的具体实现来调整此代码。