Difference between revisions of "Parallel procedures/ru"
(Deleted English categories) |
|||
(23 intermediate revisions by 4 users not shown) | |||
Line 1: | Line 1: | ||
{{Parallel_procedures}} | {{Parallel_procedures}} | ||
− | |||
− | |||
= Общие сведения = | = Общие сведения = | ||
Line 7: | Line 5: | ||
На этой странице описывается параллельное выполнение отдельных процедур с помощью модуля MTProcs, что упрощает параллельное выполнение процедур и реализацию параллельных алгоритмов. | На этой странице описывается параллельное выполнение отдельных процедур с помощью модуля MTProcs, что упрощает параллельное выполнение процедур и реализацию параллельных алгоритмов. | ||
− | Параллельные процедуры и методы часто встречаются в параллельных алгоритмах, и некоторые языки программирования предоставляют встроенную поддержку для них (например, OpenMP в gcc). См. "[[OpenMP support|поддержка OpenMP]]" для ознакомления с планами добавления таких языковых функций в FPC. Эти возможности, будучи встроенными в язык, могут сэкономить | + | Параллельные процедуры и методы часто встречаются в параллельных алгоритмах, и некоторые языки программирования предоставляют встроенную поддержку для них (например, OpenMP в gcc). См. "[[OpenMP support|поддержка OpenMP]]" для ознакомления с планами добавления таких языковых функций в FPC. Эти возможности, будучи встроенными в язык, могут сэкономить время на ввод кода и позволяют компилятору создавать код с меньшими затратами. С другой стороны, существует множество способов преобразования однопоточного фрагмента кода в параллельный. Однопоточный подход к программированию часто замедляет код. Для получения хороших результатов необходимо указать некоторые параметры, которые компилятор не может определить самостоятельно. Для ознакомления с примерами, посетите следующие ресурсы: [http://openmp.org/wp / OpenMP] и [http://www.khronos.org/opencl / OpenCL]. Если вам нужны параллельные алгоритмы, то MTProcs поможет в их реализации. |
= Добавление MTProcs = | = Добавление MTProcs = | ||
Line 15: | Line 13: | ||
Вы можете найти его исходники на sourceforge: | Вы можете найти его исходники на sourceforge: | ||
− | <pre>https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs</pre> | + | <pre>svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs</pre> |
Или в Lazarus ''components/multithreadprocs''. | Или в Lazarus ''components/multithreadprocs''. | ||
Как обычно, откройте пакет multithreadprocslaz.lpk в IDE один раз, что бы он узнал путь. | Как обычно, откройте пакет multithreadprocslaz.lpk в IDE один раз, что бы он узнал путь. | ||
− | Для использования пакета в вашем проекте, выберите: ''Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project'' | + | Для использования пакета в вашем проекте, выберите: ''Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project'' |
− | |||
= Простой пример = | = Простой пример = | ||
Line 27: | Line 24: | ||
Ниже приведён пример, который ничего не делает, но демонстрирует, как выглядит параллельная процедура и как она вызывается: | Ниже приведён пример, который ничего не делает, но демонстрирует, как выглядит параллельная процедура и как она вызывается: | ||
− | <syntaxhighlight>program Test; | + | <syntaxhighlight lang=pascal> |
+ | program Test; | ||
{$mode objfpc}{$H+} | {$mode objfpc}{$H+} | ||
Line 74: | Line 72: | ||
* Количество потоков может меняться во время выполнения, и нет никакой гарантии минимума потоков. В худшем случае весь индекс будет выполняться одним потоком. | * Количество потоков может меняться во время выполнения, и нет никакой гарантии минимума потоков. В худшем случае весь индекс будет выполняться одним потоком. | ||
* Максимальное количество потоков инициализируется для хорошей работы с учётом текущей системы. Однако, его можно изменить вручную через: | * Максимальное количество потоков инициализируется для хорошей работы с учётом текущей системы. Однако, его можно изменить вручную через: | ||
− | <syntaxhighlight>ProcThreadPool.MaxThreadCount := 8;</syntaxhighlight> | + | <syntaxhighlight lang=pascal>ProcThreadPool.MaxThreadCount := 8;</syntaxhighlight> |
* Вы можете задать максимальное количество потоков для каждой процедуры. | * Вы можете задать максимальное количество потоков для каждой процедуры. | ||
* Параллельная процедура (или метод) может вызывать рекурсивно параллельные процедуры (или методы). | * Параллельная процедура (или метод) может вызывать рекурсивно параллельные процедуры (или методы). | ||
Line 85: | Line 83: | ||
*Накладные расходы не зависят от рекурсивных уровней параллельных процедур. | *Накладные расходы не зависят от рекурсивных уровней параллельных процедур. | ||
− | + | Накладные расходы на многопоточность, которые не зависят от модулей MTProcs, а просто являются результатом современных компьютерных архитектур: | |
− | * | + | *Как только создается один поток, ваша программа становится многопоточной, и менеджеры памяти должны использовать критические разделы, что замедляет ее работу. Таким образом, даже если вы ничего не сделаете с потоком, ваша программа может стать медленнее. |
− | * | + | *Менеджер кучи <code>cmem</code> в некоторых системах намного быстрее при многопоточности. В моих тестах, особенно на системах Intel и особенно под OS X, разница в скорости может быть более 10 раз. |
− | * | + | *Строки и интерфейсы подсчитываются глобально. Для каждого случая доступа нужна критическая секция. Таким образом, обработка строк в нескольких потоках вряд ли увеличит скорость работы программы. Вместо них используйте PChars. |
− | * | + | *Каждый фрагмент работы (индекс) должен работать с дизъюнктивной частью памяти, чтобы избежать перекрестных обновлений кеша. |
− | * | + | *Не работайте с большим объемом памяти. В некоторых системах одного потока достаточно, чтобы заполнить скорость шины памяти. По достижении максимальной скорости шины памяти любой последующий поток будет замедляться, а не ускоряться. |
− | = | + | = Установка максимального числа потоков для процедуры = |
− | + | Вы можете указать максимальное число потоков для процедуры в пятом параметре. | |
− | <syntaxhighlight>begin | + | <syntaxhighlight lang=pascal> |
− | ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // | + | begin |
− | // | + | ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // адрес, начальный_индекс, конечный_индекс, |
+ | // необязательный параметр: данные (в данном случае: nil), необязательный параметр: максимальное число потоков (в данном случае: 2) | ||
end.</syntaxhighlight> | end.</syntaxhighlight> | ||
− | + | Данная опция может быть полезна когда потоки работают с одними и теми же данными, и слишком большое количество потоков будет создавать много конфликтов с кэшем, что в итоге приведет к их замедлению или когда алгоритм использует много WaitForIndex, так что фактически могут работать только несколько потоков. Затем потоки могут использоваться для других задач. | |
− | = | + | = Дождитесь индексации/выполнения по порядку = |
− | + | Иногда результат текущего индекса зависит от результата предыдущего индекса. Например, обычная задача - сначала вычислить блок 5, а затем объединить результат с результатом блока 3. Используйте для этого метод WaitForIndex: | |
− | <syntaxhighlight>procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); | + | <syntaxhighlight lang=pascal> |
+ | procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); | ||
begin | begin | ||
− | ... | + | ... вычисляем номер блока 'Index' ... |
if Index=5 then begin | if Index=5 then begin | ||
if not Item.WaitForIndex(3) then exit; | if not Item.WaitForIndex(3) then exit; | ||
− | ... | + | ... вычисляем ... |
end; | end; | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | WaitForIndex | + | WaitForIndex принимает в качестве аргумента индекс, который меньше текущего индекса или диапазона. Если он вернет <code>true</code>, все работает, как ожидалось. Если он возвращает <code>false</code>, то в одном из других потоков произошло исключение. |
− | + | Существует расширенная функция WaitForIndexRange, ожидающая весь диапазон индекса: | |
− | <syntaxhighlight>if not Item.WaitForIndexRange(3,5) then exit; // | + | <syntaxhighlight lang=pascal>if not Item.WaitForIndexRange(3,5) then exit; // ожидаем для значений индекса 3,4 и 5</syntaxhighlight> |
− | = | + | = Исключения = |
− | + | Если в одном из потоков возникает исключение, то другие потоки завершают работу нормально, но не запускают новый индекс. Пул ожидает завершения всех потоков, а затем вызовет исключение. Вот почему вы можете использовать <code>try..except</code> как всегда: | |
− | <syntaxhighlight>try | + | <syntaxhighlight lang=pascal> |
+ | try | ||
... | ... | ||
ProcThreadPool.DoParallel(...); | ProcThreadPool.DoParallel(...); | ||
Line 133: | Line 134: | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | + | Если есть несколько исключений, будет вызвано только первое исключение. Чтобы обрабатывать все исключения, добавьте в свой параллельный метод команду <code>try..except</code>. | |
− | = | + | = Синхронизация = |
− | + | Если вы хотите вызвать функцию в основном потоке, например, чтобы обновить какой-либо элемент графического интерфейса, вы можете использовать метод класса <code>TThread.Synchronize</code>. Он принимает в качестве аргументов текущий TThread и адрес метода. Начиная с версии 1.2 <code>mtprocs</code> предоставляет переменную потока <code>CurrentThread</code>, которая упрощает синхронизацию: | |
− | <syntaxhighlight>TThread.Synchronize(CurrentThread,@YourMethod);</syntaxhighlight> | + | <syntaxhighlight lang=pascal>TThread.Synchronize(CurrentThread,@YourMethod);</syntaxhighlight> |
− | + | Это разместит событие в основной очереди событий и дождется, пока основной поток выполнит ваш метод. Имейте в виду, что голая программа fpc не имеет очереди событий. Это есть в программе LCL или fpgui. | |
− | + | Если вы создаете своих собственных потомков <code>TThread</code>, вы должны установить переменную в своем методе <code>Execute</code>. Например: | |
− | <syntaxhighlight> | + | <syntaxhighlight lang=pascal> |
procedure TYourThread.Execute; | procedure TYourThread.Execute; | ||
begin | begin | ||
CurrentThread:=Self; | CurrentThread:=Self; | ||
− | ... | + | ...работаем... |
end; | end; | ||
</syntaxhighlight> | </syntaxhighlight> | ||
− | = | + | = Пример: Параллельный цикл = |
− | + | В этом примере шаг за шагом объясняется, как преобразовать цикл в параллельную процедуру. В примере вычисляется максимальное количество целочисленного массива ''BigArray''. | |
− | == | + | == Исходный цикл == |
− | <syntaxhighlight>type | + | <syntaxhighlight lang=pascal> |
+ | type | ||
TArrayOfInteger = array of integer; | TArrayOfInteger = array of integer; | ||
Line 172: | Line 174: | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | == | + | == Разделение работы == |
− | + | Работа должна быть равномерно распределена по n потокам. Для этого BigArray разбивается на блоки одинакового размера, и внешний цикл проходит по каждому блоку. Обычно n - это количество процессоров/ядер в системе. <code>MTProcs</code> имеет несколько служебных функций для вычисления размера и количества блоков: | |
− | <syntaxhighlight>function FindMaximum(BigArray: TArrayOfInteger): integer; | + | <syntaxhighlight lang=pascal> |
+ | function FindMaximum(BigArray: TArrayOfInteger): integer; | ||
var | var | ||
BlockCount, BlockSize: PtrInt; | BlockCount, BlockSize: PtrInt; | ||
Line 193: | Line 196: | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | + | Добавленные строки можно использовать для любого цикла. Со временем можно будет написать инструмент для автоматизации этого. | |
− | + | Теперь работа разбита на более мелкие части. Теперь части должны стать более независимыми. | |
− | == | + | == Локальные и общие переменные == |
− | + | Для каждой используемой переменной в цикле вы должны решить, является ли она общей переменной, используемой всеми потоками, или каждый поток использует свою собственную локальную переменную. Общие переменные <code>BlockCount</code> и <code>BlockSize</code> только читаются и не меняются, поэтому для них не требуется никакой работы. Но общая переменная, такая как <code>Result</code>, будет изменяться всеми потоками. Это может быть достигнуто либо с помощью синхронизации (например, критической секции), которая выполняется медленно, либо каждый поток использует локальную копию, и эти локальные переменные позже объединяются. | |
− | |||
− | |||
− | + | Вот решение, заменяющее переменную <code>Result</code> на массив, который в конце объединяется: | |
− | <syntaxhighlight>function FindMaximum(BigArray: TArrayOfInteger): integer; | + | <syntaxhighlight lang=pascal> |
+ | function FindMaximum(BigArray: TArrayOfInteger): integer; | ||
var | var | ||
− | // | + | // общие переменные |
BlockCount, BlockSize: PtrInt; | BlockCount, BlockSize: PtrInt; | ||
BlockMax: PPtrInt; | BlockMax: PPtrInt; | ||
− | // | + | // локальные переменные |
i: PtrInt; | i: PtrInt; | ||
Index: PtrInt; | Index: PtrInt; | ||
Line 216: | Line 218: | ||
begin | begin | ||
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize); | ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize); | ||
− | BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // | + | BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // выделение памяти для локальных переменных |
− | // | + | // вычисляем максимум для каждого блока |
for Index:=0 to BlockCount-1 do begin | for Index:=0 to BlockCount-1 do begin | ||
− | // | + | // вычисляем максимум блока |
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd); | Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd); | ||
BlockMax[Index]:=BigArray[BlockStart]; | BlockMax[Index]:=BigArray[BlockStart]; | ||
Line 226: | Line 228: | ||
end; | end; | ||
end; | end; | ||
− | // | + | // вычисляем максимум всех блоков |
− | // ( | + | // (это лучшее решение, если у вас сотни потоков) |
Result:=BlockMax[0]; | Result:=BlockMax[0]; | ||
for Index:=1 to BlockCount-1 do | for Index:=1 to BlockCount-1 do | ||
Line 235: | Line 237: | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | + | Этот подход прост и может быть автоматизирован. Однако для этого процесса потребуются подсказки от программиста. | |
== DoParallel == | == DoParallel == | ||
− | + | Последний шаг - переместить внутренний цикл в подпрограмму и заменить цикл вызовом <code>DoParallelNested</code>. | |
− | <syntaxhighlight> | + | <syntaxhighlight lang=pascal> |
... | ... | ||
{$ModeSwitch nestedprocvars} | {$ModeSwitch nestedprocvars} | ||
Line 257: | Line 259: | ||
BlockStart, BlockEnd: PtrInt; | BlockStart, BlockEnd: PtrInt; | ||
begin | begin | ||
− | // | + | // вычисляем максимум блоков |
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd); | Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd); | ||
BlockMax[Index]:=BigArray[BlockStart]; | BlockMax[Index]:=BigArray[BlockStart]; | ||
Line 266: | Line 268: | ||
Index: PtrInt; | Index: PtrInt; | ||
begin | begin | ||
− | // | + | // разделяем работу на блоки одинакового размера |
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize); | ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize); | ||
− | // | + | // выделяем память для локальных/потоковых переменных |
BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); | BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); | ||
− | // | + | // вычисляем максимум для каждого блока |
ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1); | ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1); | ||
− | // | + | // вычисляем максимум всех блоков |
Result:=BlockMax[0]; | Result:=BlockMax[0]; | ||
for Index:=1 to BlockCount-1 do | for Index:=1 to BlockCount-1 do | ||
Line 279: | Line 281: | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | + | В основном это было копирование и вставка, так что снова это можно было автоматизировать. | |
+ | |||
+ | =Пример: параллельная сортировка= | ||
− | + | Модуль <code>mtputils</code> содержит функцию <code>ParallelSortFPList</code>, которая использует <code>mtprocs</code> для параллельной сортировки <code>TFPList</code>. Должна быть указана функция сравнения. | |
− | |||
− | <syntaxhighlight>procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);</syntaxhighlight> | + | <syntaxhighlight lang=pascal>procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);</syntaxhighlight> |
− | + | Эта функция использует параллельный алгоритм <code>MergeSort</code>. Параметр <code>MaxThreadCount</code> передается <code>DoParallel</code>. 0 означает использование системы по умолчанию. | |
− | + | При желании вы можете предоставить свою собственную функцию сортировки (<code>OnSortPart</code>) для сортировки части каждого отдельного потока. Например, вы можете сортировать блоки через <code>QuickSort</code>, которые затем объединяются. Тогда у вас будет '''параллельный QuickSort'''. См. <code>TFPList.Sort</code> для примера реализации <code>QuickSort</code>. | |
− | = | + | = Использование вложенных процедур = |
− | <syntaxhighlight>procedure DoSomething(Value: PtrInt); | + | <syntaxhighlight lang=pascal> |
+ | procedure DoSomething(Value: PtrInt); | ||
var | var | ||
p: array[1..2] of Pointer; | p: array[1..2] of Pointer; | ||
Line 298: | Line 302: | ||
procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); | procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); | ||
begin | begin | ||
− | p[Index]:=Pointer(Value); // | + | p[Index]:=Pointer(Value); // возможен доступ к локальным переменным и параметрам! |
end; | end; | ||
Line 307: | Line 311: | ||
end;</syntaxhighlight> | end;</syntaxhighlight> | ||
− | + | Это может сохранить много времени на рефакторинг и сделать код параллельно работающих процедур более читаемым. | |
− | = | + | = См. также = |
− | * [[Multithreaded Application Tutorial]] | + | * [[Multithreaded Application Tutorial/ru|Руководство по разработке многопоточных приложений]] |
− | * [[OpenCL]] | + | * [[OpenCL/ru|OpenCL]] |
Latest revision as of 17:58, 1 September 2020
│
Deutsch (de) │
English (en) │
日本語 (ja) │
русский (ru) │
Общие сведения
На этой странице описывается параллельное выполнение отдельных процедур с помощью модуля MTProcs, что упрощает параллельное выполнение процедур и реализацию параллельных алгоритмов.
Параллельные процедуры и методы часто встречаются в параллельных алгоритмах, и некоторые языки программирования предоставляют встроенную поддержку для них (например, OpenMP в gcc). См. "поддержка OpenMP" для ознакомления с планами добавления таких языковых функций в FPC. Эти возможности, будучи встроенными в язык, могут сэкономить время на ввод кода и позволяют компилятору создавать код с меньшими затратами. С другой стороны, существует множество способов преобразования однопоточного фрагмента кода в параллельный. Однопоточный подход к программированию часто замедляет код. Для получения хороших результатов необходимо указать некоторые параметры, которые компилятор не может определить самостоятельно. Для ознакомления с примерами, посетите следующие ресурсы: / OpenMP и / OpenCL. Если вам нужны параллельные алгоритмы, то MTProcs поможет в их реализации.
Добавление MTProcs
Модуль mtprocs.pas является частью пакета multithreadprocslaz.lpk. Он не зависим от других пакетов и требует лишь наличия FPC >= 2.6.0.
Вы можете найти его исходники на sourceforge:
svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs
Или в Lazarus components/multithreadprocs.
Как обычно, откройте пакет multithreadprocslaz.lpk в IDE один раз, что бы он узнал путь. Для использования пакета в вашем проекте, выберите: Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project
Простой пример
Ниже приведён пример, который ничего не делает, но демонстрирует, как выглядит параллельная процедура и как она вызывается:
program Test;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}
cthreads, cmem,
{$ENDIF}
MTProcs;
// простая параллельная процедура
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
i: Integer;
begin
writeln(Index);
for i:=1 to Index*1000000 do ; // делаем некую работу
end;
begin
ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // адрес, стартовый индекс, конечный индекс, дополнительные данные
end.
На выходе будет что-то вроде этого:
2 3 1 4 5
Тут следует дать несколько коротких заметок. Подробности позже.
- Многопоточность в UNIX требует модуль cthreads как объяснено в учебнике по многопоточным Приложениям.
- Для повышения скорости cmem рекомендуется использовать диспетчер кучи, хотя в этом примере это не имеет значения.
- Параллельная процедура DoSomethingParallel получает некие фиксированные и предопределенные параметры.
- Index определяет, какую часть работы должен сделать этот вызов.
- Data - это указатель, который был дан в ProcThreadPool.DoParallel как четвертый параметр. Это необязательно и вы можете использовать его для чего угодно.
- Item может использоваться для доступа к некоторым более сложным функциям пула потоков.
- ProcThreadPool.DoParallel работает как обычный вызов процедуры. Он завершится, когда он полностью будет выполнен - это означает, что все потоки должны завершить свою работу.
- Выходные данные показывают типичное многопоточное поведение: порядок вызовов не определен. Несколько запусков могут привести к разным порядкам чисел.
Особенности
Выполнение процедуры параллельно означает:
- процедура или метод выполняется с индексом, идущим от произвольного StartIndex к произвольному EndIndex.
- Один или несколько потоков выполняют эти индексы параллельно. Например, если Индекс выполняется от 1 до 10 и в пуле доступно 3 потока, то 3 потока будут одновременно выполнять три разных индекса. Каждый раз, когда поток завершает один вызов (один индекс), он выделяет следующий индекс и выполняет его. Результатом может быть: поток 1 выполняет индекс 3,5,7, поток 2 выполняет 1,6,8,9 и поток 3 работает 2,4,10.
- Количество потоков может меняться во время выполнения, и нет никакой гарантии минимума потоков. В худшем случае весь индекс будет выполняться одним потоком.
- Максимальное количество потоков инициализируется для хорошей работы с учётом текущей системы. Однако, его можно изменить вручную через:
ProcThreadPool.MaxThreadCount := 8;
- Вы можете задать максимальное количество потоков для каждой процедуры.
- Параллельная процедура (или метод) может вызывать рекурсивно параллельные процедуры (или методы).
- Потоки используются повторно, это означает, что они не уничтожаются и не создаются для каждого индекса, но существует глобальный пул потоков. На двухъядерном процессоре будет два потока, выполняющих всю работу-основной поток и один дополнительный поток в пуле.
Накладные расходы, замедление
Накладные расходы сильно зависят от системы (количества и типа ядер, тип общей памяти, скорость критических разделов, размер кэша). Вот некоторые общие советы:
- Каждый блок работы (index) должен занимать не менее нескольких миллисекунд.
- Накладные расходы не зависят от рекурсивных уровней параллельных процедур.
Накладные расходы на многопоточность, которые не зависят от модулей MTProcs, а просто являются результатом современных компьютерных архитектур:
- Как только создается один поток, ваша программа становится многопоточной, и менеджеры памяти должны использовать критические разделы, что замедляет ее работу. Таким образом, даже если вы ничего не сделаете с потоком, ваша программа может стать медленнее.
- Менеджер кучи
cmem
в некоторых системах намного быстрее при многопоточности. В моих тестах, особенно на системах Intel и особенно под OS X, разница в скорости может быть более 10 раз. - Строки и интерфейсы подсчитываются глобально. Для каждого случая доступа нужна критическая секция. Таким образом, обработка строк в нескольких потоках вряд ли увеличит скорость работы программы. Вместо них используйте PChars.
- Каждый фрагмент работы (индекс) должен работать с дизъюнктивной частью памяти, чтобы избежать перекрестных обновлений кеша.
- Не работайте с большим объемом памяти. В некоторых системах одного потока достаточно, чтобы заполнить скорость шины памяти. По достижении максимальной скорости шины памяти любой последующий поток будет замедляться, а не ускоряться.
Установка максимального числа потоков для процедуры
Вы можете указать максимальное число потоков для процедуры в пятом параметре.
begin
ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // адрес, начальный_индекс, конечный_индекс,
// необязательный параметр: данные (в данном случае: nil), необязательный параметр: максимальное число потоков (в данном случае: 2)
end.
Данная опция может быть полезна когда потоки работают с одними и теми же данными, и слишком большое количество потоков будет создавать много конфликтов с кэшем, что в итоге приведет к их замедлению или когда алгоритм использует много WaitForIndex, так что фактически могут работать только несколько потоков. Затем потоки могут использоваться для других задач.
Дождитесь индексации/выполнения по порядку
Иногда результат текущего индекса зависит от результата предыдущего индекса. Например, обычная задача - сначала вычислить блок 5, а затем объединить результат с результатом блока 3. Используйте для этого метод WaitForIndex:
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
... вычисляем номер блока 'Index' ...
if Index=5 then begin
if not Item.WaitForIndex(3) then exit;
... вычисляем ...
end;
end;
WaitForIndex принимает в качестве аргумента индекс, который меньше текущего индекса или диапазона. Если он вернет true
, все работает, как ожидалось. Если он возвращает false
, то в одном из других потоков произошло исключение.
Существует расширенная функция WaitForIndexRange, ожидающая весь диапазон индекса:
if not Item.WaitForIndexRange(3,5) then exit; // ожидаем для значений индекса 3,4 и 5
Исключения
Если в одном из потоков возникает исключение, то другие потоки завершают работу нормально, но не запускают новый индекс. Пул ожидает завершения всех потоков, а затем вызовет исключение. Вот почему вы можете использовать try..except
как всегда:
try
...
ProcThreadPool.DoParallel(...);
...
except
On E: Exception do ...
end;
Если есть несколько исключений, будет вызвано только первое исключение. Чтобы обрабатывать все исключения, добавьте в свой параллельный метод команду try..except
.
Синхронизация
Если вы хотите вызвать функцию в основном потоке, например, чтобы обновить какой-либо элемент графического интерфейса, вы можете использовать метод класса TThread.Synchronize
. Он принимает в качестве аргументов текущий TThread и адрес метода. Начиная с версии 1.2 mtprocs
предоставляет переменную потока CurrentThread
, которая упрощает синхронизацию:
TThread.Synchronize(CurrentThread,@YourMethod);
Это разместит событие в основной очереди событий и дождется, пока основной поток выполнит ваш метод. Имейте в виду, что голая программа fpc не имеет очереди событий. Это есть в программе LCL или fpgui.
Если вы создаете своих собственных потомков TThread
, вы должны установить переменную в своем методе Execute
. Например:
procedure TYourThread.Execute;
begin
CurrentThread:=Self;
...работаем...
end;
Пример: Параллельный цикл
В этом примере шаг за шагом объясняется, как преобразовать цикл в параллельную процедуру. В примере вычисляется максимальное количество целочисленного массива BigArray.
Исходный цикл
type
TArrayOfInteger = array of integer;
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
i: PtrInt;
begin
Result:=BigArray[0];
for i:=1 to length(BigArray)-1 do begin
if Result<BigArray[i] then Result:=BigArray[i];
end;
end;
Разделение работы
Работа должна быть равномерно распределена по n потокам. Для этого BigArray разбивается на блоки одинакового размера, и внешний цикл проходит по каждому блоку. Обычно n - это количество процессоров/ядер в системе. MTProcs
имеет несколько служебных функций для вычисления размера и количества блоков:
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
BlockCount, BlockSize: PtrInt;
i: PtrInt;
Index: PtrInt;
BlockStart, BlockEnd: PtrInt;
begin
Result:=BigArray[0];
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
for Index:=0 to BlockCount-1 do begin
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
for i:=BlockStart to BlockEnd do begin
if Result<BigArray[i] then Result:=BigArray[i];
end;
end;
end;
Добавленные строки можно использовать для любого цикла. Со временем можно будет написать инструмент для автоматизации этого.
Теперь работа разбита на более мелкие части. Теперь части должны стать более независимыми.
Локальные и общие переменные
Для каждой используемой переменной в цикле вы должны решить, является ли она общей переменной, используемой всеми потоками, или каждый поток использует свою собственную локальную переменную. Общие переменные BlockCount
и BlockSize
только читаются и не меняются, поэтому для них не требуется никакой работы. Но общая переменная, такая как Result
, будет изменяться всеми потоками. Это может быть достигнуто либо с помощью синхронизации (например, критической секции), которая выполняется медленно, либо каждый поток использует локальную копию, и эти локальные переменные позже объединяются.
Вот решение, заменяющее переменную Result
на массив, который в конце объединяется:
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
// общие переменные
BlockCount, BlockSize: PtrInt;
BlockMax: PPtrInt;
// локальные переменные
i: PtrInt;
Index: PtrInt;
BlockStart, BlockEnd: PtrInt;
begin
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // выделение памяти для локальных переменных
// вычисляем максимум для каждого блока
for Index:=0 to BlockCount-1 do begin
// вычисляем максимум блока
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
BlockMax[Index]:=BigArray[BlockStart];
for i:=BlockStart to BlockEnd do begin
if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
end;
end;
// вычисляем максимум всех блоков
// (это лучшее решение, если у вас сотни потоков)
Result:=BlockMax[0];
for Index:=1 to BlockCount-1 do
Result:=Max(Result,BlockMax[Index]);
FreeMem(BlockMax);
end;
Этот подход прост и может быть автоматизирован. Однако для этого процесса потребуются подсказки от программиста.
DoParallel
Последний шаг - переместить внутренний цикл в подпрограмму и заменить цикл вызовом DoParallelNested
.
...
{$ModeSwitch nestedprocvars}
uses mtprocs;
...
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
BlockCount, BlockSize: PtrInt;
BlockMax: PPtrInt;
procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
var
i: integer;
BlockStart, BlockEnd: PtrInt;
begin
// вычисляем максимум блоков
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
BlockMax[Index]:=BigArray[BlockStart];
for i:=BlockStart to BlockEnd do
if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
end;
var
Index: PtrInt;
begin
// разделяем работу на блоки одинакового размера
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
// выделяем память для локальных/потоковых переменных
BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
// вычисляем максимум для каждого блока
ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
// вычисляем максимум всех блоков
Result:=BlockMax[0];
for Index:=1 to BlockCount-1 do
Result:=Max(Result,BlockMax[Index]);
FreeMem(BlockMax);
end;
В основном это было копирование и вставка, так что снова это можно было автоматизировать.
Пример: параллельная сортировка
Модуль mtputils
содержит функцию ParallelSortFPList
, которая использует mtprocs
для параллельной сортировки TFPList
. Должна быть указана функция сравнения.
procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);
Эта функция использует параллельный алгоритм MergeSort
. Параметр MaxThreadCount
передается DoParallel
. 0 означает использование системы по умолчанию.
При желании вы можете предоставить свою собственную функцию сортировки (OnSortPart
) для сортировки части каждого отдельного потока. Например, вы можете сортировать блоки через QuickSort
, которые затем объединяются. Тогда у вас будет параллельный QuickSort. См. TFPList.Sort
для примера реализации QuickSort
.
Использование вложенных процедур
procedure DoSomething(Value: PtrInt);
var
p: array[1..2] of Pointer;
procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
p[Index]:=Pointer(Value); // возможен доступ к локальным переменным и параметрам!
end;
var
i: Integer;
begin
ProcThreadPool.DoParallelNested(@SubProc,1,2);
end;
Это может сохранить много времени на рефакторинг и сделать код параллельно работающих процедур более читаемым.