Difference between revisions of "Parallel procedures/ru"

From Free Pascal wiki
Jump to navigationJump to search
 
(9 intermediate revisions by 3 users not shown)
Line 1: Line 1:
 
{{Parallel_procedures}}
 
{{Parallel_procedures}}
 
'''Это незавершённый перевод оригинальной статьи на английском языке!'''
 
  
 
= Общие сведения =
 
= Общие сведения =
Line 74: Line 72:
 
* Количество потоков может меняться во время выполнения, и нет никакой гарантии минимума потоков. В худшем случае весь индекс будет выполняться одним потоком.
 
* Количество потоков может меняться во время выполнения, и нет никакой гарантии минимума потоков. В худшем случае весь индекс будет выполняться одним потоком.
 
* Максимальное количество потоков инициализируется для хорошей работы с учётом текущей системы. Однако, его можно изменить вручную через:
 
* Максимальное количество потоков инициализируется для хорошей работы с учётом текущей системы. Однако, его можно изменить вручную через:
<syntaxhighlight>ProcThreadPool.MaxThreadCount := 8;</syntaxhighlight>
+
<syntaxhighlight lang=pascal>ProcThreadPool.MaxThreadCount := 8;</syntaxhighlight>
 
* Вы можете задать максимальное количество потоков для каждой процедуры.
 
* Вы можете задать максимальное количество потоков для каждой процедуры.
 
* Параллельная процедура (или метод) может вызывать рекурсивно параллельные процедуры (или методы).
 
* Параллельная процедура (или метод) может вызывать рекурсивно параллельные процедуры (или методы).
Line 85: Line 83:
 
*Накладные расходы не зависят от рекурсивных уровней параллельных процедур.
 
*Накладные расходы не зависят от рекурсивных уровней параллельных процедур.
  
Multi threading overhead, which is independent of the MTProcs units, but simply results from todays computer architectures:
+
Накладные расходы на многопоточность, которые не зависят от модулей MTProcs, а просто являются результатом современных компьютерных архитектур:
*As soon as one thread is created your program becomes multi threaded and the memory managers must use critical sections, which slows down. So even if you do nothing with the thread your program might become slower.  
+
*Как только создается один поток, ваша программа становится многопоточной, и менеджеры памяти должны использовать критические разделы, что замедляет ее работу. Таким образом, даже если вы ничего не сделаете с потоком, ваша программа может стать медленнее.
*The cmem heap manager is on some systems much faster for multi threading. In my benchmarks especially on intel systems and especially under OS X the speed difference can be more than 10 times.
+
*Менеджер кучи <code>cmem</code> в некоторых системах намного быстрее при многопоточности. В моих тестах, особенно на системах Intel и особенно под OS X, разница в скорости может быть более 10 раз.
*Strings and interfaces are globally reference counted. Each access needs a critical section. Processing strings in multiple threads will therefore hardly give any speed up. Use PChars instead.
+
*Строки и интерфейсы подсчитываются глобально. Для каждого случая доступа нужна критическая секция. Таким образом, обработка строк в нескольких потоках вряд ли увеличит скорость работы программы. Вместо них используйте PChars.
*Each chunk of work (index) should work on a disjunctive part of memory to avoid cross cache updates.
+
*Каждый фрагмент работы (индекс) должен работать с дизъюнктивной частью памяти, чтобы избежать перекрестных обновлений кеша.
*Do not work on vast amounts of memory. On some systems one thread alone is fast enough to fill the memory bus speed. When the memory bus maximum speed is reached, any further thread will slow down instead of making it faster.
+
*Не работайте с большим объемом памяти. В некоторых системах одного потока достаточно, чтобы заполнить скорость шины памяти. По достижении максимальной скорости шины памяти любой последующий поток будет замедляться, а не ускоряться.
  
 
= Установка максимального числа потоков для процедуры =
 
= Установка максимального числа потоков для процедуры =
Line 104: Line 102:
 
Данная опция может быть полезна когда потоки работают с одними и теми же данными, и слишком большое количество потоков будет создавать много конфликтов с кэшем, что в итоге приведет к их замедлению или когда алгоритм использует много WaitForIndex, так что фактически могут работать только несколько потоков. Затем потоки могут использоваться для других задач.
 
Данная опция может быть полезна когда потоки работают с одними и теми же данными, и слишком большое количество потоков будет создавать много конфликтов с кэшем, что в итоге приведет к их замедлению или когда алгоритм использует много WaitForIndex, так что фактически могут работать только несколько потоков. Затем потоки могут использоваться для других задач.
  
= Wait for index / executing in order =
+
= Дождитесь индексации/выполнения по порядку =
  
Sometimes an Index depends on the result of a former Index. For example a common task is to first compute chunk 5 and then combine the result with the result of chunk 3. Use the WaitForIndex method for that:
+
Иногда результат текущего индекса зависит от результата предыдущего индекса. Например, обычная задача - сначала вычислить блок 5, а затем объединить результат с результатом блока 3. Используйте для этого метод WaitForIndex:
  
 
<syntaxhighlight lang=pascal>
 
<syntaxhighlight lang=pascal>
 
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
 
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
 
begin
 
begin
   ... compute chunk number 'Index' ...
+
   ... вычисляем номер блока 'Index' ...
 
   if Index=5 then begin
 
   if Index=5 then begin
 
     if not Item.WaitForIndex(3) then exit;
 
     if not Item.WaitForIndex(3) then exit;
     ... compute ...
+
     ... вычисляем ...
 
   end;
 
   end;
 
end;</syntaxhighlight>
 
end;</syntaxhighlight>
  
WaitForIndex takes as an argument an Index that is lower than the current Index or a range. If it returns true, everything worked as expected. If it returns false, then an exception happened in one of the other threads.
+
WaitForIndex принимает в качестве аргумента индекс, который меньше текущего индекса или диапазона. Если он вернет <code>true</code>, все работает, как ожидалось. Если он возвращает <code>false</code>, то в одном из других потоков произошло исключение.
  
There is an extended function WaitForIndexRange that waits for whole range of Index:
+
Существует расширенная функция WaitForIndexRange, ожидающая весь диапазон индекса:
<syntaxhighlight lang=pascal>if not Item.WaitForIndexRange(3,5) then exit; // wait for 3,4 and 5</syntaxhighlight>
+
<syntaxhighlight lang=pascal>if not Item.WaitForIndexRange(3,5) then exit; // ожидаем для значений индекса 3,4 и 5</syntaxhighlight>
  
= Exceptions =
+
= Исключения =
  
If an exception occur in one of the threads, the other threads will finish normally, but will not start a new Index. The pool waits for all threads to finish and will then raise the exception. That's why you can use ''try..except'' like always:
+
Если в одном из потоков возникает исключение, то другие потоки завершают работу нормально, но не запускают новый индекс. Пул ожидает завершения всех потоков, а затем вызовет исключение. Вот почему вы можете использовать <code>try..except</code> как всегда:
  
 
<syntaxhighlight lang=pascal>
 
<syntaxhighlight lang=pascal>
Line 136: Line 134:
 
end;</syntaxhighlight>
 
end;</syntaxhighlight>
  
If there are multiple exceptions, only the first exception will be raised. To handle all exceptions, add a try..except inside your parallel method.
+
Если есть несколько исключений, будет вызвано только первое исключение. Чтобы обрабатывать все исключения, добавьте в свой параллельный метод команду <code>try..except</code>.
  
= Synchronize =
+
= Синхронизация =
  
When you want to call a function in the main thread, for example to update some gui element, you can use the class method ''TThread.Synchronize''. It takes as arguments the current TThread and the address of a method. Since 1.2 mtprocs provides a threadvar '''CurrentThread''', which makes synchronizing simple:
+
Если вы хотите вызвать функцию в основном потоке, например, чтобы обновить какой-либо элемент графического интерфейса, вы можете использовать метод класса <code>TThread.Synchronize</code>. Он принимает в качестве аргументов текущий TThread и адрес метода. Начиная с версии 1.2 <code>mtprocs</code> предоставляет переменную потока <code>CurrentThread</code>, которая упрощает синхронизацию:
  
 
<syntaxhighlight lang=pascal>TThread.Synchronize(CurrentThread,@YourMethod);</syntaxhighlight>
 
<syntaxhighlight lang=pascal>TThread.Synchronize(CurrentThread,@YourMethod);</syntaxhighlight>
  
This will post an event on the main event queue and wait until the main thread has executed your method. Keep in mind that a bare fpc program does not have an event queue. A LCL or fpgui program has it.
+
Это разместит событие в основной очереди событий и дождется, пока основной поток выполнит ваш метод. Имейте в виду, что голая программа fpc не имеет очереди событий. Это есть в программе LCL или fpgui.
  
If you create your own TThread descendants, you should set the variable in your Execute method. For example:
+
Если вы создаете своих собственных потомков <code>TThread</code>, вы должны установить переменную в своем методе <code>Execute</code>. Например:
  
 
<syntaxhighlight lang=pascal>
 
<syntaxhighlight lang=pascal>
Line 152: Line 150:
 
begin
 
begin
 
   CurrentThread:=Self;
 
   CurrentThread:=Self;
   ...work...
+
   ...работаем...
 
end;
 
end;
 
</syntaxhighlight>
 
</syntaxhighlight>
  
= Example: Parallel loop =
+
= Пример: Параллельный цикл =
  
This example explains step by step how to convert a loop into a parallel procedure. The example computes the maximum number of an integer array ''BigArray''.
+
В этом примере шаг за шагом объясняется, как преобразовать цикл в параллельную процедуру. В примере вычисляется максимальное количество целочисленного массива ''BigArray''.
  
== The original loop ==
+
== Исходный цикл ==
  
 
<syntaxhighlight lang=pascal>
 
<syntaxhighlight lang=pascal>
Line 176: Line 174:
 
end;</syntaxhighlight>
 
end;</syntaxhighlight>
  
== Splitting the work ==
+
== Разделение работы ==
  
The work should be equally distributed over n threads. For this the BigArray is split into equally sized blocks and an outer loop runs over every block. Typically n is the number of cpus/cores in the system. MTProcs has some utility functions to compute the block size and count:
+
Работа должна быть равномерно распределена по n потокам. Для этого BigArray разбивается на блоки одинакового размера, и внешний цикл проходит по каждому блоку. Обычно n - это количество процессоров/ядер в системе. <code>MTProcs</code> имеет несколько служебных функций для вычисления размера и количества блоков:
  
 
<syntaxhighlight lang=pascal>
 
<syntaxhighlight lang=pascal>
Line 198: Line 196:
 
end;</syntaxhighlight>
 
end;</syntaxhighlight>
  
The added lines can be used for any loop. Eventually a tool can be written to automate this.
+
Добавленные строки можно использовать для любого цикла. Со временем можно будет написать инструмент для автоматизации этого.
  
The work is now split into smaller pieces. Now the pieces must become more independent.
+
Теперь работа разбита на более мелкие части. Теперь части должны стать более независимыми.
  
== Local and shared variables ==
+
== Локальные и общие переменные ==
  
For each used variable in the loop you must decide whether it is shared variable used by all threads or if each thread uses its own local variable.  
+
Для каждой используемой переменной в цикле вы должны решить, является ли она общей переменной, используемой всеми потоками, или каждый поток использует свою собственную локальную переменную. Общие переменные <code>BlockCount</code> и <code>BlockSize</code> только читаются и не меняются, поэтому для них не требуется никакой работы. Но общая переменная, такая как <code>Result</code>, будет изменяться всеми потоками. Это может быть достигнуто либо с помощью синхронизации (например, критической секции), которая выполняется медленно, либо каждый поток использует локальную копию, и эти локальные переменные позже объединяются.
The shared variables BlockCount and BlockSize are only read and do not change, so no work is needed for them.  
 
But a shared variable like Result will be changed by all threads. This can either be achieved with synchronization (e.g. critical section), which is slow, or each thread use a local copy and these local variable are later combined.
 
  
Here is a solution replacing the ''Result'' variable with an array, which is combined in the end:
+
Вот решение, заменяющее переменную <code>Result</code> на массив, который в конце объединяется:
  
 
<syntaxhighlight lang=pascal>
 
<syntaxhighlight lang=pascal>
 
function FindMaximum(BigArray: TArrayOfInteger): integer;
 
function FindMaximum(BigArray: TArrayOfInteger): integer;
 
var
 
var
   // shared variables
+
   // общие переменные
 
   BlockCount, BlockSize: PtrInt;
 
   BlockCount, BlockSize: PtrInt;
 
   BlockMax: PPtrInt;
 
   BlockMax: PPtrInt;
   // local variables
+
   // локальные переменные
 
   i: PtrInt;
 
   i: PtrInt;
 
   Index: PtrInt;
 
   Index: PtrInt;
Line 222: Line 218:
 
begin
 
begin
 
   ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
 
   ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
   BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // allocate space for local variables
+
   BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // выделение памяти для локальных переменных
   // compute maximum for each block
+
   // вычисляем максимум для каждого блока
 
   for Index:=0 to BlockCount-1 do begin
 
   for Index:=0 to BlockCount-1 do begin
     // compute maximum of block
+
     // вычисляем максимум блока
 
     Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
 
     Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
 
     BlockMax[Index]:=BigArray[BlockStart];
 
     BlockMax[Index]:=BigArray[BlockStart];
Line 232: Line 228:
 
     end;
 
     end;
 
   end;
 
   end;
   // compute maximum of all blocks
+
   // вычисляем максимум всех блоков
   // (if you have hundreds of threads there are better solutions)
+
   // (это лучшее решение, если у вас сотни потоков)
 
   Result:=BlockMax[0];
 
   Result:=BlockMax[0];
 
   for Index:=1 to BlockCount-1 do
 
   for Index:=1 to BlockCount-1 do
Line 241: Line 237:
 
end;</syntaxhighlight>
 
end;</syntaxhighlight>
  
This approach is straightforward and could be automated. The process will need some hints from the programmer though.
+
Этот подход прост и может быть автоматизирован. Однако для этого процесса потребуются подсказки от программиста.
  
 
== DoParallel ==
 
== DoParallel ==
  
The final step is to move the inner loop into a sub procedure and replace the loop with a call to DoParallelNested.
+
Последний шаг - переместить внутренний цикл в подпрограмму и заменить цикл вызовом <code>DoParallelNested</code>.
  
 
<syntaxhighlight lang=pascal>
 
<syntaxhighlight lang=pascal>
Line 263: Line 259:
 
     BlockStart, BlockEnd: PtrInt;
 
     BlockStart, BlockEnd: PtrInt;
 
   begin
 
   begin
     // compute maximum of block
+
     // вычисляем максимум блоков
 
     Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
 
     Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
 
     BlockMax[Index]:=BigArray[BlockStart];
 
     BlockMax[Index]:=BigArray[BlockStart];
Line 272: Line 268:
 
   Index: PtrInt;
 
   Index: PtrInt;
 
begin
 
begin
   // split work into equally sized blocks
+
   // разделяем работу на блоки одинакового размера
 
   ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
 
   ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
   // allocate local/thread variables
+
   // выделяем память для локальных/потоковых переменных
 
   BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
 
   BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
   // compute maximum for each block
+
   // вычисляем максимум для каждого блока
 
   ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
 
   ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
   // compute maximum of all blocks
+
   // вычисляем максимум всех блоков
 
   Result:=BlockMax[0];
 
   Result:=BlockMax[0];
 
   for Index:=1 to BlockCount-1 do
 
   for Index:=1 to BlockCount-1 do
Line 285: Line 281:
 
end;</syntaxhighlight>
 
end;</syntaxhighlight>
  
This was mostly copy and paste, so again this could be automated.
+
В основном это было копирование и вставка, так что снова это можно было автоматизировать.
  
=Example: parallel sort=
+
=Пример: параллельная сортировка=
The unit mtputils contains the function '''ParallelSortFPList''' which uses mtprocs to sort a TFPList in parallel. A compare function must be given.
 
  
<syntaxhighlight>procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);</syntaxhighlight>
+
Модуль <code>mtputils</code> содержит функцию <code>ParallelSortFPList</code>, которая использует <code>mtprocs</code> для параллельной сортировки <code>TFPList</code>. Должна быть указана функция сравнения.
  
This function uses the parallel MergeSort algorithm. The parameter MaxThreadCount is passed to DoParallel. A 0 means to use the system default.
+
<syntaxhighlight lang=pascal>procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);</syntaxhighlight>
  
Optionally you can provide your own sort function (OnSortPart) to sort the part of each single thread. For example you can sort the blocks via QuickSort, which are then merged. Then you have a '''parallel QuickSort'''. See TFPList.Sort for an example implementation of QuickSort.
+
Эта функция использует параллельный алгоритм <code>MergeSort</code>. Параметр <code>MaxThreadCount</code> передается <code>DoParallel</code>. 0 означает использование системы по умолчанию.
 +
 
 +
При желании вы можете предоставить свою собственную функцию сортировки (<code>OnSortPart</code>) для сортировки части каждого отдельного потока. Например, вы можете сортировать блоки через <code>QuickSort</code>, которые затем объединяются. Тогда у вас будет '''параллельный QuickSort'''. См. <code>TFPList.Sort</code> для примера реализации <code>QuickSort</code>.
  
 
= Использование вложенных процедур =
 
= Использование вложенных процедур =
  
<syntaxhighlight>procedure DoSomething(Value: PtrInt);
+
<syntaxhighlight lang=pascal>
 +
procedure DoSomething(Value: PtrInt);
 
var
 
var
 
   p: array[1..2] of Pointer;
 
   p: array[1..2] of Pointer;

Latest revision as of 16:58, 1 September 2020

Deutsch (de) English (en) 日本語 (ja) русский (ru)

Общие сведения

На этой странице описывается параллельное выполнение отдельных процедур с помощью модуля MTProcs, что упрощает параллельное выполнение процедур и реализацию параллельных алгоритмов.

Параллельные процедуры и методы часто встречаются в параллельных алгоритмах, и некоторые языки программирования предоставляют встроенную поддержку для них (например, OpenMP в gcc). См. "поддержка OpenMP" для ознакомления с планами добавления таких языковых функций в FPC. Эти возможности, будучи встроенными в язык, могут сэкономить время на ввод кода и позволяют компилятору создавать код с меньшими затратами. С другой стороны, существует множество способов преобразования однопоточного фрагмента кода в параллельный. Однопоточный подход к программированию часто замедляет код. Для получения хороших результатов необходимо указать некоторые параметры, которые компилятор не может определить самостоятельно. Для ознакомления с примерами, посетите следующие ресурсы: / OpenMP и / OpenCL. Если вам нужны параллельные алгоритмы, то MTProcs поможет в их реализации.

Добавление MTProcs

Модуль mtprocs.pas является частью пакета multithreadprocslaz.lpk. Он не зависим от других пакетов и требует лишь наличия FPC >= 2.6.0.

Вы можете найти его исходники на sourceforge:

svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs

Или в Lazarus components/multithreadprocs.

Как обычно, откройте пакет multithreadprocslaz.lpk в IDE один раз, что бы он узнал путь. Для использования пакета в вашем проекте, выберите: Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project

Простой пример

Ниже приведён пример, который ничего не делает, но демонстрирует, как выглядит параллельная процедура и как она вызывается:

program Test;

{$mode objfpc}{$H+}

uses
  {$IFDEF UNIX}
  cthreads, cmem,
  {$ENDIF}
  MTProcs;

// простая параллельная процедура
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
  i: Integer;
begin
  writeln(Index);
  for i:=1 to Index*1000000 do ; // делаем некую работу
end;

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // адрес, стартовый индекс, конечный индекс, дополнительные данные
end.

На выходе будет что-то вроде этого:

2
3
1
4
5

Тут следует дать несколько коротких заметок. Подробности позже.

  • Многопоточность в UNIX требует модуль cthreads как объяснено в учебнике по многопоточным Приложениям.
  • Для повышения скорости cmem рекомендуется использовать диспетчер кучи, хотя в этом примере это не имеет значения.
  • Параллельная процедура DoSomethingParallel получает некие фиксированные и предопределенные параметры.
  • Index определяет, какую часть работы должен сделать этот вызов.
  • Data - это указатель, который был дан в ProcThreadPool.DoParallel как четвертый параметр. Это необязательно и вы можете использовать его для чего угодно.
  • Item может использоваться для доступа к некоторым более сложным функциям пула потоков.
  • ProcThreadPool.DoParallel работает как обычный вызов процедуры. Он завершится, когда он полностью будет выполнен - это означает, что все потоки должны завершить свою работу.
  • Выходные данные показывают типичное многопоточное поведение: порядок вызовов не определен. Несколько запусков могут привести к разным порядкам чисел.

Особенности

Выполнение процедуры параллельно означает:

  • процедура или метод выполняется с индексом, идущим от произвольного StartIndex к произвольному EndIndex.
  • Один или несколько потоков выполняют эти индексы параллельно. Например, если Индекс выполняется от 1 до 10 и в пуле доступно 3 потока, то 3 потока будут одновременно выполнять три разных индекса. Каждый раз, когда поток завершает один вызов (один индекс), он выделяет следующий индекс и выполняет его. Результатом может быть: поток 1 выполняет индекс 3,5,7, поток 2 выполняет 1,6,8,9 и поток 3 работает 2,4,10.
  • Количество потоков может меняться во время выполнения, и нет никакой гарантии минимума потоков. В худшем случае весь индекс будет выполняться одним потоком.
  • Максимальное количество потоков инициализируется для хорошей работы с учётом текущей системы. Однако, его можно изменить вручную через:
ProcThreadPool.MaxThreadCount := 8;
  • Вы можете задать максимальное количество потоков для каждой процедуры.
  • Параллельная процедура (или метод) может вызывать рекурсивно параллельные процедуры (или методы).
  • Потоки используются повторно, это означает, что они не уничтожаются и не создаются для каждого индекса, но существует глобальный пул потоков. На двухъядерном процессоре будет два потока, выполняющих всю работу-основной поток и один дополнительный поток в пуле.

Накладные расходы, замедление

Накладные расходы сильно зависят от системы (количества и типа ядер, тип общей памяти, скорость критических разделов, размер кэша). Вот некоторые общие советы:

  • Каждый блок работы (index) должен занимать не менее нескольких миллисекунд.
  • Накладные расходы не зависят от рекурсивных уровней параллельных процедур.

Накладные расходы на многопоточность, которые не зависят от модулей MTProcs, а просто являются результатом современных компьютерных архитектур:

  • Как только создается один поток, ваша программа становится многопоточной, и менеджеры памяти должны использовать критические разделы, что замедляет ее работу. Таким образом, даже если вы ничего не сделаете с потоком, ваша программа может стать медленнее.
  • Менеджер кучи cmem в некоторых системах намного быстрее при многопоточности. В моих тестах, особенно на системах Intel и особенно под OS X, разница в скорости может быть более 10 раз.
  • Строки и интерфейсы подсчитываются глобально. Для каждого случая доступа нужна критическая секция. Таким образом, обработка строк в нескольких потоках вряд ли увеличит скорость работы программы. Вместо них используйте PChars.
  • Каждый фрагмент работы (индекс) должен работать с дизъюнктивной частью памяти, чтобы избежать перекрестных обновлений кеша.
  • Не работайте с большим объемом памяти. В некоторых системах одного потока достаточно, чтобы заполнить скорость шины памяти. По достижении максимальной скорости шины памяти любой последующий поток будет замедляться, а не ускоряться.

Установка максимального числа потоков для процедуры

Вы можете указать максимальное число потоков для процедуры в пятом параметре.

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // адрес, начальный_индекс, конечный_индекс, 
      // необязательный параметр: данные (в данном случае: nil), необязательный параметр: максимальное число потоков (в данном случае: 2)
end.

Данная опция может быть полезна когда потоки работают с одними и теми же данными, и слишком большое количество потоков будет создавать много конфликтов с кэшем, что в итоге приведет к их замедлению или когда алгоритм использует много WaitForIndex, так что фактически могут работать только несколько потоков. Затем потоки могут использоваться для других задач.

Дождитесь индексации/выполнения по порядку

Иногда результат текущего индекса зависит от результата предыдущего индекса. Например, обычная задача - сначала вычислить блок 5, а затем объединить результат с результатом блока 3. Используйте для этого метод WaitForIndex:

procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
  ... вычисляем номер блока 'Index' ...
  if Index=5 then begin
    if not Item.WaitForIndex(3) then exit;
    ... вычисляем ...
  end;
end;

WaitForIndex принимает в качестве аргумента индекс, который меньше текущего индекса или диапазона. Если он вернет true, все работает, как ожидалось. Если он возвращает false, то в одном из других потоков произошло исключение.

Существует расширенная функция WaitForIndexRange, ожидающая весь диапазон индекса:

if not Item.WaitForIndexRange(3,5) then exit; // ожидаем для значений индекса 3,4 и 5

Исключения

Если в одном из потоков возникает исключение, то другие потоки завершают работу нормально, но не запускают новый индекс. Пул ожидает завершения всех потоков, а затем вызовет исключение. Вот почему вы можете использовать try..except как всегда:

try
  ...
  ProcThreadPool.DoParallel(...);
  ...
except
  On E: Exception do ...
end;

Если есть несколько исключений, будет вызвано только первое исключение. Чтобы обрабатывать все исключения, добавьте в свой параллельный метод команду try..except.

Синхронизация

Если вы хотите вызвать функцию в основном потоке, например, чтобы обновить какой-либо элемент графического интерфейса, вы можете использовать метод класса TThread.Synchronize. Он принимает в качестве аргументов текущий TThread и адрес метода. Начиная с версии 1.2 mtprocs предоставляет переменную потока CurrentThread, которая упрощает синхронизацию:

TThread.Synchronize(CurrentThread,@YourMethod);

Это разместит событие в основной очереди событий и дождется, пока основной поток выполнит ваш метод. Имейте в виду, что голая программа fpc не имеет очереди событий. Это есть в программе LCL или fpgui.

Если вы создаете своих собственных потомков TThread, вы должны установить переменную в своем методе Execute. Например:

procedure TYourThread.Execute;
begin
  CurrentThread:=Self;
  ...работаем...
end;

Пример: Параллельный цикл

В этом примере шаг за шагом объясняется, как преобразовать цикл в параллельную процедуру. В примере вычисляется максимальное количество целочисленного массива BigArray.

Исходный цикл

type
  TArrayOfInteger = array of integer;

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  i: PtrInt;
begin
  Result:=BigArray[0];
  for i:=1 to length(BigArray)-1 do begin
    if Result<BigArray[i] then Result:=BigArray[i];
  end;
end;

Разделение работы

Работа должна быть равномерно распределена по n потокам. Для этого BigArray разбивается на блоки одинакового размера, и внешний цикл проходит по каждому блоку. Обычно n - это количество процессоров/ядер в системе. MTProcs имеет несколько служебных функций для вычисления размера и количества блоков:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  Result:=BigArray[0];
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  for Index:=0 to BlockCount-1 do begin
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    for i:=BlockStart to BlockEnd do begin
      if Result<BigArray[i] then Result:=BigArray[i];
    end;
  end;
end;

Добавленные строки можно использовать для любого цикла. Со временем можно будет написать инструмент для автоматизации этого.

Теперь работа разбита на более мелкие части. Теперь части должны стать более независимыми.

Локальные и общие переменные

Для каждой используемой переменной в цикле вы должны решить, является ли она общей переменной, используемой всеми потоками, или каждый поток использует свою собственную локальную переменную. Общие переменные BlockCount и BlockSize только читаются и не меняются, поэтому для них не требуется никакой работы. Но общая переменная, такая как Result, будет изменяться всеми потоками. Это может быть достигнуто либо с помощью синхронизации (например, критической секции), которая выполняется медленно, либо каждый поток использует локальную копию, и эти локальные переменные позже объединяются.

Вот решение, заменяющее переменную Result на массив, который в конце объединяется:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  // общие переменные
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;
  // локальные переменные
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // выделение памяти для локальных переменных
  // вычисляем максимум для каждого блока
  for Index:=0 to BlockCount-1 do begin
    // вычисляем максимум блока
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do begin
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
    end;
  end;
  // вычисляем максимум всех блоков
  // (это лучшее решение, если у вас сотни потоков)
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);

  FreeMem(BlockMax);
end;

Этот подход прост и может быть автоматизирован. Однако для этого процесса потребуются подсказки от программиста.

DoParallel

Последний шаг - переместить внутренний цикл в подпрограмму и заменить цикл вызовом DoParallelNested.

...
{$ModeSwitch nestedprocvars}
uses mtprocs;
...
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;

  procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
                                Item: TMultiThreadProcItem);
  var
    i: integer;
    BlockStart, BlockEnd: PtrInt;
  begin
    // вычисляем максимум блоков
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
  end;
var
  Index: PtrInt;
begin
  // разделяем работу на блоки одинакового размера
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  // выделяем память для локальных/потоковых переменных
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
  // вычисляем максимум для каждого блока
  ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
  // вычисляем максимум всех блоков
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);
  FreeMem(BlockMax);
end;

В основном это было копирование и вставка, так что снова это можно было автоматизировать.

Пример: параллельная сортировка

Модуль mtputils содержит функцию ParallelSortFPList, которая использует mtprocs для параллельной сортировки TFPList. Должна быть указана функция сравнения.

procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);

Эта функция использует параллельный алгоритм MergeSort. Параметр MaxThreadCount передается DoParallel. 0 означает использование системы по умолчанию.

При желании вы можете предоставить свою собственную функцию сортировки (OnSortPart) для сортировки части каждого отдельного потока. Например, вы можете сортировать блоки через QuickSort, которые затем объединяются. Тогда у вас будет параллельный QuickSort. См. TFPList.Sort для примера реализации QuickSort.

Использование вложенных процедур

procedure DoSomething(Value: PtrInt);
var
  p: array[1..2] of Pointer;

  procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
  begin
    p[Index]:=Pointer(Value); // возможен доступ к локальным переменным и параметрам!
  end;

var
  i: Integer;
begin
  ProcThreadPool.DoParallelNested(@SubProc,1,2);
end;

Это может сохранить много времени на рефакторинг и сделать код параллельно работающих процедур более читаемым.

См. также