Parallel procedures/ru

From Free Pascal wiki

Deutsch (de) English (en) français (fr) 日本語 (ja) русский (ru)

Это незавершённый перевод оригинальной статьи на английском языке!

Общие сведения

На этой странице описывается параллельное выполнение отдельных процедур с помощью модуля MTProcs, что упрощает параллельное выполнение процедур и реализацию параллельных алгоритмов.

Параллельные процедуры и методы часто встречаются в параллельных алгоритмах, и некоторые языки программирования предоставляют встроенную поддержку для них (например, OpenMP в gcc). См. "поддержка OpenMP" для ознакомления с планами добавления таких языковых функций в FPC. Эти возможности, будучи встроенными в язык, могут сэкономить некоторое количество ввода кода и позволяет компилятору создавать код с меньшими затратами. С другой стороны, существует множество способов преобразования однопоточного фрагмента кода в параллельный код. Однопоточный подход к программированию часто замедляет код. Для получения хороших результатов необходимо указать некоторые параметры, которые компилятор не может определить самостоятельно. Для ознакомления с примерами, посетите следующие ресурсы: / OpenMP и / OpenCL. Если вам нужны параллельные алгоритмы, то MTProcs поможет реализовать это.

Добавление MTProcs

Модуль mtprocs.pas является частью пакета multithreadprocslaz.lpk. Он не зависим от других пакетов и требует лишь наличия FPC >= 2.6.0.

Вы можете найти его исходники на sourceforge:

https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs

Или в Lazarus components/multithreadprocs.

Как обычно, откройте пакет multithreadprocslaz.lpk в IDE один раз, что бы он узнал путь. Для использования пакета в вашем проекте, выберите: Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project


Простой пример

Ниже приведён пример, который ничего не делает, но демонстрирует, как выглядит параллельная процедура и как она вызывается:

program Test;

{$mode objfpc}{$H+}

uses
  {$IFDEF UNIX}
  cthreads, cmem,
  {$ENDIF}
  MTProcs;

// простая параллельная процедура
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
  i: Integer;
begin
  writeln(Index);
  for i:=1 to Index*1000000 do ; // делаем некую работу
end;

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // адрес, стартовый индекс, конечный индекс, дополнительные данные
end.

На выходе будет что-то вроде этого:

2
3
1
4
5

Тут следует дать несколько коротких заметок. Подробности позже.

  • Многопоточность в UNIX требует модуль cthreads как объяснено в учебнике по многопоточным Приложениям.
  • Для повышения скорости cmem рекомендуется использовать диспетчер кучи, хотя в этом примере это не имеет значения.
  • Параллельная процедура DoSomethingParallel получает некие фиксированные и предопределенные параметры.
  • Index определяет, какую часть работы должен сделать этот вызов.
  • Data - это указатель, который был дан в ProcThreadPool.DoParallel как четвертый параметр. Это необязательно и вы можете использовать его для чего угодно.
  • Item может использоваться для доступа к некоторым более сложным функциям пула потоков.
  • ProcThreadPool.DoParallel работает как обычный вызов процедуры. Он завершится, когда он полностью будет выполнен - это означает, что все потоки должны завершить свою работу.
  • Выходные данные показывают типичное многопоточное поведение: порядок вызовов не определен. Несколько запусков могут привести к разным порядкам чисел.

Особенности

Выполнение процедуры параллельно означает:

  • процедура или метод выполняется с индексом, идущим от произвольного StartIndex к произвольному EndIndex.
  • Один или несколько потоков выполняют эти индексы параллельно. Например, если Индекс выполняется от 1 до 10 и в пуле доступно 3 потока, то 3 потока будут одновременно выполнять три разных индекса. Каждый раз, когда поток завершает один вызов (один индекс), он выделяет следующий индекс и выполняет его. Результатом может быть: поток 1 выполняет индекс 3,5,7, поток 2 выполняет 1,6,8,9 и поток 3 работает 2,4,10.
  • Количество потоков может меняться во время выполнения, и нет никакой гарантии минимума потоков. В худшем случае весь индекс будет выполняться одним потоком.
  • Максимальное количество потоков инициализируется для хорошей работы с учётом текущей системы. Однако, его можно изменить вручную через:
ProcThreadPool.MaxThreadCount := 8;
  • Вы можете задать максимальное количество потоков для каждой процедуры.
  • Параллельная процедура (или метод) может вызывать рекурсивно параллельные процедуры (или методы).
  • Потоки используются повторно, это означает, что они не уничтожаются и не создаются для каждого индекса, но существует глобальный пул потоков. На двухъядерном процессоре будет два потока, выполняющих всю работу-основной поток и один дополнительный поток в пуле.

Накладные расходы, замедление

Накладные расходы сильно зависят от системы (количества и типа ядер, тип общей памяти, скорость критических разделов, размер кэша). Вот некоторые общие советы:

  • Каждый блок работы (index) должен занимать не менее нескольких миллисекунд.
  • Накладные расходы не зависят от рекурсивных уровней параллельных процедур.

Multi threading overhead, which is independent of the MTProcs units, but simply results from todays computer architectures:

  • As soon as one thread is created your program becomes multi threaded and the memory managers must use critical sections, which slows down. So even if you do nothing with the thread your program might become slower.
  • The cmem heap manager is on some systems much faster for multi threading. In my benchmarks especially on intel systems and especially under OS X the speed difference can be more than 10 times.
  • Strings and interfaces are globally reference counted. Each access needs a critical section. Processing strings in multiple threads will therefore hardly give any speed up. Use PChars instead.
  • Each chunk of work (index) should work on a disjunctive part of memory to avoid cross cache updates.
  • Do not work on vast amounts of memory. On some systems one thread alone is fast enough to fill the memory bus speed. When the memory bus maximum speed is reached, any further thread will slow down instead of making it faster.

Установка максимального числа потоков для процедуры

Вы можете указать максимальное число потоков для процедуры в пятом параметре.

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // адрес, начальный_индекс, конечный_индекс, 
      // необязательный параметр: данные (в данном случае: nil), необязательный параметр: максимальное число потоков (в данном случае: 2)
end.

Данная опция может быть полезна когда потоки работают с одними и теми же данными, и слишком большое количество потоков будет создавать много конфликтов с кэшем, что в итоге приведет к их замедлению или когда алгоритм использует много WaitForIndex, так что фактически могут работать только несколько потоков. Затем потоки могут использоваться для других задач.

Wait for index / executing in order

Sometimes an Index depends on the result of a former Index. For example a common task is to first compute chunk 5 and then combine the result with the result of chunk 3. Use the WaitForIndex method for that:

procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
  ... compute chunk number 'Index' ...
  if Index=5 then begin
    if not Item.WaitForIndex(3) then exit;
    ... compute ...
  end;
end;

WaitForIndex takes as an argument an Index that is lower than the current Index or a range. If it returns true, everything worked as expected. If it returns false, then an exception happened in one of the other threads.

There is an extended function WaitForIndexRange that waits for whole range of Index:

if not Item.WaitForIndexRange(3,5) then exit; // wait for 3,4 and 5

Exceptions

If an exception occur in one of the threads, the other threads will finish normally, but will not start a new Index. The pool waits for all threads to finish and will then raise the exception. That's why you can use try..except like always:

try
  ...
  ProcThreadPool.DoParallel(...);
  ...
except
  On E: Exception do ...
end;

If there are multiple exceptions, only the first exception will be raised. To handle all exceptions, add a try..except inside your parallel method.

Synchronize

When you want to call a function in the main thread, for example to update some gui element, you can use the class method TThread.Synchronize. It takes as arguments the current TThread and the address of a method. Since 1.2 mtprocs provides a threadvar CurrentThread, which makes synchronizing simple:

TThread.Synchronize(CurrentThread,@YourMethod);

This will post an event on the main event queue and wait until the main thread has executed your method. Keep in mind that a bare fpc program does not have an event queue. A LCL or fpgui program has it.

If you create your own TThread descendants, you should set the variable in your Execute method. For example:

procedure TYourThread.Execute;
begin
  CurrentThread:=Self;
  ...work...
end;

Example: Parallel loop

This example explains step by step how to convert a loop into a parallel procedure. The example computes the maximum number of an integer array BigArray.

The original loop

type
  TArrayOfInteger = array of integer;

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  i: PtrInt;
begin
  Result:=BigArray[0];
  for i:=1 to length(BigArray)-1 do begin
    if Result<BigArray[i] then Result:=BigArray[i];
  end;
end;

Splitting the work

The work should be equally distributed over n threads. For this the BigArray is split into equally sized blocks and an outer loop runs over every block. Typically n is the number of cpus/cores in the system. MTProcs has some utility functions to compute the block size and count:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  Result:=BigArray[0];
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  for Index:=0 to BlockCount-1 do begin
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    for i:=BlockStart to BlockEnd do begin
      if Result<BigArray[i] then Result:=BigArray[i];
    end;
  end;
end;

The added lines can be used for any loop. Eventually a tool can be written to automate this.

The work is now split into smaller pieces. Now the pieces must become more independent.

Local and shared variables

For each used variable in the loop you must decide whether it is shared variable used by all threads or if each thread uses its own local variable. The shared variables BlockCount and BlockSize are only read and do not change, so no work is needed for them. But a shared variable like Result will be changed by all threads. This can either be achieved with synchronization (e.g. critical section), which is slow, or each thread use a local copy and these local variable are later combined.

Here is a solution replacing the Result variable with an array, which is combined in the end:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  // shared variables
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;
  // local variables
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // allocate space for local variables
  // compute maximum for each block
  for Index:=0 to BlockCount-1 do begin
    // compute maximum of block
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do begin
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
    end;
  end;
  // compute maximum of all blocks
  // (if you have hundreds of threads there are better solutions)
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);

  FreeMem(BlockMax);
end;

This approach is straightforward and could be automated. The process will need some hints from the programmer though.

DoParallel

The final step is to move the inner loop into a sub procedure and replace the loop with a call to DoParallelNested.

...
{$ModeSwitch nestedprocvars}
uses mtprocs;
...
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;

  procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
                                Item: TMultiThreadProcItem);
  var
    i: integer;
    BlockStart, BlockEnd: PtrInt;
  begin
    // compute maximum of block
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
  end;
var
  Index: PtrInt;
begin
  // split work into equally sized blocks
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  // allocate local/thread variables
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
  // compute maximum for each block
  ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
  // compute maximum of all blocks
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);
  FreeMem(BlockMax);
end;

This was mostly copy and paste, so again this could be automated.

Example: parallel sort

The unit mtputils contains the function ParallelSortFPList which uses mtprocs to sort a TFPList in parallel. A compare function must be given.

procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);

This function uses the parallel MergeSort algorithm. The parameter MaxThreadCount is passed to DoParallel. A 0 means to use the system default.

Optionally you can provide your own sort function (OnSortPart) to sort the part of each single thread. For example you can sort the blocks via QuickSort, which are then merged. Then you have a parallel QuickSort. See TFPList.Sort for an example implementation of QuickSort.

Использование вложенных процедур

procedure DoSomething(Value: PtrInt);
var
  p: array[1..2] of Pointer;

  procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
  begin
    p[Index]:=Pointer(Value); // возможен доступ к локальным переменным и параметрам!
  end;

var
  i: Integer;
begin
  ProcThreadPool.DoParallelNested(@SubProc,1,2);
end;

Это может сохранить много времени на рефакторинг и сделать код параллельно работающих процедур более читаемым.

См. также