Главная страница
    Top.Mail.Ru    Яндекс.Метрика
Форум: "Основная";
Текущий архив: 2004.11.14;
Скачать: [xml.tar.bz2];

Вниз

Пул потоков.   Найти похожие ветки 

 
Суслик ©   (2004-10-15 12:33) [80]


>  [79] Игорь Шевченко ©   (15.10.04 12:29)

Определение синхронизации в студию!


 
Игорь Шевченко ©   (2004-10-15 12:36) [81]

Суслик ©   (15.10.04 12:33) [80]

Синхронизация - (от греч. synchronos - одновременный), приведение двух или нескольких процессов к синхроннонсти, т. е. к такому их протеканию, когда одинаковые или соответствующие элементы процессов совершаются с неизменным сдвигом по фазе друг относительно друга или одновременно.

http://encycl.yandex.ru/cgi-bin/art.pl?art=bse/00071/31300.htm&encpage=bse&mrkp=/yandbtm7%3Fq%3D-811200565%26p%3D0%26g%3 D0%26d%3D0%26ag%3Denc_abc%26tg%3D1%26p0%3D0%26q0%3D1334144256%26d0%3D0%26script%3D/yandpage%253F

Мог бы и сам посмотреть (с)


 
panov ©   (2004-10-15 12:42) [82]

>Zelius ©   (15.10.04 11:36) [76]
Что бы не было вопрос при создании потомков TThread о том что раньше отработает, конструктор с инициализацией всех ресурсов или начнет Execute выполняться, я всегда сначала инициализирую ресурсы и только потом вызываю наследуемый конструктор:

В данном примере это не имеет значения, так как создание очереди сообщений происходит в поточной функции, а не в конструкторе. Вот создания этой очереди и нужно дождаться.


 
Суслик ©   (2004-10-15 12:43) [83]


>  [81] Игорь Шевченко ©   (15.10.04 12:36)

не понял:(


> совершаются с неизменным сдвигом по фазе друг относительно
> друга или одновременно.


Т.е. это значит, что синхронизируемый кусок кода должен выполняться в каждый момент не более чем одним потоком? Или нет?


 
Игорь Шевченко ©   (2004-10-15 12:50) [84]

Суслик ©   (15.10.04 12:43) [83]

Синхронизация применительно к вычислительным процессам - это гарантированное предоставление монопольного доступа к ресурсу одному процессу в каждый момент времени непротиворечивого состояния ресурса.

Слово процесс для Windows заменяется на поток.

В коде, приведенном тобой, при помощи специального алгоритма, гарантируется непротиворечивое состояние ресурса для каждого потока в любой момент времени.


 
Суслик ©   (2004-10-15 12:55) [85]


>  [84] Игорь Шевченко ©   (15.10.04 12:50)

если учитывать такое определение

> Синхронизация применительно к вычислительным процессам -
> это гарантированное предоставление монопольного доступа
> к ресурсу одному процессу в каждый момент времени непротиворечивого
> состояния ресурса.

то  полность согласен с этим

> В коде, приведенном тобой, при помощи специального алгоритма,
> гарантируется непротиворечивое состояние ресурса для каждого
> потока в любой момент времени.


Признаю, что скорее всего был не прав в том, что называл приведенный мной пример синхронизацией - это всего лишь корректная многопоточность.

ЗЫ. Тонкое это дело. Есть какие-нибудь книги про всякие приемы многопоточного программирования?


 
Игорь Шевченко ©   (2004-10-15 13:04) [86]

Суслик ©   (15.10.04 12:55) [85]

Вот теперь и мы пришли к единому мнению :)


> Есть какие-нибудь книги про всякие приемы многопоточного
> программирования?


Рихтер, как ни странно, программирование для Win32.
У Таненбаума очень хорошо рассказано про концепции.

Я просто с понятием "
> это гарантированное предоставление монопольного доступа
> к ресурсу одному процессу в каждый момент времени непротиворечивого
> состояния ресурса.
"
знаком с 1985 года, если не ошибаюсь, первый раз встретил у Шоу, в "Логическом проектировании операционных систем".


 
Суслик ©   (2004-10-15 18:06) [87]

Прошу прощения, за реанимирование темы.

Но я бы хотел высказать пожелание автору топика.

Помню когда-то в "Потрепаться" была борьба за фак. Я не помню, кто был инициатором этой работы, но идея была классная - коллективная работа над материалом. К сожалению, у меня тогда не было клиента, поэтому результатов под рукой нет. Но я помню, что в результате работы было проработано некоторое количество вопросов.

Я считал (и считаю), что фак, это хорошо, но не очень жизненно, т.к. вопросы и ответы в основном для ленивых: ответ всегда можно найти в документации.

Другой вопрос, что существуют проблемы, которые имеют определенное название. Например, пул потоков. Пул потоков нельзя отнести в фак, но в тоже время он является полезным и частоприменимым (в определенных областях) классом.

Думаю, что многие со мной согласятся, что коллективная разработка высокоскоростного пула потоков, являтется общественно полезным делом.

Поэтому предложение автору сего топика продолжить обсуждение кода пула потоков.


 
Игорь Шевченко ©   (2004-10-15 18:34) [88]


> Пул потоков нельзя отнести в фак, но в тоже время он является
> полезным и частоприменимым (в определенных областях) классом


Например, в каких областях ?

Кстати, в Win2k он уже встроен в систему.


 
Суслик ©   (2004-10-15 18:44) [89]


>  [88] Игорь Шевченко ©   (15.10.04 18:34)

Сервер (компьютераная экономическая игра). Многопоточный. Но она реализована очень слабо - маленькая нагрузка, поэтому пулами не пользуемся. А можно было бы.


> Кстати, в Win2k он уже встроен в систему.

Тот же Фаулер пишет (арх. корп. прогр. прил.), что есть команды разработчков, которые предпочитают не пользоваться готовыми решениями. Я из их группы. Также он пишет, что по определенным соображениям такие разработчики имеют право на существование. Логика ясна?


 
Суслик ©   (2004-10-15 18:45) [90]

А впрочем, не хотите, как хотите. Полезная школа для неновичков была бы.


 
panov ©   (2004-10-15 19:32) [91]

>Суслик ©   (15.10.04 18:45) [90]

Так как желание продолжить обсуждение есть, то могу выложить здесь черноввые наброски своего класса(panov ©   (14.10.04 19:04) [62]), а также модуль сс своей реализацией потока от y-soft(если будет получено разрешение от него).


 
Игорь Шевченко ©   (2004-10-15 21:56) [92]


> что есть команды разработчков, которые предпочитают не пользоваться
> готовыми решениями. Я из их группы. Также он пишет, что
> по определенным соображениям такие разработчики имеют право
> на существование. Логика ясна?


Безусловно, ясна. Для собственной ерундиции, разумеется, изобретение велосипедов есть полезное и достойное занятие. (Эт не шутка, а вполне серьезное заявление, не ставящее целью кого-либо обидеть).

Но для промышленной задачи я бы предпочел готовое и опробованное решение, хотя бы в силу его изобретенности (не придется продираться сквозь те грабли, сквозь которые продрались разработчики), отлаженности (в силу тех же обстоятельств) и документированности.


 
panov ©   (2004-10-16 18:46) [93]

>Игорь Шевченко ©   (15.10.04 21:56) [92]

Игорь, подскажи, где можно увидеть пример готового и апробированного решения для платформы Windows(версии не ниже Win95)?


 
panov ©   (2004-10-18 10:44) [94]

Вот почти готовая реализация 2-х классов - пул потоков(TThreadPool), и, собстувенно, поток в пуле(TThrExecuter).

TThrExecuter пока не реализован.

Это костяк, на который далее буду наращивать мясо.

Пока всё это полностью не реализовано, может есть каки-то замечания...

unit uTThreadPool;

interface

uses classes, windows, messages, sysutils;

var
 WM_ThreadPoolChangeProc: DWORD;         //Изменение поточной процедуры
                                         //  А надо ли?
 WM_ThreadPoolEndExecuter: DWORD;        //Сообщение пулу потоков об окончании дочернего
 WM_ThreadPoolTerminate: DWORD;          //Сообщение пулу потоков о завершении
 WM_ThreadPoolTerminateExecuter: DWORD;  //Запрос на удаление потока из пула
 WM_ThreadPoolGetCountQueueJobs: DWORD;  //Запрос количества заданий в очереди
 WM_ThreadPoolDestroyQueue: DWORD;       //Запрос пулу на уничтожение очереди заданий
 WM_ThreadPoolCreateExecuter: DWORD;     //Сообщение пулу потоков о создании нового потока

type

 TExecuteProcedure=procedure(const aParam: Pointer);cdecl; //Процедура потока
 TClientProcedure=procedure(const aParam: Pointer);        //Клиентская процедура потока

 TThrExecuter=class;

 PParmJob=^TParmjob;
 TParmJob=record
   Proc: Pointer;
   Parm: Pointer;
 end;

 TThrExecuter=class
 private
   FHandle: THandle;                     //Дескриптор потока
   FThreadId: THandle;                   //Идентификатор потока
   FEvent: THandle;                      //ОЯ для ожидания
   FSuspended: Boolean;
   FJob: TParmJob;
 public
   constructor Create;
   destructor Destroy;override;
   procedure StartProc(aThreadProc:TClientProcedure;aParm: pointer);
 end;

 TThreadPool=class                       //Пул потоков
 private
   FHandle: THandle;                     //Дескриптор потока
   FThreadId: THandle;                   //Идентификатор потока
   FEvent: THandle;                      //ОЯ для ожидания
   FPool: array of TThrExecuter;         //Пул потоков на исполнение
   FListQueue: TList;                    //Очередь заданий на выполнение
   FMaxThreads: Integer;                 //Максимальное число потоков в пуле
   FreeOnTerminate: Boolean;             //Автоматическое освобождение
   FTerminated: Boolean;                 //Состояние завершения
   FThreadProcedure: TExecuteProcedure;  //Процедура потока
   FClientProc: TClientProcedure;        //Клиентская процедура потока
   FClientParam: Pointer; //reserved
   FTimeOutWaitReply: Integer;           //Таймаут ожидания ответа от поточной функции
   function CreateExecuter: TThrExecuter; //Создание потока в пуле
   procedure DeleteExecuter(const aIndex: Integer); overload; //Удалить поток в пуле
   procedure DeleteExecuter(const aThrExecuter: TThrExecuter); overload; //Удалить поток в пуле
   procedure SetMaxThreads(const MaxThreads: Integer); //Изменить максимальное число потоков в пуле
   function GetCountQueueJobs: Integer;  //Текущее число потоков в пуле
   function GetEvent: THandle;           //Создать новый ОЯ "Event"
   procedure CloseEvent(const aEvent: THandle); //Удалить ОЯ "Event"
   procedure DestroyQueue;               //Уничтожить очередь заданий
   function PostAndWait(const Msg,Param: Integer): Boolean; //Передача сообщения
                                         //в поточную функцию и ожидание обработки сообщения
   procedure _ClearQueue;                //Очистка очереди заданий и освобождение
 public
   constructor Create(const aMaxThreads: Integer);
   destructor Destroy;override;
   procedure AddJob(const aProc: TClientProcedure; const aParm: Pointer); //Добавить задание в очередь
   function SetClientProc(const Proc: TClientProcedure): Boolean;         //Изменить поточную функцию менеджера
   function TerminateJob(aThreadId: THandle): Boolean; //Терминировать выполняющееся задание в пуле
   procedure Release;                                  //Закончить работу менеджера
   procedure Terminate;                                //Выдать запрос на окончание работы менеджера

   property Handle: THandle read FHandle;              //Дескриптор потока менеджера
   property ThreadId: THandle read FThreadId;          //Идентификатор потока менеджера
   property MaxThreads: Integer read FMaxThreads write SetMaxThreads;
   property Terminated: Boolean read FTerminated;
   property CountQueue: Integer read GetCountQueueJobs;

//    property Priority: TThreadPrioroty read GetPriority write SetPriority;
 end;

implementation

{ TThr }

procedure ThrExecuterProc(const aParam: Pointer);cdecl;
var
 Thread: TThrExecuter;
 Msg: TMsg;
begin
 Thread := aParam;
 PeekMessage(Msg,0,0,0,PM_NOREMOVE);
 while GetMessage(Msg,0,0,0) do
 begin
   if Msg.message=WM_QUIT then
   begin
     if Assigned(Thread.FJob.Proc) then TExecuteProcedure(Thread.FJob.Proc)(Thread.FJob.Parm);
   end;
 end;
 Dispose(aParam);
 ExitThread(0);
end;

constructor TThrExecuter.Create;
begin
 inherited;
 FHandle := CreateThread(nil,0,@ThrExecuterProc,Self,0,FThreadId);
end;

destructor TThrExecuter.Destroy;
begin

end;

procedure TThrExecuter.StartProc(aThreadProc: TClientProcedure; aParm: Pointer);
begin

end;



 
panov ©   (2004-10-18 10:45) [95]

Удалено модератором
Примечание: дубль


 
panov ©   (2004-10-18 10:45) [96]

{ TThreadPool }

{
   Поточная функция менеджера потоков.
   Построена на обменом сообщениями между потоком и классом-оберткой.
   const aParm: Pointer - Ссылка на класс-обертку TThreadPool
}
procedure ThreadPoolProcedure(const aParm: Pointer);cdecl;
var
 ThreadPool: TThreadPool;
 Msg: TMsg;
begin
 ThreadPool := TThreadPool(aParm);       //Ссылка на класс-обертку TThreadPool
 PeekMessage(Msg,0,0,0,PM_NOREMOVE);     //Создаем очередь сообщений
 SetEvent(ThreadPool.FEvent);            //Извещаем о готовности к работе
 while GetMessage(Msg,0,0,0) do          //Выбираем сообщение из очереди
 begin

//Запрос на количество ожидающих заданий в очереди
//  wParam - ожидающий объект Event
//  lParam - адрес для возврата результата (PInteger)
   if Msg.message=WM_ThreadPoolGetCountQueueJobs then
   begin
     PInteger(Msg.lParam)^ := ThreadPool.FListQueue.Count;
     SetEvent(Msg.wParam);
     Continue;
   end;

//Запрос на уничтожение очереди заданий
//  wParam - ожидающий объект Event
   if Msg.message=WM_ThreadPoolDestroyQueue then
   begin
     ThreadPool._ClearQueue;
     SetEvent(Msg.wParam);
     Continue;
   end;
//Запрос на изменение поточной функции
//  Сразу входим в новую поточную функцию,
//  после окончания заканчиваем Выполнения потока
{ TODO -cЗаметка : Добавить возможность "Временного перехода" в клиентскую функцию) }
   if Msg.message=WM_ThreadPoolChangeProc then
   begin
     if Assigned(ThreadPool.FclientProc) then
     begin
       ThreadPool.FClientProc(aParm);
       ExitThread(0);
     end;
   end;

//Запрос на окончание потока
//
//
   if (Msg.message=WM_QUIT) or (Msg.message=WM_ThreadPoolTerminate) then ExitThread(0);
end;
end;

//Добавить в очередь задание
procedure TThreadPool.AddJob(const aProc: TClientProcedure;
 const aParm: Pointer);
begin

end;

constructor TThreadPool.Create(const aMaxThreads: Integer);
begin
 inherited Create;
 IsMultiThread := True;                  //Установить многопоточный режим
 FreeOnTerminate := True;                //Автоматически уничтожать объект
 FMaxThreads := aMaxThreads;             //Установить количество потоков в пуле
 FTimeOutWaitReply := 5000;              //Установить максимальное время ожидания
                                         //  ответа от поточной функции
 FThreadProcedure := ThreadPoolProcedure;  //Установить поточную функцию
 FEvent := CreateEvent(nil,True,False,nil); //Создать ОЯ "Event" для ожидания готовности
 if FEvent=0 then raise Exception.Create("Error create Event object"); //Не получилось создать Event
 FHandle := CreateThread(nil,0,@FThreadProcedure,Self,0,FThreadId); //Создаем поток для выполнения поточной функции
 if FHandle=0 then
 begin
   raise Exception.Create("TThreadPool:Error CreateThread:"+SysErrorMessage(GetLastError));
 end;

//Ждем, пока поточная функция не будет готова принимать запросы
 if WaitForSingleObject(FEvent,FTimeOutWaitReply)<>WAIT_OBJECT_0 then
 begin
   TerminateThread(FHandle,1);
   CloseHandle(FHandle);
   FHandle := 0;
   raise Exception.Create("TThreadPool:Error CreateThread:"+SysErrorMessage(GetLastError));
 end;

 FListQueue := TList.Create;             //Создаем очередь заданий
end;

//Создать поток в пуле потоков
function TThreadPool.CreateExecuter: TThrExecuter;
var
 Thread: TThrExecuter;
begin
 Thread := nil;
 Result := nil;
 if FTerminated then Exit;
 PostAndWait(WM_ThreadPoolDestroyQueue,Integer(@Thread));
end;

//Удалить поток в пуле потоков по индексу
procedure TThreadPool.DeleteExecuter(const aIndex: Integer);
begin
end;

//Удалить поток в пуле потоков по ссылке на поток TThreadExecuter
procedure TThreadPool.DeleteExecuter(const aThrExecuter: TThrExecuter);
begin
end;

//Уничтожение объекта TThreadPool
destructor TThreadPool.Destroy;
begin
 if not FTerminated then Exit;
 if FEvent<>0 then CloseHandle(FEvent);
 if FHandle<>0 then CloseHandle(FHandle);
 inherited;
end;

//Запрос на уничтожение очереди заданий
procedure TThreadPool.DestroyQueue;
begin
 PostAndWait(WM_ThreadPoolDestroyQueue,0);
end;

//Запрос количества заданий в очереди
function TThreadPool.GetCountQueueJobs: Integer;
begin
 Result := -1;
 PostAndWait(WM_ThreadPoolGetCountQueueJobs,Integer(@Result));
end;

//Создать ОЯ "Event"
function TThreadPool.GetEvent: THandle;
begin
 Result := CreateEvent(nil,True,False,nil);
end;

//Освободить ОЯ "Event"
procedure TThreadPool.CloseEvent(const aEvent: THandle);
begin
 if aEvent<> 0 then CloseHandle(aEvent);
end;

//Запрос поточной функции и ожидание ответа.
function TThreadPool.PostAndWait(const Msg, Param: Integer): Boolean;
var
 Event: THandle;
begin
 Result := False;
 Event := GetEvent;
 if Event=0 then Exit;
 try
   PostThreadMessage(FThreadId,Msg,Event,Param);
   WaitForSingleObject(Event,FTimeOutWaitReply);
 finally
   CloseEvent(Event);
 end;
end;

//Окончание работы
procedure TThreadPool.Release;
begin
 if FTerminated then Exit;
 FTerminated := True;
 DestroyQueue;
 PostThreadMessage(FThreadId,WM_ThreadPoolTerminate,0,0);
 if WaitForSingleObject(FHandle,FTimeOutWaitReply)<>WAIT_OBJECT_0 then
 begin
   TerminateThread(FHandle,1);
 end;
 if FreeOnTerminate then Destroy;
end;

function TThreadPool.SetClientProc(const Proc: TClientProcedure): Boolean;
begin
 Result := False;
 if Terminated then Exit;
 if Assigned(FClientProc) then Exit;
 FClientProc := Proc;
 PostThreadMessage(FThreadId,WM_ThreadPoolChangeProc,0,0);
 Result := True;
end;

procedure TThreadPool.SetMaxThreads(const MaxThreads: Integer);
begin

end;

procedure TThreadPool.Terminate;
begin
 if FTerminated then Exit;
 Release;
end;

function TThreadPool.TerminateJob(aThreadId: THandle): Boolean;
begin
 Result := False;
end;

procedure TThreadPool._ClearQueue;
var
 i: Integer;
begin
 if not FTerminated then Exit;
 if not Assigned(FListQueue) then Exit;
 for i := 0 to FListQueue.Count-1 do
 begin
   FListQueue.Delete(i);
 end;
 FListQueue.Free;
end;

initialization
 WM_ThreadPoolChangeProc := RegisterWindowMessage("TThreadPoolWM_ThreadPoolChangeProc");
 WM_ThreadPoolEndExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolEndExecuter");
 WM_ThreadPoolTerminate := RegisterWindowMessage("TThreadPoolWM_ThreadPoolTerminate");
 WM_ThreadPoolTerminateExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolTerminateExecuter");
 WM_ThreadPoolGetCountQueueJobs := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolGetCountQueueJobs");
 WM_ThreadPoolDestroyQueue := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolDestroyQueue");
 WM_ThreadPoolCreateExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolCreateExecuter");
finalization
end.


 
Игорь Шевченко ©   (2004-10-18 12:53) [97]

panov ©   (16.10.04 18:46) [93]


> Игорь, подскажи, где можно увидеть пример готового и апробированного
> решения для платформы Windows(версии не ниже Win95)?


Я рискну высказать крамольную мысль, но строить серьезную многопоточную систему на потребительских версиях Windows (Win95,Win98,WinME) - это чистой воды буратинизм.

Готовое решение для Win2k есть в книжке Рихтера и Кларка "программирование серверных приложений для Windows 2000.
"


 
panov ©   (2004-10-18 13:10) [98]

>Игорь Шевченко ©   (18.10.04 12:53) [97]

Я рискну высказать крамольную мысль, но строить серьезную многопоточную систему на потребительских версиях Windows (Win95,Win98,WinME) - это чистой воды буратинизм.


С тем, что строить на Win98 серьезную многопоточную систему как сервер приложений не стоит. Тут я согласен.
А вот серьезную многопоточную клиентскую программу, в принципе, почему бы и нет-)

Понятно, что в этом случае надежность ОС не обеспечивает, но для многих клиентских программ достаточная надежность и устойчивость на порядки ниже серверных приложений.

Кроме этого, согласись, что та же WinNT4 дает достаточный уровень надежности, но не предоставляет таких встроенных в систему инструментов.


 
Игорь Шевченко ©   (2004-10-18 13:22) [99]


> А вот серьезную многопоточную клиентскую программу, в принципе,
> почему бы и нет-)
>
> Понятно, что в этом случае надежность ОС не обеспечивает,
> но для многих клиентских программ достаточная надежность
> и устойчивость на порядки ниже серверных приложений.


А пример ? Именно такой клиентской задачи, где требуется пул ?

А насчет NT4 - у меня сейчас книги под рукой нету, но через какое-то время я таки-посмотрю, что за механизмы там используются и есть ли они в NT4


 
panov ©   (2004-10-18 13:28) [100]

>Игорь Шевченко ©   (18.10.04 13:22) [99]

А пример? Именно такой клиентской задачи, где требуется пул ?

Так нет таких задач, где без пула потоков обойтись нельзя.
Ведь это лишь способ организации и управления несколькими потоками.

Такую задачу я в начале топика привел, как пример - многопоточное копирование файлов.

Навскидку могу привести еще примеры - скачивание нескольких файлов одновременно с интернет-сайтов, сканирование сети и пр.


 
Evgeny V ©   (2004-10-18 13:30) [101]


> panov ©   (18.10.04 13:10) [98]


> Кроме этого, согласись, что та же WinNT4 дает достаточный
> уровень надежности, но не предоставляет таких встроенных
> в систему инструментов.


GetQueuedCompletionStatus и PostQueuedCompletionStatus - позволяет удобно организовать пул потоков, в 9x их конечно нет, но по MSDN
Client: Included in Windows XP, Windows 2000 Professional, and Windows NT Workstation 3.5 and later.
Server: Included in Windows Server 2003, Windows 2000 Server, and Windows NT Server 3.5 and later.Header: Declared in Winbase.h; include Windows.h.
Library: Use Kernel32.lib.

В 2000 конечно появились более удобные функции для этой цели - QueueUserWorkItem например


 
panov ©   (2004-10-18 13:38) [102]

>Evgeny V ©   (18.10.04 13:30) [101]

Да, действительно.
Но все же у меня дома W98 и мне хочется, чтобы в этой системе тоже работало-)


 
Игорь Шевченко ©   (2004-10-18 14:06) [103]

panov ©   (18.10.04 13:28) [100]

Пул потоков есть средство ограничения обработки запросов фиксированным количеством потоков, при этом, остальные запросы ставятся в очередь до освобождения одного из потоков - типичная задача для архитерктуры супер-сервер (СУБД), например. Каким образом это поможет на клиенте, честно, не вижу.

Я могу сослаться только на пост [92] о поводе для написания подобного пула - общее самообразование.


 
Владислав ©   (2004-10-19 12:56) [104]

Понадобился пул по работе. Вот что я написал

unit ThreadPool;

interface

uses
 Windows, SysUtils, asLists;

type

 TJobProcedure = procedure(Param: Pointer);

 TThreadPool = class(TObject)
 private
   FJobExistsEvent: DWORD;
   FTerminatedEvent: DWORD;
 private
   FThreadHandles: array of DWORD;
   FJobListCS: TRTLCriticalSection;
   FJobList: TDoubleLinkedList;
   FThreadCount: DWORD;
   FStackSize: DWORD;
   FTerminated: Boolean;
   procedure SetTerminated(const Value: Boolean);
 private
   function Initialize: Boolean;
   procedure Finalize;
   procedure LockJobList;
   procedure UnlockJobList;
   function InsertJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
   procedure RemoveJob(AJob: Pointer);
   function WaitForJobTimeout(Timeout: DWORD; var Proc: TJobProcedure;
     var Param: Pointer): Boolean;
   function WaitForJobInfinite(var Proc: TJobProcedure; var Param: Pointer): Boolean;
 public
   constructor Create(AThreadCount, AStackSize: DWORD);
   destructor Destroy; override;
   function AddJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
   property ThreadCount: DWORD read FThreadCount;
   property StackSize: DWORD read FStackSize;
   property Terminated: Boolean read FTerminated write SetTerminated;
 end;

implementation

type

 PJob = ^TJob;
 TJob = record
   Proc: TJobProcedure;
   Param: Pointer;
   Link: TDoubleLinkedLink
 end;

function ThreadFunc(Param: Pointer): Integer;
var
 LPool: TThreadPool;
 LProc: TJobProcedure;
 LParam: Pointer;
begin
 LPool := Param;
 try
   while not LPool.Terminated do
   begin
     if LPool.WaitForJobInfinite(LProc, LParam) then
     begin
       try
         LProc(LParam)
       except
       end
     end
   end
 except
 end;
 Result := 0
end;

{ TThreadPool }

constructor TThreadPool.Create(AThreadCount, AStackSize: DWORD);
begin
 FThreadCount := AThreadCount;
 FStackSize := AStackSize;
 SetLength(FThreadHandles, FThreadCount);
 if not Initialize then
   RaiseLastOSError
end;

destructor TThreadPool.Destroy;
begin
 Finalize;
 inherited;
end;

function TThreadPool.Initialize: Boolean;
var
 i: Integer;
 LThreadID: DWORD;
begin
 InitializeCriticalSection(FJobListCS);
 FJobExistsEvent := CreateEvent(nil, False, False, nil);
 Result := FJobExistsEvent <> 0;
 if not Result then
   Exit;
 FTerminatedEvent := CreateEvent(nil, True, False, nil);
 Result := FTerminatedEvent <> 0;
 if not Result then
   Exit;
 for i := 0 to Integer(FThreadCount) - 1 do
 begin
   FThreadHandles[i] := BeginThread(nil, FStackSize, @ThreadFunc, Self,
     CREATE_SUSPENDED, LThreadID);
   Result := FThreadHandles[i] <> 0;
   if not Result then
     Break
 end;
 if not Result then
   Terminated := True;
 for i := 0 to Integer(FThreadCount) - 1 do
   if FThreadHandles[i] <> 0 then
     ResumeThread(FThreadHandles[i])
end;

procedure TThreadPool.Finalize;
var
 i: Integer;
 LWaitResult: DWORD;
 LLink: PDoubleLinkedLink;
begin
 Terminated := True;
 for i := 0 to FThreadCount - 1 do
 begin
   if FThreadHandles[i] <> 0 then
   begin
     LWaitResult := WaitForSingleObject(FThreadHandles[i], INFINITE);
     if LWaitResult <> WAIT_OBJECT_0 then
       TerminateThread(FThreadHandles[i], 1);
     CloseHandle(FThreadHandles[i]);
     FThreadHandles[i] := 0
   end
 end;
 while True do
 begin
   LLink := RemoveFromHead(@FJobList);
   if LLink <> nil then
     RemoveJob(LLink^.Data)
   else
     Break
 end;
 if FTerminatedEvent <> 0 then
 begin
   CloseHandle(FTerminatedEvent);
   FTerminatedEvent := 0
 end;
 if FJobExistsEvent <> 0 then
 begin
   CloseHandle(FJobExistsEvent);
   FJobExistsEvent := 0
 end;
 DeleteCriticalSection(FJobListCS);
end;

procedure TThreadPool.LockJobList;
begin
 EnterCriticalSection(FJobListCS);
end;

procedure TThreadPool.UnlockJobList;
begin
 LeaveCriticalSection(FJobListCS);
end;

function TThreadPool.AddJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
begin
 Result := InsertJob(AProc, AParam)
end;

function TThreadPool.InsertJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
var
 LJob: PJob;
begin
 try
   New(LJob)
 except
   SetLastError(ERROR_NOT_ENOUGH_MEMORY);
   Result := False;
   Exit
 end;
 with LJob^ do
 begin
   Proc := AProc;
   Param := AParam;
   with Link do
   begin
     Next := nil;
     Prev := nil;
     Data := LJob
   end
 end;
 LockJobList;
 try
   InsertIntoTail(@FJobList, @LJob^.Link);
   SetEvent(FJobExistsEvent)
 finally
   UnlockJobList
 end;
 Result := True
end;

procedure TThreadPool.RemoveJob(AJob: Pointer);
begin
 Dispose(AJob)
end;

function TThreadPool.WaitForJobTimeout(Timeout: DWORD;
 var Proc: TJobProcedure; var Param: Pointer): Boolean;
var
 LWaitResult: DWORD;
 LLink: PDoubleLinkedLink;
 LJob: PJob;
begin
 Result := False;
 Proc := nil;
 Param := nil;
 LWaitResult := WaitForMultipleObjects(2, @FJobExistsEvent, False, Timeout);
 if LWaitResult = WAIT_OBJECT_0 then
 begin
   if Terminated then
   begin
     SetEvent(FJobExistsEvent);
     Exit
   end;
   LockJobList;
   try
     if FJobList.Size = 0 then
       Exit;
     LLink := RemoveFromHead(@FJobList);
     if FJobList.Size <> 0 then
       SetEvent(FJobExistsEvent)
   finally
     UnlockJobList
   end;
   if LLink <> nil then
   begin
     LJob := LLink^.Data;
     Proc := LJob^.Proc;
     Param := LJob^.Param;
     Result := True;
     RemoveJob(LJob)
   end
 end
end;

function TThreadPool.WaitForJobInfinite(var Proc: TJobProcedure;
 var Param: Pointer): Boolean;
begin
 Result := WaitForJobTimeout(INFINITE, Proc, Param)
end;

procedure TThreadPool.SetTerminated(const Value: Boolean);
begin
 if Value then
 begin
   FTerminated := Value;
   SetEvent(FTerminatedEvent)
 end
end;

end.


 
Игорь Шевченко ©   (2004-10-19 13:59) [105]

Владислав ©   (19.10.04 12:56) [104]

А TThreadList вместо DoubleLinkedList и его обслуги не проще будет применить ?


 
Владислав ©   (2004-10-19 14:56) [106]

В TList будет постоянный Move, а на кой он нужен?


 
Игорь Шевченко ©   (2004-10-19 15:08) [107]

Владислав ©   (19.10.04 12:56) [104]


> В TList будет постоянный Move


Это плохо ?

Зато в коде строчек будет меньше...


 
Владислав ©   (2004-10-19 15:11) [108]

Их и так минимум :)


 
Игорь Шевченко ©   (2004-10-19 15:28) [109]

Владислав ©   (19.10.04 15:11) [108]


> Их и так минимум :)


Это тебе кажется :)

Как минимум - метод WaitForJobTimeOut используется один раз в методе WaitForJobInfinite - смело можно выкидывать и переносить функциональность в WaitForJobInfinite (это первое, что бросилось в глаза).

Кроме того, я бы попробовал сделать с TThreadList - строчек уменьшится, по-моему.


 
Владислав ©   (2004-10-19 16:27) [110]

Конечно, можно и выкинуть и попробовать :)
Только не лежит у меня душа к TList, а оптимизацией я еще вообще не занимался. Будут узкие места, буду оптимизировать.


 
panov ©   (2004-10-27 22:38) [111]

<Offtopic>
Чтобы в архив не уехал, так как работа продолжается...
</Offtopic>



Страницы: 1 2 3 вся ветка

Форум: "Основная";
Текущий архив: 2004.11.14;
Скачать: [xml.tar.bz2];

Наверх




Память: 0.73 MB
Время: 0.039 c
14-1097962722
Кто---то
2004-10-17 01:38
2004.11.14
Есть ли стандартный Диалог с КомбоБоксом ?


1-1098994543
Луарвик
2004-10-29 00:15
2004.11.14
Как юзать memo1.ScrollBy?


3-1098125982
MaxN
2004-10-18 22:59
2004.11.14
Соединение с БД


14-1098458364
Gero
2004-10-22 19:19
2004.11.14
Программы для тестирования железа


1-1099056568
Сергей Г
2004-10-29 17:29
2004.11.14
Доступ к файлу Excel





Afrikaans Albanian Arabic Armenian Azerbaijani Basque Belarusian Bulgarian Catalan Chinese (Simplified) Chinese (Traditional) Croatian Czech Danish Dutch English Estonian Filipino Finnish French
Galician Georgian German Greek Haitian Creole Hebrew Hindi Hungarian Icelandic Indonesian Irish Italian Japanese Korean Latvian Lithuanian Macedonian Malay Maltese Norwegian
Persian Polish Portuguese Romanian Russian Serbian Slovak Slovenian Spanish Swahili Swedish Thai Turkish Ukrainian Urdu Vietnamese Welsh Yiddish Bengali Bosnian
Cebuano Esperanto Gujarati Hausa Hmong Igbo Javanese Kannada Khmer Lao Latin Maori Marathi Mongolian Nepali Punjabi Somali Tamil Telugu Yoruba
Zulu
Английский Французский Немецкий Итальянский Португальский Русский Испанский