Главная страница
    Top.Mail.Ru    Яндекс.Метрика
Форум: "Основная";
Текущий архив: 2004.11.14;
Скачать: [xml.tar.bz2];

Вниз

Пул потоков.   Найти похожие ветки 

 
panov ©   (2004-10-14 14:34) [0]

Создал класс для работы с пулом потоков.
Прошу высказать замечания.
В следующих постингах задам вопросы к коду.


unit uTThreadPool;

interface

uses
 Classes, windows, messages;

var
 WM_EndThreadExecuter: DWORD;      //Сообщение об окончании обработки
 WM_AddThread: DWORD;              //Сообщение о добавлении элемента в очередь

type
//Тип процедуры для выполнения в отдельном потоке
 TThreadProc=procedure(aParm: pointer);

//Структура для передачи параметров
 PQueueRec=^TQueueRec;
 TQueueRec=record
   rProc: TThreadProc;
   rParm: Pointer;
 end;

//Элемент пула потоков
 TThreadExecuter=class(TThread)
 private
   FThreadIdOwner: THandle;        //Родительский поток TThreadPool
   FThreadFree: Boolean;           //Поток свободен/занят работой
   FExecProc: TThreadProc;         //Адрес процуедуры для выполнения
   FParm: Pointer;                 //Указатель на параметр,
                                   //Предаваемый в процедуру FExecProc
   FNumThread: Integer;            //Нмер потока в пуле
 protected
   procedure Execute; override;
 public
//В конструктор передается ThreadId родительского потока TThreadPool
   constructor Create(const aOwnerThread: THandle;const aNumThread: Integer);
 end;

//Менеджер пула потоков TThreadExecuter
 TThreadPool = class(TThread)
 private
{ TODO -cВажно :
Добавить код для проверки работоспособности потоков из пула
Зависшие убивать по таймауту }
   FSemaphore: THandle;            //Семафор-счетчик потоков
   FMaxThreads: Integer;           //Максимальное число потоков в пуле
   FListQueue: TList;              //Очередь заданий на выполнение
   FPool: array of TThreadExecuter; //Пул потоков
   procedure JobAdd(const aProc: TThreadProc;const aParm: Pointer); //Добавление
                                                                    //заданий в очередь
   function StartThreadFromQueue: Boolean; //Передача задания из очереди в пул
                                           //на выполнение
 protected
   procedure Execute; override;
 public
//В конструктор передается максимальное число потоков
   constructor Create(aMaxThreads: Integer);
   destructor Destroy;override;
//Добавление заданий из внешних потоков в очередь потоков
//Параметры:
//  aProc - адрес процедуры для выполнения
//  aParm - адрес памяти, по которуму нахлодятся параметры
//          для выполнения прцедуры
   procedure AddThread(aProc:TThreadProc;aParm:Pointer);
 end;

implementation

uses Unit1;

{ TThreadExecuter }

constructor TThreadExecuter.Create(const aOwnerThread: THandle;const aNumThread: Integer);
begin
 inherited Create(True);
 FreeOnTerminate := True;
 FThreadFree := True;
 FThreadIdOwner := aOwnerThread;
 FNumThread := aNumThread;
 Resume;
end;

procedure TThreadExecuter.Execute;
begin
 while not Terminated do
 begin
   Suspend;                        //Засыпаем сразу
   try
     if Assigned(FExecProc) then FExecProc(FParm); //Выполнение переданной
                                                   //процедуры
   except
   end;
//Передача в родительский поток сообщения о завершении обработки
   PostThreadMessage(FThreadIdOwner,WM_EndThreadExecuter,FNumThread,0);
   if Terminated then break;
 end;
end;



 
panov ©   (2004-10-14 14:35) [1]


{ TThreadPool }

constructor TThreadPool.Create(aMaxThreads: Integer);
begin
 inherited Create(True);
//Непонятно пока, как сделать с наименьшими потерями возможность
//полностью инициализироваться потоку(успеть создать очередь
//сообщений), так как высокий приоритет не гарантирует этого
 Priority := tpTimeCritical;
 FMaxThreads := aMaxThreads;       //Максимальное число потоков в пуле
 FreeOnTerminate := True;
 FListQueue := TList.Create;       //Очередь заданий на выполнение
 FSemaphore := CreateSemaphore(nil,FMaxThreads,FMaxThreads,nil);
 Resume;
end;

destructor TThreadPool.Destroy;
var
 i: Integer;
begin
{ TODO -cВажно : Добавить обработку ошибок }
//Чистим очередь заданий
 for i := 0 to FListQueue.Count-1 do Dispose(PQueueRec(FListQueue[0]));
 FListQueue.Free;
//Очистка пула потоков
 for i := 0 to Length(FPool)-1 do FPool[i].Terminate;
 CloseHandle(FSemaphore);
 inherited;
end;

procedure TThreadPool.Execute;
var
 Msg: TMsg;                        //Приемник сообщений
 p: Pointer;                       //Фиктивный указатель
 RC: DWORD;                        //Код возврата
begin
{ TODO -cВажно : Защитить код try..finally..end }
 PeekMessage(Msg,0,0,0,pm_NoRemove); //Создание очереди сообщений
//Возвращаем нормальный приоритет - очередь сообщений создана
 Priority := tpNormal;
 p := nil;
 while not Terminated do
 begin
//Ждем появления любого сообщения в очереди сообщений
   rc := MsgWaitForMultipleObjects(0,p,False,10,QS_ALLPOSTMESSAGE);
{ TODO -cВажно : Обработать ошибку }
   if rc=WAIT_FAILED then
   begin
     Terminate;
     Continue;
   end;

   if rc= WAIT_TIMEOUT then
   begin
     if FListQueue.Count>0 then    //В очереди есть задания
     begin
       while StartThreadFromQueue do; //Передать в пул на выполнение
     end;
   end;

   if (rc=WAIT_OBJECT_0) or (PeekMessage(Msg,0,0,0,pm_NoRemove)) then
   begin
     GetMessage(Msg,0,0,0);
     if Msg.message=WM_QUIT then
     begin
       Terminate;
       Continue;
     end;

     if Msg.message=WM_EndThreadExecuter then
     begin
       FPool[Msg.wParam].FThreadFree := True; //Закончилось выполнение
                                              //задания в пуле
       ReleaseSemaphore(FSemaphore,1,nil);    //Увеличить счетчик свободных
                                              //потоков в пуле
       Continue;
     end;

     if Msg.message=WM_AddThread then
     begin
       JobAdd(Pointer(Msg.wParam),Pointer(Msg.lParam)); //Добавить задание в очередь
       while StartThreadFromQueue do; //Передать в пул на выполнение
       Continue;
     end;
   end;
 end;

end;

procedure TThreadPool.JobAdd(const aProc: TThreadProc;const aParm: Pointer);
var
 JobRec: PQueueRec;
begin
 New(JobRec);
 JobRec^.rProc := aProc;
 JobRec^.rParm := aParm;
 FListQueue.Add(JobRec);
end;

function TThreadPool.StartThreadFromQueue: Boolean;
var
   RC: Integer;
   i: Integer;
begin
 Result := False;
 if FListQueue.Count=0 then Exit; //В очереди нет заданий - выход
 RC := WaitForSingleObject(FSemaphore,0); //Есть свободные потоки в пуле?
 case RC of
   WAIT_OBJECT_0:              //Есть свободные потоки в пуле
     begin
       for i :=0 to Length(FPool)-1 do
       begin
         if FPool[i].FThreadFree then
         begin
           FPool[i].FThreadFree := False; //Флажок "не свободен"
           FPool[i].FExecProc := PQueueRec(FListQueue[0]).rProc;
           FPool[i].FParm := PQueueRec(FListQueue[0]).rParm;
           Dispose(PQueueRec(FListQueue[0]));
           FListQueue.Delete(0);
           Result := True;
           FPool[i].Resume;    //Разбудить поток для выполнения
           Exit;
         end;
       end;
       if Length(FPool)<FMaxThreads then //В пуле еще не максимальное
       begin                             //количество потоков?
         i := Length(FPool);
         SetLength(FPool,i+1);           //Добавляем поток в пул
         FPool[i] := TThreadExecuter.Create(Self.ThreadID,i);
       end;
     end;
   end;
end;

procedure TThreadPool.AddThread(aProc: TThreadProc; aParm: Pointer);
begin
//Пересылаем в очередь сообщений сообщение о новом задании
 PostThreadMessage(Self.ThreadID,WM_AddThread,Integer(@aProc),Integer(aParm));
end;

initialization
//Регистрируем сообщение об окончании обработки задания
 WM_EndThreadExecuter := RegisterWindowMessage("TThreadPoolWM_EndThreadExecuter");
//Регистрируем сообщение о появлении нового задания на обработку
 WM_AddThread := RegisterWindowMessage("TThreadPoolWM_AddThread");
finalization

end.


Вызов происходит следующим образом(Создавал для тестирования):

type
 PParmRec=^TParmRec;
 TParmRec=record
   n: Integer;
   TimeOut: integer;
 end;

procedure Exec1(aParm: pointer);
var
   Parm: PParmRec;
begin
 Parm := aParm;
 Sleep(Parm.TimeOut);
 Form1.lb.Items.Add("End "+IntToStr(Parm^.n)+"/"+IntToStr(Parm^.TimeOut));
 Dispose(Parm);
end;

procedure TForm1.Button2Click(Sender: TObject);
var
   Parm: PParmRec;
   i: Integer;
begin
 Randomize;
 M := TThreadPool.Create(5);

 for i := 0 to 100 do
 begin
   New(Parm);
   Parm.n := i;
   Parm.TimeOut := (Random(10)+1)*30;
   M.AddThread(Exec1,Parm);
 end;
end;


 
Суслик ©   (2004-10-14 14:43) [2]

Замечание 1.


> constructor TThreadExecuter.Create(const aOwnerThread: THandle;const
> aNumThread: Integer);
> begin
>  inherited Create(True);
>  FreeOnTerminate := True;
>  FThreadFree := True;
>  FThreadIdOwner := aOwnerThread;
>  FNumThread := aNumThread;
>  Resume;
> end;


В зависимости от версии дельфи делать Resume в конструкторе не очень хорошо. Если поток отработает быстро, то будет ошибка в AfterContructrion, который в дельфи 6 выглядит так

procedure TThread.AfterConstruction;
begin
 if not FCreateSuspended then
   Resume;
end;


Я бы написал просто так

constructor TThreadExecuter.Create(const aOwnerThread: THandle;const aNumThread: Integer);
begin
inherited Create(False);
FreeOnTerminate := True;
FThreadFree := True;
FThreadIdOwner := aOwnerThread;
FNumThread := aNumThread;
end;


 
panov ©   (2004-10-14 14:46) [3]

Вопрос:
После выполнения конструктора из TThreadPool проходит некоторое время, прежде чем будет создана очередь сообщений для потока.
После M := TThreadPool.Create(5) поток использовать нельзя до создания очереди сообщений.

Как лучше обработать эту ситуацию?
Сейчас временно в конструкторе присваивается высокий приоритет tpTimeCritical, а в методе Execute возвращается в исходное состояние.


 
Суслик ©   (2004-10-14 14:49) [4]

Замечание-вопрос 2
Почему в TThreadExecuter.Execute "глушится" exception? Не лучше ли бы было его как-то обработать?

Замечание 3.
Аналог первого замечания для TThreadPool.Create. Хотя, здесь эта маловероятна ошибка.

Замечание 4.
В destructor TThreadPool.Destroy почему пишешь

for i := 0 to FListQueue.Count-1 do Dispose(PQueueRec(FListQueue[0]));
а нет

if Assigned(FListQueue) then for i := 0 to FListQueue.Count-1 do Dispose(PQueueRec(FListQueue[0]));

?

пока все - дальше времени нет смотреть :)


 
panov ©   (2004-10-14 14:49) [5]

>Суслик ©   (14.10.04 14:43) [2]

constructor TThreadExecuter.Create(const aOwnerThread: THandle;const aNumThread: Integer);
begin
inherited Create(False);
FreeOnTerminate := True;
FThreadFree := True;
FThreadIdOwner := aOwnerThread;
FNumThread := aNumThread;
end


Так делать нельзя.

После inherited Create(False); немедленно стартует метод Execute, соответственно, остаются непроинициализированными переменные

FreeOnTerminate := True;
FThreadFree := True;
FThreadIdOwner := aOwnerThread;
FNumThread := aNumThread;


и метод Execute будет обращаться к еще неинициализированным переменным.


 
Суслик ©   (2004-10-14 14:53) [6]


> Вопрос:
> После выполнения конструктора из TThreadPool проходит некоторое
> время, прежде чем будет создана очередь сообщений для потока.
> После M := TThreadPool.Create(5) поток использовать нельзя
> до создания очереди сообщений.

Я бы сделал так.
1) Инстанцировал TThreadPool например, через метод класса TThreadPool. Например так

class function TThreadPool.Counsturct: TTHreadPool;
begin
   Result := TThreadPool.Create(...);
   SomeEvent.WaitFor;
end;

2) Событие SomeEvent устанавливал бы в signaled в TThreadPool.Execute после создания очереди потока.


 
panov ©   (2004-10-14 14:54) [7]

>Суслик ©   (14.10.04 14:49) [4]

Замечание-вопрос 2
Почему в TThreadExecuter.Execute "глушится" exception? Не лучше ли бы было его как-то обработать?

Замечание 3.


Скорее всего, обработка будет Exception будет добавлена.
К сожалению, для реализации стандартных потоков(TThread) в Delphi нельзя сделать Rerasing(хотя у меня есть сторонняя реализация от y-soft, которую я обязательно разберу).

Замечание 4.

Хорошее замечание, исправлю.


 
Суслик ©   (2004-10-14 14:54) [8]


>  [5] panov ©   (14.10.04 14:49)


> Так делать нельзя.

Ты не прав. Именно так и надо делать в дельфи 6. Не веришь? См. исходный код TThread.Create, AfterContruction и FCreateSuspended


 
Суслик ©   (2004-10-14 14:56) [9]


> К сожалению, для реализации стандартных потоков(TThread)
> в Delphi нельзя сделать Rerasing

Что значит нельзя?
Используешь AcquireExceptionObject и делаешь reraise как угодно.
См. исходный код classes для класса tthread.


 
panov ©   (2004-10-14 14:59) [10]

>Суслик ©   (14.10.04 14:54) [8]
Привожу код TThread.Create:

constructor TThread.Create(CreateSuspended: Boolean);
begin
 inherited Create;
 AddThread;
 FSuspended := CreateSuspended;
 FCreateSuspended := CreateSuspended;
 FHandle := BeginThread(nil, 0, @ThreadProc, Pointer(Self), CREATE_SUSPENDED, FThreadID);
 if FHandle = 0 then
   raise EThread.CreateResFmt(@SThreadCreateError, [SysErrorMessage(GetLastError)]);
end;


После выполнения выделенной строки выполнение поточной функции начнется немедленно.


 
Суслик ©   (2004-10-14 15:02) [11]


>  [10] panov ©   (14.10.04 14:59)


> После выполнения выделенной строки выполнение поточной функции
> начнется немедленно.

ну бог в помощь :)


> CREATE_SUSPENDED


а этааа что?


 
panov ©   (2004-10-14 15:07) [12]

>Суслик ©   (14.10.04 15:02) [11]

С AfterConstruction сейчас проверю


 
Суслик ©   (2004-10-14 15:09) [13]


>  [12] panov ©   (14.10.04 15:07)

из всего, что я сказал, скорее всего был не прав в этой фразе "Именно так и надо делать в дельфи 6." Тут надо еще подумать.

Но, то, что resume в коснтурторе делать нехорошо в Д6 это точно. Спасибо sha. Навел меня как-то на эту мысль.

---------------
Как решение обозначенной в вопросе проблемы с событием? Оно точно поможет, я всегда так делаю.


 
Суслик ©   (2004-10-14 15:14) [14]

Я бы сюда

    if Msg.message=WM_EndThreadExecuter then
    begin
      FPool[Msg.wParam].FThreadFree := True; //Закончилось выполнение
                                             //задания в пуле
      ReleaseSemaphore(FSemaphore,1,nil);    //Увеличить счетчик свободных
                                             //потоков в пуле
      Continue;
    end;


перед (или вместо) continue добавил бы вот этот кусочек

    if FListQueue.Count>0 then    //В очереди есть задания
    begin
      while StartThreadFromQueue do; //Передать в пул на выполнение
    end;


Это позволит не ждать 10 мс в случае если в очереди есть необработанные запросы.


 
panov ©   (2004-10-14 15:27) [15]

>Суслик ©   (14.10.04 15:14) [14]

Спасибо за совет, добавлю
while StartThreadFromQueue do;
вместо if FListQueue>0  while...


 
Суслик ©   (2004-10-14 15:28) [16]

Еще

destructor TThreadPool.Destroy;
var
  i: Integer;
begin
  ...
  //Очистка пула потоков
  for i := 0 to Length(FPool)-1 do FPool[i].Terminate;   ...
end;


О какой очистке пула может идти речь, если потоки в нерабочем состоянии находятся в режиме suspended.

Я бы лучше не делал потокам resume\suspend, а сажал их на ожидаение tevent


 
Суслик ©   (2004-10-14 15:31) [17]

Еще

в function TThreadPool.StartThreadFromQueue: Boolean;
в случае занятости делом всех потоков и недостижения макс. кол-ва потоков не верен будет результат: result будет равен false! Т.е. не все потоки будут запущены.


 
panov ©   (2004-10-14 15:33) [18]

Метод с inherited(False) не приводит к нужному результату.

Строка
 PostThreadMessage(Self.ThreadID,WM_AddThread,Integer(@aProc),Integer(aParm));
Выполняется раньше, чем код в Execute
PeekMessage(Msg,0,0,0,pm_NoRemove);

В результате выполнения

M := TThreadPool.Create(5);

for i := 0 to 100 do
begin
  New(Parm);
  Parm.n := i;
  Parm.TimeOut := (Random(10)+1)*30;
  M.AddThread(Exec1,Parm);
end;


т.е. обращение к методу AddThread начинается раньше, чем стартует метод Execute.


 
panov ©   (2004-10-14 15:35) [19]

>Суслик ©   (14.10.04 15:28) [16]
С терминированием - спасибо, упустил из вида.

 for i := 0 to Length(FPool)-1 do
 begin
   FPool[i].Terminate;
   FPool[i].Resume;
 end;


 
Суслик ©   (2004-10-14 15:37) [20]


>  [18] panov ©   (14.10.04 15:33)

Посмотри на ответ [6].
Там приведено общее решение: в главном потоке ждать TEvent, который будет установлен после создания очереди.


 
Суслик ©   (2004-10-14 15:39) [21]


> [20] Суслик ©   (14.10.04 15:37)

пояснения

var
  E: TEvent;

// Главный поток
pooler := TPool.Create();
E.WaitFor;

// Поток-элемент пулера
procedure TThreadExecutor.Execute;
begin
   Создание очереди
   E.SetEvent;
   ...
end;


Все.


 
Суслик ©   (2004-10-14 15:42) [22]


> [19] panov ©   (14.10.04 15:35)

Плохо.
А если поток уже делом занят?
Будет двойной вызов resume.

Лучше пользоваться event"ом.


 
panov ©   (2004-10-14 15:44) [23]

>Суслик ©   (14.10.04 15:39) [21]

Я понимаю, что можно использовать объекты ядра для ожидания.
Но нет ли более быстрого решения?
Создание и использование таких объектов занимает время и ресурсы.
Мне кажется, что для единовременного использования это неоправданно.

>Суслик ©   (14.10.04 15:31) [17]

Спасибо.
Действительно, не обрабатывался WAIT_TIMEOUT;
Изменил, теперь выглядит так:

function TThreadPool.StartThreadFromQueue: Boolean;
var
   RC: Integer;
   i: Integer;
begin
 Result := False;
 if FListQueue.Count=0 then Exit; //? ??????? ??? ??????? - ?????
 RC := WaitForSingleObject(FSemaphore,0);
 case RC of
   WAIT_OBJECT_0:              
     begin
       for i :=0 to Length(FPool)-1 do
       begin
         if FPool[i].FThreadFree then
         begin
           FPool[i].FThreadFree := False;
           FPool[i].FExecProc := PQueueRec(FListQueue[0]).rProc;
           FPool[i].FParm := PQueueRec(FListQueue[0]).rParm;
           Dispose(PQueueRec(FListQueue[0]));
           FListQueue.Delete(0);
           Result := True;
           FPool[i].Resume;    
           Exit;
         end;
       end;
       NewThread;
     end;
   WAIT_TIMEOUT:
     begin
       NewThread;
     end;
   end;
end;
procedure TThreadPool.NewThread;
var
 i: Integer;
begin
  if Length(FPool)<FMaxThreads then
  begin                            
    i := Length(FPool);
    SetLength(FPool,i+1);          
    FPool[i] := TThreadExecuter.Create(Self.ThreadID,i);
  end;
end;


 
panov ©   (2004-10-14 15:48) [24]

>Суслик ©   (14.10.04 15:42) [22]
А какая разница, сколько раз Resume вызывать?

По поводу [14] и [15] -
if FListQueue.Count>0 then    //В очереди есть задания
   begin
     while StartThreadFromQueue do; //Передать в пул на выполнение
   end;

Та же самая ситуация возникает, что и с TThreadPoool: поток не успевает начать выполнять метод TThreadExecuter.Execute


 
Суслик ©   (2004-10-14 15:55) [25]


>  [23] panov ©   (14.10.04 15:44)


> Действительно, не обрабатывался WAIT_TIMEOUT;

Я про это не говорил - пропустил.

Ошибка то осталась.
WAIT_OBJECT_0:      
 begin
      for i :=0 to Length(FPool)-1 do
      begin
        if FPool[i].FThreadFree then
        begin
          FPool[i].FThreadFree := False;
          FPool[i].FExecProc := PQueueRec(FListQueue[0]).rProc;
          FPool[i].FParm := PQueueRec(FListQueue[0]).rParm;
          Dispose(PQueueRec(FListQueue[0]));
          FListQueue.Delete(0);
          Result := True;
          FPool[i].Resume;    
          Exit;
        end;
      end;
      NewThread;
      здесь не задается Result в случае занятости всех потоков, НЕдостижения максимума потоков и наличия заданий, а должен т.к. иначе при определенных условиях не будет правильно отрабатывать while StartThreadFromQueue do;     end;


Про медленность объектов ядра:
извини уж, это полная как бы ерунда. Твой код в определенных случаях от потери производительности страдал (сейчас вроде исправил) много больше, чем он может пострадать от объектов ядра.

Все же, есть способы объходиться без ОЯ. Например см. класс TMultiReadExclusiveWriteSynchronizer. Там реализована очень скорострельная синхронизация БЕЗ объектов ядра. Если интересно смотри его.


 
Суслик ©   (2004-10-14 15:58) [26]


> Та же самая ситуация возникает, что и с TThreadPoool: поток
> не успевает начать выполнять метод TThreadExecuter.Execute

С чего это? TThreadExecuter уже создан. С чего ему не успевать что-то делать.


> А какая разница, сколько раз Resume вызывать?

Согласен, что в данном случае никакой.


 
panov ©   (2004-10-14 15:59) [27]

>Суслик ©   (14.10.04 15:55) [25]

Функция StartThreadFromQueue должна возвращать True в единственном случае - при передаче задания на выполнение.
В остальных случаях возвращается False.


 
panov ©   (2004-10-14 16:01) [28]

>Суслик ©   (14.10.04 15:58) [26]
С чего это? TThreadExecuter уже создан. С чего ему не успевать что-то делать.

Как это с чего?
Пул создается не весь сразу, а по необходимости до достижения максимального количества.


 
Суслик ©   (2004-10-14 16:01) [29]


>  [27] panov ©   (14.10.04 15:59)


> Функция StartThreadFromQueue должна возвращать True в единственном
> случае - при передаче задания на выполнение.
> В остальных случаях возвращается False.

рассмотрел :)) Виноват:)


 
Суслик ©   (2004-10-14 16:03) [30]


>  [28] panov ©   (14.10.04 16:01)

предлагаю следующее.

1. Учесть те замечания в которых я оказался прав.
2. Реализовать синхронизацию в целях обеспечения своевременности создания очереди.
3. Выложить все в полном виде.

Тогда продолжим разговор. Ок?


 
Суслик ©   (2004-10-14 16:07) [31]

И вообще - на фига тут FSemaphore: он же используется в контексте одного потока?


 
Суслик ©   (2004-10-14 16:12) [32]


> [27] panov ©   (14.10.04 15:59)
> >Суслик ©   (14.10.04 15:55) [25]
>
> Функция StartThreadFromQueue должна возвращать True в единственном
> случае - при передаче задания на выполнение.
> В остальных случаях возвращается False.

Странная какая-то эта функция StartThreadFromQueue.
Ей говорят - запусти задание из очереди. Она - ок! Все потоки заняты, но макс. количества потоков не достигнут. Она создает новый поток и возврщает false. Тем самым говоря, клиенту (вызывающей стороне), то все, что могла она запустила. Странно как-то: поток есть (только что создан), задание есть, а вот выполнять его никто ПОКА не собирается - ждем следующего цикла.


 
panov ©   (2004-10-14 16:20) [33]

>Суслик ©   (14.10.04 16:03) [30]
2. Реализовать синхронизацию в целях обеспечения своевременности создания очереди.

Как раз это и хотелось обсудить.
Может быть, у кого еще будут предложения по синхронизации без использования объектов ядра, подобных Event

>Суслик ©   (14.10.04 16:07) [31]

И вообще - на фига тут FSemaphore: он же используется в контексте одного потока?


Семафор удобен для счетчика потоков в пуле и сигнализирования о том, что есть свободные потоки.

>Суслик ©   (14.10.04 16:12) [32]
Странная какая-то эта функция StartThreadFromQueue.

Как раз из-за проблем с синхронизацией она такая-)


 
Суслик ©   (2004-10-14 16:24) [34]


> Может быть, у кого еще будут предложения по синхронизации
> без использования объектов ядра, подобных Event

Ну, Александр, я же привел пример TMultiReadExclusiveWriteSynchronizer. Там то, что нужно. Только разбираться долго надо. Там нет объектов ядра, но есть синхронизация. Там все сделано на interloced функциях.


> >Суслик ©   (14.10.04 16:07) [31]
> Семафор удобен для счетчика потоков в пуле и сигнализирования
> о том, что есть свободные потоки.

Не, ну вы гляньте :))) Не хочется использовать ОЯ (медленные), а вот семафор (тоже ОЯ) для целей, где он не нужен используется радостно. Не пойму. На фига он тут нужен - к нему обращается всего один поток!


> Как раз из-за проблем с синхронизацией она такая-)

Никаких проблем нет. Какая синхронизация? Потоками управляет ровно один поток - пуллер. Что хош, то и делай там.


 
Суслик ©   (2004-10-14 16:33) [35]

у рихтера есть про очередь потоков. Может там стоит посмотреть?


 
panov ©   (2004-10-14 16:56) [36]

Никаких проблем нет. Какая синхронизация? Потоками управляет ровно один поток - пуллер. Что хош, то и делай там.

Уже обсуждали, что Execute выполняется позде, чем происходит обюращение к прочим методам объекта.


 
Суслик ©   (2004-10-14 17:01) [37]


> Уже обсуждали, что Execute выполняется позде, чем происходит
> обюращение к прочим методам объекта.

ух... сложно как-то. Все-таки предложение увидеть модифицированный код, и продолжить обсуждение от новой отправной точки.


 
Суслик ©   (2004-10-14 17:07) [38]

Мне как-то не очень понятно, что эта за задача такая, где требуется такое титаническое быстродействие?


 
Суслик ©   (2004-10-14 17:09) [39]


> [36] panov ©   (14.10.04 16:56)
> Никаких проблем нет. Какая синхронизация? Потоками управляет
> ровно один поток - пуллер. Что хош, то и делай там.
>
> Уже обсуждали, что Execute выполняется позде, чем происходит
> обюращение к прочим методам объекта.

И вообще, никакие уловки, кроме как применять методы синхронизации, не спасут от проблемы, когда поток начинает использоваться раньше, чем начинает работать его поточная ф-я. Вот.


 
panov ©   (2004-10-14 17:24) [40]

Кстати, насчет TMultiReadExclusiveWriteSynchronize.

Как я понял, единственное, что мне даст, это то, что мне не надо несколько критических секций использовать, и объекты для ожидания создаются статически всего 2.


 
Суслик ©   (2004-10-14 17:27) [41]


>  [40] panov ©   (14.10.04 17:24)

я не говорил, что его надо исполозовать. Я говорил, что в его коде используется синхронизация без ОЯ. Вот и все. Причем такая синхронизация, которую не сразу и поймешь. Я вроде разобрался. Но вопротить вряд ли смогу.

Все же хотелось бы услышать ответ на вопрос [38]. Может устроить тесты и выяснить, что собственно ОЯ вполне удовлетворяют? Я бы начал с этого. Я когда-то делал такое сравнение. И скажу честно, что я не смог придумать задачу для своих целей, где бы скорострельности event"ов мне бы не хватало.


 
panov ©   (2004-10-14 17:29) [42]

>Суслик ©   (14.10.04 17:27) [41]

Смысл ведь имеет не конкретная задача, а наиболее отимальное универсальное решение.

Например, для моей задачи(а это копирование файлов в нескольких потоках) такое замедление не будет иметь последствия.

Если прикинуть, то такая задача "на скорострельность" может иметь применение при расчетах в графических динамичных приложениях...


 
Суслик ©   (2004-10-14 17:31) [43]


>  [42] panov ©   (14.10.04 17:29)

Оптимальность, понятие неоднозначное. Я уверен, что в текущей задаче путь с использование ОЯ будет наиболее оптимальный, т.к.:
1. Позволит доделать класс за 10 мин.
2. Точно не окажет негативного влияния на скорость.


 
Игорь Шевченко ©   (2004-10-14 18:07) [44]


> Все же, есть способы объходиться без ОЯ. Например см. класс
> TMultiReadExclusiveWriteSynchronizer. Там реализована очень
> скорострельная синхронизация БЕЗ объектов ядра. Если интересно
> смотри его.


constructor TMultiReadExclusiveWriteSynchronizer.Create;
begin
 inherited Create;
 InitializeCriticalSection(FLock);
 FReadExit := CreateEvent(nil, True, True, nil);  // manual reset, start signaled
 SetLength(FActiveThreads, 4);
end;

Типа того, что CreateEvent - это уже не создание объекта ядра.


 
Игорь Шевченко ©   (2004-10-14 18:08) [45]


> Создал класс для работы с пулом потоков.


Рихтер тоже создал, только на основе IOCompletion, если мне память не изменяет.


 
Суслик ©   (2004-10-14 18:11) [46]


>  [44] Игорь Шевченко ©   (14.10.04 18:07)

Игорь, хватит придираться, ты прекрасно понимаешь о чем я? Нет?

Тогда напомню, что я тебя спрашивал как работает этот класс, ты сказал, что там ничего сложного. Если так, то ты не можешь не знать, что многие действия, для которых можно было бы применить ОЯ для синхронизации обходятся без оных. В качесвте примера см. TThreadLocalCounter.

Мое утверждение было о том, что при синхронизации не обязательно использовать ОЯ. И класс был приведен в качестве пример такой реализации.

Возражения есть?


 
panov ©   (2004-10-14 18:16) [47]

>Игорь Шевченко ©   (14.10.04 18:08) [45]
Ну можно, конечно, и на основе портов ввода-вывода, только работать в w98 они не будут...


 
Игорь Шевченко ©   (2004-10-14 18:20) [48]

Суслик ©   (14.10.04 18:11) [46]


> Возражения есть?


Есть.

Посмотри, как реализованы критические секции и зачем там объект ядра LockSemaphore.


 
Суслик ©   (2004-10-14 18:30) [49]


> Посмотри, как реализованы критические секции и зачем там
> объект ядра LockSemaphore.

Где?

Есть не ошибаюсь, критические секции доходят до использования ОЯ только в случае коллизий. Источник: рихтер, win для профи, стр 202. Т.е. большинство операций выполняются в польз. режиме.
В случае же использования ОЯ переход в реж. ядра происходит сразу. Истоник: тот же. + опыт сранения быстродействия.


 
Verg ©   (2004-10-14 18:37) [50]


> Есть не ошибаюсь, критические секции доходят до использования
> ОЯ только в случае коллизий.


Не путаете ли вы понятия "обходиться без" и "использовать эффективно".
Если КС использует ОЯ эффективно, это не означает, что она их вообще не использует.


 
Суслик ©   (2004-10-14 18:38) [51]


>  [50] Verg ©   (14.10.04 18:37)

может и путаю. Но опыт сравнения быстродействия mutex и кс в свое время сказал мне о многом.


 
Игорь Шевченко ©   (2004-10-14 18:42) [52]

Суслик ©   (14.10.04 18:30) [49]

Что есть коллизия по Рихтеру ?


 
Суслик ©   (2004-10-14 18:45) [53]


> Что есть коллизия по Рихтеру ?

Сначала на вопрос ответь, где посмотреть реализацию КС?

------------
Microsoft повысила быстродействие критических секций, включив в них спин блокировку Теперь, когда Вы вызываете EnterCriticalSection, она выполняет заданное число циклов спин-блокировки, пытаясь получить доступ к ресурсу и лишь в том случае, когда все попытки закапчиваются неудачно, функция переводит поток в ре жим ядра, где он будет находиться в состоянии ожидания.


 
Суслик ©   (2004-10-14 18:47) [54]


> [52] Игорь Шевченко ©   (14.10.04 18:42)

Вместо того, чтобы ко мне придираться (:))) ответил бы Панову - на первый вопрос [23]. Все польза была бы.


 
Игорь Шевченко ©   (2004-10-14 18:48) [55]

Суслик ©   (14.10.04 18:30) [49]

Пример:

unit main;

interface
uses
 Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,
 StdCtrls;

type
 TForm1 = class(TForm)
   Button1: TButton;
   procedure Button1Click(Sender: TObject);
 end;

var
 Form1: TForm1;

implementation
uses
 Thread;

{$R *.DFM}

procedure TForm1.Button1Click(Sender: TObject);
begin
 InitializeCriticalSection(CS);
 TMyThread.Create (false);
 TMyThread.Create (false);
end;

end.


и

unit Thread;

interface

uses
 Classes,
 Windows;

type
 TMyThread = class(TThread)
 private
 protected
   procedure Execute; override;
 end;

var
 CS: TRtlCriticalSection;

implementation

{ TMyThread }

procedure TMyThread.Execute;
begin
 EnterCriticalSection(CS);
end;

end.


Запусти и посмотри количество объектов ядра до нажатия кнопки и после. Предупреждаю, процесс лучше снимать из среды, по Ctrl+F2


 
Игорь Шевченко ©   (2004-10-14 18:49) [56]

Суслик ©   (14.10.04 18:45) [53]


> Сначала на вопрос ответь, где посмотреть реализацию КС?


windows.pas разумеется


 
Verg ©   (2004-10-14 18:51) [57]


> [51] Суслик ©   (14.10.04 18:38)
>
> >  [50] Verg ©   (14.10.04 18:37)
>
> может и путаю. Но опыт сравнения быстродействия mutex и
> кс в свое время сказал мне о многом.


Не, ну сравнивать производительнось при различных способах использования ОЯ - это уже отдельный вопрос.

Вроде шла речь об обеспечении синхронизации БЕЗ использования ОЯ вообще?

> Мое утверждение было о том, что при синхронизации не обязательно
> использовать ОЯ.


Вот и интересно как?

while lacked do sleep(1)
?

Кстати, Mutex хорош своим ObjectName, т.е. он именованый, а значит синхронизировать по "принципу КС" можно потоки из разных процессов. Вот и все, если этого не требуется, то используй КС. Конечно она эффективнее в таком случае


 
Суслик ©   (2004-10-14 18:53) [58]


> Запусти и посмотри количество объектов ядра до нажатия кнопки
> и после. Предупреждаю, процесс лучше снимать из среды, по
> Ctrl+F2

Как?
Никогда не делал.


 
ЮрийК ©   (2004-10-14 18:53) [59]

Просьба выложить окнчательный вариант ваших изысканий, как только все выскажут свои замечания.


 
Суслик ©   (2004-10-14 18:56) [60]


> [57] Verg ©   (14.10.04 18:51)

Interlocedфункции, они же не ОЯ.
И в тоже время позволяют делать синхнонизацию. Класс TThreadLocalCounter например.

Контрольный вопрос.
В классе TThreadLocalCounter используются методы синхронизации. Например, TThreadLocalCounter.Open. Есть ли здесь ОЯ?


 
Суслик ©   (2004-10-14 19:00) [61]


> [57] Verg ©   (14.10.04 18:51)


> Вот и интересно как?


> while lacked do sleep(1)

Например. interlocked функции еще есть.


 
panov ©   (2004-10-14 19:04) [62]

>ЮрийК ©   (14.10.04 18:53) [59]

Просьба выложить окнчательный вариант ваших изысканий, как только все выскажут свои замечания.

На самом деле, похоже, лучше эту задачу реализовать без использования TThread, что я сейчас и буду пробовать.

Конечно, вопрос с максимально работой и ожиданием без спин-блокировки все равно остается.

Но к нему придется вернуться после первых набросков.


 
panov ©   (2004-10-14 19:05) [63]

>Суслик ©   (14.10.04 19:00) [61]
Например. interlocked функции еще есть.

InterLocked-функции могут быть применены только для спин-блокировки(активного ожидания).


 
Суслик ©   (2004-10-14 19:08) [64]


> InterLocked-функции могут быть применены только для спин-блокировки(активного
> ожидания).

ну да, но это же лучше чем бегать из режима ядра и обратно, как бывает при использовании ОЯ?

ЗЫ
Скаже всем, что понятия не имею, что такое режим ядра. Знаю только, что в него надо переходить для использования ОЯ и это дорогостоящая операция (несколько 1000 тактов). Это я говорю для того, чтобы не приписвать себе знания, коих нет. Но думаю понимание того, что лучше режим ядра избегать являтеся достаточным основанием для продолжения моего участия в дискусе.
Спасибо за внимание.


 
panov ©   (2004-10-14 19:08) [65]

>ЮрийК ©   (14.10.04 18:53) [59]

При небольших изменениях в коде код в начале топика будет совершенно работоспособным, другое дело, что все-таки хочется сделать оптимальный код-)


 
panov ©   (2004-10-14 19:11) [66]

>Суслик ©   (14.10.04 19:08) [64]

ну да, но это же лучше чем бегать из режима ядра и обратно, как бывает при использовании ОЯ

Пример "почти -)" спин-блокировки привел Verg ©   (14.10.04 18:51) [57]
while lacked do sleep(1)
Вот классический пример:
while lacked do ;

Естественно, такой код загрузит на время ожидания процессор на 100%.

Такой метод применим только для гарантированно кратких по времени ожиданий.


 
Суслик ©   (2004-10-14 19:11) [67]


> При небольших изменениях в коде код в начале топика будет
> совершенно работоспособным, другое дело, что все-таки хочется
> сделать оптимальный код-)

Когда сделаем, хорошо было бы его здесь увидеть :)
Т.е. я поддерживаю ЮрияК


 
Суслик ©   (2004-10-14 19:13) [68]


> Такой метод применим только для гарантированно кратких по
> времени ожиданий.

Все в наших руках - можно гарантировать чего душе угодно. Я серьезно. Нужно внимательно посмотреть на задачу - может такая гарантия есть.

Или хочестся создать суперуниверсальный супероптимальный супервсеоблемющий пул для всех случае жизни? Это миф (имхо). Кадой задаче свое решение.


 
panov ©   (2004-10-14 19:17) [69]

>Суслик ©   (14.10.04 19:13) [68]
Суперуниверсальный не нужен, конечно.

Я выше приводил пример, в котором необходима максимальная скорость различных вычислений в потоках - динамичная графика.

Кстати, даже для задачи, которой сейчас я занимаюсь(резервное копирование файлов), желательна минимальная нагрузка на процессор-)


 
Суслик ©   (2004-10-14 19:20) [70]


>  [69] panov ©   (14.10.04 19:17)

Мое мнение такое (повторяюсь) - пока не будет тестов производительности, говорить об оптимальности или отсутствии таковой рано.

У меня нет сейчас времени на опыты :( Иначе подготовил бы статистический отчет. При этом надо тестировать не просто бомбардировку ОЯ (т.е. активное их использование), а имитацию реальной нагрузки на ОЯ, (т.е. попытаться смоделировать действительность - как часто будут ОЯ использоваться в зависимости от поребностей пользователя пуллера). Уверяю, что результат удивит - разница в общем производительности будет заметна слабо (конечно, если не брать каких-то экстремальных случаев).


 
Суслик ©   (2004-10-14 19:28) [71]

И самое мое последнее мнение о направленности разработки супероптимального сервера такое: если оный нужен, то нужно очень серьезно засаживаться за соответствующую литературу.
Я видел много кода, часть на сях, часть на дельфи. Я был поражен теми изощнениями, которыми пользуются люди для оптимальности.

Вот простой пример (не относится к теме, но показывает устремленность людей, как писать оптимально)


function TStringList.Find(const S: string; var Index: Integer): Boolean;
var
 L, H, I, C: Integer;
begin
 Result := False;
 L := 0;
 H := FCount - 1;
 while L <= H do
 begin
   I := (L + H) shr 1;
   C := CompareStrings(FList^[I].FString, S);
   if C < 0 then L := I + 1 else
   begin
     H := I - 1;
     if C = 0 then
     begin
       Result := True;
       if Duplicates <> dupAccept then L := I;
     end;
   end;
 end;
 Index := L;
end;

Как вы думаете, почему не написано так (мне какжется так наглядней)

   if C < 0 then L := I + 1 else
   if C > 0 then  H := I - 1 else
   if C = 0 then
   begin
     Result := True;
     if Duplicates <> dupAccept then L := I;
   end;


Все таже пресловутая оптимальность - экономия на одном сравнении (в первом случае нет if c > 0). Понимаю, что пример оффтоповый, но мне кажется, что он показывает, что приемы оптимального программирования так просто не изобретешь - нужно планомерное чтение разной лит-ры и просмотр кода.


 
VMcL ©   (2004-10-14 21:13) [72]

>Как вы думаете, почему не написано так (мне какжется так наглядней)

 if C < 0 then L := I + 1 else
 if C > 0 then  H := I - 1 else
 if C = 0 then
 begin
  Result := True;
  if Duplicates <> dupAccept then L := I;
 end;


В любом случае предпоследнее сравнение бесполезно - это видно невооруженным глазом: если C не меньше нуля и не больше, то тогда остается только один вариант (в области действительных чисел:) - нуль.

if C < 0 then L := I + 1 else
if C > 0 then  H := I - 1 else
//if C = 0 then
begin
  Result := True;
  if Duplicates <> dupAccept then L := I;
end;


 
Verg ©   (2004-10-14 22:43) [73]


> Как вы думаете, почему не написано так (мне какжется так
> наглядней)
>
>    if C < 0 then L := I + 1 else
>    if C > 0 then  H := I - 1 (* C>0 *)else
>    if C = 0 then
>    begin
>      Result := True;
>      if Duplicates <> dupAccept then L := I;
>    end;


Ой ли? Разве этот код эквивалентен


>    if C < 0 then L := I + 1 else
>    begin
>      H := I - 1; // C>=0
>      if C = 0 then
>      begin
>        Result := True;
>        if Duplicates <> dupAccept then L := I;
>      end;
>    end;


??


 
Суслик ©   (2004-10-15 11:12) [74]


> Ой ли? Разве этот код эквивалентен

нет, но делает тоже самое (по сути)


 
Игорь Шевченко ©   (2004-10-15 11:18) [75]

Суслик ©   (14.10.04 19:08) [64]

Синхронизация потоков невозможна без диспетчеризации, согласись.
То есть, как минимум, одни поток должен уметь сказать WaitForSingleObject (или какую-то разновидность), пока другой поток не освободит ресурс. После того, как ресурс освобожден, об этом должно стать известно диспетчеру, чтобы он смог запустить ожидавший поток.

Разумно ?

Спин-блокировка применима для синхронизации между потоками в многопроцессорной системе, когда поток на одном процессоре входит в цикл (крутится - spin), опрашивая общую переменную.


 
Zelius ©   (2004-10-15 11:36) [76]

Кстати, посмотрел код и решил сказать, может пригодится.
Что бы не было вопрос при создании потомков TThread о том что раньше отработает, конструктор с инициализацией всех ресурсов или начнет Execute выполняться, я всегда сначала инициализирую ресурсы и только потом вызываю наследуемый конструктор:

constructor TThreadExecuter.Create(const aOwnerThread: THandle;const aNumThread: Integer);
begin
FreeOnTerminate := True;
FThreadFree := True;
FThreadIdOwner := aOwnerThread;
FNumThread := aNumThread;
inherited Create(True);
end;


 
Erik1 ©   (2004-10-15 11:56) [77]

Мастера фигей маются, как новички. Видать и им ничто человеческое нечуждо. :)
 Вобщето надо проежде всего позаботится об понятности алгоритма, его эфективновность оченивается вовсе не в количестве обращений к ядру. Вобщее правило: если алгоритм красив, то он ненуждается в улучшениях!
Суслик [71]
 В 99% случаев я бы выбрал второй вариант, проше читать. А эфективность можно уличшить как нибудь по другому.
to panov
У меня все потоки используют WaitForMultipleObjects с двумя event как минимум. Первый говорит потоку начать работу, второй терминировать поток. Это неокажет некакого замедления на расчеты, поскольку они выполняются внутри процедувы. А если расчеты очень короткие но частые, то suspend resume замедлят их еще больше. К томуже в Dlephi выще 5 версии нельзя делать resume 2 раза подряд, будет exception.


 
Суслик ©   (2004-10-15 12:18) [78]

2Игорь Шевченко

> Синхронизация потоков невозможна без диспетчеризации, согласись.
> То есть, как минимум, одни поток должен уметь сказать WaitForSingleObject
> (или какую-то разновидность), пока другой поток не освободит
> ресурс. После того, как ресурс освобожден, об этом должно
> стать известно диспетчеру, чтобы он смог запустить ожидавший
> поток.
>
> Разумно ?

Да разумно. Но WaitForSingleObject не единственное решение.

Все предложение: для того, чтобы убедится в том, что я говорю о том, о чем имею предсталение и для того, чтобы не разводить лишние разговоры, предлагаю всем изучить класс TThreadLocalCounter.

Класс TThreadLocalCounter (модуль sysutils) реализует TLS (не полноценный, а для одного конкретного целочисленного значения: TThreadInfo.RecursionCount), средствами дельфи. Причем существенно быстрее, чем TLS windows.
Идея такая: есть список (fHashTable) голов однонапрасленных списков. Хэш индекс строится исходя из GetCurrentThreadId.
Приведенный метод занимается тем, что выделяет вызвавшему потоку структуру PThreadInfo. Поиск идет либо в списке свободных структур (если поток отказывается от использования структура помечается свободной, но из списка не удаляется), либо если не нашел, то создает новую. Заметь, тут нет никаких критических секций. Внимательно читай комментарий. Собственно в тех двух строчках, которые о коментриует, и есть вся суть метода и моего утвержднеия о возможности синхронизации без ОЯ и даже без КС.

procedure TThreadLocalCounter.Open(var Thread: PThreadInfo);
var
 P: PThreadInfo;
 H: Byte;
 CurThread: Cardinal;
begin
 InterlockedIncrement(FOpenCount);
 H := HashIndex;
 P := FHashTable[H];
 CurThread := GetCurrentThreadID;

 // КАЖДОМУ ПОТОКУ ВЫДЕЛЯТЕСЯ РОВНО ОДНА СТРУКТУРА (ЭТО ЕЩЕ ОДНО ОТЛИЧИЕ ОТ TLS WINDOWS)
 while (P <> nil) and (P.ThreadID <> CurThread) do
   P := P.Next;

 // ЕСЛИ НЕ НАШЛИ, ТО...
 if P = nil then
 begin
   // ПЫТАЕМСЯ НАЙТИ В НЕИСПОЛЬЗУЕМЫХ СТРУКТРУАХ СВОБОДНУЮ
   P := Recycle;

   // ЕСЛИ ТАКОВЫХ НЕТ, ТО СОЗДАЕМ НОВУЮ
   if P = nil then
     P := PThreadInfo(AllocMem(sizeof(TThreadInfo)));

   P.ThreadID := CurThread;

   // А ВОТ ТЕПЕРЬ ЕЕ НУЖНО ВСТАВИТЬ В СПИСОК. ЧИТАЙ КОМЕНТАРИЙ.

   // Another thread could start traversing the list between when we set the
   // head to P and when we assign to P.Next.  Initializing P.Next to point
   // to itself will make others loop until we assign the tail to P.Next.
   P.Next := P; ????? ВОТ ЭТО МЕСТО МЕНЯ ПОТРЯСЛО!
   P.Next := PThreadInfo(InterlockedExchange(Integer(FHashTable[H]), Integer(P)));
 end;
 Thread := P;
end;


После этого продолжим обсуждение синхронизацию доступа к общему ресурсу без ОЯ и КС.


 
Игорь Шевченко ©   (2004-10-15 12:29) [79]

Суслик ©   (15.10.04 12:18) [78]

В этом коде модификация списка, которая может затронуть другие потоки, выполняется одной функцией InterlockedExchange. Строчка, которая тебя поразила - это искусственное действие, направленное на то, чтобы модификацию можно было выполнить той самой единственной функцией.
InterlockedExchange реализуется единственной командой (не считая подготовки аргументов) lock cmpxchg, которая в данном случае успешно может использоваться для модификации, так как является атомарной (не прерываемой в середине исполнения) и атомарной же на многопроцессорной системе, так как префикс lock выдает сигнал на блокировку шины доступа к памяти.

Точно такие же приемы используются при работе с длинными строками.

Но синхронизацией это можно назвать с огромной натяжкой, так как осноная цель в этом коде - это реализовать потокобезопасную модификацию списка.


 
Суслик ©   (2004-10-15 12:33) [80]


>  [79] Игорь Шевченко ©   (15.10.04 12:29)

Определение синхронизации в студию!


 
Игорь Шевченко ©   (2004-10-15 12:36) [81]

Суслик ©   (15.10.04 12:33) [80]

Синхронизация - (от греч. synchronos - одновременный), приведение двух или нескольких процессов к синхроннонсти, т. е. к такому их протеканию, когда одинаковые или соответствующие элементы процессов совершаются с неизменным сдвигом по фазе друг относительно друга или одновременно.

http://encycl.yandex.ru/cgi-bin/art.pl?art=bse/00071/31300.htm&encpage=bse&mrkp=/yandbtm7%3Fq%3D-811200565%26p%3D0%26g%3 D0%26d%3D0%26ag%3Denc_abc%26tg%3D1%26p0%3D0%26q0%3D1334144256%26d0%3D0%26script%3D/yandpage%253F

Мог бы и сам посмотреть (с)


 
panov ©   (2004-10-15 12:42) [82]

>Zelius ©   (15.10.04 11:36) [76]
Что бы не было вопрос при создании потомков TThread о том что раньше отработает, конструктор с инициализацией всех ресурсов или начнет Execute выполняться, я всегда сначала инициализирую ресурсы и только потом вызываю наследуемый конструктор:

В данном примере это не имеет значения, так как создание очереди сообщений происходит в поточной функции, а не в конструкторе. Вот создания этой очереди и нужно дождаться.


 
Суслик ©   (2004-10-15 12:43) [83]


>  [81] Игорь Шевченко ©   (15.10.04 12:36)

не понял:(


> совершаются с неизменным сдвигом по фазе друг относительно
> друга или одновременно.


Т.е. это значит, что синхронизируемый кусок кода должен выполняться в каждый момент не более чем одним потоком? Или нет?


 
Игорь Шевченко ©   (2004-10-15 12:50) [84]

Суслик ©   (15.10.04 12:43) [83]

Синхронизация применительно к вычислительным процессам - это гарантированное предоставление монопольного доступа к ресурсу одному процессу в каждый момент времени непротиворечивого состояния ресурса.

Слово процесс для Windows заменяется на поток.

В коде, приведенном тобой, при помощи специального алгоритма, гарантируется непротиворечивое состояние ресурса для каждого потока в любой момент времени.


 
Суслик ©   (2004-10-15 12:55) [85]


>  [84] Игорь Шевченко ©   (15.10.04 12:50)

если учитывать такое определение

> Синхронизация применительно к вычислительным процессам -
> это гарантированное предоставление монопольного доступа
> к ресурсу одному процессу в каждый момент времени непротиворечивого
> состояния ресурса.

то  полность согласен с этим

> В коде, приведенном тобой, при помощи специального алгоритма,
> гарантируется непротиворечивое состояние ресурса для каждого
> потока в любой момент времени.


Признаю, что скорее всего был не прав в том, что называл приведенный мной пример синхронизацией - это всего лишь корректная многопоточность.

ЗЫ. Тонкое это дело. Есть какие-нибудь книги про всякие приемы многопоточного программирования?


 
Игорь Шевченко ©   (2004-10-15 13:04) [86]

Суслик ©   (15.10.04 12:55) [85]

Вот теперь и мы пришли к единому мнению :)


> Есть какие-нибудь книги про всякие приемы многопоточного
> программирования?


Рихтер, как ни странно, программирование для Win32.
У Таненбаума очень хорошо рассказано про концепции.

Я просто с понятием "
> это гарантированное предоставление монопольного доступа
> к ресурсу одному процессу в каждый момент времени непротиворечивого
> состояния ресурса.
"
знаком с 1985 года, если не ошибаюсь, первый раз встретил у Шоу, в "Логическом проектировании операционных систем".


 
Суслик ©   (2004-10-15 18:06) [87]

Прошу прощения, за реанимирование темы.

Но я бы хотел высказать пожелание автору топика.

Помню когда-то в "Потрепаться" была борьба за фак. Я не помню, кто был инициатором этой работы, но идея была классная - коллективная работа над материалом. К сожалению, у меня тогда не было клиента, поэтому результатов под рукой нет. Но я помню, что в результате работы было проработано некоторое количество вопросов.

Я считал (и считаю), что фак, это хорошо, но не очень жизненно, т.к. вопросы и ответы в основном для ленивых: ответ всегда можно найти в документации.

Другой вопрос, что существуют проблемы, которые имеют определенное название. Например, пул потоков. Пул потоков нельзя отнести в фак, но в тоже время он является полезным и частоприменимым (в определенных областях) классом.

Думаю, что многие со мной согласятся, что коллективная разработка высокоскоростного пула потоков, являтется общественно полезным делом.

Поэтому предложение автору сего топика продолжить обсуждение кода пула потоков.


 
Игорь Шевченко ©   (2004-10-15 18:34) [88]


> Пул потоков нельзя отнести в фак, но в тоже время он является
> полезным и частоприменимым (в определенных областях) классом


Например, в каких областях ?

Кстати, в Win2k он уже встроен в систему.


 
Суслик ©   (2004-10-15 18:44) [89]


>  [88] Игорь Шевченко ©   (15.10.04 18:34)

Сервер (компьютераная экономическая игра). Многопоточный. Но она реализована очень слабо - маленькая нагрузка, поэтому пулами не пользуемся. А можно было бы.


> Кстати, в Win2k он уже встроен в систему.

Тот же Фаулер пишет (арх. корп. прогр. прил.), что есть команды разработчков, которые предпочитают не пользоваться готовыми решениями. Я из их группы. Также он пишет, что по определенным соображениям такие разработчики имеют право на существование. Логика ясна?


 
Суслик ©   (2004-10-15 18:45) [90]

А впрочем, не хотите, как хотите. Полезная школа для неновичков была бы.


 
panov ©   (2004-10-15 19:32) [91]

>Суслик ©   (15.10.04 18:45) [90]

Так как желание продолжить обсуждение есть, то могу выложить здесь черноввые наброски своего класса(panov ©   (14.10.04 19:04) [62]), а также модуль сс своей реализацией потока от y-soft(если будет получено разрешение от него).


 
Игорь Шевченко ©   (2004-10-15 21:56) [92]


> что есть команды разработчков, которые предпочитают не пользоваться
> готовыми решениями. Я из их группы. Также он пишет, что
> по определенным соображениям такие разработчики имеют право
> на существование. Логика ясна?


Безусловно, ясна. Для собственной ерундиции, разумеется, изобретение велосипедов есть полезное и достойное занятие. (Эт не шутка, а вполне серьезное заявление, не ставящее целью кого-либо обидеть).

Но для промышленной задачи я бы предпочел готовое и опробованное решение, хотя бы в силу его изобретенности (не придется продираться сквозь те грабли, сквозь которые продрались разработчики), отлаженности (в силу тех же обстоятельств) и документированности.


 
panov ©   (2004-10-16 18:46) [93]

>Игорь Шевченко ©   (15.10.04 21:56) [92]

Игорь, подскажи, где можно увидеть пример готового и апробированного решения для платформы Windows(версии не ниже Win95)?


 
panov ©   (2004-10-18 10:44) [94]

Вот почти готовая реализация 2-х классов - пул потоков(TThreadPool), и, собстувенно, поток в пуле(TThrExecuter).

TThrExecuter пока не реализован.

Это костяк, на который далее буду наращивать мясо.

Пока всё это полностью не реализовано, может есть каки-то замечания...

unit uTThreadPool;

interface

uses classes, windows, messages, sysutils;

var
 WM_ThreadPoolChangeProc: DWORD;         //Изменение поточной процедуры
                                         //  А надо ли?
 WM_ThreadPoolEndExecuter: DWORD;        //Сообщение пулу потоков об окончании дочернего
 WM_ThreadPoolTerminate: DWORD;          //Сообщение пулу потоков о завершении
 WM_ThreadPoolTerminateExecuter: DWORD;  //Запрос на удаление потока из пула
 WM_ThreadPoolGetCountQueueJobs: DWORD;  //Запрос количества заданий в очереди
 WM_ThreadPoolDestroyQueue: DWORD;       //Запрос пулу на уничтожение очереди заданий
 WM_ThreadPoolCreateExecuter: DWORD;     //Сообщение пулу потоков о создании нового потока

type

 TExecuteProcedure=procedure(const aParam: Pointer);cdecl; //Процедура потока
 TClientProcedure=procedure(const aParam: Pointer);        //Клиентская процедура потока

 TThrExecuter=class;

 PParmJob=^TParmjob;
 TParmJob=record
   Proc: Pointer;
   Parm: Pointer;
 end;

 TThrExecuter=class
 private
   FHandle: THandle;                     //Дескриптор потока
   FThreadId: THandle;                   //Идентификатор потока
   FEvent: THandle;                      //ОЯ для ожидания
   FSuspended: Boolean;
   FJob: TParmJob;
 public
   constructor Create;
   destructor Destroy;override;
   procedure StartProc(aThreadProc:TClientProcedure;aParm: pointer);
 end;

 TThreadPool=class                       //Пул потоков
 private
   FHandle: THandle;                     //Дескриптор потока
   FThreadId: THandle;                   //Идентификатор потока
   FEvent: THandle;                      //ОЯ для ожидания
   FPool: array of TThrExecuter;         //Пул потоков на исполнение
   FListQueue: TList;                    //Очередь заданий на выполнение
   FMaxThreads: Integer;                 //Максимальное число потоков в пуле
   FreeOnTerminate: Boolean;             //Автоматическое освобождение
   FTerminated: Boolean;                 //Состояние завершения
   FThreadProcedure: TExecuteProcedure;  //Процедура потока
   FClientProc: TClientProcedure;        //Клиентская процедура потока
   FClientParam: Pointer; //reserved
   FTimeOutWaitReply: Integer;           //Таймаут ожидания ответа от поточной функции
   function CreateExecuter: TThrExecuter; //Создание потока в пуле
   procedure DeleteExecuter(const aIndex: Integer); overload; //Удалить поток в пуле
   procedure DeleteExecuter(const aThrExecuter: TThrExecuter); overload; //Удалить поток в пуле
   procedure SetMaxThreads(const MaxThreads: Integer); //Изменить максимальное число потоков в пуле
   function GetCountQueueJobs: Integer;  //Текущее число потоков в пуле
   function GetEvent: THandle;           //Создать новый ОЯ "Event"
   procedure CloseEvent(const aEvent: THandle); //Удалить ОЯ "Event"
   procedure DestroyQueue;               //Уничтожить очередь заданий
   function PostAndWait(const Msg,Param: Integer): Boolean; //Передача сообщения
                                         //в поточную функцию и ожидание обработки сообщения
   procedure _ClearQueue;                //Очистка очереди заданий и освобождение
 public
   constructor Create(const aMaxThreads: Integer);
   destructor Destroy;override;
   procedure AddJob(const aProc: TClientProcedure; const aParm: Pointer); //Добавить задание в очередь
   function SetClientProc(const Proc: TClientProcedure): Boolean;         //Изменить поточную функцию менеджера
   function TerminateJob(aThreadId: THandle): Boolean; //Терминировать выполняющееся задание в пуле
   procedure Release;                                  //Закончить работу менеджера
   procedure Terminate;                                //Выдать запрос на окончание работы менеджера

   property Handle: THandle read FHandle;              //Дескриптор потока менеджера
   property ThreadId: THandle read FThreadId;          //Идентификатор потока менеджера
   property MaxThreads: Integer read FMaxThreads write SetMaxThreads;
   property Terminated: Boolean read FTerminated;
   property CountQueue: Integer read GetCountQueueJobs;

//    property Priority: TThreadPrioroty read GetPriority write SetPriority;
 end;

implementation

{ TThr }

procedure ThrExecuterProc(const aParam: Pointer);cdecl;
var
 Thread: TThrExecuter;
 Msg: TMsg;
begin
 Thread := aParam;
 PeekMessage(Msg,0,0,0,PM_NOREMOVE);
 while GetMessage(Msg,0,0,0) do
 begin
   if Msg.message=WM_QUIT then
   begin
     if Assigned(Thread.FJob.Proc) then TExecuteProcedure(Thread.FJob.Proc)(Thread.FJob.Parm);
   end;
 end;
 Dispose(aParam);
 ExitThread(0);
end;

constructor TThrExecuter.Create;
begin
 inherited;
 FHandle := CreateThread(nil,0,@ThrExecuterProc,Self,0,FThreadId);
end;

destructor TThrExecuter.Destroy;
begin

end;

procedure TThrExecuter.StartProc(aThreadProc: TClientProcedure; aParm: Pointer);
begin

end;



 
panov ©   (2004-10-18 10:45) [95]

Удалено модератором
Примечание: дубль


 
panov ©   (2004-10-18 10:45) [96]

{ TThreadPool }

{
   Поточная функция менеджера потоков.
   Построена на обменом сообщениями между потоком и классом-оберткой.
   const aParm: Pointer - Ссылка на класс-обертку TThreadPool
}
procedure ThreadPoolProcedure(const aParm: Pointer);cdecl;
var
 ThreadPool: TThreadPool;
 Msg: TMsg;
begin
 ThreadPool := TThreadPool(aParm);       //Ссылка на класс-обертку TThreadPool
 PeekMessage(Msg,0,0,0,PM_NOREMOVE);     //Создаем очередь сообщений
 SetEvent(ThreadPool.FEvent);            //Извещаем о готовности к работе
 while GetMessage(Msg,0,0,0) do          //Выбираем сообщение из очереди
 begin

//Запрос на количество ожидающих заданий в очереди
//  wParam - ожидающий объект Event
//  lParam - адрес для возврата результата (PInteger)
   if Msg.message=WM_ThreadPoolGetCountQueueJobs then
   begin
     PInteger(Msg.lParam)^ := ThreadPool.FListQueue.Count;
     SetEvent(Msg.wParam);
     Continue;
   end;

//Запрос на уничтожение очереди заданий
//  wParam - ожидающий объект Event
   if Msg.message=WM_ThreadPoolDestroyQueue then
   begin
     ThreadPool._ClearQueue;
     SetEvent(Msg.wParam);
     Continue;
   end;
//Запрос на изменение поточной функции
//  Сразу входим в новую поточную функцию,
//  после окончания заканчиваем Выполнения потока
{ TODO -cЗаметка : Добавить возможность "Временного перехода" в клиентскую функцию) }
   if Msg.message=WM_ThreadPoolChangeProc then
   begin
     if Assigned(ThreadPool.FclientProc) then
     begin
       ThreadPool.FClientProc(aParm);
       ExitThread(0);
     end;
   end;

//Запрос на окончание потока
//
//
   if (Msg.message=WM_QUIT) or (Msg.message=WM_ThreadPoolTerminate) then ExitThread(0);
end;
end;

//Добавить в очередь задание
procedure TThreadPool.AddJob(const aProc: TClientProcedure;
 const aParm: Pointer);
begin

end;

constructor TThreadPool.Create(const aMaxThreads: Integer);
begin
 inherited Create;
 IsMultiThread := True;                  //Установить многопоточный режим
 FreeOnTerminate := True;                //Автоматически уничтожать объект
 FMaxThreads := aMaxThreads;             //Установить количество потоков в пуле
 FTimeOutWaitReply := 5000;              //Установить максимальное время ожидания
                                         //  ответа от поточной функции
 FThreadProcedure := ThreadPoolProcedure;  //Установить поточную функцию
 FEvent := CreateEvent(nil,True,False,nil); //Создать ОЯ "Event" для ожидания готовности
 if FEvent=0 then raise Exception.Create("Error create Event object"); //Не получилось создать Event
 FHandle := CreateThread(nil,0,@FThreadProcedure,Self,0,FThreadId); //Создаем поток для выполнения поточной функции
 if FHandle=0 then
 begin
   raise Exception.Create("TThreadPool:Error CreateThread:"+SysErrorMessage(GetLastError));
 end;

//Ждем, пока поточная функция не будет готова принимать запросы
 if WaitForSingleObject(FEvent,FTimeOutWaitReply)<>WAIT_OBJECT_0 then
 begin
   TerminateThread(FHandle,1);
   CloseHandle(FHandle);
   FHandle := 0;
   raise Exception.Create("TThreadPool:Error CreateThread:"+SysErrorMessage(GetLastError));
 end;

 FListQueue := TList.Create;             //Создаем очередь заданий
end;

//Создать поток в пуле потоков
function TThreadPool.CreateExecuter: TThrExecuter;
var
 Thread: TThrExecuter;
begin
 Thread := nil;
 Result := nil;
 if FTerminated then Exit;
 PostAndWait(WM_ThreadPoolDestroyQueue,Integer(@Thread));
end;

//Удалить поток в пуле потоков по индексу
procedure TThreadPool.DeleteExecuter(const aIndex: Integer);
begin
end;

//Удалить поток в пуле потоков по ссылке на поток TThreadExecuter
procedure TThreadPool.DeleteExecuter(const aThrExecuter: TThrExecuter);
begin
end;

//Уничтожение объекта TThreadPool
destructor TThreadPool.Destroy;
begin
 if not FTerminated then Exit;
 if FEvent<>0 then CloseHandle(FEvent);
 if FHandle<>0 then CloseHandle(FHandle);
 inherited;
end;

//Запрос на уничтожение очереди заданий
procedure TThreadPool.DestroyQueue;
begin
 PostAndWait(WM_ThreadPoolDestroyQueue,0);
end;

//Запрос количества заданий в очереди
function TThreadPool.GetCountQueueJobs: Integer;
begin
 Result := -1;
 PostAndWait(WM_ThreadPoolGetCountQueueJobs,Integer(@Result));
end;

//Создать ОЯ "Event"
function TThreadPool.GetEvent: THandle;
begin
 Result := CreateEvent(nil,True,False,nil);
end;

//Освободить ОЯ "Event"
procedure TThreadPool.CloseEvent(const aEvent: THandle);
begin
 if aEvent<> 0 then CloseHandle(aEvent);
end;

//Запрос поточной функции и ожидание ответа.
function TThreadPool.PostAndWait(const Msg, Param: Integer): Boolean;
var
 Event: THandle;
begin
 Result := False;
 Event := GetEvent;
 if Event=0 then Exit;
 try
   PostThreadMessage(FThreadId,Msg,Event,Param);
   WaitForSingleObject(Event,FTimeOutWaitReply);
 finally
   CloseEvent(Event);
 end;
end;

//Окончание работы
procedure TThreadPool.Release;
begin
 if FTerminated then Exit;
 FTerminated := True;
 DestroyQueue;
 PostThreadMessage(FThreadId,WM_ThreadPoolTerminate,0,0);
 if WaitForSingleObject(FHandle,FTimeOutWaitReply)<>WAIT_OBJECT_0 then
 begin
   TerminateThread(FHandle,1);
 end;
 if FreeOnTerminate then Destroy;
end;

function TThreadPool.SetClientProc(const Proc: TClientProcedure): Boolean;
begin
 Result := False;
 if Terminated then Exit;
 if Assigned(FClientProc) then Exit;
 FClientProc := Proc;
 PostThreadMessage(FThreadId,WM_ThreadPoolChangeProc,0,0);
 Result := True;
end;

procedure TThreadPool.SetMaxThreads(const MaxThreads: Integer);
begin

end;

procedure TThreadPool.Terminate;
begin
 if FTerminated then Exit;
 Release;
end;

function TThreadPool.TerminateJob(aThreadId: THandle): Boolean;
begin
 Result := False;
end;

procedure TThreadPool._ClearQueue;
var
 i: Integer;
begin
 if not FTerminated then Exit;
 if not Assigned(FListQueue) then Exit;
 for i := 0 to FListQueue.Count-1 do
 begin
   FListQueue.Delete(i);
 end;
 FListQueue.Free;
end;

initialization
 WM_ThreadPoolChangeProc := RegisterWindowMessage("TThreadPoolWM_ThreadPoolChangeProc");
 WM_ThreadPoolEndExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolEndExecuter");
 WM_ThreadPoolTerminate := RegisterWindowMessage("TThreadPoolWM_ThreadPoolTerminate");
 WM_ThreadPoolTerminateExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolTerminateExecuter");
 WM_ThreadPoolGetCountQueueJobs := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolGetCountQueueJobs");
 WM_ThreadPoolDestroyQueue := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolDestroyQueue");
 WM_ThreadPoolCreateExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolCreateExecuter");
finalization
end.


 
Игорь Шевченко ©   (2004-10-18 12:53) [97]

panov ©   (16.10.04 18:46) [93]


> Игорь, подскажи, где можно увидеть пример готового и апробированного
> решения для платформы Windows(версии не ниже Win95)?


Я рискну высказать крамольную мысль, но строить серьезную многопоточную систему на потребительских версиях Windows (Win95,Win98,WinME) - это чистой воды буратинизм.

Готовое решение для Win2k есть в книжке Рихтера и Кларка "программирование серверных приложений для Windows 2000.
"


 
panov ©   (2004-10-18 13:10) [98]

>Игорь Шевченко ©   (18.10.04 12:53) [97]

Я рискну высказать крамольную мысль, но строить серьезную многопоточную систему на потребительских версиях Windows (Win95,Win98,WinME) - это чистой воды буратинизм.


С тем, что строить на Win98 серьезную многопоточную систему как сервер приложений не стоит. Тут я согласен.
А вот серьезную многопоточную клиентскую программу, в принципе, почему бы и нет-)

Понятно, что в этом случае надежность ОС не обеспечивает, но для многих клиентских программ достаточная надежность и устойчивость на порядки ниже серверных приложений.

Кроме этого, согласись, что та же WinNT4 дает достаточный уровень надежности, но не предоставляет таких встроенных в систему инструментов.


 
Игорь Шевченко ©   (2004-10-18 13:22) [99]


> А вот серьезную многопоточную клиентскую программу, в принципе,
> почему бы и нет-)
>
> Понятно, что в этом случае надежность ОС не обеспечивает,
> но для многих клиентских программ достаточная надежность
> и устойчивость на порядки ниже серверных приложений.


А пример ? Именно такой клиентской задачи, где требуется пул ?

А насчет NT4 - у меня сейчас книги под рукой нету, но через какое-то время я таки-посмотрю, что за механизмы там используются и есть ли они в NT4


 
panov ©   (2004-10-18 13:28) [100]

>Игорь Шевченко ©   (18.10.04 13:22) [99]

А пример? Именно такой клиентской задачи, где требуется пул ?

Так нет таких задач, где без пула потоков обойтись нельзя.
Ведь это лишь способ организации и управления несколькими потоками.

Такую задачу я в начале топика привел, как пример - многопоточное копирование файлов.

Навскидку могу привести еще примеры - скачивание нескольких файлов одновременно с интернет-сайтов, сканирование сети и пр.


 
Evgeny V ©   (2004-10-18 13:30) [101]


> panov ©   (18.10.04 13:10) [98]


> Кроме этого, согласись, что та же WinNT4 дает достаточный
> уровень надежности, но не предоставляет таких встроенных
> в систему инструментов.


GetQueuedCompletionStatus и PostQueuedCompletionStatus - позволяет удобно организовать пул потоков, в 9x их конечно нет, но по MSDN
Client: Included in Windows XP, Windows 2000 Professional, and Windows NT Workstation 3.5 and later.
Server: Included in Windows Server 2003, Windows 2000 Server, and Windows NT Server 3.5 and later.Header: Declared in Winbase.h; include Windows.h.
Library: Use Kernel32.lib.

В 2000 конечно появились более удобные функции для этой цели - QueueUserWorkItem например


 
panov ©   (2004-10-18 13:38) [102]

>Evgeny V ©   (18.10.04 13:30) [101]

Да, действительно.
Но все же у меня дома W98 и мне хочется, чтобы в этой системе тоже работало-)


 
Игорь Шевченко ©   (2004-10-18 14:06) [103]

panov ©   (18.10.04 13:28) [100]

Пул потоков есть средство ограничения обработки запросов фиксированным количеством потоков, при этом, остальные запросы ставятся в очередь до освобождения одного из потоков - типичная задача для архитерктуры супер-сервер (СУБД), например. Каким образом это поможет на клиенте, честно, не вижу.

Я могу сослаться только на пост [92] о поводе для написания подобного пула - общее самообразование.


 
Владислав ©   (2004-10-19 12:56) [104]

Понадобился пул по работе. Вот что я написал

unit ThreadPool;

interface

uses
 Windows, SysUtils, asLists;

type

 TJobProcedure = procedure(Param: Pointer);

 TThreadPool = class(TObject)
 private
   FJobExistsEvent: DWORD;
   FTerminatedEvent: DWORD;
 private
   FThreadHandles: array of DWORD;
   FJobListCS: TRTLCriticalSection;
   FJobList: TDoubleLinkedList;
   FThreadCount: DWORD;
   FStackSize: DWORD;
   FTerminated: Boolean;
   procedure SetTerminated(const Value: Boolean);
 private
   function Initialize: Boolean;
   procedure Finalize;
   procedure LockJobList;
   procedure UnlockJobList;
   function InsertJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
   procedure RemoveJob(AJob: Pointer);
   function WaitForJobTimeout(Timeout: DWORD; var Proc: TJobProcedure;
     var Param: Pointer): Boolean;
   function WaitForJobInfinite(var Proc: TJobProcedure; var Param: Pointer): Boolean;
 public
   constructor Create(AThreadCount, AStackSize: DWORD);
   destructor Destroy; override;
   function AddJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
   property ThreadCount: DWORD read FThreadCount;
   property StackSize: DWORD read FStackSize;
   property Terminated: Boolean read FTerminated write SetTerminated;
 end;

implementation

type

 PJob = ^TJob;
 TJob = record
   Proc: TJobProcedure;
   Param: Pointer;
   Link: TDoubleLinkedLink
 end;

function ThreadFunc(Param: Pointer): Integer;
var
 LPool: TThreadPool;
 LProc: TJobProcedure;
 LParam: Pointer;
begin
 LPool := Param;
 try
   while not LPool.Terminated do
   begin
     if LPool.WaitForJobInfinite(LProc, LParam) then
     begin
       try
         LProc(LParam)
       except
       end
     end
   end
 except
 end;
 Result := 0
end;

{ TThreadPool }

constructor TThreadPool.Create(AThreadCount, AStackSize: DWORD);
begin
 FThreadCount := AThreadCount;
 FStackSize := AStackSize;
 SetLength(FThreadHandles, FThreadCount);
 if not Initialize then
   RaiseLastOSError
end;

destructor TThreadPool.Destroy;
begin
 Finalize;
 inherited;
end;

function TThreadPool.Initialize: Boolean;
var
 i: Integer;
 LThreadID: DWORD;
begin
 InitializeCriticalSection(FJobListCS);
 FJobExistsEvent := CreateEvent(nil, False, False, nil);
 Result := FJobExistsEvent <> 0;
 if not Result then
   Exit;
 FTerminatedEvent := CreateEvent(nil, True, False, nil);
 Result := FTerminatedEvent <> 0;
 if not Result then
   Exit;
 for i := 0 to Integer(FThreadCount) - 1 do
 begin
   FThreadHandles[i] := BeginThread(nil, FStackSize, @ThreadFunc, Self,
     CREATE_SUSPENDED, LThreadID);
   Result := FThreadHandles[i] <> 0;
   if not Result then
     Break
 end;
 if not Result then
   Terminated := True;
 for i := 0 to Integer(FThreadCount) - 1 do
   if FThreadHandles[i] <> 0 then
     ResumeThread(FThreadHandles[i])
end;

procedure TThreadPool.Finalize;
var
 i: Integer;
 LWaitResult: DWORD;
 LLink: PDoubleLinkedLink;
begin
 Terminated := True;
 for i := 0 to FThreadCount - 1 do
 begin
   if FThreadHandles[i] <> 0 then
   begin
     LWaitResult := WaitForSingleObject(FThreadHandles[i], INFINITE);
     if LWaitResult <> WAIT_OBJECT_0 then
       TerminateThread(FThreadHandles[i], 1);
     CloseHandle(FThreadHandles[i]);
     FThreadHandles[i] := 0
   end
 end;
 while True do
 begin
   LLink := RemoveFromHead(@FJobList);
   if LLink <> nil then
     RemoveJob(LLink^.Data)
   else
     Break
 end;
 if FTerminatedEvent <> 0 then
 begin
   CloseHandle(FTerminatedEvent);
   FTerminatedEvent := 0
 end;
 if FJobExistsEvent <> 0 then
 begin
   CloseHandle(FJobExistsEvent);
   FJobExistsEvent := 0
 end;
 DeleteCriticalSection(FJobListCS);
end;

procedure TThreadPool.LockJobList;
begin
 EnterCriticalSection(FJobListCS);
end;

procedure TThreadPool.UnlockJobList;
begin
 LeaveCriticalSection(FJobListCS);
end;

function TThreadPool.AddJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
begin
 Result := InsertJob(AProc, AParam)
end;

function TThreadPool.InsertJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
var
 LJob: PJob;
begin
 try
   New(LJob)
 except
   SetLastError(ERROR_NOT_ENOUGH_MEMORY);
   Result := False;
   Exit
 end;
 with LJob^ do
 begin
   Proc := AProc;
   Param := AParam;
   with Link do
   begin
     Next := nil;
     Prev := nil;
     Data := LJob
   end
 end;
 LockJobList;
 try
   InsertIntoTail(@FJobList, @LJob^.Link);
   SetEvent(FJobExistsEvent)
 finally
   UnlockJobList
 end;
 Result := True
end;

procedure TThreadPool.RemoveJob(AJob: Pointer);
begin
 Dispose(AJob)
end;

function TThreadPool.WaitForJobTimeout(Timeout: DWORD;
 var Proc: TJobProcedure; var Param: Pointer): Boolean;
var
 LWaitResult: DWORD;
 LLink: PDoubleLinkedLink;
 LJob: PJob;
begin
 Result := False;
 Proc := nil;
 Param := nil;
 LWaitResult := WaitForMultipleObjects(2, @FJobExistsEvent, False, Timeout);
 if LWaitResult = WAIT_OBJECT_0 then
 begin
   if Terminated then
   begin
     SetEvent(FJobExistsEvent);
     Exit
   end;
   LockJobList;
   try
     if FJobList.Size = 0 then
       Exit;
     LLink := RemoveFromHead(@FJobList);
     if FJobList.Size <> 0 then
       SetEvent(FJobExistsEvent)
   finally
     UnlockJobList
   end;
   if LLink <> nil then
   begin
     LJob := LLink^.Data;
     Proc := LJob^.Proc;
     Param := LJob^.Param;
     Result := True;
     RemoveJob(LJob)
   end
 end
end;

function TThreadPool.WaitForJobInfinite(var Proc: TJobProcedure;
 var Param: Pointer): Boolean;
begin
 Result := WaitForJobTimeout(INFINITE, Proc, Param)
end;

procedure TThreadPool.SetTerminated(const Value: Boolean);
begin
 if Value then
 begin
   FTerminated := Value;
   SetEvent(FTerminatedEvent)
 end
end;

end.


 
Игорь Шевченко ©   (2004-10-19 13:59) [105]

Владислав ©   (19.10.04 12:56) [104]

А TThreadList вместо DoubleLinkedList и его обслуги не проще будет применить ?


 
Владислав ©   (2004-10-19 14:56) [106]

В TList будет постоянный Move, а на кой он нужен?


 
Игорь Шевченко ©   (2004-10-19 15:08) [107]

Владислав ©   (19.10.04 12:56) [104]


> В TList будет постоянный Move


Это плохо ?

Зато в коде строчек будет меньше...


 
Владислав ©   (2004-10-19 15:11) [108]

Их и так минимум :)


 
Игорь Шевченко ©   (2004-10-19 15:28) [109]

Владислав ©   (19.10.04 15:11) [108]


> Их и так минимум :)


Это тебе кажется :)

Как минимум - метод WaitForJobTimeOut используется один раз в методе WaitForJobInfinite - смело можно выкидывать и переносить функциональность в WaitForJobInfinite (это первое, что бросилось в глаза).

Кроме того, я бы попробовал сделать с TThreadList - строчек уменьшится, по-моему.


 
Владислав ©   (2004-10-19 16:27) [110]

Конечно, можно и выкинуть и попробовать :)
Только не лежит у меня душа к TList, а оптимизацией я еще вообще не занимался. Будут узкие места, буду оптимизировать.


 
panov ©   (2004-10-27 22:38) [111]

<Offtopic>
Чтобы в архив не уехал, так как работа продолжается...
</Offtopic>



Страницы: 1 2 3 вся ветка

Форум: "Основная";
Текущий архив: 2004.11.14;
Скачать: [xml.tar.bz2];

Наверх





Память: 0.9 MB
Время: 0.038 c
14-1098952123
_none_
2004-10-28 12:28
2004.11.14
Существуют ли аналоги стандартных контролов, позволяющие...


4-1096073306
tnx
2004-09-25 04:48
2004.11.14
IExplorer History


1-1099385564
}|{yk
2004-11-02 11:52
2004.11.14
При инсталляции TurboPower InternetPro получаю сообщение


4-1097041381
Суслик
2004-10-06 09:43
2004.11.14
Шрифт


4-1096915697
Klopan
2004-10-04 22:48
2004.11.14
Обновить часть экрана.





Afrikaans Albanian Arabic Armenian Azerbaijani Basque Belarusian Bulgarian Catalan Chinese (Simplified) Chinese (Traditional) Croatian Czech Danish Dutch English Estonian Filipino Finnish French
Galician Georgian German Greek Haitian Creole Hebrew Hindi Hungarian Icelandic Indonesian Irish Italian Japanese Korean Latvian Lithuanian Macedonian Malay Maltese Norwegian
Persian Polish Portuguese Romanian Russian Serbian Slovak Slovenian Spanish Swahili Swedish Thai Turkish Ukrainian Urdu Vietnamese Welsh Yiddish Bengali Bosnian
Cebuano Esperanto Gujarati Hausa Hmong Igbo Javanese Kannada Khmer Lao Latin Maori Marathi Mongolian Nepali Punjabi Somali Tamil Telugu Yoruba
Zulu
Английский Французский Немецкий Итальянский Португальский Русский Испанский