Parallel procedures/ja

From Free Pascal wiki
Jump to: navigation, search

Deutsch (de) English (en) français (fr) 日本語 (ja) русский (ru)

概略

このページでは、MTProcs ユニットを用いて一つの手続きを並列動作させる方法を記述します。これを用いると手続きの並列動作が簡単になり、並列処理アルゴリズムの実装が容易になります。

並列手続き/並列メソッドは並列処理アルゴリズムにしばしば見られ、これを最初からサポートしているプログラミング言語もあります(例えば gcc の OpenMP) 。FPC にこの種の言語機能を加えるプランについては、OpenMP support をご覧下さい。言語にこの種の機能を加えると、プログラムの長さが短くなり、オーヴァヘッドが減るようなコードをコンパイラが生成できるようにできるかもしれません。一方、ある単一スレッドプログラムの中の部分的なコードを並列処理コードに変換するためには沢山の方法があります。単純なアプローチでは速度低下をもたらすことがしばしばなのです。良い結果を得るには、コンパイラが伺い知れないいくつかのパラメタを指定する必要があります。例えば、OpenMP および OpenCL にある膨大な設定例と議論をご覧下さい。並列処理アルゴリズムが必要なら、MTProcs が実装の役に立ちます。

MTProcs を入手する

mtprocs.pas ユニットは、multithreadprocslaz.lpk パッケージの中にあります。このパッケージは svn から得られます:

svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs

例によって: IDEで一度 multithreadprocslaz.lpk を開きます。これで IDE はパッケージのパスを覚えます。プロジェクトの中でパッケージを使うには: IDE Menu / Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project とするか、プロジェクトインスペクタから追加します。

簡単な例

短い例を出します。何も役に立つことはしませんが、並列手続きの見かけと呼出し方がわかります:

program Test;
 
{$mode objfpc}{$H+}
 
uses
  {$IFDEF UNIX}
  cthreads, cmem,
  {$ENDIF}
  MTProcs;
 
// 簡単な並列手続き
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
  i: Integer;
begin
  writeln(Index);
  for i:=1 to Index*1000000 do ; // 何かする
end;
 
begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil) // アドレス、開始する添字、終了する添字、オプションのデータ
end.

出力はこんな感じになるでしょう:

2
3
1
4
5

短い注意書きをいくつか。詳細は後述します。

  • Multithreaded Application Tutorial/ja で述べられているように、UNIXでのマルチスレッディングには cthreads ユニットが必要です。
  • 実行速度の面から、cmem ヒープマネジャの利用を勧めます。ただしこの例では差はでません。
  • 並列手続き DoSomethingParallel がとる引数は個数が固定であり、予め定義されています。
  • Index で、この呼出しが受け持つべき作業部分が決まります。
  • Data はポインタで、ProcThreadPool.DoParallel の四番目の引数として与えられたものです。このパラメタはオプションであり、何に使ってもかまいませんし、利用しなくてもかまいません。
  • Item はスレッドプールの持つ、より高度な特徴にアクセスするために使います。
  • ProcThreadPool.DoParallel は通常の手続き呼出しのように働きます。最後まで実行したとき - つまり、全スレッドが作業を完了したら -

リターンします。

  • この出力は、マルチスレッドに典型的な振る舞いを示します: 呼出しの順序は決まっていません。順序は実行する毎に変わり得ます。

特徴

手続きの並列実行とはこういうことです:

  • ある手続きまたはメソッドを、任意に設定した StartIndex から EndIndex までの間の Index(添字)について実行します。
  • 一つ以上のスレッドがそれらの添字について並列に動作します。例えば Index が 1 から 10 までだとして、スレッドプールの中で利用できるスレッドが三つあったとすると、これら三つのスレッドがそれぞれ異なった添字について同時に動作します。一つのスレッドの呼出し(一つの添字に対する処理)が終わる毎に、そのスレッドには新たな添字が割り当てられ、スレッドはそれを実行します。結果はこんなふうになるでしょう: スレッド1が添字 3, 5, 7 を、スレッド2が 1, 6, 8, 9 を、スレッド3が 2, 4, 10 を。
  • スレッド数は実行中に変化し得、最小数は保証されていません。最悪の場合、全ての添字を一つのスレッドで実行することにもなり得ます - メインスレッド自体で。
  • スレッドの最大数は動作中のシステムに応じた適切な値に初期化されます。次のように手動で設定することもできます。
    ProcThreadPool.MaxThreadCount := 8;
  • スレッドの最大数は手続き毎に設定することができます。
  • 並列手続き/メソッドは、再帰的に並列手続き/メソッドを呼出すことが可能です。
  • スレッドは再利用されます。つまり、各添字に対し消滅と生成を繰り返すのではなく、プールされています。2コアのプロセッサでは、プールには二つのスレッド - メインスレッドと、追加のスレッド - があり、処理全体に投入されます。

オーヴァヘッド、速度低下

オーヴァヘッドは、システムに強く依存しています(コアの数と種類、共有メモリの種類、クリティカルセクションの実行速度、キャッシュの大きさ)。一般的なヒントとしては:

  • 作業の各チャンク(添字)には、少なくとも数ミリ秒かかります。
  • オーヴァヘッドは再帰の深さとは関係がありません。

マルチスレッディングのオーヴァヘッドは、単純にこんにちのコンピュータのアーキテクチャによるものであり、MTProcs ユニットとは無関係です:

  • スレッドを一つ生成してプログラムがマルチスレッドになった途端に、メモリマネジャはクリティカルセクションを使わざるをえなくなります。ここで実行速度が低下します。そのスレッドでなにもしなくても、プログラムの実行速度は低下するでしょう。
  • いくつかのシステムが持つ cmem ヒープマネジャはマルチスレッドでははるかに高速です。私が Intel CPU を用いた OS X で試みたベンチマークでは、速度は10倍も違いました。
  • 文字列とインターフェースはグローバルな参照カウンタを持っています(訳注: どこからも参照されなくなったら自動的に dispose できるように)。アクセスする毎にクリティカルセクションが必要になります。文字列を複数のスレッドで処理すると、大幅な速度低下を齎します。代わりに PChar を使ってください。
  • 作業の各チャンク(添字)はそれぞれメモリ上の離れた部分を処理するようにすべきです。そうしないとキャッシュのアップデートが交錯します。
  • 広過ぎるメモリ領域を扱わないでください。システムによっては、スレッド一つでメモリバスの速度を使い切ってしまいます。メモリバスが最大速度に達すると、それ以上スレッドを立ち上げても速度は向上せず、むしろ低下します。

手続きに用いるスレッドの最大数を設定する

一つの手続きに対するスレッドの最大数を、五番目の引数として指定することができます。

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // アドレス、添字の開始値、添字の終了値、
      // オプション: データ(ここでは無し)、オプション: スレッドの最大数(ここでは2)
end.

これは、複数のスレッドが同一のデータに対し作用するときに有用です。スレッド数が多過ぎると、キャッシュの衝突が増加し、各スレッドの実行速度が低下します。あるいは、アルゴリズムが WaitForIndex を多用する場合、実際に動作しているスレッドは少ないということもありえます。余ったスレッドは他のタスク向けにできるはずのものです。

添字を待つ / 順番に実行する

時折、ある添字の処理が、それより前の添字の処理の結果に依存する場合があります。例えば、添字 5 の計算を行い、その結果を添字 3 の結果と組み合わせて用いるような場合です。こういう時は WaitForIndex を使います:

procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
  ... 添字 'Index' の処理を行う ...
  if Index=5 then begin
    if not Item.WaitForIndex(3) then exit;
    ... 計算処理 ...
  end;
end;

WaitForIndex は添字を引数としてとります。その添字は、現在の添字あるいは範囲よりも小さい値です。実行が上手くいっている場合は true を返します。戻り値が false となった場合は、他のスレッドのどれかで例外が発生しています。

更に拡張された関数 WaitForIndexRange があります。これは添字の全域について待ちます:

if not Item.WaitForIndexRange(3,5) then exit; // 3, 4, 5を待つ

例外

あるスレッドで例外が発生すると、他のスレッドは正常に終了するでしょうが、新たな添字の処理は行われません。スレッドプールは全スレッドの実効終了を待ち、例外を惹起させます。このため、try..except 文を通常通り用いることができます:

try
  ...
  ProcThreadPool.DoParallel(...);
  ...
except
  On E: Exception do ...
end;

複数の例外が発生した場合は、最初のもののみが立ち上がります。全ての例外を処理するには、各並列手続きの中に try..except を加えてください。

Synchronize

GUI の更新など、メインスレッドで関数を呼び出したい場合は、クラスメソッドの TThread.Synchronize を用いることができます。このメソッドは現在の TThread とメソッドのアドレスを引数としてとります。MTProc の 1.2 版から、CurrentThread というスレッド変数が提供されています。これを用いると synchronize の呼出しが容易になります:

TThread.Synchronize(CurrentThread,@YourMethod);

これで、メインイヴェント待ち行列にイヴェントをポストし、メインスレッドが指定したメソッドを実行するまで待ちます。生の FPC プログラムはイヴェント待ち行列をもたないことをよく思い出してください。イヴェント待ち行列を持つのは、LCL あるいは fpGUI プログラムです。

TThread の子孫を自前で生成する場合は、Execute メソッドの中でこの変数に代入する必要があります。たとえば:

procedure TYourThread.Execute;
begin
  CurrentThread:=Self;
  ...作業...
end;

例: 並列ループ

この例では、ループを並列手続きに変換する方法をステップを追って解説します。ある整数の配列 BigArray の最大値の計算例です。

オリジナルのループ

type
  TArrayOfInteger = array of integer;
 
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  i: PtrInt;
begin
  Result:=BigArray[0];
  for i:=1 to length(BigArray)-1 do begin
    if Result<BigArray[i] then Result:=BigArray[i];
  end;
end;

作業を分割する

作業は n 個のスレッドに均等に分配されるべきです。そこで、BigArray を大きさの等しい複数のブロックに分割し、外側のループで各ブロックを舐めるようにします。典型的には n はシステムの持つ CPU やコアの数です。MTProcs にはブロックサイズと数を決めるための便利な関数があります:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  Result:=BigArray[0];
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  for Index:=0 to BlockCount-1 do begin
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    for i:=BlockStart to BlockEnd do begin
      if Result<BigArray[i] then Result:=BigArray[i];
    end;
  end;
end;

追加の行はいかなるループにも使えます。ツールを使って自動化してもいいでしょう。これで作業は小さな部分に分割されました。ここからは各部分をより独立させなければなりません。

訳注: 上述のように、スレッドの切り替えには時間がかかります(プロセスを起動する程ではありませんが)。そのため、

DoParallel(なんとか, 1, 10000000, nil)

のような呼出しは非常に不利です。スレッドの切り替えが0.1ミリ秒ですむ場合でも、何もしなくても1000秒かかります。従って各スレッドの中で十分長い処理を行うべきですし、しかも、全てのスレッドが同時に処理を終えると効率がよくなります。そのため、比較的小数の、互いに等しい大きさのブロックに分割すると有利です。

ProcThreadPool は MTProcs ユニット内で宣言された変数です。MTProcs ユニットの initialization 節で生成・初期化されます。コンピュータの中で共通に用いられるものなので自分でインスタンス化する必要はなく、この変数をそのまま使えばいいわけです。

この変数が持つメソッド CalcBlockSize は次のように三つの引数をとり、それぞれデータの大きさ、分割数、分割後の各ブロックの大きさです。

procedure TProcThreadPool.CalcBlockSize(LoopLength: PtrInt; out BlockCount, BlockSize: PtrInt; MinBlockSize: PtrInt);

コア数が 2(でハイパースレッディングのない)の機械で

ProcThreadPool.CalcBlockSize(1000, BlockCount, BlockSize);

のように呼び出すと、コア数を自動的に検出し、BlockCount には 2 が、BlockSize には 500 が代入されます。

CalcBlock は、

procedure TMultiThreadProcItem.CalcBlock(Index, BlockSize, LoopLength: PtrInt; out BlockStart, BlockEnd: PtrInt);

と定義されていて、Index 番目のブロックが元のデータの添字のいくつからいくつまでに当るかを計算します。LoopLength、BlockSize は CalcBlockSize に渡した/から得たものを入れてください。元データの長さ (LoopLength) が BlockCount で割り切れなかった場合は、最後のブロックの長さが BlockSize より短くなりますが、CalcBlockSize はその調整も行います。CalcBlock は全ての添字が 0 から始まることを前提としています。

局所変数と共有変数

ループ内の各変数について、全てのスレッドから使える共有変数にするか、各スレッド用の局所変数にするかを決めなければなりません。共有変数である BlockCount と BlockSize は読み出すだけで変更しないので、これ以上の作業は不要です。しかし、Result のような共有変数は全てのスレッドによって書き換えられるでしょう。同期(つまりはクリティカルセクション)を用いてこれを実現することができますが、低速です。別の方法として、各スレッドがローカルにコピーを持っていて、最後にそれらの局所変数を組み合わせることもできます。

ここでは変数 Result を配列にしてしまい、最後に組み合わせることにします:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  // shared variables
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;
  // local variables
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // 局所変数用を割り当てるための空間を用意する
  // 各ブロックの最大値を求める
  for Index:=0 to BlockCount-1 do begin
    // compute maximum of block
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do begin
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
    end;
  end;
  // ブロック間の最大値を求める
  // (何百ものスレッドがある場合は、もっとよい解決法がある)
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);
 
  FreeMem(BlockMax);
end;

このアプローチは単刀直入であり、自動化が可能ですが、それでもなおプログラマの関与がある程度必要です。

DoParallel

最後のステップは内側のループを手続き内手続きにもちこんで、ループを DoParallelLocalProc 呼出しで置き換えることです。

注意: FPC はいまもなお局所手続き型を定義することを認めていません。そのため、危険なタイプキャストを用いる必要があります。

function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;
 
  procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
                                Item: TMultiThreadProcItem);
  var
    i: integer;
    BlockStart, BlockEnd: PtrInt;
  begin
    // ブロック内最大値を求める
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
  end;
var
  Index: PtrInt;
begin
  // 作業を等しい大きさのブロックに分割する
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  // 局所/スレッド変数を割り当てる
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
  // 各ブロックについて最大値を求める
  ProcThreadPool.DoParallelLocalProc(@FindMaximumParallel,0,BlockCount-1);// 注意: 安全ではないタイプキャスト
  // 全ブロックの最大値を求める
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);
  FreeMem(BlockMax);
end;

ほとんどがコピペ作業であり、これまた自動化が可能でしょう。

例: 並列ソート

ユニット MTPutils には ParallelSortFPList 関数があります。これは MTProcs を用いて TFPList を並列にソートします。比較用の関数を与える必要があります。

procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; 
                             MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);

この関数は並列マージソートのアルゴリズムを用いています。引数 MaxThreadCount は DoParallel に渡されます。0 はシステム既定値を用いることを意味します。

オプションとして、各スレッドでソートを行うための自前のソート用関数 (OnSortPart) を与えることができます。例えば、各ブロックをクイックソートして、それらをマージすることもできます。そうすれば、並列クイックソートを得ることになります。クイックソートの実装例は TFPList.Sort をご覧下さい。

実験的

1.0.1以降、入れ子になった手続きを呼び出せる新たな DoParallelLocalProc が用意されました。コンパイラはいまだに入れ子になった手続きの呼出しをサポートしていないので、危険なタイプキャストが必要なことに注意してください。例えば:

procedure DoSomething(Value: PtrInt);
var
  p: array[1..2] of Pointer;
 
  procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
  begin
    p[Index]:=Pointer(Value); // 局所変数と引数へのアクセスが可能に!
  end;
 
var
  i: Integer;
begin
  ProcThreadPool.DoParallelLocalProc(@SubProc,1,2); // 注意: SubProcの型チェックは行われない
                 // DoSomething は手続き。メソッドは不可。
end;


これで沢山のリファクタリングが不要となり、並列手続きの可読性が上がります。

関連項目