Parallel procedures/zh CN

From Lazarus wiki
Jump to navigationJump to search

Deutsch (de) English (en) 日本語 (ja) русский (ru) 中文(中国大陆) (zh_CN)

概述

本页描述如何使用MTProcs单元以并行方式运行单个过程,这简化了并行过程的运行以及并行算法的实现。

并行过程和方法经常出现在并行算法中,一些编程语言为它们提供了内置支持(例如gcc中的OpenMP)。有关向FPC添加此类语言特性的计划,请参阅OpenMP支持。将这些内容嵌入到语言中可以节省一些键入,并允许编译器生成开销较少的代码。另一方面,有许多方法可以将单线程代码段转换为并行代码。事实上,简单的方法往往会减慢代码速度。为了获得好的结果,必须指定一些编译器无法猜测的参数。有关示例,请参阅OpenMPOpenCL的大量设置和讨论。你需要并行算法。MTProcs有助于实现并行算法。

获取MTProcs

单元mtprocs.pasmultithreadprocslaz.lpk包的一部分,该包不需要其他包,只需FPC >= 2.6.0。

你可以在sourceforge上找到其源代码:

svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs

或者在Lazarus的components/multithreadprocs中。

一如既往:在IDE中打开一次multithreadprocslaz.lpk包,以便它学习路径。 要在你的项目中使用该包:使用IDE Menu / Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project将其添加到当前项目中。或者通过项目检查器添加。

简单示例

以下是一个简短的示例,它不做任何有用的事情,但演示了并行过程的样子以及如何调用它:

program Test;

{$mode objfpc}{$H+}

uses
  {$IFDEF UNIX}
  cthreads, cmem,
  {$ENDIF}
  MTProcs;

// 一个简单的并行过程
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
  i: Integer;
begin
  writeln(Index);
  for i:=1 to Index*1000000 do ; // 执行一些工作
end;

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // 方法地址,起始索引,结束索引,可选数据
end.

输出将是类似这样的:

2
3
1
4
5

以下是一些简短的说明。详细信息稍后提供。

  • 在Unix下,多线程需要单元cthreads,如Multithreaded Application Tutorial中所述。
  • 出于速度考虑,建议使用cmem堆管理器,尽管在此示例中没有任何差异。
  • 并行过程DoSomethingParallel获取一些固定和预定义的参数。
  • Index定义了应由此次调用完成的工作块。
  • Data是指针,作为第四个参数传递给ProcThreadPool.DoParallel。它是可选的,你可以自由地使用它来做任何事情。
  • Item可用于访问线程池的一些更复杂的功能。
  • ProcThreadPool.DoParallel的工作就像一个正常的过程调用。当它完全运行时返回 - 这意味着所有线程都已完成其工作。
  • 输出显示了典型的多线程行为:调用的顺序是不确定的。几次运行可能会导致不同的顺序。

特性

并行运行过程意味着:

  • 过程或方法以从任意StartIndex到任意EndIndex的Index执行。
  • 一个或多个线程并行执行这些索引。例如,如果Index从1运行到10,并且池中有3个线程可用,那么3个线程将同时运行三个不同的索引。每次线程完成一个调用(一个索引)时,它都会分配下一个索引并运行它。结果可能是:线程1执行索引3,5,7,线程2执行1,6,8,9,线程3运行2,4,10。
  • 线程的数量可能在运行期间变化,并且没有最小线程数的保证。在最坏的情况下,所有索引都将由一个线程执行 - 线程本身。
  • 最大线程数以对当前系统的良好猜测初始化。它可以通过
ProcThreadPool.MaxThreadCount := 8;

随时手动设置。

  • 你可以为每个过程设置最大线程数。
  • 并行过程(或方法)可以递归地调用并行过程(或方法)。
  • 线程是重用的,这意味着它们不是为每个索引创建和销毁的,而是有一个全局线程池。在双核处理器上,将有两个线程完成所有工作 - 主线程和池中的一个额外线程。

开销,减速

开销在很大程度上取决于系统(核心的数量和类型,共享内存的类型,关键部分的速度,缓存大小)。以下是一些一般提示:

  • 每个工作块(索引)应该至少占用几毫秒。
  • 开销与并行过程的递归级别无关。

与MTProcs单元无关,但仅由当今计算机体系结构产生的多线程开销:

  • 一旦创建一个线程,你的程序就变成多线程,内存管理器必须使用关键部分,这会减慢速度。因此,即使你不使用线程做任何事情,你的程序也可能会变慢。
  • 在一些系统上,对于多线程,cmem堆管理器要快得多。在我的基准测试中,特别是在Intel系统上,特别是在OS X下,速度差异可能超过10倍。
  • 字符串和接口是全局引用计数的。每次访问都需要一个关键部分。因此,在多个线程中处理字符串几乎不会提高速度。请使用PChars代替。
  • 每个工作块(索引)应该在内存的不相交部分上工作,以避免跨缓存更新。
  • 不要处理大量内存。在一些系统上,单个线程的速度就足够填满内存总线速度。当达到内存总线的最大速度时,任何进一步的线程都会减慢速度而不是加快速度。

设置过程的最大线程数

你可以将过程的最大线程数指定为第五个参数。

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // 地址,起始索引,结束索引,
      // 可选:数据(此处为nil),可选:最大线程数(此处为2)
end.

当线程在同一数据上工作,并且太多线程会创建太多缓存冲突,从而相互减慢速度时,这可能会很有用。或者,当算法大量使用WaitForIndex时,以便只有少数线程可以实际工作。然后,线程可以用于其他任务。

等待索引/按顺序执行

有时,一个Index依赖于前一个Index的结果。例如,一个常见的任务是首先计算块5,然后将结果与块3的结果组合。为此,请使用WaitForIndex方法:

procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
  ... 计算块号'Index' ...
  if Index=5 then begin
    if not Item.WaitForIndex(3) then exit;
    ... 计算 ...
  end;
end;

WaitForIndex接受一个低于当前Index的Index或一个范围作为参数。如果它返回true,则一切按预期工作。如果它返回false,则表示其他线程之一发生了异常。

有一个扩展函数WaitForIndexRange,它等待整个Index范围:

if not Item.WaitForIndexRange(3,5) then exit; // 等待3,4和5

异常

如果其中一个线程中发生异常,其他线程将正常完成,但不会开始新的Index。池等待所有线程完成,然后引发异常。这就是为什么你可以像往常一样使用try..except

try
  ...
  ProcThreadPool.DoParallel(...);
  ...
except
  On E: Exception do ...
end;

如果有多个异常,则只会引发第一个异常。要处理所有异常,请在并行方法中添加try..except。

同步

当你想在主线程中调用一个函数时,例如更新一些gui元素,你可以使用类方法TThread.Synchronize。它接受当前TThread和一个方法地址作为参数。自1.2版起,mtprocs提供了一个线程变量CurrentThread,它简化了同步:

TThread.Synchronize(CurrentThread,@YourMethod);

这将在主事件队列上发布一个事件,并等待直到主线程执行了你的方法。请记住,一个裸露的fpc程序没有事件队列。LCL或fpgui程序有。

如果你创建了自己的TThread后代,你应该在你的Execute方法中设置这个变量。例如:

procedure TYourThread.Execute;
begin
  CurrentThread:=Self;  // 设置当前线程为自身
  ...工作...  // 执行相关工作
end;

示例:并行循环

本示例逐步解释如何将循环转换为并行过程。该示例计算整数数组“BigArray”的最大值。

原始循环

type
  TArrayOfInteger = array of integer;

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  i: PtrInt;
begin
  Result:=BigArray[0];
  for i:=1 to length(BigArray)-1 do begin
    if Result<BigArray[i] then Result:=BigArray[i];
  end;
end;

分配工作

工作应平均分配给n个线程。为此,将BigArray分割成大小相等的块,并且外层循环遍历每个块。通常,n是系统中的CPU/核心数。MTProcs有一些实用函数用于计算块大小和数量:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  Result:=BigArray[0];
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize); // 计算块大小和数量
  for Index:=0 to BlockCount-1 do begin
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd); // 计算每个块的起始和结束索引
    for i:=BlockStart to BlockEnd do begin
      if Result<BigArray[i] then Result:=BigArray[i];
    end;
  end;
end;

添加的几行代码可用于任何循环。最终可以编写一个工具来自动化此过程。

现在工作已被分割成较小的部分。接下来,这些部分必须变得更加独立。

局部变量和共享变量

对于循环中使用的每个变量,必须确定它是由所有线程使用的共享变量,还是每个线程使用其自己的局部变量。 共享变量BlockCount和BlockSize仅被读取且不会更改,因此无需对它们进行处理。 但是,像Result这样的共享变量将被所有线程更改。这可以通过同步(例如临界区)来实现,但速度较慢,或者每个线程使用局部副本,然后合并这些局部变量。

以下是一个解决方案,将“Result”变量替换为数组,最后进行合并:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  // 共享变量
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;
  // 局部变量
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // 为局部变量分配空间
  // 计算每个块的最大值
  for Index:=0 to BlockCount-1 do begin
    // 计算块的最大值
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do begin
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
    end;
  end;
  // 计算所有块的最大值
  // (如果有数百个线程,则有更好的解决方案)
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);

  FreeMem(BlockMax);
end;

这种方法简单明了,可以自动化。但是,此过程将需要程序员提供一些提示。

DoParallel

最后一步是将内部循环移入子过程,并用对DoParallelNested的调用替换循环。

...
{$ModeSwitch nestedprocvars}
uses mtprocs;
...
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;

  procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
                                Item: TMultiThreadProcItem);
  var
    i: integer;
    BlockStart, BlockEnd: PtrInt;
  begin
    // 计算块的最大值
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
  end;
var
  Index: PtrInt;
begin
  // 将工作分割成大小相等的块
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  // 分配局部/线程变量
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
  // 计算每个块的最大值
  ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
  // 计算所有块的最大值
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);
  FreeMem(BlockMax);
end;

这大多是复制和粘贴,因此同样可以自动化。

示例:并行排序

单元mtputils包含函数“ParallelSortFPList”,该函数使用mtprocs对TFPList进行并行排序。必须提供比较函数。

procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);

此函数使用并行MergeSort算法。参数MaxThreadCount传递给DoParallel。0表示使用系统默认值。

可选地,您可以提供自己的排序函数(OnSortPart)来对每个单线程的部分进行排序。例如,您可以通过QuickSort对块进行排序,然后再进行合并。这样您就拥有了一个“并行QuickSort”。有关QuickSort的示例实现,请参阅TFPList.Sort。

使用嵌套过程

procedure DoSomething(Value: PtrInt);
var
  p: array[1..2] of Pointer;

  procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
  begin
    p[Index]:=Pointer(Value); // 可以访问局部变量和参数!
  end;

var
  i: Integer;
begin
  ProcThreadPool.DoParallelNested(@SubProc,1,2);
end;

这可以节省大量重构工作,并使并行过程更加可读。

See also