Главная страница
    Top.Mail.Ru    Яндекс.Метрика
Текущий архив: 2011.04.17;
Скачать: CL | DM;

Вниз

Организация многопоточности   Найти похожие ветки 

 
Vitaliy_____   (2009-07-03 08:24) [0]

Доброе время суток!
Озадачился вопросом распараллеливания алгоритма для повышения вычислительной производительности.
Есть модуль, реализующий искуственную нейросеть. В нем есть процедура DoTeach(), выполняющая обучение ИНС по модификации алгоритма обратного распространения ошибки.
В ней есть часто используемая процедура GetNetOut(), которая вычисляет значение выходов многослойной ИНС при заданных входах. Учитывая то, что коррекция весов происходит по завершении эпохи обучения, думаю будет достаточно распараллелить только GetNetOut, так как основная вычислительная нагрузка на ней.
Теперь ближе к вопросу.
Сама процедура сейчас выглядит так:

procedure TTeachNet.GetNetOut();
var
i,j,k,LN_1,LN_2: integer;
begin
 LN_1:=LayersNum-1; LN_2:=LayersNum-2;
 // Первый слой (входной), c активационной функции.
 for i:=0 to LayerSize[0]-1 do
  OutLayers[0,i]:=LayerActivateF(InpLayers[0,i],1);
 
 // Рассчитываем все остальные слои кроме последнего.
 for k:=1 to LN_2 do
  for i:=0 to LayerSize[k]-1 do begin
   InpLayers[k,i]:=0;
   for j:=0 to LayerSize[k-1]-1 do
    InpLayers[k,i]:=InpLayers[k,i]+OutLayers[k-1,j]*LayerWeights[k-1,i,j];
   // Учет сдвига
   InpLayers[k,i]:=InpLayers[k,i]+LayerWeights[k-1,i,LayerSize[k-1]]; // *1
   OutLayers[k,i]:=LayerActivateF(InpLayers[k,i],1); // 1 - сигмоид
  end;

  // Рассчитываем последний слой c акт. ф. на выходе
  for i:=0 to LayerSize[LN_1]-1 do begin
   InpLayers[LN_1,i]:=0;
   for j:=0 to LayerSize[LN_2]-1 do
    InpLayers[LN_1,i]:=InpLayers[LN_1,i]+OutLayers[LN_2,j]*LayerWeights[LN_2,i,j];
   // Учет сдвига
   InpLayers[LN_1,i]:=InpLayers[LN_1,i]+LayerWeights[LN_2,i,LayerSize[LN_2]]; // *1
   //OutLayers[LN_1,i]:=InpLayers[LN_1,i];
   OutLayers[LN_1,i]:=LayerActivateF(InpLayers[LN_1,i],1);
  end;
end;

Я вижу своей целью распараллелить циклы: каждый поток будет рассчитывать свой кусок цикла. По идее не сложно, но есть пара технических вопросов:
1) Нужно чтобы потоки не создавались при каждом вызове GetNetOut, а были созданы 1 раз при вызове DoTeach и по ее команде выполняли вычисление с новыми данными.
2) Массивы InpLayers, OutLayers, LayerWeights, LayerSize - это поля класса TTeachNet, и они необходимы для вычислений выхода ИНС. Как их лучше передавать, чтобы они были доступны, как глобальные переменные всем потокам? В принципе, есть экземпляр класса TTeachNet, и можно сделать эти переменные public (т.к. потоки планируется создавать и уничтожать при гарантированной жизни TTeachNet, его же методом, то ошибки не должно быть). Но, возможно, есть более хорошие идеи?
3) Также потребуется барьерная синхронизация между вычислениями различных слоев ИНС. Не силен я в этом, буклет или пример нужен...
4) Взаимодействия с VCL в этой части нет, с синхронизацией вроде проблем не будет по этому вопросу. Только распараллеливание вычислений, это должно облегчить задачу.
------------
Я понимаю, что мой вопрос в каком-то смысле звучит как "код давай", но что бы мне действительно хотелось:
- Ваше мнение, верны ли мои мысли на идейном уровне?
- пару примеров по техническим вопросам "а как это сделать", т.к. мне делфийских примеров на потоки показалось маловато, хотя бы барьерной синхронизации там нет, а она потребуется.
- подводные камни: тут нужен опыт, мало ли о чем не подумал, в многопоточности их много :)
------------
Прочитать если кто что напишет смогу не раньше вторника (7-го числа).


 
Сергей М. ©   (2009-07-03 09:58) [1]


> 1)


Нужно организовать тред-пул.
Как пример для изучения см. класс TIdSchedulerOfThreadPool в составе Indy10


> 2)


Доступ по записи к этим массивам следует обязательно синхронизировать.
Для этого можно использовать, например, TCriticalSection, TMultiReadExclusiveWriteSynchronizer.
Синхронизацию доступа к cписочныv ресурсам можно организовать, например, на базе TThreadList


> 3)


см. 2)


 
Loginov Dmitry ©   (2009-07-03 12:50) [2]


> В ней есть часто используемая процедура GetNetOut(), которая
> вычисляет значение выходов многослойной ИНС при заданных
> входах. Учитывая то, что коррекция весов происходит по завершении
> эпохи обучения, думаю будет достаточно распараллелить только
> GetNetOut, так как основная вычислительная нагрузка на ней.
>


Этот расчет - обычное матричное умножение, верно?
Тогда используй Blass.dll, будет раз в 50 быстрее.
Примеры можешь найти на моем сайте.


 
Вариант   (2009-07-03 14:16) [3]


> Vitaliy_____   (03.07.09 08:24)  
> - пару примеров по техническим вопросам "а как это сделать",
>  т.к. мне делфийских примеров на потоки показалось маловато,
>  хотя бы барьерной синхронизации там нет, а она потребуется.
>

В принципе сделать барьер можно множеством вариантов, сходу пришло в голову два из тех, что мне кажутся попроще...

Можно сделать на

DWORD WaitForMultipleObjects(
 DWORD nCount,
 const HANDLE* lpHandles,
 BOOL bWaitAll,
 DWORD dwMilliseconds
);

Можно сделать на семафоре,  правда код будет посложнее.

Синхронизация, многопоточность хорошо описана у Рихтера в
"Создание эффективных WIN32-приложений с учетом специфики 64-разрядной версии Windows".

Пул потоков организую обычно aeyrwbtq QueueUserWorkItem, но это дело вкуса, можно создать и свой. Кстати примеры тоже есть у Рихтера в той же книге.


 
Leonid Troyanovsky ©   (2009-07-04 13:55) [4]


> Vitaliy_____   (03.07.09 08:24)  

> Озадачился вопросом распараллеливания алгоритма для повышения
> вычислительной производительности.

Сколько физических процессоров в целевой системе?

--
Regards, LVT.


 
Vitaliy_____   (2009-07-07 11:39) [5]

Сергей М.
> Нужно организовать тред-пул.

Не согласен, я вижу реализацию в создании тредов 1 раз и убивании их по завершении вычислений. Эта функция будет вызываться 300 000 раз за 1 эпоху вычислений, зачем создавать поток каждый раз, пусть даже пулом?

> Доступ по записи к этим массивам следует обязательно синхронизировать.
И тут не согласен: если есть четкое разграничение по данным (а его можно сделать: передаем потоку перед началом число потоков и его номер, по этим параметрам определяем фрагмент массива для обработки), то синхронизация нужна только между слоями (т.к. нам потребуется полностью вычисленный предыдущий слой для рассчета следующего).

Loginov Dmitry
> Этот расчет - обычное матричное умножение, верно?

По сути да, но сложность в том что слои разной размерности.
У тебя на сайте есть раздел "нейронные сети" (правда пустой), думаю ты в курсе что делается при "прямом проходе". Но есть тонкости. Учет случайного сдвига, разный алгоритм для входного, выходного и скрытых слоев.
И вообще, как-то слабо верится, что умножение чисел возможно ускорить. На чем экономия? На вычислении адресов при умножении матриц? Так тут размерности у слоев разные, не выйдет... Откуда 50 раз к скорости?

Leonid Troyanovsky
> Сколько физических процессоров в целевой системе?

Более 1. В данный момент 2, возможно будет 4. А это важно? Размерности слоев достаточно большие, чтобы загрузить и 100 процессоров :)


 
Vitaliy_____   (2009-07-07 11:41) [6]


> Вариант

WaitForMultipleObjects - интересно возможно подойдет, посмотрю Рихтера, слышал про эту книгу... Будем разбираться дальше :)


 
Сергей М. ©   (2009-07-07 12:19) [7]


> Vitaliy_____   (07.07.09 11:39) [5]



> зачем создавать поток каждый раз, пусть даже пулом?


Ты неверно трактуешь функциональность пула.
http://en.wikipedia.org/wiki/Thread_pool


> если есть четкое разграничение по данным


Из твоего объяснения это совершенно не очевидно.


 
Сергей М. ©   (2009-07-07 12:31) [8]


> Сколько физических процессоров в целевой системе?


> А это важно?


Куда как важней !

Достичь повышения сквозной выч.производительности алгоритма увеличением кол-ва кодовых потоков при неизменном (фиксированном) числе процессоров невозможно.


 
Vitaliy_____   (2009-07-07 14:34) [9]

Сергей М.
> Ты неверно трактуешь функциональность пула.


Гм, да, согласен. По сути будет что-то подобное.


> Достичь повышения сквозной выч.производительности алгоритма
> увеличением кол-ва кодовых потоков при неизменном (фиксированном)
> числе процессоров невозможно.

Не понял фразы. Если у нас в системе 2 ядра и мы организуем 2 вычислительных потока, то производительность возрастет по сравнению с 1 потоком (не в 2 раза, конечно, но возрастет). Учитывая, что непрерывные вычисления на 1 поток значительны, синхронизация отъест не так много (сколько - уже покажет практика).
Что значит  "...при неизменном (фиксированном) числе процессоров невозможно"? У нас 1 ядро простаивает в принципе при 1 вычислительном потоке и 2 процессорах.


 
Leonid Troyanovsky ©   (2009-07-07 14:52) [10]


> Vitaliy_____   (07.07.09 14:34) [9]

http://dtf.ru/articles/read.php?id=39888

http://rsdn.ru/article/baseserv/threadpool.xml

--
Regards, LVT.


 
Сергей М. ©   (2009-07-07 15:14) [11]


> У нас 1 ядро простаивает


С чего бы вдруг оно простаивает ?
Кроме потоков твоего процесса в системе одновременно существует еще туева хуча процессов, каждый из которых породил минимум 1 поток.

Они, эти потоки, не святым духом же исполняются, им тоже нужны кванты проц.времени .. а процессоров-то всего два .. а потоков - хуча .. да ты еще пару-тройку добавил)..


 
Loginov Dmitry ©   (2009-07-07 22:54) [12]


> По сути да, но сложность в том что слои разной размерности.
>
> У тебя на сайте есть раздел "нейронные сети" (правда пустой),
>  думаю ты в курсе что делается при "прямом проходе". Но
> есть тонкости. Учет случайного сдвига, разный алгоритм для
> входного, выходного и скрытых слоев.
> И вообще, как-то слабо верится, что умножение чисел возможно
> ускорить. На чем экономия? На вычислении адресов при умножении
> матриц? Так тут размерности у слоев разные, не выйдет...
>  Откуда 50 раз к скорости?



> По сути да, но сложность в том что слои разной размерности.


Слои-то разной размерности, верно, но насколько помню, выходы слоя расчитываются как вектор входов умноженный на матрицу весов слоя.


> И вообще, как-то слабо верится, что умножение чисел возможно
> ускорить. На чем экономия? На вычислении адресов при умножении
> матриц? Так тут размерности у слоев разные, не выйдет...
>  Откуда 50 раз к скорости?


в составе замечательной библиотеки Lapack есть замечательная библиотека Blas. Все в них реализовано на фортране, это язык с математическим уклоном, и компилятор фортрана донильзя оптимизирует операции по обработке матриц, причем может учитывать особенности того или иного процессора, оттуда и ускорение в десятки раз. Благодаря этому и Lapack и Blas используются в большинстве известных математических пакетах. Магия.


 
Vitaliy_____   (2009-07-08 07:12) [13]


> Leonid Troyanovsky ©

...1. На однопроцессорных системах нет смысла использовать многопоточные приложения. Это не приводит к повышению производительности, а наоборот, усложняет систему из-за необходимости выполнять синхронизацию потоков.

- ежу понятно, потому и написал, что более чем одно ядро. 2 или 4 в данном случае не важно.
У нас 2 чисто расчетные задачи, примерно равные по объему. Статья интересная, прочитал с удовольствием, но пока не вижу причин для беспокойства. В нашем случае должно работать.
Про пул: мы заранее знаем, а точнее планируем число потоков, есть ли смысл так усложнять? По-моему для серверных решений это действительно важно (хотя сталкивался с этим лишь на лабораторных в универе и там обходились отдельным потоком на запрос пользователя - идейно это все достаточно понятно и в статье подробно разжевано).

Сергей М. ©
> Кроме потоков твоего процесса в системе одновременно существует еще туева хуча процессов

Трудно поспорить, но большинство их (по крайней мере у меня) простаивает и запуская 2 варианта моей однопоточной программы мы получаем почти двукратную прибавку к скорости. На мой взгляд достаточный аргумент, чтобы задуматься о потоках.

Loginov Dmitry ©  
> Слои-то разной размерности, верно, но насколько помню, выходы
> слоя расчитываются как вектор входов умноженный на матрицу
> весов слоя.

Да, все верно. На выходе получим входной вектор для след. слоя.

> в составе замечательной библиотеки Lapack есть замечательная
> библиотека Blas. Все в них реализовано на фортране, это
> язык с математическим уклоном, и компилятор фортрана донильзя
> оптимизирует операции по обработке матриц

Слово "замечательная" напоминает рекламу БАДов. Есть тесты, которые это подтверждают? В магию не верю. Кстати, спасибо что напомнили выпускнику прикладной математики о существовании фортрана, а то я уже совсем его забыл. Помнится сравнивали решение одной и той же задачи, реализованной на фортране и на билдере 5-м тогда еще, так вот... Магии нет :) А неудобство писать пользовательский интерфейс на фортране есть.
Про оптимизацию: дык это пересобирать каждый раз надо под архитектуру системы, чтобы получить выигрыш. Нет, мы будем ручками делать. Хотя бы потому, что давно пора пересматривать наши последовательные алгоритмы.


 
Vitaliy_____   (2009-08-11 06:10) [14]

Не прошло и месяца как отпуск закончился :)
Решил поднять старую ветку: как ни наберешь в поиске "барьерная синхронизация на семафорах", никак не получаешь примера программы. Вроде все просто, но примера нет.
И так, пара комментариев по своему же вопросу:
Функция

DWORD WaitForMultipleObjects(
 DWORD nCount,
 const HANDLE* lpHandles,
 BOOL bWaitAll,
 DWORD dwMilliseconds
);

имеет один существенный на мой взгляд недостаток: есть константа, ограничивающая число объектов, которое она может ожидать.

function WaitForMultipleObjects(nCount: DWORD; lpHandles: PWOHandleArray;
 bWaitAll: BOOL; dwMilliseconds: DWORD): DWORD; stdcall;
{$EXTERNALSYM WaitForMultipleObjects}

type
 TWOHandleArray = array[0..MAXIMUM_WAIT_OBJECTS - 1] of THandle;
 PWOHandleArray = ^TWOHandleArray;

MAXIMUM_WAIT_OBJECTS = 64;
{$EXTERNALSYM MAXIMUM_WAIT_OBJECTS}


То есть при такой синхронизации мы можем ждать не больше 64 потоков. Это достаточно много, но "недальновидно", учитывая что уже сейчас есть видеокарты с большим числов процессоров (кто сказал, что обязательно проводить вычисления на ЦП, в дальнейшем возможно будут еще идеи по усовершенствованию, но это уже не для этой ветки).

Для облегчения работы с барьерной синхронизацией написал модуль, помогающий ее организовать. Хотелось бы услышать критику по нему:


unit BarrierSinchronization;

interface

type TMyBarrierSinchronization=class(TObject)
private
 NP:integer;                            // Число потоков для синхронизации
 MBSSemaphore1,MBSSemaphore2:THandle;   // 2 семафора для реализации барьера
 SemName1,SemName2:String;              // Имена семафоров
 state:boolean;                         // Ключ, с каким семафором работаем
public
 procedure WaitBarrier();
 procedure Initialize(NumProcesses:integer);

 constructor Create;
 destructor destroy; override;
end;

var MyBarrierSinchronization:TMyBarrierSinchronization;

resourcestring
WinAPIErrMsg1="Can not create CriticalSection for barrier synchronization!";
WinAPIErrMsg2="Can not create semaphore 1 for synchronization!";
WinAPIErrMsg3="Can not create semaphore 2 for synchronization!";
implementation

uses Windows, SysUtils, StrUtils;

constructor TMyBarrierSinchronization.Create;
begin
SemName1:=DateTimeToStr(Now());
SemName1:=AnsiReplaceStr(SemName1,":","_");
SemName1:=AnsiReplaceStr(SemName1,"\","_");
SemName1:=AnsiReplaceStr(SemName1,".","_");
SemName1:=AnsiReplaceStr(SemName1," ","_");
SemName1:=SemName1+"MBSMySemaphore";
SemName2:=SemName1+"2";
SemName1:=SemName1+"1";
np:=0;
if GetLastError<>0 then Exception.Create(WinAPIErrMsg1);
end;

destructor TMyBarrierSinchronization.destroy;
begin
CloseHandle(MBSSemaphore1);
CloseHandle(MBSSemaphore2);
np:=0;
inherited;
end;

procedure TMyBarrierSinchronization.Initialize(NumProcesses:integer);
begin
NP:=NumProcesses;
MBSSemaphore1:=CreateSemaphore(nil,0,np,PChar(SemName1)); // Занят
if GetLastError<>0 then Exception.Create(WinAPIErrMsg2);
MBSSemaphore2:=CreateSemaphore(nil,0,np,PChar(SemName2)); // Занят
if GetLastError<>0 then Exception.Create(WinAPIErrMsg3);
state:=true; // работаем с первым семафором, ждем второй
end;

procedure TMyBarrierSinchronization.WaitBarrier();
var MSemPrevCount:longint;
begin
if state then begin // На нечетных барьерах ждем второй семафор, работаем с первым
 ReleaseSemaphore(MBSSemaphore1,1,@MSemPrevCount);
 if MSemPrevCount=np-1 then begin // Это последний процесс, можно стартовать след. цикл
  CloseHandle(MBSSemaphore1);
  MBSSemaphore1:=CreateSemaphore(nil,0,np,PChar(SemName1)); // Занят
  if GetLastError<>0 then Exception.Create(WinAPIErrMsg2);
  State:=(State=false);  // Обратили состояние
  ReleaseSemaphore(MBSSemaphore2,np-1,nil); // Освободили семафор ожидания - все начали работу (np-1 - т.к. этот {последний} поток не будет ждать)
 end else
  WaitForSingleObject(MBSSemaphore2,INFINITE);
end else begin      // На четных барьерах наоборот
 ReleaseSemaphore(MBSSemaphore2,1,@MSemPrevCount);
 if MSemPrevCount=np-1 then begin // Это последний процесс, можно стартовать след. цикл
  CloseHandle(MBSSemaphore2);
  MBSSemaphore2:=CreateSemaphore(nil,0,np,PChar(SemName2)); // Занят
  if GetLastError<>0 then Exception.Create(WinAPIErrMsg3);
  State:=(State=false); // Обратили состояние
  ReleaseSemaphore(MBSSemaphore1,np-1,nil); // Освободили семафор ожидания - все начали работу (np-1 - т.к. этот {последний} поток не будет ждать)
 end else
  WaitForSingleObject(MBSSemaphore1,INFINITE);
end;
end;

initialization
MyBarrierSinchronization:=TMyBarrierSinchronization.Create;
end.


 
Вариант   (2009-08-11 10:10) [15]

Мне показалось сложновато несколько. Два семафора для того, что бы можно было барьер постоянно в цикле использовать без переинициализации (если правильно понял идею), постоянное пересоздание семафоров

Предлагаю еще вариант(естественно упрощенный, без проверок, без деструктора и т.п.):-)

Пусть класс содержит счетчик ожидаемых процессов (переменная типа integer), переменная для хранения начального значения счетчика  и объект синхронизации типа Event(THandle)

procedure TMyBarrierSinchronization.Create(NP:integer);
begin
 TP:=NP;//хранилище стартового значения счетчика
 NumProc:=NP-1;// счетчик = число процессов для ожидания-1
 WaitEvent:=CreateEvent(nil,true,false,nil);//Объект синхронизации
end;

procedure TMyBarrierSinchronization.Initialize(NumProcesses:integer);
begin
 NumProc:=NumProcesses-1;
end;

procedure TMyBarrierSinchronization.WaitBarrier();
begin
   ResetEvent(WaitEvent);
    if InterlockedDecrement(NumProc) < 0 then
     begin
       Initialize(TP);
       SetEvent(WaitEvent);
     end;
    WaitForSingleObject(WaitEvent,INFINITE);
end;

PS: Не проверял работает код или нет


 
Вариант   (2009-08-11 10:15) [16]

Не учел, что мой вариант можно использовать только в одном процессе. Для разных процессов надо дать имя Event, а вместо переменной-счетчика надо таки будет использовать то же именованный семафор. Но суть алгоритма остается та же.


 
Вариант   (2009-08-11 10:29) [17]

Посмотрел внимательней свой код, есть ошибки,автоинициализация не пойдет - какой-то ожидающий поток  может не сразу получить управление по установке события, а другой уже вошел в следующий цикл ожидания и сбросил событие... это нехорошо. Поспешил однако.


 
Vitaliy_____   (2009-08-11 13:00) [18]


> Вариант   (11.08.09 10:29) [17]
>
> Посмотрел внимательней свой код, есть ошибки,автоинициализация
> не пойдет - какой-то ожидающий поток  может не сразу получить
> управление по установке события, а другой уже вошел в следующий
> цикл ожидания и сбросил событие... это нехорошо. Поспешил
> однако.

Только хотел это написать, но ты меня опередил :)
Именно поэтому семафоров 2 а не 1 (для четной и нечетной итерации). в принципе, у меня был вариант с событиями. Но их все равно надо не меньше 2-х. А вот по поводу пересоздания семафора - тут у меня вопрос, может его совсем не обязательно "убивать". Надо еще раз заглянуть в рихтера, мне вообще надо его "сбросить". может CloseHandle не обязателен, проконсультируйте кто знает: может достаточно вызвать Create или Open и оно перепишет как надо?
У меня тоже есть ошибки, точнее неточности: если 2-ды вызвать TMyBarrierSinchronization.Initialize, то семафоры будут созданы второй раз. Правда имя тоже. Знатоки, подскажите, что при этом произойдет? Будут просто переписаны параметры объекта ядра или еще и увеличен счетчик пользователей этого объекта? (в последнем случае мы его тогда не освободим, как я понимаю).


 
Vitaliy_____   (2009-08-11 13:16) [19]

Уточнил у рихтера, при создании или открытии у нас НЕ будут переписаны параметры объекта ядра, если он существует, так что уничтожать его надо. Кстати, в инициализации, думаю, можно будет перед созданием семафоров тоже CloseHandle добавить. Согласно рихтеру

BOOL CloseHandle(HANDLE hobj);
Эта функция сначала проверяет таблицу описателей, принадлежащую вызываю
щему процессу, чтобы убедиться, идентифицирует ли переданный ей индекс (описатель) объект, к которому этот процесс действительно имеет доступ. Если переданный индекс правилен, система получает адрес структуры данных объекта и уменьшает в этой структуре счетчик числа пользователей; как только счетчик обнулится, ядро удалит объект из памяти.
Если же описатель неверен, происходит одно из двух. В нормальном режиме вы
полнения процесса CloseHandle возвращает FALSE, а GetLastError код ERROR_INVALID_HANDLE. Но при выполнении процесса в режиме отладки система просто уведомляет отладчик об ошибке.

Конечно, можно еще и в Create Прописать создание семафоров и тогда в Initialize все будет честно.


 
Вариант   (2009-08-11 13:26) [20]

А зачем уничтожать? Сбросить можно без пересоздания. Уменьшить счетчик можно WaitForSingleObject функцией.


 
Vitaliy_____   (2009-08-11 13:55) [21]

Нет, а как вы себе представляете, в цикле вызывать ожидание до числа процессов? Может проще уничтожить?


 
Вариант   (2009-08-11 14:28) [22]


> Vitaliy_____   (11.08.09 13:55) [21]


Например
while(true)
do
sig:=WaitForSingleObject(Sem,0);
if sig <> WAIT_OBJECT_0 then
break;///объект в не сигнальном состоянии, то есть дошли до нуля или была ошибка
end;

насчет проще  - не скажу. MSDN советует WaitFor в цикле.
Но не знаю, если процессов более 1000 или еще больше например, что больше займет времени - сброс на ноль  или уничтожение/создание... Не знаю.


 
Vitaliy_____   (2009-08-11 14:38) [23]

Ну не знаю, мне по большому счету пока без разницы, все равно планируется 2 потока с большой загрузкой между барьерами. Еще бы время найти доделать :)
Хотя на будущее - спасибо за совет: MSDN для меня достаточный аргумент.


 
Vitaliy_____   (2009-08-14 07:58) [24]

Честно, не очень понимаю в чем дело - вроде идейно все правильно, но потоки каким-то магическим образом проскакивают барьер. В итоге попадаем на бесконечное ожидание разных семафоров. (число потоков выставлено правильно во всех тредах). Если кто укажет ошибку - здорово, хотя слабо в это верю. Попробую пока сделать для каждого участка свой барьер из обычного одного семафора, интересно что там будет.

unit BarrierSinchronization;

interface

uses Windows, Utils, SysUtils, SyncObjs;

type TMyBarrierSinchronization=class(TObject)
private
 NP:integer;                            // Число потоков для синхронизации
 MBSSemaphore1,MBSSemaphore2:THandle;   // 2 семафора для реализации барьера
 SemName1,SemName2:String;              // Имена семафоров
 state:boolean;                         // Ключ, с каким семафором работаем
public
 procedure Initialize(NumProcesses:integer);
 procedure WaitBarrier(ThreadState:string);

 constructor Create;
 destructor destroy; override;
end;

var MyBarrierSinchronization:TMyBarrierSinchronization;

resourcestring
WinAPIErrMsg1="Can not create CriticalSection for barrier synchronization!";
WinAPIErrMsg2="Can not create semaphore 1 for synchronization!";
WinAPIErrMsg3="Can not create semaphore 2 for synchronization!";
implementation

constructor TMyBarrierSinchronization.Create;
begin
SemName1:=MyUtils.GetUnicalName+"MBSMySemaphore";
SemName2:=SemName1+"2";
SemName1:=SemName1+"1";
np:=0;
// Создаем "пустые" семафоры, чтобы корректно работала Initialize
MBSSemaphore1:=0;
MBSSemaphore2:=0;
end;

destructor TMyBarrierSinchronization.destroy;
begin
if MBSSemaphore1<>0 then
 CloseHandle(MBSSemaphore1);
if MBSSemaphore2<>0 then
 CloseHandle(MBSSemaphore2);
np:=0;
inherited;
end;

procedure TMyBarrierSinchronization.Initialize(NumProcesses:integer);
begin
NP:=NumProcesses;
if MBSSemaphore1<>0 then
 CloseHandle(MBSSemaphore1);
MBSSemaphore1:=CreateSemaphore(nil,0,np,PChar(SemName1)); // Занят
if GetLastError<>0 then raise Exception.Create(WinAPIErrMsg2);
if MBSSemaphore2<>0 then
 CloseHandle(MBSSemaphore2);
MBSSemaphore2:=CreateSemaphore(nil,0,np,PChar(SemName2)); // Занят
if GetLastError<>0 then raise Exception.Create(WinAPIErrMsg3);
state:=true; // работаем с первым семафором, ждем второй
end;

procedure TMyBarrierSinchronization.WaitBarrier(ThreadState:string);
var MSemPrevCount:longint;
   key:cardinal;

procedure ReSetSemaphore(Sem:THandle);
var sig:cardinal;
begin
 repeat
  sig:=WaitForSingleObject(Sem,0);
 until sig <> WAIT_OBJECT_0;
end;

begin
if state then begin // На нечетных барьерах ждем второй семафор, работаем с первым
 ReleaseSemaphore(MBSSemaphore1,1,@MSemPrevCount);
 if MSemPrevCount=np-1 then begin // Это последний процесс, можно стартовать след. цикл
  ReSetSemaphore(MBSSemaphore1);
  State:=false;
  ReleaseSemaphore(MBSSemaphore2,np-1,nil); // Освободили семафор ожидания - все начали работу (np-1 - т.к. этот {последний} поток не будет ждать)
 end else begin
  repeat
   key:=WaitForSingleObject(MBSSemaphore2,3000);
   if key=WAIT_TIMEOUT then begin
    MessageBox(0,PChar("WaitTimeOut on semaphore 2"+#13+ThreadState),PChar(IntToStr(GetCurrentThreadID)),0);
    DebugBreak;
   end;
  until key=WAIT_OBJECT_0;
 end;
end else begin      // На четных барьерах наоборот
 ReleaseSemaphore(MBSSemaphore2,1,@MSemPrevCount);
 if MSemPrevCount=np-1 then begin // Это последний процесс, можно стартовать след. цикл
  ReSetSemaphore(MBSSemaphore2);
  if GetLastError<>0 then raise Exception.Create(WinAPIErrMsg3);
  State:=true;
  ReleaseSemaphore(MBSSemaphore1,np-1,nil); // Освободили семафор ожидания - все начали работу (np-1 - т.к. этот {последний} поток не будет ждать)
 end else begin
  repeat
   key:=WaitForSingleObject(MBSSemaphore1,3000);
   if key=WAIT_TIMEOUT then begin
    MessageBox(0,PChar("WaitTimeOut on semaphore 1"+#13+ThreadState),PChar(IntToStr(GetCurrentThreadID)),0);
    DebugBreak;
   end;
  until key=WAIT_OBJECT_0;
 end;
end;
end;

initialization
MyBarrierSinchronization:=TMyBarrierSinchronization.Create;

end.


 
Vitaliy_____   (2009-08-14 12:07) [25]

Похоже, все же где-то в этом модуле есть ошибка. При создании n семафоров (где n - число барьеров) - все работает, а при таком "автосбросе" где-то пролетаем на красный свет.
> Вариант  Похоже эта тема особо интересна нам двоим :) Если будет желание, подумай где может быть косяк: у меня явный дедлок - проблема есть, а с какого боку ее решать не знаю :)


 
Leonid Troyanovsky ©   (2009-08-14 14:11) [26]


> Vitaliy_____   (14.08.09 07:58) [24]

Много буков, не осилил.
А чего требуется, в чем сакральный смысл проектируемого?

Могу заметить, чтобы написать и отладить не очень сложный составной объект синхронизации нужна масса времени и терпения.

Какие объекты (системные) синхронизации, кроме семафоров,
изучены и почему они неприменимы для данной задачи?
И как, собс-но, формулируется сама задача?

--
Regards, LVT.


 
Leonid Troyanovsky ©   (2009-08-14 14:16) [27]


> Vitaliy_____   (11.08.09 06:10) [14]

> То есть при такой синхронизации мы можем ждать не больше
> 64 потоков.

Для двух ядер - 64 потока - как до звезды.

Ну, а если очень нужно сделай 2 потока, каждый из которых
будет ждать по 64. Но, все это не для двухпроцессорной машины.

--
Regards, LVT.


 
Leonid Troyanovsky ©   (2009-08-14 14:23) [28]


> Vitaliy_____   (08.07.09 07:12) [13]

> простаивает и запуская 2 варианта моей однопоточной программы
> мы получаем почти двукратную прибавку к скорости. На мой

И почему бы не распараллелить задачу столь естественным образом?

--
Regards, LVT.


 
Leonid Troyanovsky ©   (2009-08-14 14:50) [29]


> Vitaliy_____   (03.07.09 08:24)  

>  т.к. мне делфийских примеров на потоки показалось маловато,
>  хотя бы барьерной синхронизации там нет, а она потребуется.

Не понял, зачем здесь потребуется барьерная синхронизация.
Для эффективных вычислений требуется (консольное) приложение
с двумя, четырьмя и т.д. (по кол-ву процессоров)
дополнительными потоками.
Т.е., про гуи приложение на двух ядрах лучше забыть.

А для управления одним потоком хватит одного-пары Event
и одной-пары критических секций.
Ну, и, конечно, основная рабочая лошадь - WaitForMultipleObjects[Ex]

--
Regards, LVT.


 
Leonid Troyanovsky ©   (2009-08-14 14:53) [30]


> Leonid Troyanovsky ©   (14.08.09 14:50) [29]

> Для эффективных вычислений требуется (консольное) приложение
> с двумя, четырьмя и т.д. (по кол-ву процессоров)
> дополнительными потоками.

Слово "дополнительными" здесь лишнее, sorry.

--
Regards, LVT.


 
Vitaliy_____   (2009-08-17 07:36) [31]


> Leonid Troyanovsky ©

Много букв, но осилил.
1) В данный момент задача решена, находится в стадии тестирования. Остался вопрос, касаемый "барьера с автоустановкой" - подробнее ниже.

В качестве синхронизации сейчас используется n семафоров, где n - количество барьеров. И вроде все работает. Конечно, слово "вроде" нехорошее, но пока проблем не выявлено, а вот в [24] они есть, хотя лично мне не понятно где.

2) Если уж продолжать тему, то вопрос сейчас можно сформулировать так:
Существуют итерационные вычисления, между частями итераций нужно синхронизировать потоки (т.е. барьер, нам на след. части нужны данные с предыдущей, значит она должна быть полностью досчитана).
Для того чтобы "кто-то" (читать, управляющий всем этим барахлом поток) не брал на себя тяжкий труд по организации необходимых барьеров (ему достаточно того, что надо синхронизировать сами итерации и выполнять действия, необходимые между ними) - требуется "барьер с автоустановкой".

3) Идея [24] заключается в следующем: имеется 2 семафора, как только последний поток достиг барьера, он СНАЧАЛА выставляет барьер для следующей итерации (четная/нечетная определяется переменной state), затем сбрасывает текущий барьер. Вроде логически корректно, пока мы не выставим следующий барьер, все будут ждать на текущем, но на практике получается что потоки "магическим" образом оказываются на разных барьеров и, как следствие, впадают в бесконечное ожидание (т.к. ни на один не придут ВСЕ потоки и, значит, барьер не "рухнет").
Я в магию не верю, значит в коде есть ошибка.

4) Гуй. Если "о нем можно забыть" - это не значит что его нет :)
Он есть, написан на паре timer + Application.ProcessMessages и переписывать смысла не вижу. Единственное, что важно, так это то что в потоках у нас никаких Application.ProcessMessages быть не должно, т.к. это VCL.
Потоки никакого гуя не знают и знать не должны - они просто должны досчитать итерацию (внутри которой еще и барьеры нужны) а затем последний досчитавший ставит событие EndCalcEvent. После этого все ждут команды на след. итерацию или умирание (вот тут как раз WaitForMultipleObjects для 2-х объектов - семафора старта и события умирания)


 
Leonid Troyanovsky ©   (2009-08-17 09:15) [32]


> Vitaliy_____   (17.08.09 07:36) [31]

> В качестве синхронизации сейчас используется n семафоров,
>  где n - количество барьеров.

Для двух ядер достаточно 1 критической секции.
КС даже предпочтительней объектов ядра, т.к., переключает поток
в kernel mode лишь по необходимости.

> Существуют итерационные вычисления, между частями итераций
> нужно синхронизировать потоки (т.е. барьер, нам на след.
>  части нужны данные с предыдущей, значит она должна быть
> полностью досчитана).

С этим вполне справляется даже 1 КС.
Можно найти и более "легкое" решение.

> Я в магию не верю, значит в коде есть ошибка.

[26]

> 4) Гуй. Если "о нем можно забыть" - это не значит что его
> нет :)

Если шашечки важней, чем ехать, то, конечно, можно и с гуем.

Если вычисления распараллеливаются между двумя вторичными
потоками, то всего в процессе 3 потока, и, значит, за удовольствие
смотреть на мелькающие цифирки/картинки придется заплатить
увеличением числа переключений контекстов на где-то на треть.

И, во-ще, для данной формулировки важно не "работает/не работает",
а эффективность работы - загрузка процессоров полезной
(вычислительной) работой, а не переключением контекстов и мод.

--
Regards, LVT.


 
Vitaliy_____   (2009-08-17 11:06) [33]


> Если вычисления распараллеливаются между двумя вторичными
> потоками, то всего в процессе 3 потока, и, значит, за удовольствие
> смотреть на мелькающие цифирки/картинки придется заплатить
> увеличением числа переключений контекстов на где-то на треть.
>

А вот это сейчас и проверяю. Тест подтверждает увеличение производительности на 1.9 - что на мой взгляд опровергает ваше утверждение. Накладные расходы на барьеры, + (что намного важнее) _частичное_ распараллеливание задачи (только та часть, где мало логики и много вычислений), и, в конце концов, другие процессы в системе - результат  на мой взгляд чуть ли не идеален.
Что-то ну в упор не понимаю, при чем здесь критические секции - поясните, исходная функция, которую надо распараллелить, дана в [0] посте.

> Для двух ядер...

Сегодня 2, завтра 4 - зачем писать код, привязанный к железу - чтобы его переписывать потом? Если уж делать программу параллельной, так уж не ограничивать по возможности тем, что имеешь в данный момент! Не понимаю я подхода "исходя из числа ядер" - слишком уж часто меняются технологии в наше время.

> И, во-ще, для данной формулировки важно не "работает/не
> работает",
> а эффективность работы - загрузка процессоров полезной
> (вычислительной) работой, а не переключением контекстов
> и мод.
>

Категорически не согласен. В качестве понимания "почему" работает медленно - все это знать нужно, но в качестве КРИТЕРИЯ верю только в засечение времени, за которое совершается одинаковая (достаточно большая) работа. У нас много оптимизаций, связанных с памятью и т.д. - "работа" может быть и полезной, с точки зрения процессора, но "лишней", "не запланированной" с точки зрения алгоритма.


 
Дмитрий Белькевич   (2009-08-17 11:30) [34]


>  Не понимаю я подхода "исходя из числа ядер" - слишком уж
> часто меняются технологии в наше время.


Для разного количества ядер оптимальны разные алгоритмы. Так что если хочется оптимально под несколько разных вариантов, то, по хорошему, нужно делать  несколько разных алгоритмов.


 
Anatoly Podgoretsky ©   (2009-08-17 11:45) [35]

> Vitaliy_____  (17.08.2009 11:06:33)  [33]

> Сегодня 2, завтра 4 - зачем писать код, привязанный к железу - чтобы его переписывать потом?

Не обязательно переписывать код, можно написать код, который при старте будет узнавать количество логических процессоров, мы же все же делаем software, а не hardwarе


 
Vitaliy_____   (2009-08-17 12:38) [36]

Конечно, так и сделано, на форме спинедит: количество процессоров, дефолтом в нем число реальных...
Единственное отличие в моем алгоритме - так это если 1 процессор и несколько: для одного лишние потоки не создаются, синхронизаций нет, для нескольких (без разницы сколько) - 1 управляющий поток (на нем труднораспараллеливаемая часть и создание/запуск расчетных) и n (=числу реальных ядер) расчетных.
Основное время работают расчетные, и немного основной (когда этап расчета завершен) - он проверяет логическую часть, дает обновиться гую и запускает следующий этап.
Это все как раз работает - вопрос который остался я попытался сформулировать в [31] (2)


 
Leonid Troyanovsky ©   (2009-08-17 13:01) [37]


> Vitaliy_____   (17.08.09 11:06) [33]

> Что-то ну в упор не понимаю, при чем здесь критические секции
> - поясните, исходная функция, которую надо распараллелить,
>  дана в [0] посте.

Вот простенький пример для гуи и вторичного потока (хотя он
здесь, IMHO, вовсе не нужен):

uses
 Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
 Dialogs, StdCtrls;

type
 TForm1 = class(TForm)
   Button1: TButton;
   procedure Button1Click(Sender: TObject);
   procedure FormCreate(Sender: TObject);
   procedure FormDestroy(Sender: TObject);
 private
   { Private declarations }
 public
   { Public declarations }
   critsect: TRTLCriticalSection;
   s: Longint;
 end;

var
 Form1: TForm1;

implementation

{$R *.dfm}

type
 TGlobalData = record
   ID: Longint;
   Data: Longint;
 end;

 TCalcThread = class(TThread)
 public
   cs : TRTLCriticalSection;
   calcid: Longint;
   global: TGlobalData;
   procedure Execute; override;
 end;

procedure TCalcThread.Execute;
var
 i: Longint;
begin
 while not Terminated do
   begin
     EnterCriticalSection(cs);
       if calcid <> global.ID then
         begin
           for i := 0 to 100000 do
             global.Data := global.Data + 500 - Random(1001); { здесь считаем половину цикла}
           global.ID := calcid;
         end;
     LeaveCriticalSection(cs);
   end;
end;

var
 ct: TCalcThread;

procedure TForm1.Button1Click(Sender: TObject);
var
 i: Longint;
begin
 ct.Resume;
 s := 0;
 while not Application.Terminated do
   begin
     EnterCriticalSection(CritSect);
       ct.Global.Data := ct.Global.Data + s;
       Caption := Format("%d %d", [ct.Global.Id, ct.Global.Data ]);
       ct.calcid := ct.calcid + 1;
     LeaveCriticalSection(CritSect);
     Application.ProcessMessages;
     for i := 0 to 100000 do
       s := s + 500 - Random(1001); { здесь считаем другую половину }
   end;
end;

procedure TForm1.FormCreate(Sender: TObject);
begin
 InitializeCriticalSection(CritSect);
 ct := TCalcThread.Create(True);
 ct.cs := CritSect;
 ct.calcid := 0;
 ct.global.ID := 0;
 ct.global.Data := 0;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
 DeleteCriticalSection(critsect);
end;

Вполне возможно, что делить цикл надо не точно пополам,
а с учетом накладных расходов первичного потока.
И несмотря на простоту и изученность объекта синхронизации
здесь еще таится куча тонкостей.

> КРИТЕРИЯ верю только в засечение времени, за которое совершается
> одинаковая (достаточно большая) работа.

Для анализа работы потребуется не только общее время, но и все его
составляющие, включая количество переключений контекстов,
переключений между режимами, частота сброса кеша(ей),  реакция на
появление высокоприоритетных (в т.ч. системных) потоков и др.

--
Regards, LVT.


 
Leonid Troyanovsky ©   (2009-08-17 13:12) [38]


> Leonid Troyanovsky ©   (17.08.09 13:01) [37]

> здесь, IMHO, вовсе не нужен):

Гуи не нужен, консольный поток менее ресурсоемок.

--
Regards, LVT.


 
Kolan ©   (2009-08-17 13:15) [39]

Что такое TRTLCriticalSection? В справке нет.


 
Leonid Troyanovsky ©   (2009-08-17 13:29) [40]


> Kolan ©   (17.08.09 13:15) [39]

> Что такое TRTLCriticalSection? В справке нет.

From Turbo Delphi for Win32 help:

TRTLCriticalSection stores information about a Windows critical section.

From Windows.pas (find declaration):
TRTLCriticalSection = _RTL_CRITICAL_SECTION; // &etc.

--
Regards, LVT.


 
Kolan ©   (2009-08-17 16:15) [41]

Благодарю.

А чем принципиально исользование TRTLCriticalSection отличается от использования TCriticalSection?

Кстати в 2009 Делфи для этого типа есть два хелпера:

 TCriticalSectionHelper = record helper for TRTLCriticalSection
   procedure Initialize; inline;
   procedure Destroy; inline;
   procedure Free; inline;
   procedure Enter; inline;
   procedure Leave; inline;
   function TryEnter: Boolean; inline;
 end;

 TConditionVariableHelper = record helper for TRTLConditionVariable
 public
   class function Create: TRTLConditionVariable; static;
   procedure Free; inline;
   function SleepCS(var CriticalSection: TRTLCriticalSection; dwMilliseconds: DWORD): Boolean;
   procedure Wake;
   procedure WakeAll;
 end;


 
Дмитрий Белькевич   (2009-08-17 16:26) [42]


> record helper for


Тоже увидел. Вопрос, кстати, образовался. Чем эти хэлперы отличаются от обычного наследования?


 
Kolan ©   (2009-08-17 16:29) [43]

Ну наследования структур нет. А кроме того, как наследованием добавить методы в середину бибилиотечной иерархии? — Только исправив библиотеку. А бибилиотека может быть и без исходников, кроме того VCL какой-нибудь править — не самое безопасное занятие.

А тут раз: helper for TComponent написал и все.


 
Дмитрий Белькевич   (2009-08-17 18:03) [44]

Ясно, буду разбираться.


 
Leonid Troyanovsky ©   (2009-08-17 21:12) [45]


> Kolan ©   (17.08.09 16:15) [41]

> А чем принципиально исользование TRTLCriticalSection отличается
> от использования TCriticalSection?

Принципиально, наверное, не отличается.
Как-то обвык без объектов и Enter|Leave :(

--
Regards, LVT.


 
Vitaliy_____   (2009-08-18 08:29) [46]


> Leonid Troyanovsky ©

Идею твоей программы понял, там кстати целые числа - вполне подошел бы InterlockedExchange, твоя идея для вещественных лучше. Идея плохая :) Где гарантия, что порожденный поток не уснет надолго? (скажем, на час :) ) - тогда "главный" просчитает часть цикла, отпишется через критическую секцию и начнет считать дальше, ведь так? Просчитает ЕЩЕ ОДНУ часть цикла и снова увеличит ct.calcid := ct.calcid + 1;
При мелких вычислениях - очень реальная ситуация. Что при этом будет - а также как у меня где-то проскок через контрольную точку. Хотя, если еще одну контрольную ID-шку добавить, подход сработает. Только по сути это будет спин-блокировка, что крайне не рекомендуется тем же Рихтером.
Поправьте, если не прав...


 
Leonid Troyanovsky ©   (2009-08-18 10:12) [47]


> Vitaliy_____   (18.08.09 08:29) [46]

> Идею твоей программы понял, там кстати целые числа - вполне
> подошел бы InterlockedExchange, твоя идея для вещественных

Да хоть для комплексных ;)

> Где гарантия, что порожденный поток не уснет надолго? (скажем,
>  на час :) ) -

Эту гарантию должен дать разработчик.

> тогда "главный" просчитает часть цикла, отпишется через
> критическую секцию и начнет считать дальше, ведь так?

Если вторичный поток будет, например, вытеснен не осводив КС,
то первичный, досчитав свой цикл, будет ожидать ее освобождения.
Для гуева потока это может и не очень, но я на гуи и не настаивал.

> Только по сути это будет спин-блокировка, что крайне не
> рекомендуется тем же Рихтером.

Чего-то я такой рекомендации не помню.
Спин-блокировка - самый быстрый механизм синхронизации.
При использовании же КС потоки, все же, попадают в режим ядра.

--
Regards, LVT.


 
Vitaliy_____   (2009-08-18 12:12) [48]


> Если вторичный поток будет, например, вытеснен не осводив
> КС,
> то первичный, досчитав свой цикл, будет ожидать ее освобождения.
>
> Для гуева потока это может и не очень, но я на гуи и не
> настаивал.

А если все же освободит и уснет в момент проверки while not Application.Terminated do?
Согласен, редкий случай, но возможный. В моем примере тоже по 10-20 итераций делает (то есть по 30 000 образов, в них по 4 барьера) и это * число итераций, а потом вдруг умирает. К счастью, стабильно :)
Нет, должно работать 100%.
А по поводу спин-блокировки, у рихтера это "Худшее что можно сделать"... стр. 195 (215 в моей электронной версии). Сам ее "придумал", потом прочитал, проникся и больше так не делаю :)
Пусть попадают в режим ядра, у меня при большой задаче не такие частые синхронизации, и можно пренебречь 1000 тактов ради того чтобы прога не мешала другим. А вот о "надежности" барьеров - придется самому разбираться, как время найду - хочу добить задачку, может потом статью накатаю :-[c]


 
Leonid Troyanovsky ©   (2009-08-18 13:35) [49]


> Vitaliy_____   (18.08.09 12:12) [48]

> А если все же освободит и уснет в момент проверки while
> not Application.Terminated do?

Если вторичный поток вытеснен, то первичный узнает об этом
сравнив ID & calcid, ну и вторую часть ему придется считать самому.


> А по поводу спин-блокировки, у рихтера это "Худшее что можно
> сделать"...

Контекст непонятен. Ладно, посмотрим на досуге.

> Пусть попадают в режим ядра, у меня при большой задаче не
> такие частые синхронизации

Два Event с автосбросом, первичный поток сигналит одним для старта очередной итерации и ждет сигнал готовности от вторичного.

Ну и, конечно, внутри процесса имена объектам не нужны.

--
Regards, LVT.


 
Vitaliy_____   (2009-08-18 13:51) [50]

А если у нас появляется новый проц с 4 ядрами, то 4 event? Нет, не разубедил относительно семафоров.


 
Leonid Troyanovsky ©   (2009-08-18 14:45) [51]


> Vitaliy_____   (18.08.09 13:51) [50]

> А если у нас появляется новый проц с 4 ядрами, то 4 event?

Я ж ссылочкой снабжал, насчет эффективной многопоточности.

--
Regards, LVT.


 
Vitaliy_____   (2009-08-19 06:09) [52]

А я даже все статьи прочитал... Это было интересно - но если я не ошибаюсь, для корных процов проблем нет. У меня Core 2 duo e6750 @ 2.66GHz. Да и личный замер, как я писал в [33] более чем убеждает в этом. Хотя у меня в руках теперь хороший тестер, могу сам проверить на других :) Только ждать долго - хорошее разделение нагрузки только на реальной (читать большой) задаче - а ее потом для сравнения надо еще и на однопоточной версии проги прогнать.


 
Vitaliy_____   (2009-08-26 12:59) [53]

Переписал модуль, да обрадуется Leonid Troyanovsky, 2 события для синхронизации достаточно (ну еще счетчик с InterlockedIncrement) :) Пока работает, кому интересно можно потестировать.

Повторюсь, суть этой штуки - барьер с автосбросом - то есть несколько потоков могут "легко и непринужденно" собираться в контрольной точке (вызывая WaitBarrier), при этом когда они дождутся следующий барьер будет автоматически поставлен. Изначально необходимо только проинициализировать число потоков, на которые выставляется барьер.
Если кто протестирует - отпишитесь.


unit BarrierSinchronization;

interface

uses Windows, Utils, SysUtils, SyncObjs;

type TMyBarrierSinchronization=class(TObject)
private
 NP:integer;                            // Число потоков для синхронизации
 MBSEvent1,MBSEvent2:THandle;   // 2 события для реализации барьера
 EvName1,EvName2:String;                // Имена событий
 state:boolean;                         // Ключ, с каким событием работаем
 Counter: integer;                      // Внутренний счетчик выполненности заданий
public
 procedure Initialize(NumProcesses:integer);
 procedure WaitBarrier(const ThreadState:string); overload; // Отладочная версия
 procedure WaitBarrier(); overload;

 constructor Create;
 destructor destroy; override;
end;

var MyBarrierSinchronization:TMyBarrierSinchronization;

resourcestring
WinAPIErrMsg2="Can not create event 1 for synchronization!";
WinAPIErrMsg3="Can not create event 2 for synchronization!";
WinAPIErrMsg4="WaitTimeOut on Event 1!";
WinAPIErrMsg5="WaitTimeOut on Event 2!";
implementation

constructor TMyBarrierSinchronization.Create;
begin
EvName1:=MyUtils.GetUnicalName+"MBSMyEvent";
EvName2:=EvName1+"2";
EvName1:=EvName1+"1";
// Создаем события
MBSEvent1:=CreateEvent(nil,true,false,PChar(EvName1)); // Событие с ручным сбросом, изначально занято
if GetLastError<>0 then raise Exception.Create(WinAPIErrMsg2);
MBSEvent2:=CreateEvent(nil,true,false,PChar(EvName2)); // Событие с ручным сбросом, изначально занято
if GetLastError<>0 then raise Exception.Create(WinAPIErrMsg3);
end;

destructor TMyBarrierSinchronization.destroy;
begin
if MBSEvent1<>0 then
 CloseHandle(MBSEvent1);
if MBSEvent2<>0 then
 CloseHandle(MBSEvent2);
inherited;
end;

procedure TMyBarrierSinchronization.Initialize(NumProcesses:integer);
begin
NP:=NumProcesses;
Counter:=0;
ResetEvent(MBSEvent1);
ResetEvent(MBSEvent2);
State:=true;
end;

procedure TMyBarrierSinchronization.WaitBarrier(const ThreadState:string);
var key:cardinal;
begin
if state then begin // На нечетных барьерах ждем первое событие, выставляем второе
 InterlockedIncrement(Counter);
 if Counter=NP then begin
  ResetEvent(MBSEvent2);
  Counter:=0;
  State:=false;
  SetEvent(MBSEvent1); // ПОСЛЕДНИМ шагом запустили все потоки
 end else begin
  repeat
  key:=WaitForSingleObject(MBSEvent1,3000);
   if key=WAIT_TIMEOUT then begin
    MessageBox(0,PChar(WinAPIErrMsg4+#13+ThreadState),PChar(IntToStr(GetCurrentThreadID)),0);
    DebugBreak;
   end;
  until key=WAIT_OBJECT_0;
 end;
end else begin      // На четных барьерах наоборот
 InterlockedIncrement(Counter);
 if Counter=NP then begin
  ResetEvent(MBSEvent1);
  Counter:=0;
  State:=true;
  SetEvent(MBSEvent2);
 end else begin
  repeat
   key:=WaitForSingleObject(MBSEvent2,3000);
   if key=WAIT_TIMEOUT then begin
    MessageBox(0,PChar(WinAPIErrMsg5+#13+ThreadState),PChar(IntToStr(GetCurrentThreadID)),0);
    DebugBreak;
   end;
  until key=WAIT_OBJECT_0;
 end;
end;
end;

procedure TMyBarrierSinchronization.WaitBarrier();
var key:cardinal;
begin
if state then begin // На нечетных барьерах ждем первое событие, выставляем второе
 InterlockedIncrement(Counter);
 if Counter=NP then begin
  ResetEvent(MBSEvent2);
  Counter:=0;
  State:=false;
  SetEvent(MBSEvent1); // ПОСЛЕДНИМ шагом запустили все потоки
 end else begin
  key:=WaitForSingleObject(MBSEvent1,INFINITE);
 end;
end else begin      // На четных барьерах наоборот
 InterlockedIncrement(Counter);
 if Counter=NP then begin
  ResetEvent(MBSEvent1);
  Counter:=0;
  State:=true;
  SetEvent(MBSEvent2);
 end else begin
   key:=WaitForSingleObject(MBSEvent2,INFINITE);
 end;
end;
end;

initialization
MyBarrierSinchronization:=TMyBarrierSinchronization.Create;

end.


// Utils - модуль с утилитами, из него дергается только GetUnicalName - на основе времени создает уникальную текстовую прибавку к имени - можно убрать, т.к. в рамках 1 процесса можно и неименованные события использовать.


 
Leonid Troyanovsky ©   (2009-08-26 14:37) [54]


> Vitaliy_____   (26.08.09 12:59) [53]

> if GetLastError<>0 then raise Exception.Create(WinAPIErrMsg2);

Win32Check(MBSEvent1<>0);

MessageBox я б заменил на OutputDebugString, смотреть можно
в дельфийском EventLog.

У WaitForSingleObject есть еще WAIT_FAILED.

> т.к. в рамках 1 процесса можно и неименованные события

А для 2 процессов и InterlockedIncrement не годится.

--
Regards, LVT.


 
Vitaliy_____   (2009-08-27 09:05) [55]


> У WaitForSingleObject есть еще WAIT_FAILED.


Да. Обработки ошибки ожидания тут нет... Можно добавить.


> А для 2 процессов и InterlockedIncrement не годится.


Да точно, мне в принципе для разных не надо. Хотя чуток напильником поработать и можно будет для разных использовать (взять тот же оптекс у Рихтера за основу).


> MessageBox я б заменил на OutputDebugString


Вот тут не согласен - MessageBox удобен тем, что блокирует поток на время - я вижу на экране несколько сообщений от разных потоков, в нужном давлю ок и через DebugBreak попадаю в отладку именного этого потока (то есть хотя сообщения возникнуть могут не все сразу, я могу подождать и отлаживать именно интересующий меня поток).


 
Суслик_   (2009-08-28 14:17) [56]

Если физический проц один, то только потеряешь производительность, а не получишь ее.


 
Vitaliy_____   (2009-09-09 13:23) [57]

Leonid Troyanovsky, [26]
> Могу заметить, чтобы написать и отладить не очень сложный
> составной объект синхронизации нужна масса времени и терпения.

Кажется придется согласиться :)
Код в [53] все еще не рабочий :) Как я понимаю проблему, у нас "защищен" Counter, но недостаточно защищена переменная state. Редко, но все-таки возникает ситуация "прорывания барьера". Долго не мог понять почему, т.к. возникает крайне редко - тяжело найти. Как раз та самая паршивая ситуация, когда 1 поток "засыпает на долго". Решил потестировать что будет при 4 потоках на двух ядрах, конкуренция возросла, стало появляться чаще. Ох уж ента штука... :)
Попробовал извернуться так:

...
MBSCrSect:TCriticalSection;
...

procedure TMyBarrierSinchronization.WaitBarrier(const ThreadState:string);
var key:cardinal;
   WaitintEvent,ResetingEvent:THandle;
begin
MBSCrSect.Enter;
 if state then begin  // На нечетных барьерах ждем первое событие, выставляем второе
  WaitintEvent:=MBSEvent1;
  ResetingEvent:=MBSEvent2;
 end else begin       // На четных барьерах наоборот
  WaitintEvent:=MBSEvent2;
  ResetingEvent:=MBSEvent1;
 end;

 inc(Counter);
 if Counter=NP then begin
  if (ResetEvent(ResetingEvent)=false) then
   raise Exception.Create(WinAPIErrMsg6);
  Counter:=0;
  State:=(state=false);
  SetEvent(WaitintEvent); // запустили все остальные потоки
 end;
MBSCrSect.Leave; // Все критичные данные в локальных переменных, гонка не возможна.

repeat
 key:=WaitForSingleObject(WaitintEvent,7000);
 if key=WAIT_TIMEOUT then begin
  MessageBox(0,PChar(WinAPIErrMsg4+#13+ThreadState),PChar(IntToStr(GetCurrentThreadID)),0);
  DebugBreak;
 end;
until key=WAIT_OBJECT_0;
end;



 
Vitaliy_____   (2009-09-10 05:41) [58]

Я кажется наконец понял, что там могло быть:
InterlockedIncrement(Counter);
if Counter=NP then begin
 ResetEvent(MBSEvent2);
 Counter:=0;
 State:=false;
 SetEvent(MBSEvent1); // ПОСЛЕДНИМ шагом запустили все потоки
end else begin
 repeat
 key:=WaitForSingleObject(MBSEvent1,3000);
  if key=WAIT_TIMEOUT then begin
   MessageBox(0,PChar(WinAPIErrMsg4+#13+ThreadState),PChar(IntToStr(GetCurrentThreadID)),0);
   DebugBreak;
  end;
 until key=WAIT_OBJECT_0;
end;

Похоже, ситуация такая: InterlockedIncrement(Counter); нам гарантирует, что Counter будет "честно" увеличен. А вот дальнейшее сравнение if Counter=NP небезопасно. Похоже получалось так: поток увеличивал счетчик и засыпал. После этого последний поток увеличивал счетчик и делал "свои дела", но до Counter:=0 не успевал еще дойти. Просыпался "уснувший" и, опа, он тоже считал себя последним, т.к. Counter=NP все еще истина. Вот и получаем ДВА последних потока. Уже ошибка, хотя событиям все равно сколько через них пройдет (кстати, когда делал на семафорах - там не все равно, потому и вылетало стабильно). Однако может быть так (хотя не спорю, достаточно редко):
У нас два "последних" потока. Один из них запускает все остальные, а второй спит. И если вдруг какой-то поток проснувшись дойдет до след. барьера (т.е. увеличит счетчик) и лишь после этого проснется "лишний последний поток" и выполнит counter:=0 - вот тогда и возникает ошибка. Похоже так. Естественно, вероятность ситуации повышается с увеличением числа потоков.
----------
Чувствую глубокое удовлетворения от познания дебрей ВинАПИ :)



Страницы: 1 2 вся ветка

Текущий архив: 2011.04.17;
Скачать: CL | DM;

Наверх




Память: 0.74 MB
Время: 0.006 c
15-1294059774
KilkennyCat
2011-01-03 16:02
2011.04.17
TGesture


2-1295201878
IPranker
2011-01-16 21:17
2011.04.17
Определить, является ли многоугольник выпуклым?


2-1295279025
SamBrown
2011-01-17 18:43
2011.04.17
Как убрать колонки у VCL ListView (vsReport)


2-1295269594
Евгений07
2011-01-17 16:06
2011.04.17
дельфи игнорирует файл источник


15-1292840260
ocean
2010-12-20 13:17
2011.04.17
К вопросу о нобелевских премиях





Afrikaans Albanian Arabic Armenian Azerbaijani Basque Belarusian Bulgarian Catalan Chinese (Simplified) Chinese (Traditional) Croatian Czech Danish Dutch English Estonian Filipino Finnish French
Galician Georgian German Greek Haitian Creole Hebrew Hindi Hungarian Icelandic Indonesian Irish Italian Japanese Korean Latvian Lithuanian Macedonian Malay Maltese Norwegian
Persian Polish Portuguese Romanian Russian Serbian Slovak Slovenian Spanish Swahili Swedish Thai Turkish Ukrainian Urdu Vietnamese Welsh Yiddish Bengali Bosnian
Cebuano Esperanto Gujarati Hausa Hmong Igbo Javanese Kannada Khmer Lao Latin Maori Marathi Mongolian Nepali Punjabi Somali Tamil Telugu Yoruba
Zulu
Английский Французский Немецкий Итальянский Португальский Русский Испанский