Форум: "Основная";
Текущий архив: 2004.11.14;
Скачать: [xml.tar.bz2];
ВнизПул потоков. Найти похожие ветки
← →
Суслик © (2004-10-15 12:33) [80]
> [79] Игорь Шевченко © (15.10.04 12:29)
Определение синхронизации в студию!
← →
Игорь Шевченко © (2004-10-15 12:36) [81]Суслик © (15.10.04 12:33) [80]
Синхронизация - (от греч. synchronos - одновременный), приведение двух или нескольких процессов к синхроннонсти, т. е. к такому их протеканию, когда одинаковые или соответствующие элементы процессов совершаются с неизменным сдвигом по фазе друг относительно друга или одновременно.
http://encycl.yandex.ru/cgi-bin/art.pl?art=bse/00071/31300.htm&encpage=bse&mrkp=/yandbtm7%3Fq%3D-811200565%26p%3D0%26g%3 D0%26d%3D0%26ag%3Denc_abc%26tg%3D1%26p0%3D0%26q0%3D1334144256%26d0%3D0%26script%3D/yandpage%253F
Мог бы и сам посмотреть (с)
← →
panov © (2004-10-15 12:42) [82]>Zelius © (15.10.04 11:36) [76]
Что бы не было вопрос при создании потомков TThread о том что раньше отработает, конструктор с инициализацией всех ресурсов или начнет Execute выполняться, я всегда сначала инициализирую ресурсы и только потом вызываю наследуемый конструктор:
В данном примере это не имеет значения, так как создание очереди сообщений происходит в поточной функции, а не в конструкторе. Вот создания этой очереди и нужно дождаться.
← →
Суслик © (2004-10-15 12:43) [83]
> [81] Игорь Шевченко © (15.10.04 12:36)
не понял:(
> совершаются с неизменным сдвигом по фазе друг относительно
> друга или одновременно.
Т.е. это значит, что синхронизируемый кусок кода должен выполняться в каждый момент не более чем одним потоком? Или нет?
← →
Игорь Шевченко © (2004-10-15 12:50) [84]Суслик © (15.10.04 12:43) [83]
Синхронизация применительно к вычислительным процессам - это гарантированное предоставление монопольного доступа к ресурсу одному процессу в каждый момент времени непротиворечивого состояния ресурса.
Слово процесс для Windows заменяется на поток.
В коде, приведенном тобой, при помощи специального алгоритма, гарантируется непротиворечивое состояние ресурса для каждого потока в любой момент времени.
← →
Суслик © (2004-10-15 12:55) [85]
> [84] Игорь Шевченко © (15.10.04 12:50)
если учитывать такое определение
> Синхронизация применительно к вычислительным процессам -
> это гарантированное предоставление монопольного доступа
> к ресурсу одному процессу в каждый момент времени непротиворечивого
> состояния ресурса.
то полность согласен с этим
> В коде, приведенном тобой, при помощи специального алгоритма,
> гарантируется непротиворечивое состояние ресурса для каждого
> потока в любой момент времени.
Признаю, что скорее всего был не прав в том, что называл приведенный мной пример синхронизацией - это всего лишь корректная многопоточность.
ЗЫ. Тонкое это дело. Есть какие-нибудь книги про всякие приемы многопоточного программирования?
← →
Игорь Шевченко © (2004-10-15 13:04) [86]Суслик © (15.10.04 12:55) [85]
Вот теперь и мы пришли к единому мнению :)
> Есть какие-нибудь книги про всякие приемы многопоточного
> программирования?
Рихтер, как ни странно, программирование для Win32.
У Таненбаума очень хорошо рассказано про концепции.
Я просто с понятием "
> это гарантированное предоставление монопольного доступа
> к ресурсу одному процессу в каждый момент времени непротиворечивого
> состояния ресурса.
"
знаком с 1985 года, если не ошибаюсь, первый раз встретил у Шоу, в "Логическом проектировании операционных систем".
← →
Суслик © (2004-10-15 18:06) [87]Прошу прощения, за реанимирование темы.
Но я бы хотел высказать пожелание автору топика.
Помню когда-то в "Потрепаться" была борьба за фак. Я не помню, кто был инициатором этой работы, но идея была классная - коллективная работа над материалом. К сожалению, у меня тогда не было клиента, поэтому результатов под рукой нет. Но я помню, что в результате работы было проработано некоторое количество вопросов.
Я считал (и считаю), что фак, это хорошо, но не очень жизненно, т.к. вопросы и ответы в основном для ленивых: ответ всегда можно найти в документации.
Другой вопрос, что существуют проблемы, которые имеют определенное название. Например, пул потоков. Пул потоков нельзя отнести в фак, но в тоже время он является полезным и частоприменимым (в определенных областях) классом.
Думаю, что многие со мной согласятся, что коллективная разработка высокоскоростного пула потоков, являтется общественно полезным делом.
Поэтому предложение автору сего топика продолжить обсуждение кода пула потоков.
← →
Игорь Шевченко © (2004-10-15 18:34) [88]
> Пул потоков нельзя отнести в фак, но в тоже время он является
> полезным и частоприменимым (в определенных областях) классом
Например, в каких областях ?
Кстати, в Win2k он уже встроен в систему.
← →
Суслик © (2004-10-15 18:44) [89]
> [88] Игорь Шевченко © (15.10.04 18:34)
Сервер (компьютераная экономическая игра). Многопоточный. Но она реализована очень слабо - маленькая нагрузка, поэтому пулами не пользуемся. А можно было бы.
> Кстати, в Win2k он уже встроен в систему.
Тот же Фаулер пишет (арх. корп. прогр. прил.), что есть команды разработчков, которые предпочитают не пользоваться готовыми решениями. Я из их группы. Также он пишет, что по определенным соображениям такие разработчики имеют право на существование. Логика ясна?
← →
Суслик © (2004-10-15 18:45) [90]А впрочем, не хотите, как хотите. Полезная школа для неновичков была бы.
← →
panov © (2004-10-15 19:32) [91]>Суслик © (15.10.04 18:45) [90]
Так как желание продолжить обсуждение есть, то могу выложить здесь черноввые наброски своего класса(panov © (14.10.04 19:04) [62]), а также модуль сс своей реализацией потока от y-soft(если будет получено разрешение от него).
← →
Игорь Шевченко © (2004-10-15 21:56) [92]
> что есть команды разработчков, которые предпочитают не пользоваться
> готовыми решениями. Я из их группы. Также он пишет, что
> по определенным соображениям такие разработчики имеют право
> на существование. Логика ясна?
Безусловно, ясна. Для собственной ерундиции, разумеется, изобретение велосипедов есть полезное и достойное занятие. (Эт не шутка, а вполне серьезное заявление, не ставящее целью кого-либо обидеть).
Но для промышленной задачи я бы предпочел готовое и опробованное решение, хотя бы в силу его изобретенности (не придется продираться сквозь те грабли, сквозь которые продрались разработчики), отлаженности (в силу тех же обстоятельств) и документированности.
← →
panov © (2004-10-16 18:46) [93]>Игорь Шевченко © (15.10.04 21:56) [92]
Игорь, подскажи, где можно увидеть пример готового и апробированного решения для платформы Windows(версии не ниже Win95)?
← →
panov © (2004-10-18 10:44) [94]Вот почти готовая реализация 2-х классов - пул потоков(TThreadPool), и, собстувенно, поток в пуле(TThrExecuter).
TThrExecuter пока не реализован.
Это костяк, на который далее буду наращивать мясо.
Пока всё это полностью не реализовано, может есть каки-то замечания...
unit uTThreadPool;
interface
uses classes, windows, messages, sysutils;
var
WM_ThreadPoolChangeProc: DWORD; //Изменение поточной процедуры
// А надо ли?
WM_ThreadPoolEndExecuter: DWORD; //Сообщение пулу потоков об окончании дочернего
WM_ThreadPoolTerminate: DWORD; //Сообщение пулу потоков о завершении
WM_ThreadPoolTerminateExecuter: DWORD; //Запрос на удаление потока из пула
WM_ThreadPoolGetCountQueueJobs: DWORD; //Запрос количества заданий в очереди
WM_ThreadPoolDestroyQueue: DWORD; //Запрос пулу на уничтожение очереди заданий
WM_ThreadPoolCreateExecuter: DWORD; //Сообщение пулу потоков о создании нового потока
type
TExecuteProcedure=procedure(const aParam: Pointer);cdecl; //Процедура потока
TClientProcedure=procedure(const aParam: Pointer); //Клиентская процедура потока
TThrExecuter=class;
PParmJob=^TParmjob;
TParmJob=record
Proc: Pointer;
Parm: Pointer;
end;
TThrExecuter=class
private
FHandle: THandle; //Дескриптор потока
FThreadId: THandle; //Идентификатор потока
FEvent: THandle; //ОЯ для ожидания
FSuspended: Boolean;
FJob: TParmJob;
public
constructor Create;
destructor Destroy;override;
procedure StartProc(aThreadProc:TClientProcedure;aParm: pointer);
end;
TThreadPool=class //Пул потоков
private
FHandle: THandle; //Дескриптор потока
FThreadId: THandle; //Идентификатор потока
FEvent: THandle; //ОЯ для ожидания
FPool: array of TThrExecuter; //Пул потоков на исполнение
FListQueue: TList; //Очередь заданий на выполнение
FMaxThreads: Integer; //Максимальное число потоков в пуле
FreeOnTerminate: Boolean; //Автоматическое освобождение
FTerminated: Boolean; //Состояние завершения
FThreadProcedure: TExecuteProcedure; //Процедура потока
FClientProc: TClientProcedure; //Клиентская процедура потока
FClientParam: Pointer; //reserved
FTimeOutWaitReply: Integer; //Таймаут ожидания ответа от поточной функции
function CreateExecuter: TThrExecuter; //Создание потока в пуле
procedure DeleteExecuter(const aIndex: Integer); overload; //Удалить поток в пуле
procedure DeleteExecuter(const aThrExecuter: TThrExecuter); overload; //Удалить поток в пуле
procedure SetMaxThreads(const MaxThreads: Integer); //Изменить максимальное число потоков в пуле
function GetCountQueueJobs: Integer; //Текущее число потоков в пуле
function GetEvent: THandle; //Создать новый ОЯ "Event"
procedure CloseEvent(const aEvent: THandle); //Удалить ОЯ "Event"
procedure DestroyQueue; //Уничтожить очередь заданий
function PostAndWait(const Msg,Param: Integer): Boolean; //Передача сообщения
//в поточную функцию и ожидание обработки сообщения
procedure _ClearQueue; //Очистка очереди заданий и освобождение
public
constructor Create(const aMaxThreads: Integer);
destructor Destroy;override;
procedure AddJob(const aProc: TClientProcedure; const aParm: Pointer); //Добавить задание в очередь
function SetClientProc(const Proc: TClientProcedure): Boolean; //Изменить поточную функцию менеджера
function TerminateJob(aThreadId: THandle): Boolean; //Терминировать выполняющееся задание в пуле
procedure Release; //Закончить работу менеджера
procedure Terminate; //Выдать запрос на окончание работы менеджера
property Handle: THandle read FHandle; //Дескриптор потока менеджера
property ThreadId: THandle read FThreadId; //Идентификатор потока менеджера
property MaxThreads: Integer read FMaxThreads write SetMaxThreads;
property Terminated: Boolean read FTerminated;
property CountQueue: Integer read GetCountQueueJobs;
// property Priority: TThreadPrioroty read GetPriority write SetPriority;
end;
implementation
{ TThr }
procedure ThrExecuterProc(const aParam: Pointer);cdecl;
var
Thread: TThrExecuter;
Msg: TMsg;
begin
Thread := aParam;
PeekMessage(Msg,0,0,0,PM_NOREMOVE);
while GetMessage(Msg,0,0,0) do
begin
if Msg.message=WM_QUIT then
begin
if Assigned(Thread.FJob.Proc) then TExecuteProcedure(Thread.FJob.Proc)(Thread.FJob.Parm);
end;
end;
Dispose(aParam);
ExitThread(0);
end;
constructor TThrExecuter.Create;
begin
inherited;
FHandle := CreateThread(nil,0,@ThrExecuterProc,Self,0,FThreadId);
end;
destructor TThrExecuter.Destroy;
begin
end;
procedure TThrExecuter.StartProc(aThreadProc: TClientProcedure; aParm: Pointer);
begin
end;
← →
panov © (2004-10-18 10:45) [95]Удалено модератором
Примечание: дубль
← →
panov © (2004-10-18 10:45) [96]
{ TThreadPool }
{
Поточная функция менеджера потоков.
Построена на обменом сообщениями между потоком и классом-оберткой.
const aParm: Pointer - Ссылка на класс-обертку TThreadPool
}
procedure ThreadPoolProcedure(const aParm: Pointer);cdecl;
var
ThreadPool: TThreadPool;
Msg: TMsg;
begin
ThreadPool := TThreadPool(aParm); //Ссылка на класс-обертку TThreadPool
PeekMessage(Msg,0,0,0,PM_NOREMOVE); //Создаем очередь сообщений
SetEvent(ThreadPool.FEvent); //Извещаем о готовности к работе
while GetMessage(Msg,0,0,0) do //Выбираем сообщение из очереди
begin
//Запрос на количество ожидающих заданий в очереди
// wParam - ожидающий объект Event
// lParam - адрес для возврата результата (PInteger)
if Msg.message=WM_ThreadPoolGetCountQueueJobs then
begin
PInteger(Msg.lParam)^ := ThreadPool.FListQueue.Count;
SetEvent(Msg.wParam);
Continue;
end;
//Запрос на уничтожение очереди заданий
// wParam - ожидающий объект Event
if Msg.message=WM_ThreadPoolDestroyQueue then
begin
ThreadPool._ClearQueue;
SetEvent(Msg.wParam);
Continue;
end;
//Запрос на изменение поточной функции
// Сразу входим в новую поточную функцию,
// после окончания заканчиваем Выполнения потока
{ TODO -cЗаметка : Добавить возможность "Временного перехода" в клиентскую функцию) }
if Msg.message=WM_ThreadPoolChangeProc then
begin
if Assigned(ThreadPool.FclientProc) then
begin
ThreadPool.FClientProc(aParm);
ExitThread(0);
end;
end;
//Запрос на окончание потока
//
//
if (Msg.message=WM_QUIT) or (Msg.message=WM_ThreadPoolTerminate) then ExitThread(0);
end;
end;
//Добавить в очередь задание
procedure TThreadPool.AddJob(const aProc: TClientProcedure;
const aParm: Pointer);
begin
end;
constructor TThreadPool.Create(const aMaxThreads: Integer);
begin
inherited Create;
IsMultiThread := True; //Установить многопоточный режим
FreeOnTerminate := True; //Автоматически уничтожать объект
FMaxThreads := aMaxThreads; //Установить количество потоков в пуле
FTimeOutWaitReply := 5000; //Установить максимальное время ожидания
// ответа от поточной функции
FThreadProcedure := ThreadPoolProcedure; //Установить поточную функцию
FEvent := CreateEvent(nil,True,False,nil); //Создать ОЯ "Event" для ожидания готовности
if FEvent=0 then raise Exception.Create("Error create Event object"); //Не получилось создать Event
FHandle := CreateThread(nil,0,@FThreadProcedure,Self,0,FThreadId); //Создаем поток для выполнения поточной функции
if FHandle=0 then
begin
raise Exception.Create("TThreadPool:Error CreateThread:"+SysErrorMessage(GetLastError));
end;
//Ждем, пока поточная функция не будет готова принимать запросы
if WaitForSingleObject(FEvent,FTimeOutWaitReply)<>WAIT_OBJECT_0 then
begin
TerminateThread(FHandle,1);
CloseHandle(FHandle);
FHandle := 0;
raise Exception.Create("TThreadPool:Error CreateThread:"+SysErrorMessage(GetLastError));
end;
FListQueue := TList.Create; //Создаем очередь заданий
end;
//Создать поток в пуле потоков
function TThreadPool.CreateExecuter: TThrExecuter;
var
Thread: TThrExecuter;
begin
Thread := nil;
Result := nil;
if FTerminated then Exit;
PostAndWait(WM_ThreadPoolDestroyQueue,Integer(@Thread));
end;
//Удалить поток в пуле потоков по индексу
procedure TThreadPool.DeleteExecuter(const aIndex: Integer);
begin
end;
//Удалить поток в пуле потоков по ссылке на поток TThreadExecuter
procedure TThreadPool.DeleteExecuter(const aThrExecuter: TThrExecuter);
begin
end;
//Уничтожение объекта TThreadPool
destructor TThreadPool.Destroy;
begin
if not FTerminated then Exit;
if FEvent<>0 then CloseHandle(FEvent);
if FHandle<>0 then CloseHandle(FHandle);
inherited;
end;
//Запрос на уничтожение очереди заданий
procedure TThreadPool.DestroyQueue;
begin
PostAndWait(WM_ThreadPoolDestroyQueue,0);
end;
//Запрос количества заданий в очереди
function TThreadPool.GetCountQueueJobs: Integer;
begin
Result := -1;
PostAndWait(WM_ThreadPoolGetCountQueueJobs,Integer(@Result));
end;
//Создать ОЯ "Event"
function TThreadPool.GetEvent: THandle;
begin
Result := CreateEvent(nil,True,False,nil);
end;
//Освободить ОЯ "Event"
procedure TThreadPool.CloseEvent(const aEvent: THandle);
begin
if aEvent<> 0 then CloseHandle(aEvent);
end;
//Запрос поточной функции и ожидание ответа.
function TThreadPool.PostAndWait(const Msg, Param: Integer): Boolean;
var
Event: THandle;
begin
Result := False;
Event := GetEvent;
if Event=0 then Exit;
try
PostThreadMessage(FThreadId,Msg,Event,Param);
WaitForSingleObject(Event,FTimeOutWaitReply);
finally
CloseEvent(Event);
end;
end;
//Окончание работы
procedure TThreadPool.Release;
begin
if FTerminated then Exit;
FTerminated := True;
DestroyQueue;
PostThreadMessage(FThreadId,WM_ThreadPoolTerminate,0,0);
if WaitForSingleObject(FHandle,FTimeOutWaitReply)<>WAIT_OBJECT_0 then
begin
TerminateThread(FHandle,1);
end;
if FreeOnTerminate then Destroy;
end;
function TThreadPool.SetClientProc(const Proc: TClientProcedure): Boolean;
begin
Result := False;
if Terminated then Exit;
if Assigned(FClientProc) then Exit;
FClientProc := Proc;
PostThreadMessage(FThreadId,WM_ThreadPoolChangeProc,0,0);
Result := True;
end;
procedure TThreadPool.SetMaxThreads(const MaxThreads: Integer);
begin
end;
procedure TThreadPool.Terminate;
begin
if FTerminated then Exit;
Release;
end;
function TThreadPool.TerminateJob(aThreadId: THandle): Boolean;
begin
Result := False;
end;
procedure TThreadPool._ClearQueue;
var
i: Integer;
begin
if not FTerminated then Exit;
if not Assigned(FListQueue) then Exit;
for i := 0 to FListQueue.Count-1 do
begin
FListQueue.Delete(i);
end;
FListQueue.Free;
end;
initialization
WM_ThreadPoolChangeProc := RegisterWindowMessage("TThreadPoolWM_ThreadPoolChangeProc");
WM_ThreadPoolEndExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolEndExecuter");
WM_ThreadPoolTerminate := RegisterWindowMessage("TThreadPoolWM_ThreadPoolTerminate");
WM_ThreadPoolTerminateExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolTerminateExecuter");
WM_ThreadPoolGetCountQueueJobs := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolGetCountQueueJobs");
WM_ThreadPoolDestroyQueue := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolDestroyQueue");
WM_ThreadPoolCreateExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolCreateExecuter");
finalization
end.
← →
Игорь Шевченко © (2004-10-18 12:53) [97]panov © (16.10.04 18:46) [93]
> Игорь, подскажи, где можно увидеть пример готового и апробированного
> решения для платформы Windows(версии не ниже Win95)?
Я рискну высказать крамольную мысль, но строить серьезную многопоточную систему на потребительских версиях Windows (Win95,Win98,WinME) - это чистой воды буратинизм.
Готовое решение для Win2k есть в книжке Рихтера и Кларка "программирование серверных приложений для Windows 2000.
"
← →
panov © (2004-10-18 13:10) [98]>Игорь Шевченко © (18.10.04 12:53) [97]
Я рискну высказать крамольную мысль, но строить серьезную многопоточную систему на потребительских версиях Windows (Win95,Win98,WinME) - это чистой воды буратинизм.
С тем, что строить на Win98 серьезную многопоточную систему как сервер приложений не стоит. Тут я согласен.
А вот серьезную многопоточную клиентскую программу, в принципе, почему бы и нет-)
Понятно, что в этом случае надежность ОС не обеспечивает, но для многих клиентских программ достаточная надежность и устойчивость на порядки ниже серверных приложений.
Кроме этого, согласись, что та же WinNT4 дает достаточный уровень надежности, но не предоставляет таких встроенных в систему инструментов.
← →
Игорь Шевченко © (2004-10-18 13:22) [99]
> А вот серьезную многопоточную клиентскую программу, в принципе,
> почему бы и нет-)
>
> Понятно, что в этом случае надежность ОС не обеспечивает,
> но для многих клиентских программ достаточная надежность
> и устойчивость на порядки ниже серверных приложений.
А пример ? Именно такой клиентской задачи, где требуется пул ?
А насчет NT4 - у меня сейчас книги под рукой нету, но через какое-то время я таки-посмотрю, что за механизмы там используются и есть ли они в NT4
← →
panov © (2004-10-18 13:28) [100]>Игорь Шевченко © (18.10.04 13:22) [99]
А пример? Именно такой клиентской задачи, где требуется пул ?
Так нет таких задач, где без пула потоков обойтись нельзя.
Ведь это лишь способ организации и управления несколькими потоками.
Такую задачу я в начале топика привел, как пример - многопоточное копирование файлов.
Навскидку могу привести еще примеры - скачивание нескольких файлов одновременно с интернет-сайтов, сканирование сети и пр.
← →
Evgeny V © (2004-10-18 13:30) [101]
> panov © (18.10.04 13:10) [98]
> Кроме этого, согласись, что та же WinNT4 дает достаточный
> уровень надежности, но не предоставляет таких встроенных
> в систему инструментов.
GetQueuedCompletionStatus и PostQueuedCompletionStatus - позволяет удобно организовать пул потоков, в 9x их конечно нет, но по MSDN
Client: Included in Windows XP, Windows 2000 Professional, and Windows NT Workstation 3.5 and later.
Server: Included in Windows Server 2003, Windows 2000 Server, and Windows NT Server 3.5 and later.Header: Declared in Winbase.h; include Windows.h.
Library: Use Kernel32.lib.
В 2000 конечно появились более удобные функции для этой цели - QueueUserWorkItem например
← →
panov © (2004-10-18 13:38) [102]>Evgeny V © (18.10.04 13:30) [101]
Да, действительно.
Но все же у меня дома W98 и мне хочется, чтобы в этой системе тоже работало-)
← →
Игорь Шевченко © (2004-10-18 14:06) [103]panov © (18.10.04 13:28) [100]
Пул потоков есть средство ограничения обработки запросов фиксированным количеством потоков, при этом, остальные запросы ставятся в очередь до освобождения одного из потоков - типичная задача для архитерктуры супер-сервер (СУБД), например. Каким образом это поможет на клиенте, честно, не вижу.
Я могу сослаться только на пост [92] о поводе для написания подобного пула - общее самообразование.
← →
Владислав © (2004-10-19 12:56) [104]Понадобился пул по работе. Вот что я написал
unit ThreadPool;
interface
uses
Windows, SysUtils, asLists;
type
TJobProcedure = procedure(Param: Pointer);
TThreadPool = class(TObject)
private
FJobExistsEvent: DWORD;
FTerminatedEvent: DWORD;
private
FThreadHandles: array of DWORD;
FJobListCS: TRTLCriticalSection;
FJobList: TDoubleLinkedList;
FThreadCount: DWORD;
FStackSize: DWORD;
FTerminated: Boolean;
procedure SetTerminated(const Value: Boolean);
private
function Initialize: Boolean;
procedure Finalize;
procedure LockJobList;
procedure UnlockJobList;
function InsertJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
procedure RemoveJob(AJob: Pointer);
function WaitForJobTimeout(Timeout: DWORD; var Proc: TJobProcedure;
var Param: Pointer): Boolean;
function WaitForJobInfinite(var Proc: TJobProcedure; var Param: Pointer): Boolean;
public
constructor Create(AThreadCount, AStackSize: DWORD);
destructor Destroy; override;
function AddJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
property ThreadCount: DWORD read FThreadCount;
property StackSize: DWORD read FStackSize;
property Terminated: Boolean read FTerminated write SetTerminated;
end;
implementation
type
PJob = ^TJob;
TJob = record
Proc: TJobProcedure;
Param: Pointer;
Link: TDoubleLinkedLink
end;
function ThreadFunc(Param: Pointer): Integer;
var
LPool: TThreadPool;
LProc: TJobProcedure;
LParam: Pointer;
begin
LPool := Param;
try
while not LPool.Terminated do
begin
if LPool.WaitForJobInfinite(LProc, LParam) then
begin
try
LProc(LParam)
except
end
end
end
except
end;
Result := 0
end;
{ TThreadPool }
constructor TThreadPool.Create(AThreadCount, AStackSize: DWORD);
begin
FThreadCount := AThreadCount;
FStackSize := AStackSize;
SetLength(FThreadHandles, FThreadCount);
if not Initialize then
RaiseLastOSError
end;
destructor TThreadPool.Destroy;
begin
Finalize;
inherited;
end;
function TThreadPool.Initialize: Boolean;
var
i: Integer;
LThreadID: DWORD;
begin
InitializeCriticalSection(FJobListCS);
FJobExistsEvent := CreateEvent(nil, False, False, nil);
Result := FJobExistsEvent <> 0;
if not Result then
Exit;
FTerminatedEvent := CreateEvent(nil, True, False, nil);
Result := FTerminatedEvent <> 0;
if not Result then
Exit;
for i := 0 to Integer(FThreadCount) - 1 do
begin
FThreadHandles[i] := BeginThread(nil, FStackSize, @ThreadFunc, Self,
CREATE_SUSPENDED, LThreadID);
Result := FThreadHandles[i] <> 0;
if not Result then
Break
end;
if not Result then
Terminated := True;
for i := 0 to Integer(FThreadCount) - 1 do
if FThreadHandles[i] <> 0 then
ResumeThread(FThreadHandles[i])
end;
procedure TThreadPool.Finalize;
var
i: Integer;
LWaitResult: DWORD;
LLink: PDoubleLinkedLink;
begin
Terminated := True;
for i := 0 to FThreadCount - 1 do
begin
if FThreadHandles[i] <> 0 then
begin
LWaitResult := WaitForSingleObject(FThreadHandles[i], INFINITE);
if LWaitResult <> WAIT_OBJECT_0 then
TerminateThread(FThreadHandles[i], 1);
CloseHandle(FThreadHandles[i]);
FThreadHandles[i] := 0
end
end;
while True do
begin
LLink := RemoveFromHead(@FJobList);
if LLink <> nil then
RemoveJob(LLink^.Data)
else
Break
end;
if FTerminatedEvent <> 0 then
begin
CloseHandle(FTerminatedEvent);
FTerminatedEvent := 0
end;
if FJobExistsEvent <> 0 then
begin
CloseHandle(FJobExistsEvent);
FJobExistsEvent := 0
end;
DeleteCriticalSection(FJobListCS);
end;
procedure TThreadPool.LockJobList;
begin
EnterCriticalSection(FJobListCS);
end;
procedure TThreadPool.UnlockJobList;
begin
LeaveCriticalSection(FJobListCS);
end;
function TThreadPool.AddJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
begin
Result := InsertJob(AProc, AParam)
end;
function TThreadPool.InsertJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
var
LJob: PJob;
begin
try
New(LJob)
except
SetLastError(ERROR_NOT_ENOUGH_MEMORY);
Result := False;
Exit
end;
with LJob^ do
begin
Proc := AProc;
Param := AParam;
with Link do
begin
Next := nil;
Prev := nil;
Data := LJob
end
end;
LockJobList;
try
InsertIntoTail(@FJobList, @LJob^.Link);
SetEvent(FJobExistsEvent)
finally
UnlockJobList
end;
Result := True
end;
procedure TThreadPool.RemoveJob(AJob: Pointer);
begin
Dispose(AJob)
end;
function TThreadPool.WaitForJobTimeout(Timeout: DWORD;
var Proc: TJobProcedure; var Param: Pointer): Boolean;
var
LWaitResult: DWORD;
LLink: PDoubleLinkedLink;
LJob: PJob;
begin
Result := False;
Proc := nil;
Param := nil;
LWaitResult := WaitForMultipleObjects(2, @FJobExistsEvent, False, Timeout);
if LWaitResult = WAIT_OBJECT_0 then
begin
if Terminated then
begin
SetEvent(FJobExistsEvent);
Exit
end;
LockJobList;
try
if FJobList.Size = 0 then
Exit;
LLink := RemoveFromHead(@FJobList);
if FJobList.Size <> 0 then
SetEvent(FJobExistsEvent)
finally
UnlockJobList
end;
if LLink <> nil then
begin
LJob := LLink^.Data;
Proc := LJob^.Proc;
Param := LJob^.Param;
Result := True;
RemoveJob(LJob)
end
end
end;
function TThreadPool.WaitForJobInfinite(var Proc: TJobProcedure;
var Param: Pointer): Boolean;
begin
Result := WaitForJobTimeout(INFINITE, Proc, Param)
end;
procedure TThreadPool.SetTerminated(const Value: Boolean);
begin
if Value then
begin
FTerminated := Value;
SetEvent(FTerminatedEvent)
end
end;
end.
← →
Игорь Шевченко © (2004-10-19 13:59) [105]Владислав © (19.10.04 12:56) [104]
А TThreadList вместо DoubleLinkedList и его обслуги не проще будет применить ?
← →
Владислав © (2004-10-19 14:56) [106]В TList будет постоянный Move, а на кой он нужен?
← →
Игорь Шевченко © (2004-10-19 15:08) [107]Владислав © (19.10.04 12:56) [104]
> В TList будет постоянный Move
Это плохо ?
Зато в коде строчек будет меньше...
← →
Владислав © (2004-10-19 15:11) [108]Их и так минимум :)
← →
Игорь Шевченко © (2004-10-19 15:28) [109]Владислав © (19.10.04 15:11) [108]
> Их и так минимум :)
Это тебе кажется :)
Как минимум - метод WaitForJobTimeOut используется один раз в методе WaitForJobInfinite - смело можно выкидывать и переносить функциональность в WaitForJobInfinite (это первое, что бросилось в глаза).
Кроме того, я бы попробовал сделать с TThreadList - строчек уменьшится, по-моему.
← →
Владислав © (2004-10-19 16:27) [110]Конечно, можно и выкинуть и попробовать :)
Только не лежит у меня душа к TList, а оптимизацией я еще вообще не занимался. Будут узкие места, буду оптимизировать.
← →
panov © (2004-10-27 22:38) [111]<Offtopic>
Чтобы в архив не уехал, так как работа продолжается...
</Offtopic>
Страницы: 1 2 3 вся ветка
Форум: "Основная";
Текущий архив: 2004.11.14;
Скачать: [xml.tar.bz2];
Память: 0.73 MB
Время: 0.039 c