Difference between revisions of "Parallel procedures/ru"
Line 156: | Line 156: | ||
</syntaxhighlight> | </syntaxhighlight> | ||
− | = | + | = Пример: Параллельный цикл = |
− | + | В этом примере шаг за шагом объясняется, как преобразовать цикл в параллельную процедуру. В примере вычисляется максимальное количество целочисленного массива ''BigArray''. | |
− | == | + | == Исходный цикл == |
<syntaxhighlight lang=pascal> | <syntaxhighlight lang=pascal> | ||
Line 176: | Line 176: | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | == | + | == Разделение работы == |
− | + | Работа должна быть равномерно распределена по n потокам. Для этого BigArray разбивается на блоки одинакового размера, и внешний цикл проходит по каждому блоку. Обычно n - это количество процессоров/ядер в системе. <code>MTProcs</code> имеет несколько служебных функций для вычисления размера и количества блоков: | |
<syntaxhighlight lang=pascal> | <syntaxhighlight lang=pascal> | ||
Line 198: | Line 198: | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | + | Добавленные строки можно использовать для любого цикла. Со временем можно будет написать инструмент для автоматизации этого. | |
− | + | Теперь работа разбита на более мелкие части. Теперь части должны стать более независимыми. | |
− | == | + | == Локальные и общие переменные == |
− | + | Для каждой используемой переменной в цикле вы должны решить, является ли она общей переменной, используемой всеми потоками, или каждый поток использует свою собственную локальную переменную. Общие переменные <code>BlockCount</code> и <code>BlockSize</code> только читаются и не меняются, поэтому для них не требуется никакой работы. Но общая переменная, такая как <code>Result</code>, будет изменяться всеми потоками. Это может быть достигнуто либо с помощью синхронизации (например, критической секции), которая выполняется медленно, либо каждый поток использует локальную копию, и эти локальные переменные позже объединяются. | |
− | |||
− | |||
− | + | Вот решение, заменяющее переменную <code>Result</code> на массив, который в конце объединяется: | |
<syntaxhighlight lang=pascal> | <syntaxhighlight lang=pascal> | ||
function FindMaximum(BigArray: TArrayOfInteger): integer; | function FindMaximum(BigArray: TArrayOfInteger): integer; | ||
var | var | ||
− | // | + | // общие переменные |
BlockCount, BlockSize: PtrInt; | BlockCount, BlockSize: PtrInt; | ||
BlockMax: PPtrInt; | BlockMax: PPtrInt; | ||
− | // | + | // локальные переменные |
i: PtrInt; | i: PtrInt; | ||
Index: PtrInt; | Index: PtrInt; | ||
Line 222: | Line 220: | ||
begin | begin | ||
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize); | ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize); | ||
− | BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // | + | BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // выделение памяти для локальных переменных |
− | // | + | // вычисляем максимум для каждого блока |
for Index:=0 to BlockCount-1 do begin | for Index:=0 to BlockCount-1 do begin | ||
− | // | + | // вычисляем максимум блока |
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd); | Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd); | ||
BlockMax[Index]:=BigArray[BlockStart]; | BlockMax[Index]:=BigArray[BlockStart]; | ||
Line 232: | Line 230: | ||
end; | end; | ||
end; | end; | ||
− | // | + | // вычисляем максимум всех блоков |
− | // ( | + | // (это лучшее решение, если у вас сотни потоков) |
Result:=BlockMax[0]; | Result:=BlockMax[0]; | ||
for Index:=1 to BlockCount-1 do | for Index:=1 to BlockCount-1 do | ||
Line 241: | Line 239: | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | + | Этот подход прост и может быть автоматизирован. Однако для этого процесса потребуются подсказки от программиста. | |
== DoParallel == | == DoParallel == | ||
− | + | Последний шаг - переместить внутренний цикл в подпрограмму и заменить цикл вызовом <code>DoParallelNested</code>. | |
<syntaxhighlight lang=pascal> | <syntaxhighlight lang=pascal> | ||
Line 263: | Line 261: | ||
BlockStart, BlockEnd: PtrInt; | BlockStart, BlockEnd: PtrInt; | ||
begin | begin | ||
− | // | + | // вычисляем максимум блоков |
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd); | Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd); | ||
BlockMax[Index]:=BigArray[BlockStart]; | BlockMax[Index]:=BigArray[BlockStart]; | ||
Line 272: | Line 270: | ||
Index: PtrInt; | Index: PtrInt; | ||
begin | begin | ||
− | // | + | // разделяем работу на блоки одинакового размера |
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize); | ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize); | ||
− | // | + | // выделяем память для локальных/потоковых переменных |
BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); | BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); | ||
− | // | + | // вычисляем максимум для каждого блока |
ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1); | ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1); | ||
− | // | + | // вычисляем максимум всех блоков |
Result:=BlockMax[0]; | Result:=BlockMax[0]; | ||
for Index:=1 to BlockCount-1 do | for Index:=1 to BlockCount-1 do | ||
Line 285: | Line 283: | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | + | В основном это было копирование и вставка, так что снова это можно было автоматизировать. | |
=Example: parallel sort= | =Example: parallel sort= |
Revision as of 17:54, 1 September 2020
│
Deutsch (de) │
English (en) │
日本語 (ja) │
русский (ru) │
Это незавершённый перевод оригинальной статьи на английском языке!
Общие сведения
На этой странице описывается параллельное выполнение отдельных процедур с помощью модуля MTProcs, что упрощает параллельное выполнение процедур и реализацию параллельных алгоритмов.
Параллельные процедуры и методы часто встречаются в параллельных алгоритмах, и некоторые языки программирования предоставляют встроенную поддержку для них (например, OpenMP в gcc). См. "поддержка OpenMP" для ознакомления с планами добавления таких языковых функций в FPC. Эти возможности, будучи встроенными в язык, могут сэкономить время на ввод кода и позволяют компилятору создавать код с меньшими затратами. С другой стороны, существует множество способов преобразования однопоточного фрагмента кода в параллельный. Однопоточный подход к программированию часто замедляет код. Для получения хороших результатов необходимо указать некоторые параметры, которые компилятор не может определить самостоятельно. Для ознакомления с примерами, посетите следующие ресурсы: / OpenMP и / OpenCL. Если вам нужны параллельные алгоритмы, то MTProcs поможет в их реализации.
Добавление MTProcs
Модуль mtprocs.pas является частью пакета multithreadprocslaz.lpk. Он не зависим от других пакетов и требует лишь наличия FPC >= 2.6.0.
Вы можете найти его исходники на sourceforge:
svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs
Или в Lazarus components/multithreadprocs.
Как обычно, откройте пакет multithreadprocslaz.lpk в IDE один раз, что бы он узнал путь. Для использования пакета в вашем проекте, выберите: Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project
Простой пример
Ниже приведён пример, который ничего не делает, но демонстрирует, как выглядит параллельная процедура и как она вызывается:
program Test;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}
cthreads, cmem,
{$ENDIF}
MTProcs;
// простая параллельная процедура
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
i: Integer;
begin
writeln(Index);
for i:=1 to Index*1000000 do ; // делаем некую работу
end;
begin
ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // адрес, стартовый индекс, конечный индекс, дополнительные данные
end.
На выходе будет что-то вроде этого:
2 3 1 4 5
Тут следует дать несколько коротких заметок. Подробности позже.
- Многопоточность в UNIX требует модуль cthreads как объяснено в учебнике по многопоточным Приложениям.
- Для повышения скорости cmem рекомендуется использовать диспетчер кучи, хотя в этом примере это не имеет значения.
- Параллельная процедура DoSomethingParallel получает некие фиксированные и предопределенные параметры.
- Index определяет, какую часть работы должен сделать этот вызов.
- Data - это указатель, который был дан в ProcThreadPool.DoParallel как четвертый параметр. Это необязательно и вы можете использовать его для чего угодно.
- Item может использоваться для доступа к некоторым более сложным функциям пула потоков.
- ProcThreadPool.DoParallel работает как обычный вызов процедуры. Он завершится, когда он полностью будет выполнен - это означает, что все потоки должны завершить свою работу.
- Выходные данные показывают типичное многопоточное поведение: порядок вызовов не определен. Несколько запусков могут привести к разным порядкам чисел.
Особенности
Выполнение процедуры параллельно означает:
- процедура или метод выполняется с индексом, идущим от произвольного StartIndex к произвольному EndIndex.
- Один или несколько потоков выполняют эти индексы параллельно. Например, если Индекс выполняется от 1 до 10 и в пуле доступно 3 потока, то 3 потока будут одновременно выполнять три разных индекса. Каждый раз, когда поток завершает один вызов (один индекс), он выделяет следующий индекс и выполняет его. Результатом может быть: поток 1 выполняет индекс 3,5,7, поток 2 выполняет 1,6,8,9 и поток 3 работает 2,4,10.
- Количество потоков может меняться во время выполнения, и нет никакой гарантии минимума потоков. В худшем случае весь индекс будет выполняться одним потоком.
- Максимальное количество потоков инициализируется для хорошей работы с учётом текущей системы. Однако, его можно изменить вручную через:
ProcThreadPool.MaxThreadCount := 8;
- Вы можете задать максимальное количество потоков для каждой процедуры.
- Параллельная процедура (или метод) может вызывать рекурсивно параллельные процедуры (или методы).
- Потоки используются повторно, это означает, что они не уничтожаются и не создаются для каждого индекса, но существует глобальный пул потоков. На двухъядерном процессоре будет два потока, выполняющих всю работу-основной поток и один дополнительный поток в пуле.
Накладные расходы, замедление
Накладные расходы сильно зависят от системы (количества и типа ядер, тип общей памяти, скорость критических разделов, размер кэша). Вот некоторые общие советы:
- Каждый блок работы (index) должен занимать не менее нескольких миллисекунд.
- Накладные расходы не зависят от рекурсивных уровней параллельных процедур.
Накладные расходы на многопоточность, которые не зависят от модулей MTProcs, а просто являются результатом современных компьютерных архитектур:
- Как только создается один поток, ваша программа становится многопоточной, и менеджеры памяти должны использовать критические разделы, что замедляет ее работу. Таким образом, даже если вы ничего не сделаете с потоком, ваша программа может стать медленнее.
- Менеджер кучи
cmem
в некоторых системах намного быстрее при многопоточности. В моих тестах, особенно на системах Intel и особенно под OS X, разница в скорости может быть более 10 раз. - Строки и интерфейсы подсчитываются глобально. Для каждого случая доступа нужна критическая секция. Таким образом, обработка строк в нескольких потоках вряд ли увеличит скорость работы программы. Вместо них используйте PChars.
- Каждый фрагмент работы (индекс) должен работать с дизъюнктивной частью памяти, чтобы избежать перекрестных обновлений кеша.
- Не работайте с большим объемом памяти. В некоторых системах одного потока достаточно, чтобы заполнить скорость шины памяти. По достижении максимальной скорости шины памяти любой последующий поток будет замедляться, а не ускоряться.
Установка максимального числа потоков для процедуры
Вы можете указать максимальное число потоков для процедуры в пятом параметре.
begin
ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // адрес, начальный_индекс, конечный_индекс,
// необязательный параметр: данные (в данном случае: nil), необязательный параметр: максимальное число потоков (в данном случае: 2)
end.
Данная опция может быть полезна когда потоки работают с одними и теми же данными, и слишком большое количество потоков будет создавать много конфликтов с кэшем, что в итоге приведет к их замедлению или когда алгоритм использует много WaitForIndex, так что фактически могут работать только несколько потоков. Затем потоки могут использоваться для других задач.
Дождитесь индексации/выполнения по порядку
Иногда результат текущего индекса зависит от результата предыдущего индекса. Например, обычная задача - сначала вычислить блок 5, а затем объединить результат с результатом блока 3. Используйте для этого метод WaitForIndex:
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
... вычисляем номер блока 'Index' ...
if Index=5 then begin
if not Item.WaitForIndex(3) then exit;
... вычисляем ...
end;
end;
WaitForIndex принимает в качестве аргумента индекс, который меньше текущего индекса или диапазона. Если он вернет true
, все работает, как ожидалось. Если он возвращает false
, то в одном из других потоков произошло исключение.
Существует расширенная функция WaitForIndexRange, ожидающая весь диапазон индекса:
if not Item.WaitForIndexRange(3,5) then exit; // ожидаем для значений индекса 3,4 и 5
Исключения
Если в одном из потоков возникает исключение, то другие потоки завершают работу нормально, но не запускают новый индекс. Пул ожидает завершения всех потоков, а затем вызовет исключение. Вот почему вы можете использовать try..except
как всегда:
try
...
ProcThreadPool.DoParallel(...);
...
except
On E: Exception do ...
end;
Если есть несколько исключений, будет вызвано только первое исключение. Чтобы обрабатывать все исключения, добавьте в свой параллельный метод команду try..except
.
Синхронизация
Если вы хотите вызвать функцию в основном потоке, например, чтобы обновить какой-либо элемент графического интерфейса, вы можете использовать метод класса TThread.Synchronize
. Он принимает в качестве аргументов текущий TThread и адрес метода. Начиная с версии 1.2 mtprocs
предоставляет переменную потока CurrentThread
, которая упрощает синхронизацию:
TThread.Synchronize(CurrentThread,@YourMethod);
Это разместит событие в основной очереди событий и дождется, пока основной поток выполнит ваш метод. Имейте в виду, что голая программа fpc не имеет очереди событий. Это есть в программе LCL или fpgui.
Если вы создаете своих собственных потомков TThread
, вы должны установить переменную в своем методе Execute
. Например:
procedure TYourThread.Execute;
begin
CurrentThread:=Self;
...работаем...
end;
Пример: Параллельный цикл
В этом примере шаг за шагом объясняется, как преобразовать цикл в параллельную процедуру. В примере вычисляется максимальное количество целочисленного массива BigArray.
Исходный цикл
type
TArrayOfInteger = array of integer;
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
i: PtrInt;
begin
Result:=BigArray[0];
for i:=1 to length(BigArray)-1 do begin
if Result<BigArray[i] then Result:=BigArray[i];
end;
end;
Разделение работы
Работа должна быть равномерно распределена по n потокам. Для этого BigArray разбивается на блоки одинакового размера, и внешний цикл проходит по каждому блоку. Обычно n - это количество процессоров/ядер в системе. MTProcs
имеет несколько служебных функций для вычисления размера и количества блоков:
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
BlockCount, BlockSize: PtrInt;
i: PtrInt;
Index: PtrInt;
BlockStart, BlockEnd: PtrInt;
begin
Result:=BigArray[0];
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
for Index:=0 to BlockCount-1 do begin
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
for i:=BlockStart to BlockEnd do begin
if Result<BigArray[i] then Result:=BigArray[i];
end;
end;
end;
Добавленные строки можно использовать для любого цикла. Со временем можно будет написать инструмент для автоматизации этого.
Теперь работа разбита на более мелкие части. Теперь части должны стать более независимыми.
Локальные и общие переменные
Для каждой используемой переменной в цикле вы должны решить, является ли она общей переменной, используемой всеми потоками, или каждый поток использует свою собственную локальную переменную. Общие переменные BlockCount
и BlockSize
только читаются и не меняются, поэтому для них не требуется никакой работы. Но общая переменная, такая как Result
, будет изменяться всеми потоками. Это может быть достигнуто либо с помощью синхронизации (например, критической секции), которая выполняется медленно, либо каждый поток использует локальную копию, и эти локальные переменные позже объединяются.
Вот решение, заменяющее переменную Result
на массив, который в конце объединяется:
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
// общие переменные
BlockCount, BlockSize: PtrInt;
BlockMax: PPtrInt;
// локальные переменные
i: PtrInt;
Index: PtrInt;
BlockStart, BlockEnd: PtrInt;
begin
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // выделение памяти для локальных переменных
// вычисляем максимум для каждого блока
for Index:=0 to BlockCount-1 do begin
// вычисляем максимум блока
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
BlockMax[Index]:=BigArray[BlockStart];
for i:=BlockStart to BlockEnd do begin
if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
end;
end;
// вычисляем максимум всех блоков
// (это лучшее решение, если у вас сотни потоков)
Result:=BlockMax[0];
for Index:=1 to BlockCount-1 do
Result:=Max(Result,BlockMax[Index]);
FreeMem(BlockMax);
end;
Этот подход прост и может быть автоматизирован. Однако для этого процесса потребуются подсказки от программиста.
DoParallel
Последний шаг - переместить внутренний цикл в подпрограмму и заменить цикл вызовом DoParallelNested
.
...
{$ModeSwitch nestedprocvars}
uses mtprocs;
...
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
BlockCount, BlockSize: PtrInt;
BlockMax: PPtrInt;
procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
var
i: integer;
BlockStart, BlockEnd: PtrInt;
begin
// вычисляем максимум блоков
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
BlockMax[Index]:=BigArray[BlockStart];
for i:=BlockStart to BlockEnd do
if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
end;
var
Index: PtrInt;
begin
// разделяем работу на блоки одинакового размера
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
// выделяем память для локальных/потоковых переменных
BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
// вычисляем максимум для каждого блока
ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
// вычисляем максимум всех блоков
Result:=BlockMax[0];
for Index:=1 to BlockCount-1 do
Result:=Max(Result,BlockMax[Index]);
FreeMem(BlockMax);
end;
В основном это было копирование и вставка, так что снова это можно было автоматизировать.
Example: parallel sort
The unit mtputils contains the function ParallelSortFPList which uses mtprocs to sort a TFPList in parallel. A compare function must be given.
procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);
This function uses the parallel MergeSort algorithm. The parameter MaxThreadCount is passed to DoParallel. A 0 means to use the system default.
Optionally you can provide your own sort function (OnSortPart) to sort the part of each single thread. For example you can sort the blocks via QuickSort, which are then merged. Then you have a parallel QuickSort. See TFPList.Sort for an example implementation of QuickSort.
Использование вложенных процедур
procedure DoSomething(Value: PtrInt);
var
p: array[1..2] of Pointer;
procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
p[Index]:=Pointer(Value); // возможен доступ к локальным переменным и параметрам!
end;
var
i: Integer;
begin
ProcThreadPool.DoParallelNested(@SubProc,1,2);
end;
Это может сохранить много времени на рефакторинг и сделать код параллельно работающих процедур более читаемым.