Difference between revisions of "Parallel procedures/ja"

From Free Pascal wiki
Jump to navigationJump to search
m (Fixed syntax highlighting; removed categories included in template)
 
(9 intermediate revisions by 2 users not shown)
Line 18: Line 18:
 
短い例を出します。何も役に立つことはしませんが、並列手続きの見かけと呼出し方がわかります:
 
短い例を出します。何も役に立つことはしませんが、並列手続きの見かけと呼出し方がわかります:
  
<syntaxhighlight>
+
<syntaxhighlight lang=pascal>
 
program Test;
 
program Test;
  
Line 67: Line 67:
 
* 一つ以上のスレッドがそれらの添字について並列に動作します。例えば Index が 1 から 10 までだとして、スレッドプールの中で利用できるスレッドが三つあったとすると、これら三つのスレッドがそれぞれ異なった添字について同時に動作します。一つのスレッドの呼出し(一つの添字に対する処理)が終わる毎に、そのスレッドには新たな添字が割り当てられ、スレッドはそれを実行します。結果はこんなふうになるでしょう: スレッド1が添字 3, 5, 7 を、スレッド2が 1, 6, 8, 9 を、スレッド3が 2, 4, 10 を。
 
* 一つ以上のスレッドがそれらの添字について並列に動作します。例えば Index が 1 から 10 までだとして、スレッドプールの中で利用できるスレッドが三つあったとすると、これら三つのスレッドがそれぞれ異なった添字について同時に動作します。一つのスレッドの呼出し(一つの添字に対する処理)が終わる毎に、そのスレッドには新たな添字が割り当てられ、スレッドはそれを実行します。結果はこんなふうになるでしょう: スレッド1が添字 3, 5, 7 を、スレッド2が 1, 6, 8, 9 を、スレッド3が 2, 4, 10 を。
 
* スレッド数は実行中に変化し得、最小数は保証されていません。最悪の場合、全ての添字を一つのスレッドで実行することにもなり得ます - メインスレッド自体で。
 
* スレッド数は実行中に変化し得、最小数は保証されていません。最悪の場合、全ての添字を一つのスレッドで実行することにもなり得ます - メインスレッド自体で。
* スレッドの最大数は動作中のシステムに応じた適切な値に初期化されます。<syntaxhighlight>ProcThreadPool.MaxThreadCount := 8;</syntaxhighlight> のように手動で設定することもできます。
+
* スレッドの最大数は動作中のシステムに応じた適切な値に初期化されます。次のように手動で設定することもできます。<syntaxhighlight lang=pascal>ProcThreadPool.MaxThreadCount := 8;</syntaxhighlight>
 
* スレッドの最大数は手続き毎に設定することができます。
 
* スレッドの最大数は手続き毎に設定することができます。
 
* 並列手続き/メソッドは、再帰的に並列手続き/メソッドを呼出すことが可能です。
 
* 並列手続き/メソッドは、再帰的に並列手続き/メソッドを呼出すことが可能です。
* スレッドは再利用されます。つまり、各添字に対し消滅と生成を繰り返すのではなく、プールされています。2コアのプロセッサでは、プールには二つのスレッドがあり - メインスレッドと、おまけのスレッド - 処理全体に投入されます。
+
* スレッドは再利用されます。つまり、各添字に対し消滅と生成を繰り返すのではなく、プールされています。2コアのプロセッサでは、プールには二つのスレッド - メインスレッドと、追加のスレッド - があり、処理全体に投入されます。
  
 
= オーヴァヘッド、速度低下 =
 
= オーヴァヘッド、速度低下 =
Line 89: Line 89:
 
一つの手続きに対するスレッドの最大数を、五番目の引数として指定することができます。
 
一つの手続きに対するスレッドの最大数を、五番目の引数として指定することができます。
  
<syntaxhighlight>begin
+
<syntaxhighlight lang=pascal>begin
 
   ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // アドレス、添字の開始値、添字の終了値、
 
   ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // アドレス、添字の開始値、添字の終了値、
 
       // オプション: データ(ここでは無し)、オプション: スレッドの最大数(ここでは2)
 
       // オプション: データ(ここでは無し)、オプション: スレッドの最大数(ここでは2)
Line 100: Line 100:
 
時折、ある添字の処理が、それより前の添字の処理の結果に依存する場合があります。例えば、添字 5 の計算を行い、その結果を添字 3 の結果と組み合わせて用いるような場合です。こういう時は WaitForIndex を使います:
 
時折、ある添字の処理が、それより前の添字の処理の結果に依存する場合があります。例えば、添字 5 の計算を行い、その結果を添字 3 の結果と組み合わせて用いるような場合です。こういう時は WaitForIndex を使います:
  
<syntaxhighlight>
+
<syntaxhighlight lang=pascal>
 
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
 
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
 
begin
 
begin
Line 114: Line 114:
  
 
更に拡張された関数 WaitForIndexRange があります。これは添字の全域について待ちます:
 
更に拡張された関数 WaitForIndexRange があります。これは添字の全域について待ちます:
<syntaxhighlight>if not Item.WaitForIndexRange(3,5) then exit; // 3, 4, 5を待つ</syntaxhighlight>
+
<syntaxhighlight lang=pascal>if not Item.WaitForIndexRange(3,5) then exit; // 3, 4, 5を待つ</syntaxhighlight>
  
= Exceptions =
+
= 例外 =
  
If an exception occur in one of the threads, the other threads will finish normally, but will not start a new Index. The pool waits for all threads to finish and will then raise the exception. That's why you can use ''try..except'' like always:
+
あるスレッドで例外が発生すると、他のスレッドは正常に終了するでしょうが、新たな添字の処理は行われません。スレッドプールは全スレッドの実効終了を待ち、例外を惹起させます。このため、''try..except'' 文を通常通り用いることができます:
  
<syntaxhighlight>try
+
<syntaxhighlight lang=pascal>
 +
try
 
   ...
 
   ...
 
   ProcThreadPool.DoParallel(...);
 
   ProcThreadPool.DoParallel(...);
Line 126: Line 127:
 
except
 
except
 
   On E: Exception do ...
 
   On E: Exception do ...
end;</syntaxhighlight>
+
end;
 +
</syntaxhighlight>
  
If there are multiple exceptions, only the first exception will be raised. To handle all exceptions, add a try..except inside your parallel method.
+
複数の例外が発生した場合は、最初のもののみが立ち上がります。全ての例外を処理するには、各並列手続きの中に try..except を加えてください。
  
 
= Synchronize =
 
= Synchronize =
  
When you want to call a function in the main thread, for example to update some gui element, you can use the class method ''TThread.Synchronize''. It takes as arguments the current TThread and the address of a method. Since 1.2 mtprocs provides a threadvar '''CurrentThread''', which makes synchronizing simple:
+
GUI の更新など、メインスレッドで関数を呼び出したい場合は、クラスメソッドの ''TThread.Synchronize'' を用いることができます。このメソッドは現在の TThread とメソッドのアドレスを引数としてとります。MTProc の 1.2 版から、'''CurrentThread''' というスレッド変数が提供されています。これを用いると synchronize の呼出しが容易になります:
  
<syntaxhighlight>TThread.Synchronize(CurrentThread,@YourMethod);</syntaxhighlight>
+
<syntaxhighlight lang=pascal>TThread.Synchronize(CurrentThread,@YourMethod);</syntaxhighlight>
  
This will post an event on the main event queue and wait until the main thread has executed your method. Keep in mind that a bare fpc program does not have an event queue. A LCL or fpgui program has it.
+
これで、メインイヴェント待ち行列にイヴェントをポストし、メインスレッドが指定したメソッドを実行するまで待ちます。生の FPC プログラムはイヴェント待ち行列をもたないことをよく思い出してください。イヴェント待ち行列を持つのは、LCL あるいは fpGUI プログラムです。
  
If you create your own TThread descendants, you should set the variable in your Execute method. For example:
+
TThread の子孫を自前で生成する場合は、Execute メソッドの中でこの変数に代入する必要があります。たとえば:
  
<syntaxhighlight>
+
<syntaxhighlight lang=pascal>
 
procedure TYourThread.Execute;
 
procedure TYourThread.Execute;
 
begin
 
begin
 
   CurrentThread:=Self;
 
   CurrentThread:=Self;
   ...work...
+
   ...作業...
 
end;
 
end;
 
</syntaxhighlight>
 
</syntaxhighlight>
  
= Example: Parallel loop =
+
= : 並列ループ =
  
This example explains step by step how to convert a loop into a parallel procedure. The example computes the maximum number of an integer array ''BigArray''.
+
この例では、ループを並列手続きに変換する方法をステップを追って解説します。ある整数の配列 ''BigArray'' の最大値の計算例です。
  
== The original loop ==
+
== オリジナルのループ ==
  
<syntaxhighlight>type
+
<syntaxhighlight lang=pascal>
 +
type
 
   TArrayOfInteger = array of integer;
 
   TArrayOfInteger = array of integer;
  
Line 165: Line 168:
 
     if Result<BigArray[i] then Result:=BigArray[i];
 
     if Result<BigArray[i] then Result:=BigArray[i];
 
   end;
 
   end;
end;</syntaxhighlight>
+
end;
 +
</syntaxhighlight>
  
== Splitting the work ==
+
== 作業を分割する ==
  
The work should be equally distributed over n threads. For this the BigArray is split into equally sized blocks and an outer loop runs over every block. Typically n is the number of cpus/cores in the system. MTProcs has some utility functions to compute the block size and count:
+
作業は n 個のスレッドに均等に分配されるべきです。そこで、BigArray を大きさの等しい複数のブロックに分割し、外側のループで各ブロックを舐めるようにします。典型的には n はシステムの持つ CPU やコアの数です。MTProcs にはブロックサイズと数を決めるための便利な関数があります:
  
<syntaxhighlight>function FindMaximum(BigArray: TArrayOfInteger): integer;
+
<syntaxhighlight lang=pascal>
 +
function FindMaximum(BigArray: TArrayOfInteger): integer;
 
var
 
var
 
   BlockCount, BlockSize: PtrInt;
 
   BlockCount, BlockSize: PtrInt;
Line 186: Line 191:
 
     end;
 
     end;
 
   end;
 
   end;
end;</syntaxhighlight>
+
end;
 +
</syntaxhighlight>
  
The added lines can be used for any loop. Eventually a tool can be written to automate this.
+
追加の行はいかなるループにも使えます。ツールを使って自動化してもいいでしょう。これで作業は小さな部分に分割されました。ここからは各部分をより独立させなければなりません。
  
The work is now split into smaller pieces. Now the pieces must become more independent.
+
訳注:
 +
上述のように、スレッドの切り替えには時間がかかります(プロセスを起動する程ではありませんが)。そのため、
  
== Local and shared variables ==
+
DoParallel(なんとか, 1, 10000000, nil)
  
For each used variable in the loop you must decide whether it is shared variable used by all threads or if each thread uses its own local variable.
+
のような呼出しは非常に不利です。スレッドの切り替えが0.1ミリ秒ですむ場合でも、何もしなくても1000秒かかります。従って各スレッドの中で十分長い処理を行うべきですし、しかも、全てのスレッドが同時に処理を終えると効率がよくなります。そのため、比較的小数の、互いに等しい大きさのブロックに分割すると有利です。
The shared variables BlockCount and BlockSize are only read and do not change, so no work is needed for them.
 
But a shared variable like Result will be changed by all threads. This can either be achieved with synchronization (e.g. critical section), which is slow, or each thread use a local copy and these local variable are later combined.
 
  
Here is a solution replacing the ''Result'' variable with an array, which is combined in the end:
+
ProcThreadPool は MTProcs ユニット内で宣言された変数です。MTProcs ユニットの initialization 節で生成・初期化されます。コンピュータの中で共通に用いられるものなので自分でインスタンス化する必要はなく、この変数をそのまま使えばいいわけです。
  
<syntaxhighlight>function FindMaximum(BigArray: TArrayOfInteger): integer;
+
この変数が持つメソッド CalcBlockSize は次のように三つの引数をとり、それぞれデータの大きさ、分割数、分割後の各ブロックの大きさです。
 +
 
 +
procedure TProcThreadPool.CalcBlockSize(LoopLength: PtrInt; out BlockCount, BlockSize: PtrInt; MinBlockSize: PtrInt);
 +
 
 +
コア数が 2(でハイパースレッディングのない)の機械で
 +
 
 +
ProcThreadPool.CalcBlockSize(1000, BlockCount, BlockSize);
 +
 
 +
のように呼び出すと、コア数を自動的に検出し、BlockCount には 2 が、BlockSize には 500 が代入されます。
 +
 
 +
CalcBlock は、
 +
 
 +
procedure TMultiThreadProcItem.CalcBlock(Index, BlockSize, LoopLength: PtrInt;  out BlockStart, BlockEnd: PtrInt);
 +
 
 +
と定義されていて、Index 番目のブロックが元のデータの添字のいくつからいくつまでに当るかを計算します。LoopLength、BlockSize は CalcBlockSize に渡した/から得たものを入れてください。元データの長さ (LoopLength) が  BlockCount で割り切れなかった場合は、最後のブロックの長さが BlockSize より短くなりますが、CalcBlockSize はその調整も行います。CalcBlock は全ての添字が 0 から始まることを前提としています。
 +
 
 +
== 局所変数と共有変数 ==
 +
 
 +
ループ内の各変数について、全てのスレッドから使える共有変数にするか、各スレッド用の局所変数にするかを決めなければなりません。共有変数である BlockCount と BlockSize は読み出すだけで変更しないので、これ以上の作業は不要です。しかし、Result のような共有変数は全てのスレッドによって書き換えられるでしょう。同期(つまりはクリティカルセクション)を用いてこれを実現することができますが、低速です。別の方法として、各スレッドがローカルにコピーを持っていて、最後にそれらの局所変数を組み合わせることもできます。
 +
 
 +
ここでは変数 ''Result'' を配列にしてしまい、最後に組み合わせることにします:
 +
 
 +
<syntaxhighlight lang=pascal>
 +
function FindMaximum(BigArray: TArrayOfInteger): integer;
 
var
 
var
 
   // shared variables
 
   // shared variables
Line 211: Line 239:
 
begin
 
begin
 
   ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
 
   ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
   BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // allocate space for local variables
+
   BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // 局所変数用を割り当てるための空間を用意する
   // compute maximum for each block
+
   // 各ブロックの最大値を求める
 
   for Index:=0 to BlockCount-1 do begin
 
   for Index:=0 to BlockCount-1 do begin
 
     // compute maximum of block
 
     // compute maximum of block
Line 221: Line 249:
 
     end;
 
     end;
 
   end;
 
   end;
   // compute maximum of all blocks
+
   // ブロック間の最大値を求める
   // (if you have hundreds of threads there are better solutions)
+
   // (何百ものスレッドがある場合は、もっとよい解決法がある)
 
   Result:=BlockMax[0];
 
   Result:=BlockMax[0];
 
   for Index:=1 to BlockCount-1 do
 
   for Index:=1 to BlockCount-1 do
Line 228: Line 256:
  
 
   FreeMem(BlockMax);
 
   FreeMem(BlockMax);
end;</syntaxhighlight>
+
end;
 +
</syntaxhighlight>
  
This approach is straightforward and could be automated. The process will need some hints from the programmer though.
+
このアプローチは単刀直入であり、自動化が可能ですが、それでもなおプログラマの関与がある程度必要です。
  
 
== DoParallel ==
 
== DoParallel ==
  
The final step is to move the inner loop into a sub procedure and replace the loop with a call to DoParallelLocalProc.
+
最後のステップは内側のループを手続き内手続きにもちこんで、ループを DoParallelLocalProc 呼出しで置き換えることです。
  
'''Beware''': FPC does not yet allow to define local procedure types, so an unsafe type cast must be used for this.
+
'''注意''': FPC はいまもなお局所手続き型を定義することを認めていません。そのため、危険なタイプキャストを用いる必要があります。
  
<syntaxhighlight>function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
+
<syntaxhighlight lang=pascal>
 +
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
 
var
 
var
 
   BlockCount, BlockSize: PtrInt;
 
   BlockCount, BlockSize: PtrInt;
Line 249: Line 279:
 
     BlockStart, BlockEnd: PtrInt;
 
     BlockStart, BlockEnd: PtrInt;
 
   begin
 
   begin
     // compute maximum of block
+
     // ブロック内最大値を求める
 
     Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
 
     Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
 
     BlockMax[Index]:=BigArray[BlockStart];
 
     BlockMax[Index]:=BigArray[BlockStart];
Line 258: Line 288:
 
   Index: PtrInt;
 
   Index: PtrInt;
 
begin
 
begin
   // split work into equally sized blocks
+
   // 作業を等しい大きさのブロックに分割する
 
   ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
 
   ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
   // allocate local/thread variables
+
   // 局所/スレッド変数を割り当てる
 
   BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
 
   BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
   // compute maximum for each block
+
   // 各ブロックについて最大値を求める
   ProcThreadPool.DoParallelLocalProc(@FindMaximumParallel,0,BlockCount-1);// Beware: unsafe type
+
   ProcThreadPool.DoParallelLocalProc(@FindMaximumParallel,0,BlockCount-1);// 注意: 安全ではないタイプキャスト
   // compute maximum of all blocks
+
   // 全ブロックの最大値を求める
 
   Result:=BlockMax[0];
 
   Result:=BlockMax[0];
 
   for Index:=1 to BlockCount-1 do
 
   for Index:=1 to BlockCount-1 do
 
     Result:=Max(Result,BlockMax[Index]);
 
     Result:=Max(Result,BlockMax[Index]);
 
   FreeMem(BlockMax);
 
   FreeMem(BlockMax);
end;</syntaxhighlight>
+
end;
 +
</syntaxhighlight>
  
This was mostly copy and paste, so again this could be automated.
+
ほとんどがコピペ作業であり、これまた自動化が可能でしょう。
  
=Example: parallel sort=
+
= : 並列ソート =
The unit mtputils contains the function '''ParallelSortFPList''' which uses mtprocs to sort a TFPList in parallel. A compare function must be given.
+
ユニット MTPutils には '''ParallelSortFPList''' 関数があります。これは MTProcs を用いて TFPList を並列にソートします。比較用の関数を与える必要があります。
  
<syntaxhighlight>procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);</syntaxhighlight>
+
<syntaxhighlight lang=pascal>
 +
procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare;  
 +
                            MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);
 +
</syntaxhighlight>
  
This function uses the parallel MergeSort algorithm. The parameter MaxThreadCount is passed to DoParallel. A 0 means to use the system default.
+
この関数は並列マージソートのアルゴリズムを用いています。引数 MaxThreadCount DoParallel に渡されます。0 はシステム既定値を用いることを意味します。
  
Optionally you can provide your own sort function (OnSortPart) to sort the part of each single thread. For example you can sort the blocks via QuickSort, which are then merged. Then you have a '''parallel QuickSort'''. See TFPList.Sort for an example implementation of QuickSort.
+
オプションとして、各スレッドでソートを行うための自前のソート用関数 (OnSortPart) を与えることができます。例えば、各ブロックをクイックソートして、それらをマージすることもできます。そうすれば、'''並列クイックソート'''を得ることになります。クイックソートの実装例は TFPList.Sort をご覧下さい。
  
=Experimental=
+
= 実験的 =
  
Since 1.0.1 there is a new DoParallelLocalProc which allows to call nested procedures of procedures. Beware that calling nested procedures is not yet supported by the compiler and therefore one has to use unsafe type casts. For example:
+
1.0.1以降、入れ子になった手続きを呼び出せる新たな DoParallelLocalProc が用意されました。コンパイラはいまだに入れ子になった手続きの呼出しをサポートしていないので、危険なタイプキャストが必要なことに注意してください。例えば:
  
<syntaxhighlight>procedure DoSomething(Value: PtrInt);
+
<syntaxhighlight lang=pascal>procedure DoSomething(Value: PtrInt);
 
var
 
var
 
   p: array[1..2] of Pointer;
 
   p: array[1..2] of Pointer;
Line 292: Line 326:
 
   procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
 
   procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
 
   begin
 
   begin
     p[Index]:=Pointer(Value); // accessing local variables and parameters is possible!
+
     p[Index]:=Pointer(Value); // 局所変数と引数へのアクセスが可能に!
 
   end;
 
   end;
  
Line 298: Line 332:
 
   i: Integer;
 
   i: Integer;
 
begin
 
begin
   ProcThreadPool.DoParallelLocalProc(@SubProc,1,2); // BEWARE: no type checking is done for SubProc.
+
   ProcThreadPool.DoParallelLocalProc(@SubProc,1,2); // 注意: SubProcの型チェックは行われない
                 // DoSomething must be a procedure, not a method.
+
                 // DoSomething は手続き。メソッドは不可。
end;</syntaxhighlight>
+
end;
 +
</syntaxhighlight>
  
This can save a lot of refactoring and makes the parallel procedure much more readable.
 
  
=See also=
+
これで沢山のリファクタリングが不要となり、並列手続きの可読性が上がります。
 +
 
 +
= 関連項目 =
  
 
* [[Multithreaded Application Tutorial/ja]]
 
* [[Multithreaded Application Tutorial/ja]]
 
* [[OpenCL]]
 
* [[OpenCL]]
 
[[Category:Components]]
 
[[Category:Parallel programming]]
 
[[Category:Multitasking]]
 
[[Category:Lazarus-CCR]]
 

Latest revision as of 05:51, 23 February 2020

Deutsch (de) English (en) 日本語 (ja) русский (ru)

概略

このページでは、MTProcs ユニットを用いて一つの手続きを並列動作させる方法を記述します。これを用いると手続きの並列動作が簡単になり、並列処理アルゴリズムの実装が容易になります。

並列手続き/並列メソッドは並列処理アルゴリズムにしばしば見られ、これを最初からサポートしているプログラミング言語もあります(例えば gcc の OpenMP) 。FPC にこの種の言語機能を加えるプランについては、OpenMP support をご覧下さい。言語にこの種の機能を加えると、プログラムの長さが短くなり、オーヴァヘッドが減るようなコードをコンパイラが生成できるようにできるかもしれません。一方、ある単一スレッドプログラムの中の部分的なコードを並列処理コードに変換するためには沢山の方法があります。単純なアプローチでは速度低下をもたらすことがしばしばなのです。良い結果を得るには、コンパイラが伺い知れないいくつかのパラメタを指定する必要があります。例えば、OpenMP および OpenCL にある膨大な設定例と議論をご覧下さい。並列処理アルゴリズムが必要なら、MTProcs が実装の役に立ちます。

MTProcs を入手する

mtprocs.pas ユニットは、multithreadprocslaz.lpk パッケージの中にあります。このパッケージは svn から得られます:

svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs

例によって: IDEで一度 multithreadprocslaz.lpk を開きます。これで IDE はパッケージのパスを覚えます。プロジェクトの中でパッケージを使うには: IDE Menu / Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project とするか、プロジェクトインスペクタから追加します。

簡単な例

短い例を出します。何も役に立つことはしませんが、並列手続きの見かけと呼出し方がわかります:

program Test;

{$mode objfpc}{$H+}

uses
  {$IFDEF UNIX}
  cthreads, cmem,
  {$ENDIF}
  MTProcs;

// 簡単な並列手続き
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
  i: Integer;
begin
  writeln(Index);
  for i:=1 to Index*1000000 do ; // 何かする
end;

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil) // アドレス、開始する添字、終了する添字、オプションのデータ
end.

出力はこんな感じになるでしょう:

2
3
1
4
5

短い注意書きをいくつか。詳細は後述します。

  • Multithreaded Application Tutorial/ja で述べられているように、UNIXでのマルチスレッディングには cthreads ユニットが必要です。
  • 実行速度の面から、cmem ヒープマネジャの利用を勧めます。ただしこの例では差はでません。
  • 並列手続き DoSomethingParallel がとる引数は個数が固定であり、予め定義されています。
  • Index で、この呼出しが受け持つべき作業部分が決まります。
  • Data はポインタで、ProcThreadPool.DoParallel の四番目の引数として与えられたものです。このパラメタはオプションであり、何に使ってもかまいませんし、利用しなくてもかまいません。
  • Item はスレッドプールの持つ、より高度な特徴にアクセスするために使います。
  • ProcThreadPool.DoParallel は通常の手続き呼出しのように働きます。最後まで実行したとき - つまり、全スレッドが作業を完了したら -

リターンします。

  • この出力は、マルチスレッドに典型的な振る舞いを示します: 呼出しの順序は決まっていません。順序は実行する毎に変わり得ます。

特徴

手続きの並列実行とはこういうことです:

  • ある手続きまたはメソッドを、任意に設定した StartIndex から EndIndex までの間の Index(添字)について実行します。
  • 一つ以上のスレッドがそれらの添字について並列に動作します。例えば Index が 1 から 10 までだとして、スレッドプールの中で利用できるスレッドが三つあったとすると、これら三つのスレッドがそれぞれ異なった添字について同時に動作します。一つのスレッドの呼出し(一つの添字に対する処理)が終わる毎に、そのスレッドには新たな添字が割り当てられ、スレッドはそれを実行します。結果はこんなふうになるでしょう: スレッド1が添字 3, 5, 7 を、スレッド2が 1, 6, 8, 9 を、スレッド3が 2, 4, 10 を。
  • スレッド数は実行中に変化し得、最小数は保証されていません。最悪の場合、全ての添字を一つのスレッドで実行することにもなり得ます - メインスレッド自体で。
  • スレッドの最大数は動作中のシステムに応じた適切な値に初期化されます。次のように手動で設定することもできます。
    ProcThreadPool.MaxThreadCount := 8;
    
  • スレッドの最大数は手続き毎に設定することができます。
  • 並列手続き/メソッドは、再帰的に並列手続き/メソッドを呼出すことが可能です。
  • スレッドは再利用されます。つまり、各添字に対し消滅と生成を繰り返すのではなく、プールされています。2コアのプロセッサでは、プールには二つのスレッド - メインスレッドと、追加のスレッド - があり、処理全体に投入されます。

オーヴァヘッド、速度低下

オーヴァヘッドは、システムに強く依存しています(コアの数と種類、共有メモリの種類、クリティカルセクションの実行速度、キャッシュの大きさ)。一般的なヒントとしては:

  • 作業の各チャンク(添字)には、少なくとも数ミリ秒かかります。
  • オーヴァヘッドは再帰の深さとは関係がありません。

マルチスレッディングのオーヴァヘッドは、単純にこんにちのコンピュータのアーキテクチャによるものであり、MTProcs ユニットとは無関係です:

  • スレッドを一つ生成してプログラムがマルチスレッドになった途端に、メモリマネジャはクリティカルセクションを使わざるをえなくなります。ここで実行速度が低下します。そのスレッドでなにもしなくても、プログラムの実行速度は低下するでしょう。
  • いくつかのシステムが持つ cmem ヒープマネジャはマルチスレッドでははるかに高速です。私が Intel CPU を用いた OS X で試みたベンチマークでは、速度は10倍も違いました。
  • 文字列とインターフェースはグローバルな参照カウンタを持っています(訳注: どこからも参照されなくなったら自動的に dispose できるように)。アクセスする毎にクリティカルセクションが必要になります。文字列を複数のスレッドで処理すると、大幅な速度低下を齎します。代わりに PChar を使ってください。
  • 作業の各チャンク(添字)はそれぞれメモリ上の離れた部分を処理するようにすべきです。そうしないとキャッシュのアップデートが交錯します。
  • 広過ぎるメモリ領域を扱わないでください。システムによっては、スレッド一つでメモリバスの速度を使い切ってしまいます。メモリバスが最大速度に達すると、それ以上スレッドを立ち上げても速度は向上せず、むしろ低下します。

手続きに用いるスレッドの最大数を設定する

一つの手続きに対するスレッドの最大数を、五番目の引数として指定することができます。

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // アドレス、添字の開始値、添字の終了値、
      // オプション: データ(ここでは無し)、オプション: スレッドの最大数(ここでは2)
end.

これは、複数のスレッドが同一のデータに対し作用するときに有用です。スレッド数が多過ぎると、キャッシュの衝突が増加し、各スレッドの実行速度が低下します。あるいは、アルゴリズムが WaitForIndex を多用する場合、実際に動作しているスレッドは少ないということもありえます。余ったスレッドは他のタスク向けにできるはずのものです。

添字を待つ / 順番に実行する

時折、ある添字の処理が、それより前の添字の処理の結果に依存する場合があります。例えば、添字 5 の計算を行い、その結果を添字 3 の結果と組み合わせて用いるような場合です。こういう時は WaitForIndex を使います:

procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
  ... 添字 'Index' の処理を行う ...
  if Index=5 then begin
    if not Item.WaitForIndex(3) then exit;
    ... 計算処理 ...
  end;
end;

WaitForIndex は添字を引数としてとります。その添字は、現在の添字あるいは範囲よりも小さい値です。実行が上手くいっている場合は true を返します。戻り値が false となった場合は、他のスレッドのどれかで例外が発生しています。

更に拡張された関数 WaitForIndexRange があります。これは添字の全域について待ちます:

if not Item.WaitForIndexRange(3,5) then exit; // 3, 4, 5を待つ

例外

あるスレッドで例外が発生すると、他のスレッドは正常に終了するでしょうが、新たな添字の処理は行われません。スレッドプールは全スレッドの実効終了を待ち、例外を惹起させます。このため、try..except 文を通常通り用いることができます:

try
  ...
  ProcThreadPool.DoParallel(...);
  ...
except
  On E: Exception do ...
end;

複数の例外が発生した場合は、最初のもののみが立ち上がります。全ての例外を処理するには、各並列手続きの中に try..except を加えてください。

Synchronize

GUI の更新など、メインスレッドで関数を呼び出したい場合は、クラスメソッドの TThread.Synchronize を用いることができます。このメソッドは現在の TThread とメソッドのアドレスを引数としてとります。MTProc の 1.2 版から、CurrentThread というスレッド変数が提供されています。これを用いると synchronize の呼出しが容易になります:

TThread.Synchronize(CurrentThread,@YourMethod);

これで、メインイヴェント待ち行列にイヴェントをポストし、メインスレッドが指定したメソッドを実行するまで待ちます。生の FPC プログラムはイヴェント待ち行列をもたないことをよく思い出してください。イヴェント待ち行列を持つのは、LCL あるいは fpGUI プログラムです。

TThread の子孫を自前で生成する場合は、Execute メソッドの中でこの変数に代入する必要があります。たとえば:

procedure TYourThread.Execute;
begin
  CurrentThread:=Self;
  ...作業...
end;

例: 並列ループ

この例では、ループを並列手続きに変換する方法をステップを追って解説します。ある整数の配列 BigArray の最大値の計算例です。

オリジナルのループ

type
  TArrayOfInteger = array of integer;

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  i: PtrInt;
begin
  Result:=BigArray[0];
  for i:=1 to length(BigArray)-1 do begin
    if Result<BigArray[i] then Result:=BigArray[i];
  end;
end;

作業を分割する

作業は n 個のスレッドに均等に分配されるべきです。そこで、BigArray を大きさの等しい複数のブロックに分割し、外側のループで各ブロックを舐めるようにします。典型的には n はシステムの持つ CPU やコアの数です。MTProcs にはブロックサイズと数を決めるための便利な関数があります:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  Result:=BigArray[0];
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  for Index:=0 to BlockCount-1 do begin
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    for i:=BlockStart to BlockEnd do begin
      if Result<BigArray[i] then Result:=BigArray[i];
    end;
  end;
end;

追加の行はいかなるループにも使えます。ツールを使って自動化してもいいでしょう。これで作業は小さな部分に分割されました。ここからは各部分をより独立させなければなりません。

訳注: 上述のように、スレッドの切り替えには時間がかかります(プロセスを起動する程ではありませんが)。そのため、

DoParallel(なんとか, 1, 10000000, nil)

のような呼出しは非常に不利です。スレッドの切り替えが0.1ミリ秒ですむ場合でも、何もしなくても1000秒かかります。従って各スレッドの中で十分長い処理を行うべきですし、しかも、全てのスレッドが同時に処理を終えると効率がよくなります。そのため、比較的小数の、互いに等しい大きさのブロックに分割すると有利です。

ProcThreadPool は MTProcs ユニット内で宣言された変数です。MTProcs ユニットの initialization 節で生成・初期化されます。コンピュータの中で共通に用いられるものなので自分でインスタンス化する必要はなく、この変数をそのまま使えばいいわけです。

この変数が持つメソッド CalcBlockSize は次のように三つの引数をとり、それぞれデータの大きさ、分割数、分割後の各ブロックの大きさです。

procedure TProcThreadPool.CalcBlockSize(LoopLength: PtrInt; out BlockCount, BlockSize: PtrInt; MinBlockSize: PtrInt);

コア数が 2(でハイパースレッディングのない)の機械で

ProcThreadPool.CalcBlockSize(1000, BlockCount, BlockSize);

のように呼び出すと、コア数を自動的に検出し、BlockCount には 2 が、BlockSize には 500 が代入されます。

CalcBlock は、

procedure TMultiThreadProcItem.CalcBlock(Index, BlockSize, LoopLength: PtrInt; out BlockStart, BlockEnd: PtrInt);

と定義されていて、Index 番目のブロックが元のデータの添字のいくつからいくつまでに当るかを計算します。LoopLength、BlockSize は CalcBlockSize に渡した/から得たものを入れてください。元データの長さ (LoopLength) が BlockCount で割り切れなかった場合は、最後のブロックの長さが BlockSize より短くなりますが、CalcBlockSize はその調整も行います。CalcBlock は全ての添字が 0 から始まることを前提としています。

局所変数と共有変数

ループ内の各変数について、全てのスレッドから使える共有変数にするか、各スレッド用の局所変数にするかを決めなければなりません。共有変数である BlockCount と BlockSize は読み出すだけで変更しないので、これ以上の作業は不要です。しかし、Result のような共有変数は全てのスレッドによって書き換えられるでしょう。同期(つまりはクリティカルセクション)を用いてこれを実現することができますが、低速です。別の方法として、各スレッドがローカルにコピーを持っていて、最後にそれらの局所変数を組み合わせることもできます。

ここでは変数 Result を配列にしてしまい、最後に組み合わせることにします:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  // shared variables
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;
  // local variables
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // 局所変数用を割り当てるための空間を用意する
  // 各ブロックの最大値を求める
  for Index:=0 to BlockCount-1 do begin
    // compute maximum of block
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do begin
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
    end;
  end;
  // ブロック間の最大値を求める
  // (何百ものスレッドがある場合は、もっとよい解決法がある)
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);

  FreeMem(BlockMax);
end;

このアプローチは単刀直入であり、自動化が可能ですが、それでもなおプログラマの関与がある程度必要です。

DoParallel

最後のステップは内側のループを手続き内手続きにもちこんで、ループを DoParallelLocalProc 呼出しで置き換えることです。

注意: FPC はいまもなお局所手続き型を定義することを認めていません。そのため、危険なタイプキャストを用いる必要があります。

function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;

  procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
                                Item: TMultiThreadProcItem);
  var
    i: integer;
    BlockStart, BlockEnd: PtrInt;
  begin
    // ブロック内最大値を求める
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
  end;
var
  Index: PtrInt;
begin
  // 作業を等しい大きさのブロックに分割する
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  // 局所/スレッド変数を割り当てる
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
  // 各ブロックについて最大値を求める
  ProcThreadPool.DoParallelLocalProc(@FindMaximumParallel,0,BlockCount-1);// 注意: 安全ではないタイプキャスト
  // 全ブロックの最大値を求める
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);
  FreeMem(BlockMax);
end;

ほとんどがコピペ作業であり、これまた自動化が可能でしょう。

例: 並列ソート

ユニット MTPutils には ParallelSortFPList 関数があります。これは MTProcs を用いて TFPList を並列にソートします。比較用の関数を与える必要があります。

procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; 
                             MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);

この関数は並列マージソートのアルゴリズムを用いています。引数 MaxThreadCount は DoParallel に渡されます。0 はシステム既定値を用いることを意味します。

オプションとして、各スレッドでソートを行うための自前のソート用関数 (OnSortPart) を与えることができます。例えば、各ブロックをクイックソートして、それらをマージすることもできます。そうすれば、並列クイックソートを得ることになります。クイックソートの実装例は TFPList.Sort をご覧下さい。

実験的

1.0.1以降、入れ子になった手続きを呼び出せる新たな DoParallelLocalProc が用意されました。コンパイラはいまだに入れ子になった手続きの呼出しをサポートしていないので、危険なタイプキャストが必要なことに注意してください。例えば:

procedure DoSomething(Value: PtrInt);
var
  p: array[1..2] of Pointer;

  procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
  begin
    p[Index]:=Pointer(Value); // 局所変数と引数へのアクセスが可能に!
  end;

var
  i: Integer;
begin
  ProcThreadPool.DoParallelLocalProc(@SubProc,1,2); // 注意: SubProcの型チェックは行われない
                 // DoSomething は手続き。メソッドは不可。
end;


これで沢山のリファクタリングが不要となり、並列手続きの可読性が上がります。

関連項目