Parallel procedures/ja

From Free Pascal wiki
Revision as of 14:51, 18 March 2014 by TheCreativeCAT (talk | contribs) (→‎Exceptions: Translation added)
Jump to navigationJump to search

Deutsch (de) English (en) 日本語 (ja) русский (ru)

概略

このページでは、MTProcs ユニットを用いて一つの手続きを並列動作させる方法を記述します。これを用いると手続きの並列動作が簡単になり、並列処理アルゴリズムの実装が容易になります。

並列手続き/並列メソッドは並列処理アルゴリズムにしばしば見られ、これを最初からサポートしているプログラミング言語もあります(例えば gcc の OpenMP) 。FPC にこの種の言語機能を加えるプランについては、OpenMP support をご覧下さい。言語にこの種の機能を加えると、プログラムの長さが短くなり、オーヴァヘッドが減るようなコードをコンパイラが生成できるようにできるかもしれません。一方、ある単一スレッドプログラムの中の部分的なコードを並列処理コードに変換するためには沢山の方法があります。単純なアプローチでは速度低下をもたらすことがしばしばなのです。良い結果を得るには、コンパイラが伺い知れないいくつかのパラメタを指定する必要があります。例えば、OpenMP および OpenCL にある膨大な設定例と議論をご覧下さい。並列処理アルゴリズムが必要なら、MTProcs が実装の役に立ちます。

MTProcs を入手する

mtprocs.pas ユニットは、multithreadprocslaz.lpk パッケージの中にあります。このパッケージは svn から得られます:

svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs

例によって: IDEで一度 multithreadprocslaz.lpk を開きます。これで IDE はパッケージのパスを覚えます。プロジェクトの中でパッケージを使うには: IDE Menu / Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project とするか、プロジェクトインスペクタから追加します。

簡単な例

短い例を出します。何も役に立つことはしませんが、並列手続きの見かけと呼出し方がわかります:

program Test;

{$mode objfpc}{$H+}

uses
  {$IFDEF UNIX}
  cthreads, cmem,
  {$ENDIF}
  MTProcs;

// 簡単な並列手続き
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
  i: Integer;
begin
  writeln(Index);
  for i:=1 to Index*1000000 do ; // 何かする
end;

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil) // アドレス、開始する添字、終了する添字、オプションのデータ
end.

出力はこんな感じになるでしょう:

2
3
1
4
5

短い注意書きをいくつか。詳細は後述します。

  • Multithreaded Application Tutorial/ja で述べられているように、UNIXでのマルチスレッディングには cthreads ユニットが必要です。
  • 実行速度の面から、cmem ヒープマネジャの利用を勧めます。ただしこの例では差はでません。
  • 並列手続き DoSomethingParallel がとる引数は個数が固定であり、予め定義されています。
  • Index で、この呼出しが受け持つべき作業部分が決まります。
  • Data はポインタで、ProcThreadPool.DoParallel の四番目の引数として与えられたものです。このパラメタはオプションであり、何に使ってもかまいませんし、利用しなくてもかまいません。
  • Item はスレッドプールの持つ、より高度な特徴にアクセスするために使います。
  • ProcThreadPool.DoParallel は通常の手続き呼出しのように働きます。最後まで実行したとき - つまり、全スレッドが作業を完了したら -

リターンします。

  • この出力は、マルチスレッドに典型的な振る舞いを示します: 呼出しの順序は決まっていません。順序は実行する毎に変わり得ます。

特徴

手続きの並列実行とはこういうことです:

  • ある手続きまたはメソッドを、任意に設定した StartIndex から EndIndex までの間の Index(添字)について実行します。
  • 一つ以上のスレッドがそれらの添字について並列に動作します。例えば Index が 1 から 10 までだとして、スレッドプールの中で利用できるスレッドが三つあったとすると、これら三つのスレッドがそれぞれ異なった添字について同時に動作します。一つのスレッドの呼出し(一つの添字に対する処理)が終わる毎に、そのスレッドには新たな添字が割り当てられ、スレッドはそれを実行します。結果はこんなふうになるでしょう: スレッド1が添字 3, 5, 7 を、スレッド2が 1, 6, 8, 9 を、スレッド3が 2, 4, 10 を。
  • スレッド数は実行中に変化し得、最小数は保証されていません。最悪の場合、全ての添字を一つのスレッドで実行することにもなり得ます - メインスレッド自体で。
  • スレッドの最大数は動作中のシステムに応じた適切な値に初期化されます。
    ProcThreadPool.MaxThreadCount := 8;
    のように手動で設定することもできます。
  • スレッドの最大数は手続き毎に設定することができます。
  • 並列手続き/メソッドは、再帰的に並列手続き/メソッドを呼出すことが可能です。
  • スレッドは再利用されます。つまり、各添字に対し消滅と生成を繰り返すのではなく、プールされています。2コアのプロセッサでは、プールには二つのスレッドがあり - メインスレッドと、おまけのスレッド - 処理全体に投入されます。

オーヴァヘッド、速度低下

オーヴァヘッドは、システムに強く依存しています(コアの数と種類、共有メモリの種類、クリティカルセクションの実行速度、キャッシュの大きさ)。一般的なヒントとしては:

  • 作業の各チャンク(添字)には、少なくとも数ミリ秒かかります。
  • オーヴァヘッドは再帰の深さとは関係がありません。

マルチスレッディングのオーヴァヘッドは、単純にこんにちのコンピュータのアーキテクチャによるものであり、MTProcs ユニットとは無関係です:

  • スレッドを一つ生成してプログラムがマルチスレッドになった途端に、メモリマネジャはクリティカルセクションを使わざるをえなくなります。ここで実行速度が低下します。そのスレッドでなにもしなくても、プログラムの実行速度は低下するでしょう。
  • いくつかのシステムが持つ cmem ヒープマネジャはマルチスレッドでははるかに高速です。私が Intel CPU を用いた OS X で試みたベンチマークでは、速度は10倍も違いました。
  • 文字列とインターフェースはグローバルな参照カウンタを持っています(訳注: どこからも参照されなくなったら自動的に dispose できるように)。アクセスする毎にクリティカルセクションが必要になります。文字列を複数のスレッドで処理すると、大幅な速度低下を齎します。代わりに PChar を使ってください。
  • 作業の各チャンク(添字)はそれぞれメモリ上の離れた部分を処理するようにすべきです。そうしないとキャッシュのアップデートが交錯します。
  • 広過ぎるメモリ領域を扱わないでください。システムによっては、スレッド一つでメモリバスの速度を使い切ってしまいます。メモリバスが最大速度に達すると、それ以上スレッドを立ち上げても速度は向上せず、むしろ低下します。

手続きに用いるスレッドの最大数を設定する

一つの手続きに対するスレッドの最大数を、五番目の引数として指定することができます。

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // アドレス、添字の開始値、添字の終了値、
      // オプション: データ(ここでは無し)、オプション: スレッドの最大数(ここでは2)
end.

これは、複数のスレッドが同一のデータに対し作用するときに有用です。スレッド数が多過ぎると、キャッシュの衝突が増加し、各スレッドの実行速度が低下します。あるいは、アルゴリズムが WaitForIndex を多用する場合、実際に動作しているスレッドは少ないということもありえます。余ったスレッドは他のタスク向けにできるはずのものです。

添字を待つ / 順番に実行する

時折、ある添字の処理が、それより前の添字の処理の結果に依存する場合があります。例えば、添字 5 の計算を行い、その結果を添字 3 の結果と組み合わせて用いるような場合です。こういう時は WaitForIndex を使います:

procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
  ... 添字 'Index' の処理を行う ...
  if Index=5 then begin
    if not Item.WaitForIndex(3) then exit;
    ... 計算処理 ...
  end;
end;

WaitForIndex は添字を引数としてとります。その添字は、現在の添字あるいは範囲よりも小さい値です。実行が上手くいっている場合は true を返します。戻り値が false となった場合は、他のスレッドのどれかで例外が発生しています。

更に拡張された関数 WaitForIndexRange があります。これは添字の全域について待ちます:

if not Item.WaitForIndexRange(3,5) then exit; // 3, 4, 5を待つ

例外

あるスレッドで例外が発生すると、他のスレッドは正常に終了するでしょうが、新たな添字の処理は行われません。スレッドプールは全スレッドの実効終了を待ち、例外を惹起させます。このため、try..except 文を通常通り用いることができます:

try
  ...
  ProcThreadPool.DoParallel(...);
  ...
except
  On E: Exception do ...
end;

複数の例外が発生した場合は、最初のもののみが立ち上がります。全ての例外を処理するには、各並列手続きの中に try..except を加えてください。

Synchronize

When you want to call a function in the main thread, for example to update some gui element, you can use the class method TThread.Synchronize. It takes as arguments the current TThread and the address of a method. Since 1.2 mtprocs provides a threadvar CurrentThread, which makes synchronizing simple:

TThread.Synchronize(CurrentThread,@YourMethod);

This will post an event on the main event queue and wait until the main thread has executed your method. Keep in mind that a bare fpc program does not have an event queue. A LCL or fpgui program has it.

If you create your own TThread descendants, you should set the variable in your Execute method. For example:

procedure TYourThread.Execute;
begin
  CurrentThread:=Self;
  ...work...
end;

Example: Parallel loop

This example explains step by step how to convert a loop into a parallel procedure. The example computes the maximum number of an integer array BigArray.

The original loop

type
  TArrayOfInteger = array of integer;

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  i: PtrInt;
begin
  Result:=BigArray[0];
  for i:=1 to length(BigArray)-1 do begin
    if Result<BigArray[i] then Result:=BigArray[i];
  end;
end;

Splitting the work

The work should be equally distributed over n threads. For this the BigArray is split into equally sized blocks and an outer loop runs over every block. Typically n is the number of cpus/cores in the system. MTProcs has some utility functions to compute the block size and count:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  Result:=BigArray[0];
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  for Index:=0 to BlockCount-1 do begin
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    for i:=BlockStart to BlockEnd do begin
      if Result<BigArray[i] then Result:=BigArray[i];
    end;
  end;
end;

The added lines can be used for any loop. Eventually a tool can be written to automate this.

The work is now split into smaller pieces. Now the pieces must become more independent.

Local and shared variables

For each used variable in the loop you must decide whether it is shared variable used by all threads or if each thread uses its own local variable. The shared variables BlockCount and BlockSize are only read and do not change, so no work is needed for them. But a shared variable like Result will be changed by all threads. This can either be achieved with synchronization (e.g. critical section), which is slow, or each thread use a local copy and these local variable are later combined.

Here is a solution replacing the Result variable with an array, which is combined in the end:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  // shared variables
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;
  // local variables
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // allocate space for local variables
  // compute maximum for each block
  for Index:=0 to BlockCount-1 do begin
    // compute maximum of block
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do begin
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
    end;
  end;
  // compute maximum of all blocks
  // (if you have hundreds of threads there are better solutions)
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);

  FreeMem(BlockMax);
end;

This approach is straightforward and could be automated. The process will need some hints from the programmer though.

DoParallel

The final step is to move the inner loop into a sub procedure and replace the loop with a call to DoParallelLocalProc.

Beware: FPC does not yet allow to define local procedure types, so an unsafe type cast must be used for this.

function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;

  procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
                                Item: TMultiThreadProcItem);
  var
    i: integer;
    BlockStart, BlockEnd: PtrInt;
  begin
    // compute maximum of block
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
  end;
var
  Index: PtrInt;
begin
  // split work into equally sized blocks
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  // allocate local/thread variables
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
  // compute maximum for each block
  ProcThreadPool.DoParallelLocalProc(@FindMaximumParallel,0,BlockCount-1);// Beware: unsafe type
  // compute maximum of all blocks
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);
  FreeMem(BlockMax);
end;

This was mostly copy and paste, so again this could be automated.

Example: parallel sort

The unit mtputils contains the function ParallelSortFPList which uses mtprocs to sort a TFPList in parallel. A compare function must be given.

procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);

This function uses the parallel MergeSort algorithm. The parameter MaxThreadCount is passed to DoParallel. A 0 means to use the system default.

Optionally you can provide your own sort function (OnSortPart) to sort the part of each single thread. For example you can sort the blocks via QuickSort, which are then merged. Then you have a parallel QuickSort. See TFPList.Sort for an example implementation of QuickSort.

Experimental

Since 1.0.1 there is a new DoParallelLocalProc which allows to call nested procedures of procedures. Beware that calling nested procedures is not yet supported by the compiler and therefore one has to use unsafe type casts. For example:

procedure DoSomething(Value: PtrInt);
var
  p: array[1..2] of Pointer;

  procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
  begin
    p[Index]:=Pointer(Value); // accessing local variables and parameters is possible!
  end;

var
  i: Integer;
begin
  ProcThreadPool.DoParallelLocalProc(@SubProc,1,2); // BEWARE: no type checking is done for SubProc.
                 // DoSomething must be a procedure, not a method.
end;

This can save a lot of refactoring and makes the parallel procedure much more readable.

See also