Parallel procedures/zh CN
│
Deutsch (de) │
English (en) │
日本語 (ja) │
русский (ru) │
中文(中国大陆) (zh_CN) │
概述
本页描述如何使用MTProcs单元以并行方式运行单个过程,这简化了并行过程的运行以及并行算法的实现。
并行过程和方法经常出现在并行算法中,一些编程语言为它们提供了内置支持(例如gcc中的OpenMP)。有关向FPC添加此类语言特性的计划,请参阅OpenMP支持。将这些内容嵌入到语言中可以节省一些键入,并允许编译器生成开销较少的代码。另一方面,有许多方法可以将单线程代码段转换为并行代码。事实上,简单的方法往往会减慢代码速度。为了获得好的结果,必须指定一些编译器无法猜测的参数。有关示例,请参阅OpenMP和OpenCL的大量设置和讨论。你需要并行算法。MTProcs有助于实现并行算法。
获取MTProcs
单元mtprocs.pas是multithreadprocslaz.lpk包的一部分,该包不需要其他包,只需FPC >= 2.6.0。
你可以在sourceforge上找到其源代码:
svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs
或者在Lazarus的components/multithreadprocs中。
一如既往:在IDE中打开一次multithreadprocslaz.lpk包,以便它学习路径。 要在你的项目中使用该包:使用IDE Menu / Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project将其添加到当前项目中。或者通过项目检查器添加。
简单示例
以下是一个简短的示例,它不做任何有用的事情,但演示了并行过程的样子以及如何调用它:
program Test;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}
cthreads, cmem,
{$ENDIF}
MTProcs;
// 一个简单的并行过程
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
i: Integer;
begin
writeln(Index);
for i:=1 to Index*1000000 do ; // 执行一些工作
end;
begin
ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // 方法地址,起始索引,结束索引,可选数据
end.
输出将是类似这样的:
2 3 1 4 5
以下是一些简短的说明。详细信息稍后提供。
- 在Unix下,多线程需要单元cthreads,如Multithreaded Application Tutorial中所述。
- 出于速度考虑,建议使用cmem堆管理器,尽管在此示例中没有任何差异。
- 并行过程DoSomethingParallel获取一些固定和预定义的参数。
- Index定义了应由此次调用完成的工作块。
- Data是指针,作为第四个参数传递给ProcThreadPool.DoParallel。它是可选的,你可以自由地使用它来做任何事情。
- Item可用于访问线程池的一些更复杂的功能。
- ProcThreadPool.DoParallel的工作就像一个正常的过程调用。当它完全运行时返回 - 这意味着所有线程都已完成其工作。
- 输出显示了典型的多线程行为:调用的顺序是不确定的。几次运行可能会导致不同的顺序。
特性
并行运行过程意味着:
- 过程或方法以从任意StartIndex到任意EndIndex的Index执行。
- 一个或多个线程并行执行这些索引。例如,如果Index从1运行到10,并且池中有3个线程可用,那么3个线程将同时运行三个不同的索引。每次线程完成一个调用(一个索引)时,它都会分配下一个索引并运行它。结果可能是:线程1执行索引3,5,7,线程2执行1,6,8,9,线程3运行2,4,10。
- 线程的数量可能在运行期间变化,并且没有最小线程数的保证。在最坏的情况下,所有索引都将由一个线程执行 - 线程本身。
- 最大线程数以对当前系统的良好猜测初始化。它可以通过
ProcThreadPool.MaxThreadCount := 8;
随时手动设置。
- 你可以为每个过程设置最大线程数。
- 并行过程(或方法)可以递归地调用并行过程(或方法)。
- 线程是重用的,这意味着它们不是为每个索引创建和销毁的,而是有一个全局线程池。在双核处理器上,将有两个线程完成所有工作 - 主线程和池中的一个额外线程。
开销,减速
开销在很大程度上取决于系统(核心的数量和类型,共享内存的类型,关键部分的速度,缓存大小)。以下是一些一般提示:
- 每个工作块(索引)应该至少占用几毫秒。
- 开销与并行过程的递归级别无关。
与MTProcs单元无关,但仅由当今计算机体系结构产生的多线程开销:
- 一旦创建一个线程,你的程序就变成多线程,内存管理器必须使用关键部分,这会减慢速度。因此,即使你不使用线程做任何事情,你的程序也可能会变慢。
- 在一些系统上,对于多线程,cmem堆管理器要快得多。在我的基准测试中,特别是在Intel系统上,特别是在OS X下,速度差异可能超过10倍。
- 字符串和接口是全局引用计数的。每次访问都需要一个关键部分。因此,在多个线程中处理字符串几乎不会提高速度。请使用PChars代替。
- 每个工作块(索引)应该在内存的不相交部分上工作,以避免跨缓存更新。
- 不要处理大量内存。在一些系统上,单个线程的速度就足够填满内存总线速度。当达到内存总线的最大速度时,任何进一步的线程都会减慢速度而不是加快速度。
设置过程的最大线程数
你可以将过程的最大线程数指定为第五个参数。
begin
ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // 地址,起始索引,结束索引,
// 可选:数据(此处为nil),可选:最大线程数(此处为2)
end.
当线程在同一数据上工作,并且太多线程会创建太多缓存冲突,从而相互减慢速度时,这可能会很有用。或者,当算法大量使用WaitForIndex时,以便只有少数线程可以实际工作。然后,线程可以用于其他任务。
等待索引/按顺序执行
有时,一个Index依赖于前一个Index的结果。例如,一个常见的任务是首先计算块5,然后将结果与块3的结果组合。为此,请使用WaitForIndex方法:
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
... 计算块号'Index' ...
if Index=5 then begin
if not Item.WaitForIndex(3) then exit;
... 计算 ...
end;
end;
WaitForIndex接受一个低于当前Index的Index或一个范围作为参数。如果它返回true,则一切按预期工作。如果它返回false,则表示其他线程之一发生了异常。
有一个扩展函数WaitForIndexRange,它等待整个Index范围:
if not Item.WaitForIndexRange(3,5) then exit; // 等待3,4和5
异常
如果其中一个线程中发生异常,其他线程将正常完成,但不会开始新的Index。池等待所有线程完成,然后引发异常。这就是为什么你可以像往常一样使用try..except:
try
...
ProcThreadPool.DoParallel(...);
...
except
On E: Exception do ...
end;
如果有多个异常,则只会引发第一个异常。要处理所有异常,请在并行方法中添加try..except。
同步
当你想在主线程中调用一个函数时,例如更新一些gui元素,你可以使用类方法TThread.Synchronize。它接受当前TThread和一个方法地址作为参数。自1.2版起,mtprocs提供了一个线程变量CurrentThread,它简化了同步:
TThread.Synchronize(CurrentThread,@YourMethod);
这将在主事件队列上发布一个事件,并等待直到主线程执行了你的方法。请记住,一个裸露的fpc程序没有事件队列。LCL或fpgui程序有。
如果你创建了自己的TThread后代,你应该在你的Execute方法中设置这个变量。例如:
procedure TYourThread.Execute;
begin
CurrentThread:=Self; // 设置当前线程为自身
...工作... // 执行相关工作
end;
示例:并行循环
本示例逐步解释如何将循环转换为并行过程。该示例计算整数数组“BigArray”的最大值。
原始循环
type
TArrayOfInteger = array of integer;
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
i: PtrInt;
begin
Result:=BigArray[0];
for i:=1 to length(BigArray)-1 do begin
if Result<BigArray[i] then Result:=BigArray[i];
end;
end;
分配工作
工作应平均分配给n个线程。为此,将BigArray分割成大小相等的块,并且外层循环遍历每个块。通常,n是系统中的CPU/核心数。MTProcs有一些实用函数用于计算块大小和数量:
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
BlockCount, BlockSize: PtrInt;
i: PtrInt;
Index: PtrInt;
BlockStart, BlockEnd: PtrInt;
begin
Result:=BigArray[0];
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize); // 计算块大小和数量
for Index:=0 to BlockCount-1 do begin
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd); // 计算每个块的起始和结束索引
for i:=BlockStart to BlockEnd do begin
if Result<BigArray[i] then Result:=BigArray[i];
end;
end;
end;
添加的几行代码可用于任何循环。最终可以编写一个工具来自动化此过程。
现在工作已被分割成较小的部分。接下来,这些部分必须变得更加独立。
局部变量和共享变量
对于循环中使用的每个变量,必须确定它是由所有线程使用的共享变量,还是每个线程使用其自己的局部变量。 共享变量BlockCount和BlockSize仅被读取且不会更改,因此无需对它们进行处理。 但是,像Result这样的共享变量将被所有线程更改。这可以通过同步(例如临界区)来实现,但速度较慢,或者每个线程使用局部副本,然后合并这些局部变量。
以下是一个解决方案,将“Result”变量替换为数组,最后进行合并:
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
// 共享变量
BlockCount, BlockSize: PtrInt;
BlockMax: PPtrInt;
// 局部变量
i: PtrInt;
Index: PtrInt;
BlockStart, BlockEnd: PtrInt;
begin
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // 为局部变量分配空间
// 计算每个块的最大值
for Index:=0 to BlockCount-1 do begin
// 计算块的最大值
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
BlockMax[Index]:=BigArray[BlockStart];
for i:=BlockStart to BlockEnd do begin
if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
end;
end;
// 计算所有块的最大值
// (如果有数百个线程,则有更好的解决方案)
Result:=BlockMax[0];
for Index:=1 to BlockCount-1 do
Result:=Max(Result,BlockMax[Index]);
FreeMem(BlockMax);
end;
这种方法简单明了,可以自动化。但是,此过程将需要程序员提供一些提示。
DoParallel
最后一步是将内部循环移入子过程,并用对DoParallelNested的调用替换循环。
...
{$ModeSwitch nestedprocvars}
uses mtprocs;
...
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
BlockCount, BlockSize: PtrInt;
BlockMax: PPtrInt;
procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
var
i: integer;
BlockStart, BlockEnd: PtrInt;
begin
// 计算块的最大值
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
BlockMax[Index]:=BigArray[BlockStart];
for i:=BlockStart to BlockEnd do
if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
end;
var
Index: PtrInt;
begin
// 将工作分割成大小相等的块
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
// 分配局部/线程变量
BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
// 计算每个块的最大值
ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
// 计算所有块的最大值
Result:=BlockMax[0];
for Index:=1 to BlockCount-1 do
Result:=Max(Result,BlockMax[Index]);
FreeMem(BlockMax);
end;
这大多是复制和粘贴,因此同样可以自动化。
示例:并行排序
单元mtputils包含函数“ParallelSortFPList”,该函数使用mtprocs对TFPList进行并行排序。必须提供比较函数。
procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);
此函数使用并行MergeSort算法。参数MaxThreadCount传递给DoParallel。0表示使用系统默认值。
可选地,您可以提供自己的排序函数(OnSortPart)来对每个单线程的部分进行排序。例如,您可以通过QuickSort对块进行排序,然后再进行合并。这样您就拥有了一个“并行QuickSort”。有关QuickSort的示例实现,请参阅TFPList.Sort。
使用嵌套过程
procedure DoSomething(Value: PtrInt);
var
p: array[1..2] of Pointer;
procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
p[Index]:=Pointer(Value); // 可以访问局部变量和参数!
end;
var
i: Integer;
begin
ProcThreadPool.DoParallelNested(@SubProc,1,2);
end;
这可以节省大量重构工作,并使并行过程更加可读。