Example of multi-threaded application: array of threads/ja

From Free Pascal wiki
Jump to: navigation, search

English (en) español (es) 日本語 (ja)

ここで、私は多数のスレッドを生成して、それらが作業を終えるのを待つ例を示そうと思います(同期は必要としていません)。これを書く気になったのは、Multithreaded Application Tutorial/jaを読んでも、その種のプログラムを書く方法がはっきり判らなかったからです。私が書いているのはOS X用のアプリケーションですが、ここに出てきたコードはどんなシステムでも動作するでしょう。

次のようなループを考えてみましょう:

...
for i := 1 to n do power(i, 0.5)
...

このループは、冪乗を一つづつ n 回計算しています。スレッドを使って同じことを並列に処理させてみましょう。

マルチスレッドは必要なの?

現代のマルチコアコンピュータでは、マルチスレッドを利用すればパフォーマンスが劇的に改善する場合があります。しかしながら、現代のコンピュータは極めて高速であり、スレッド化したコードは逐次処理に比べ、しばしばデバグと保守が困難です。処理時間の短縮とマルチスレッドプログラミングの複雑さとを天秤に掛け、またアルゴリズムが並列処理に適しているかよく考えてみる必要があります。最後に、並列処理の中にも、CPUスレッドの利用が適している場合(この例)とGPUの利用が適している場合があることを知っていてください(後者の場合、OpenCLのようなツールが最適です)。

メモリのマネジメント

複数のスレッドが同時に動作するため、それらがメモリを獲り合わないようにする必要があります。このメモリの競合問題は複数のスレッドがメモリの同じ場所に書き込もうとする場合に発生します。ある種のアルゴリズムは各計算処理がそれまでの結果に依存するため、そもそもマルチスレッディングに適していません。一方、各計算処理が独立かつ並列に行える問題に対しては、マルチスレッディングは極めて有効です。今回の例では、各処理が完全に独立しており、故にマルチスレッドで容易に攻略できる課題を解こうと思います。更に進んだアルゴリズムでは、メモリのロック機能を用いて、競合を避ける必要が出てくるでしょう。

今回の例では、各スレッドがそれぞれ異なるメモリの場所に書き込みます。もっと具体的に言えば、1..n の配列を作り、その範囲の i に対して power(i, 0.5) の値を計算します。各スレッドは計算処理の独立した部分を受け持ちます。n = 1000 だとします。一つのスレッドだけを使うなら、1..1000 の全体を扱うことになります。スレッドが二つなら、一つが 1..500 を、もう一つが 501..1000 に取り組むようにできます。このようにして、各スレッドが異なったメモリ部分を埋めていけるわけです。

1. 利用できるコアの数を検出する

コアを一つしか持たないコンピュータではスレッド化の効用はありませんが、物理コアが四つのコンピュータは、ハイパースレッディング(二つのタスクを同時実行可能にする)によって最大八個のタスクを同時に処理することができます。次の cpucount ユニットは利用可能なコア数を返します。あるコンピュータに於いて走らせるべきスレッドの数を決めるのにこれが使えます。四つ以上のコアを持つコンピュータでは、コア数を n とすれば、n-1 個のスレッドを走らせたくなるでしょう。残りの一つは GUI など他のタスク用にとっておきます。これだけコアがあれば、n と n-1 では処理速度に大差がありませんから。

unit cpucount;
{$mode objfpc}
 
interface
//コンピュータのコア数を返す。ハイパースレッドを使った2コアのコンピュータでは4が返るだろう。
function GetLogicalCpuCount: Integer;
 
implementation
 
{$IF defined(windows)}
uses windows;
{$endif}
 
{$IF defined(darwin)}
uses ctypes, sysctl;
{$endif} 
 
{$IFDEF Linux}
uses ctypes;
 
const _SC_NPROCESSORS_ONLN = 83;
function sysconf(i: cint): clong; cdecl; external name 'sysconf';
{$ENDIF}
 
 
function GetLogicalCpuCount: integer;
// 当該システム上で適切なスレッドの数のデフォルト値を返す
{$IF defined(windows)}
// ハイパースレッドを含んで、システム上で利用可能なプロセッサの数を返す。
var
  i: Integer;
  ProcessAffinityMask, SystemAffinityMask: DWORD_PTR;
  Mask: DWORD;
  SystemInfo: SYSTEM_INFO;
begin
  if GetProcessAffinityMask(GetCurrentProcess, ProcessAffinityMask, SystemAffinityMask)
  then begin
    Result := 0;
    for i := 0 to 31 do begin
      Mask := DWord(1) shl i;
      if (ProcessAffinityMask and Mask)<>0 then
        inc(Result);
    end;
  end else begin
    //affinity mask が得られなかったので、単にプロセッサの数を報告する
    GetSystemInfo(SystemInfo);
    Result := SystemInfo.dwNumberOfProcessors;
  end;
end;
{$ELSEIF defined(UNTESTEDsolaris)}
  begin
    t = sysconf(_SC_NPROC_ONLN);
  end;
{$ELSEIF defined(freebsd) or defined(darwin)}
var
  mib: array[0..1] of cint;
  len: cint;
  t: cint;
begin
  mib[0] := CTL_HW;
  mib[1] := HW_NCPU;
  len := sizeof(t);
  fpsysctl(pchar(@mib), 2, @t, @len, Nil, 0);
  Result:=t;
end;
{$ELSEIF defined(linux)}
  begin
    Result:=sysconf(_SC_NPROCESSORS_ONLN);
  end;
 
{$ELSE}
  begin
    Result:=1;
  end;
{$ENDIF}
end.


ユニット MTPCPU 内の GetSystemThreadCount 関数は、同様のロジックで利用可能なコアの数を報告します。このユニットは lazarus/components/multithreadprocs/mtpcpu.pas にあります。

Program CPUCountTest;
uses MTPCPU;
begin
  writeln(GetSystemThreadCount)
end.

例えば Intel Core2Duo を一個だけ持っている旧式 MacBook では結果は 2 となります。

2. カスタムスレッドクラスの生成

複数のスレッドの動作を定義するために専用のユニットを一つ作成します。FreeOnTerminate を false にしていることに注意してください。そのため、各スレッドが動作終了した後でそれらを dispose する必要があります。これによって、複数のスレッドをうまく扱えます(FreeOnTerminate を true にして、高速なジョブを複数立ち上げると、あるスレッドの動作完了をプログラム側が検査する前にそのスレッドが解放されてしまう可能性があり、存在しないスレッドを検査しようとすると例外を発生するかもしれません)。FreeOnTerminate を false にすれば、全てのスレッドが正常に終了したことを確認できます。

unit mythreads;
{$mode objfpc}{$H+}
interface
uses Classes;
type
  TData = array of double;
  PData = ^TData;
 Type
    TMyThread = class(TThread)
    private
    protected
      tPtr: PData;
      tstart,tfinish: integer;
      procedure Execute; override;
    public
      property Terminated;
      Constructor Create(lstart, lfinish: integer; lPtr: PData);
    end;
 
implementation
uses SysUtils, Math;
 
  constructor TMyThread.Create(lstart, lfinish: integer; lPtr: PData);
  begin
    FreeOnTerminate := False;
    tstart := lstart;
    tfinish := lfinish;
    tPtr := lPtr;
    inherited Create(false)
  end;
 
  procedure TMyThread.Execute;
  var i: integer;
  begin
    for i := tstart to tfinish do tPtr^[i] := power(i, 0.5)
  end;
 
end.

3. メインプログラムを書く

メインユニットに cthread を入れる必要があります。スレッドのユニットにではありませんよ。

全スレッドの終了を調べるために二つの方法があることにご注意ください。一つは組み込みの WaitFor 関数です。これは大変うまく動きますが、私のOS X機では、100 ms毎にしか更新されないようです。これはリアルワールドプログラミングでは完璧であり(計算機的には低速な課題にのみスレッディングを用います)、スレッドオーヴァヘッドが減りますが、短時間で済んでしまうようなベンチマークの例としては、スレッド化の効能を隠してしまうかもしれません(いくつスレッドを動かしても、最低でも100 msかかるので)。そこで、この例ではスレッドの終了状態を 2 msおきに調べています。これで更に正確な実行時間のベンチマークが得られます。

スレッドの利用が終わった時にそれを解放するのを忘れないでください。FreeOnTerminate を false にしてあるので、明示的にこれを行う必要があります。

Tips: 私の Lazarus IDE では、マルチスレッドアプリケーションのデバグに pthreads が必要です。cmem を使うとプログラムの実行速度が上がるとも聞きますが、各自の環境でチェックされるよう強く勧めます(cmem を使うと私のプログラムはハングします)。

uses //    cmem,pthreads,
  cthreads, Classes, SysUtils, CustApp, MyThreads, Math, cpucount;
 
procedure DoUnThreaded (nValues: integer);
var
 dataArray: TData;
 i: integer;
 StartMS: double;
begin
     if (nValues < 1) then exit;
     StartMS:=timestamptomsecs(datetimetotimestamp(now));
     setlength(dataArray, nValues+1);//0ベースなので +1する
     for i:=1 to nValues do
         dataArray[i] := power(i,0.5);  ;
     Writeln('Serially processed '+inttostr(nValues)+' values in '+floattostr(timestamptomsecs(datetimetotimestamp(now))-StartMS)+'ms, with '+inttostr(nValues)+'^0.5 = '+floattostr(dataArray[nValues]));
end;
 
procedure DoThreading (nThreadsIn, nValues: integer);
var
 threadArray: array  of TMyThread;
 dataArray: TData;
 lData : PData;
 nThreads, i,lStart,lFinish: integer;
 StartMS: double;
begin
     if (nThreadsIn < 1) or (nValues < 1) then exit;
     nThreads := nThreadsIn;
     if  nThreads > nValues then nThreads := nValues;
     StartMS:=timestamptomsecs(datetimetotimestamp(now));
     setlength(threadArray,nThreads+1);//0ベースなので +1する
     setlength(dataArray, nValues+1);//0ベースなので +1する
     lData := @dataArray;
     lStart := 1;
     for i:=1 to nThreads do begin
         if i < nThreads then
            lFinish:=i*(nValues div nThreads)
         else
             lFinish:=  nValues;
         threadArray[i]:= TMyThread.Create(lStart, lFinish, lData);
         //Writeln('Thread '+inttostr(i)+' processing '+inttostr(lStart)+'..'+inttostr(lFinish));
         lStart := lFinish+1;
     end;
     for i:=1 to nThreads do if not ThreadArray[i].Terminated then Sleep(2);
     //for i:=1 to nThreads do threadArray[i].waitFor;  //OS Xでは100 msスリープするようだ
     for i:=1 to nThreads do threadArray[i].Free;
     Writeln(inttostr(nThreads)+' Threads processed '+inttostr(nValues)+' values in '+floattostr(timestamptomsecs(datetimetotimestamp(now))-StartMS)+'ms, with '+inttostr(nValues)+'^0.5 = '+floattostr(dataArray[nValues]));
end;
 
begin
  Writeln('コンピュータの報告によるとコア数は '+inttostr(GetLogicalCpuCount)+' : 最適と思われるスレッド数 ');
  DoUnthreaded(10);
  DoThreading(1,10);
  DoThreading(2,10);
  DoThreading(4,10);
  DoThreading(8,10);
  DoUnthreaded(100000000);
  DoThreading(1,100000000);
  DoThreading(2,100000000);
  DoThreading(4,100000000);
  DoThreading(8,100000000);
end.

結果

以下の結果が示す通り、スレッド作成の際に遅延が認められますが、タスクが大きくなると、マルチスレッドによる並列処理が順次処理を上回る性能を見せます。スレッド数がコア数(このコンピュータでは4)を超えると、スレッド数を増やす効能はほとんどなくなることにご注意ください。スレッド数と実行速度が単純に比例すると期待してはいけません: スレッド化にはいくらかのオーヴァヘッドがつきまといますし、最近のCPUは一つのタスクに集中して処理を行う場合、複数のコアを利用する場合よりも若干高速に動作します(「ターボブースト」)。

  • Computer reports 4 cores: probably optimal number of threads
  • Serially processed 10 values in 0ms, with 10^0.5 = 3.16227766016838
  • 1 Threads processed 10 values in 3ms, with 10^0.5 = 3.16227766016838
  • 2 Threads processed 10 values in 5ms, with 10^0.5 = 3.16227766016838
  • 4 Threads processed 10 values in 9ms, with 10^0.5 = 3.16227766016838
  • 8 Threads processed 10 values in 19ms, with 10^0.5 = 3.16227766016838
  • Serially processed 100000000 values in 10214ms, with 100000000^0.5 = 10000
  • 1 Threads processed 100000000 values in 10320ms, with 100000000^0.5 = 10000
  • 2 Threads processed 100000000 values in 5894ms, with 100000000^0.5 = 10000
  • 4 Threads processed 100000000 values in 3801ms, with 100000000^0.5 = 10000
  • 8 Threads processed 100000000 values in 3733ms, with 100000000^0.5 = 10000