Форум: "Основная";
Текущий архив: 2004.11.14;
Скачать: [xml.tar.bz2];
ВнизПул потоков. Найти похожие ветки
← →
panov © (2004-10-14 14:34) [0]Создал класс для работы с пулом потоков.
Прошу высказать замечания.
В следующих постингах задам вопросы к коду.
unit uTThreadPool;
interface
uses
Classes, windows, messages;
var
WM_EndThreadExecuter: DWORD; //Сообщение об окончании обработки
WM_AddThread: DWORD; //Сообщение о добавлении элемента в очередь
type
//Тип процедуры для выполнения в отдельном потоке
TThreadProc=procedure(aParm: pointer);
//Структура для передачи параметров
PQueueRec=^TQueueRec;
TQueueRec=record
rProc: TThreadProc;
rParm: Pointer;
end;
//Элемент пула потоков
TThreadExecuter=class(TThread)
private
FThreadIdOwner: THandle; //Родительский поток TThreadPool
FThreadFree: Boolean; //Поток свободен/занят работой
FExecProc: TThreadProc; //Адрес процуедуры для выполнения
FParm: Pointer; //Указатель на параметр,
//Предаваемый в процедуру FExecProc
FNumThread: Integer; //Нмер потока в пуле
protected
procedure Execute; override;
public
//В конструктор передается ThreadId родительского потока TThreadPool
constructor Create(const aOwnerThread: THandle;const aNumThread: Integer);
end;
//Менеджер пула потоков TThreadExecuter
TThreadPool = class(TThread)
private
{ TODO -cВажно :
Добавить код для проверки работоспособности потоков из пула
Зависшие убивать по таймауту }
FSemaphore: THandle; //Семафор-счетчик потоков
FMaxThreads: Integer; //Максимальное число потоков в пуле
FListQueue: TList; //Очередь заданий на выполнение
FPool: array of TThreadExecuter; //Пул потоков
procedure JobAdd(const aProc: TThreadProc;const aParm: Pointer); //Добавление
//заданий в очередь
function StartThreadFromQueue: Boolean; //Передача задания из очереди в пул
//на выполнение
protected
procedure Execute; override;
public
//В конструктор передается максимальное число потоков
constructor Create(aMaxThreads: Integer);
destructor Destroy;override;
//Добавление заданий из внешних потоков в очередь потоков
//Параметры:
// aProc - адрес процедуры для выполнения
// aParm - адрес памяти, по которуму нахлодятся параметры
// для выполнения прцедуры
procedure AddThread(aProc:TThreadProc;aParm:Pointer);
end;
implementation
uses Unit1;
{ TThreadExecuter }
constructor TThreadExecuter.Create(const aOwnerThread: THandle;const aNumThread: Integer);
begin
inherited Create(True);
FreeOnTerminate := True;
FThreadFree := True;
FThreadIdOwner := aOwnerThread;
FNumThread := aNumThread;
Resume;
end;
procedure TThreadExecuter.Execute;
begin
while not Terminated do
begin
Suspend; //Засыпаем сразу
try
if Assigned(FExecProc) then FExecProc(FParm); //Выполнение переданной
//процедуры
except
end;
//Передача в родительский поток сообщения о завершении обработки
PostThreadMessage(FThreadIdOwner,WM_EndThreadExecuter,FNumThread,0);
if Terminated then break;
end;
end;
← →
panov © (2004-10-14 14:35) [1]
{ TThreadPool }
constructor TThreadPool.Create(aMaxThreads: Integer);
begin
inherited Create(True);
//Непонятно пока, как сделать с наименьшими потерями возможность
//полностью инициализироваться потоку(успеть создать очередь
//сообщений), так как высокий приоритет не гарантирует этого
Priority := tpTimeCritical;
FMaxThreads := aMaxThreads; //Максимальное число потоков в пуле
FreeOnTerminate := True;
FListQueue := TList.Create; //Очередь заданий на выполнение
FSemaphore := CreateSemaphore(nil,FMaxThreads,FMaxThreads,nil);
Resume;
end;
destructor TThreadPool.Destroy;
var
i: Integer;
begin
{ TODO -cВажно : Добавить обработку ошибок }
//Чистим очередь заданий
for i := 0 to FListQueue.Count-1 do Dispose(PQueueRec(FListQueue[0]));
FListQueue.Free;
//Очистка пула потоков
for i := 0 to Length(FPool)-1 do FPool[i].Terminate;
CloseHandle(FSemaphore);
inherited;
end;
procedure TThreadPool.Execute;
var
Msg: TMsg; //Приемник сообщений
p: Pointer; //Фиктивный указатель
RC: DWORD; //Код возврата
begin
{ TODO -cВажно : Защитить код try..finally..end }
PeekMessage(Msg,0,0,0,pm_NoRemove); //Создание очереди сообщений
//Возвращаем нормальный приоритет - очередь сообщений создана
Priority := tpNormal;
p := nil;
while not Terminated do
begin
//Ждем появления любого сообщения в очереди сообщений
rc := MsgWaitForMultipleObjects(0,p,False,10,QS_ALLPOSTMESSAGE);
{ TODO -cВажно : Обработать ошибку }
if rc=WAIT_FAILED then
begin
Terminate;
Continue;
end;
if rc= WAIT_TIMEOUT then
begin
if FListQueue.Count>0 then //В очереди есть задания
begin
while StartThreadFromQueue do; //Передать в пул на выполнение
end;
end;
if (rc=WAIT_OBJECT_0) or (PeekMessage(Msg,0,0,0,pm_NoRemove)) then
begin
GetMessage(Msg,0,0,0);
if Msg.message=WM_QUIT then
begin
Terminate;
Continue;
end;
if Msg.message=WM_EndThreadExecuter then
begin
FPool[Msg.wParam].FThreadFree := True; //Закончилось выполнение
//задания в пуле
ReleaseSemaphore(FSemaphore,1,nil); //Увеличить счетчик свободных
//потоков в пуле
Continue;
end;
if Msg.message=WM_AddThread then
begin
JobAdd(Pointer(Msg.wParam),Pointer(Msg.lParam)); //Добавить задание в очередь
while StartThreadFromQueue do; //Передать в пул на выполнение
Continue;
end;
end;
end;
end;
procedure TThreadPool.JobAdd(const aProc: TThreadProc;const aParm: Pointer);
var
JobRec: PQueueRec;
begin
New(JobRec);
JobRec^.rProc := aProc;
JobRec^.rParm := aParm;
FListQueue.Add(JobRec);
end;
function TThreadPool.StartThreadFromQueue: Boolean;
var
RC: Integer;
i: Integer;
begin
Result := False;
if FListQueue.Count=0 then Exit; //В очереди нет заданий - выход
RC := WaitForSingleObject(FSemaphore,0); //Есть свободные потоки в пуле?
case RC of
WAIT_OBJECT_0: //Есть свободные потоки в пуле
begin
for i :=0 to Length(FPool)-1 do
begin
if FPool[i].FThreadFree then
begin
FPool[i].FThreadFree := False; //Флажок "не свободен"
FPool[i].FExecProc := PQueueRec(FListQueue[0]).rProc;
FPool[i].FParm := PQueueRec(FListQueue[0]).rParm;
Dispose(PQueueRec(FListQueue[0]));
FListQueue.Delete(0);
Result := True;
FPool[i].Resume; //Разбудить поток для выполнения
Exit;
end;
end;
if Length(FPool)<FMaxThreads then //В пуле еще не максимальное
begin //количество потоков?
i := Length(FPool);
SetLength(FPool,i+1); //Добавляем поток в пул
FPool[i] := TThreadExecuter.Create(Self.ThreadID,i);
end;
end;
end;
end;
procedure TThreadPool.AddThread(aProc: TThreadProc; aParm: Pointer);
begin
//Пересылаем в очередь сообщений сообщение о новом задании
PostThreadMessage(Self.ThreadID,WM_AddThread,Integer(@aProc),Integer(aParm));
end;
initialization
//Регистрируем сообщение об окончании обработки задания
WM_EndThreadExecuter := RegisterWindowMessage("TThreadPoolWM_EndThreadExecuter");
//Регистрируем сообщение о появлении нового задания на обработку
WM_AddThread := RegisterWindowMessage("TThreadPoolWM_AddThread");
finalization
end.
Вызов происходит следующим образом(Создавал для тестирования):
type
PParmRec=^TParmRec;
TParmRec=record
n: Integer;
TimeOut: integer;
end;
procedure Exec1(aParm: pointer);
var
Parm: PParmRec;
begin
Parm := aParm;
Sleep(Parm.TimeOut);
Form1.lb.Items.Add("End "+IntToStr(Parm^.n)+"/"+IntToStr(Parm^.TimeOut));
Dispose(Parm);
end;
procedure TForm1.Button2Click(Sender: TObject);
var
Parm: PParmRec;
i: Integer;
begin
Randomize;
M := TThreadPool.Create(5);
for i := 0 to 100 do
begin
New(Parm);
Parm.n := i;
Parm.TimeOut := (Random(10)+1)*30;
M.AddThread(Exec1,Parm);
end;
end;
← →
Суслик © (2004-10-14 14:43) [2]Замечание 1.
> constructor TThreadExecuter.Create(const aOwnerThread: THandle;const
> aNumThread: Integer);
> begin
> inherited Create(True);
> FreeOnTerminate := True;
> FThreadFree := True;
> FThreadIdOwner := aOwnerThread;
> FNumThread := aNumThread;
> Resume;
> end;
В зависимости от версии дельфи делать Resume в конструкторе не очень хорошо. Если поток отработает быстро, то будет ошибка в AfterContructrion, который в дельфи 6 выглядит такprocedure TThread.AfterConstruction;
begin
if not FCreateSuspended then
Resume;
end;
Я бы написал просто такconstructor TThreadExecuter.Create(const aOwnerThread: THandle;const aNumThread: Integer);
begin
inherited Create(False);
FreeOnTerminate := True;
FThreadFree := True;
FThreadIdOwner := aOwnerThread;
FNumThread := aNumThread;
end;
← →
panov © (2004-10-14 14:46) [3]Вопрос:
После выполнения конструктора из TThreadPool проходит некоторое время, прежде чем будет создана очередь сообщений для потока.
ПослеM := TThreadPool.Create(5)
поток использовать нельзя до создания очереди сообщений.
Как лучше обработать эту ситуацию?
Сейчас временно в конструкторе присваивается высокий приоритет tpTimeCritical, а в методе Execute возвращается в исходное состояние.
← →
Суслик © (2004-10-14 14:49) [4]Замечание-вопрос 2
Почему в TThreadExecuter.Execute "глушится" exception? Не лучше ли бы было его как-то обработать?
Замечание 3.
Аналог первого замечания для TThreadPool.Create. Хотя, здесь эта маловероятна ошибка.
Замечание 4.
В destructor TThreadPool.Destroy почему пишешьfor i := 0 to FListQueue.Count-1 do Dispose(PQueueRec(FListQueue[0]));
а нетif Assigned(FListQueue) then for i := 0 to FListQueue.Count-1 do Dispose(PQueueRec(FListQueue[0]));
?
пока все - дальше времени нет смотреть :)
← →
panov © (2004-10-14 14:49) [5]>Суслик © (14.10.04 14:43) [2]
constructor TThreadExecuter.Create(const aOwnerThread: THandle;const aNumThread: Integer);
begin
inherited Create(False);
FreeOnTerminate := True;
FThreadFree := True;
FThreadIdOwner := aOwnerThread;
FNumThread := aNumThread;
end
Так делать нельзя.
Послеinherited Create(False);
немедленно стартует методExecute
, соответственно, остаются непроинициализированными переменные
FreeOnTerminate := True;
FThreadFree := True;
FThreadIdOwner := aOwnerThread;
FNumThread := aNumThread;
и метод Execute будет обращаться к еще неинициализированным переменным.
← →
Суслик © (2004-10-14 14:53) [6]
> Вопрос:
> После выполнения конструктора из TThreadPool проходит некоторое
> время, прежде чем будет создана очередь сообщений для потока.
> После M := TThreadPool.Create(5) поток использовать нельзя
> до создания очереди сообщений.
Я бы сделал так.
1) Инстанцировал TThreadPool например, через метод класса TThreadPool. Например так
class function TThreadPool.Counsturct: TTHreadPool;
begin
Result := TThreadPool.Create(...);
SomeEvent.WaitFor;
end;
2) Событие SomeEvent устанавливал бы в signaled в TThreadPool.Execute после создания очереди потока.
← →
panov © (2004-10-14 14:54) [7]>Суслик © (14.10.04 14:49) [4]
Замечание-вопрос 2
Почему в TThreadExecuter.Execute "глушится" exception? Не лучше ли бы было его как-то обработать?
Замечание 3.
Скорее всего, обработка будет Exception будет добавлена.
К сожалению, для реализации стандартных потоков(TThread) в Delphi нельзя сделать Rerasing(хотя у меня есть сторонняя реализация от y-soft, которую я обязательно разберу).
Замечание 4.
Хорошее замечание, исправлю.
← →
Суслик © (2004-10-14 14:54) [8]
> [5] panov © (14.10.04 14:49)
> Так делать нельзя.
Ты не прав. Именно так и надо делать в дельфи 6. Не веришь? См. исходный код TThread.Create, AfterContruction и FCreateSuspended
← →
Суслик © (2004-10-14 14:56) [9]
> К сожалению, для реализации стандартных потоков(TThread)
> в Delphi нельзя сделать Rerasing
Что значит нельзя?
Используешь AcquireExceptionObject и делаешь reraise как угодно.
См. исходный код classes для класса tthread.
← →
panov © (2004-10-14 14:59) [10]>Суслик © (14.10.04 14:54) [8]
Привожу код TThread.Create:
constructor TThread.Create(CreateSuspended: Boolean);
begin
inherited Create;
AddThread;
FSuspended := CreateSuspended;
FCreateSuspended := CreateSuspended;
FHandle := BeginThread(nil, 0, @ThreadProc, Pointer(Self), CREATE_SUSPENDED, FThreadID);
if FHandle = 0 then
raise EThread.CreateResFmt(@SThreadCreateError, [SysErrorMessage(GetLastError)]);
end;
После выполнения выделенной строки выполнение поточной функции начнется немедленно.
← →
Суслик © (2004-10-14 15:02) [11]
> [10] panov © (14.10.04 14:59)
> После выполнения выделенной строки выполнение поточной функции
> начнется немедленно.
ну бог в помощь :)
> CREATE_SUSPENDED
а этааа что?
← →
panov © (2004-10-14 15:07) [12]>Суслик © (14.10.04 15:02) [11]
С AfterConstruction сейчас проверю
← →
Суслик © (2004-10-14 15:09) [13]
> [12] panov © (14.10.04 15:07)
из всего, что я сказал, скорее всего был не прав в этой фразе "Именно так и надо делать в дельфи 6." Тут надо еще подумать.
Но, то, что resume в коснтурторе делать нехорошо в Д6 это точно. Спасибо sha. Навел меня как-то на эту мысль.
---------------
Как решение обозначенной в вопросе проблемы с событием? Оно точно поможет, я всегда так делаю.
← →
Суслик © (2004-10-14 15:14) [14]Я бы сюда
if Msg.message=WM_EndThreadExecuter then
begin
FPool[Msg.wParam].FThreadFree := True; //Закончилось выполнение
//задания в пуле
ReleaseSemaphore(FSemaphore,1,nil); //Увеличить счетчик свободных
//потоков в пуле
Continue;
end;
перед (или вместо) continue добавил бы вот этот кусочек
if FListQueue.Count>0 then //В очереди есть задания
begin
while StartThreadFromQueue do; //Передать в пул на выполнение
end;
Это позволит не ждать 10 мс в случае если в очереди есть необработанные запросы.
← →
panov © (2004-10-14 15:27) [15]>Суслик © (14.10.04 15:14) [14]
Спасибо за совет, добавлюwhile StartThreadFromQueue do;
вместо if FListQueue>0 while...
← →
Суслик © (2004-10-14 15:28) [16]Еще
destructor TThreadPool.Destroy;
var
i: Integer;
begin
...
//Очистка пула потоков
for i := 0 to Length(FPool)-1 do FPool[i].Terminate; ...
end;
О какой очистке пула может идти речь, если потоки в нерабочем состоянии находятся в режиме suspended.
Я бы лучше не делал потокам resume\suspend, а сажал их на ожидаение tevent
← →
Суслик © (2004-10-14 15:31) [17]Еще
в function TThreadPool.StartThreadFromQueue: Boolean;
в случае занятости делом всех потоков и недостижения макс. кол-ва потоков не верен будет результат: result будет равен false! Т.е. не все потоки будут запущены.
← →
panov © (2004-10-14 15:33) [18]Метод с inherited(False) не приводит к нужному результату.
СтрокаPostThreadMessage(Self.ThreadID,WM_AddThread,Integer(@aProc),Integer(aParm));
Выполняется раньше, чем код в ExecutePeekMessage(Msg,0,0,0,pm_NoRemove);
В результате выполнения
M := TThreadPool.Create(5);
for i := 0 to 100 do
begin
New(Parm);
Parm.n := i;
Parm.TimeOut := (Random(10)+1)*30;
M.AddThread(Exec1,Parm);
end;
т.е. обращение к методу AddThread начинается раньше, чем стартует метод Execute.
← →
panov © (2004-10-14 15:35) [19]>Суслик © (14.10.04 15:28) [16]
С терминированием - спасибо, упустил из вида.
for i := 0 to Length(FPool)-1 do
begin
FPool[i].Terminate;
FPool[i].Resume;
end;
← →
Суслик © (2004-10-14 15:37) [20]
> [18] panov © (14.10.04 15:33)
Посмотри на ответ [6].
Там приведено общее решение: в главном потоке ждать TEvent, который будет установлен после создания очереди.
← →
Суслик © (2004-10-14 15:39) [21]
> [20] Суслик © (14.10.04 15:37)
поясненияvar
E: TEvent;
// Главный поток
pooler := TPool.Create();
E.WaitFor;
// Поток-элемент пулера
procedure TThreadExecutor.Execute;
begin
Создание очереди
E.SetEvent;
...
end;
Все.
← →
Суслик © (2004-10-14 15:42) [22]
> [19] panov © (14.10.04 15:35)
Плохо.
А если поток уже делом занят?
Будет двойной вызов resume.
Лучше пользоваться event"ом.
← →
panov © (2004-10-14 15:44) [23]>Суслик © (14.10.04 15:39) [21]
Я понимаю, что можно использовать объекты ядра для ожидания.
Но нет ли более быстрого решения?
Создание и использование таких объектов занимает время и ресурсы.
Мне кажется, что для единовременного использования это неоправданно.
>Суслик © (14.10.04 15:31) [17]
Спасибо.
Действительно, не обрабатывался WAIT_TIMEOUT;
Изменил, теперь выглядит так:function TThreadPool.StartThreadFromQueue: Boolean;
var
RC: Integer;
i: Integer;
begin
Result := False;
if FListQueue.Count=0 then Exit; //? ??????? ??? ??????? - ?????
RC := WaitForSingleObject(FSemaphore,0);
case RC of
WAIT_OBJECT_0:
begin
for i :=0 to Length(FPool)-1 do
begin
if FPool[i].FThreadFree then
begin
FPool[i].FThreadFree := False;
FPool[i].FExecProc := PQueueRec(FListQueue[0]).rProc;
FPool[i].FParm := PQueueRec(FListQueue[0]).rParm;
Dispose(PQueueRec(FListQueue[0]));
FListQueue.Delete(0);
Result := True;
FPool[i].Resume;
Exit;
end;
end;
NewThread;
end;
WAIT_TIMEOUT:
begin
NewThread;
end;
end;
end;
procedure TThreadPool.NewThread;
var
i: Integer;
begin
if Length(FPool)<FMaxThreads then
begin
i := Length(FPool);
SetLength(FPool,i+1);
FPool[i] := TThreadExecuter.Create(Self.ThreadID,i);
end;
end;
← →
panov © (2004-10-14 15:48) [24]>Суслик © (14.10.04 15:42) [22]
А какая разница, сколько раз Resume вызывать?
По поводу [14] и [15] -
if FListQueue.Count>0 then //В очереди есть задания
begin
while StartThreadFromQueue do; //Передать в пул на выполнение
end;
Та же самая ситуация возникает, что и с TThreadPoool: поток не успевает начать выполнять метод TThreadExecuter.Execute
← →
Суслик © (2004-10-14 15:55) [25]
> [23] panov © (14.10.04 15:44)
> Действительно, не обрабатывался WAIT_TIMEOUT;
Я про это не говорил - пропустил.
Ошибка то осталась.WAIT_OBJECT_0:
begin
for i :=0 to Length(FPool)-1 do
begin
if FPool[i].FThreadFree then
begin
FPool[i].FThreadFree := False;
FPool[i].FExecProc := PQueueRec(FListQueue[0]).rProc;
FPool[i].FParm := PQueueRec(FListQueue[0]).rParm;
Dispose(PQueueRec(FListQueue[0]));
FListQueue.Delete(0);
Result := True;
FPool[i].Resume;
Exit;
end;
end;
NewThread;
здесь не задается Result в случае занятости всех потоков, НЕдостижения максимума потоков и наличия заданий, а должен т.к. иначе при определенных условиях не будет правильно отрабатывать while StartThreadFromQueue do; end;
Про медленность объектов ядра:
извини уж, это полная как бы ерунда. Твой код в определенных случаях от потери производительности страдал (сейчас вроде исправил) много больше, чем он может пострадать от объектов ядра.
Все же, есть способы объходиться без ОЯ. Например см. класс TMultiReadExclusiveWriteSynchronizer. Там реализована очень скорострельная синхронизация БЕЗ объектов ядра. Если интересно смотри его.
← →
Суслик © (2004-10-14 15:58) [26]
> Та же самая ситуация возникает, что и с TThreadPoool: поток
> не успевает начать выполнять метод TThreadExecuter.Execute
С чего это? TThreadExecuter уже создан. С чего ему не успевать что-то делать.
> А какая разница, сколько раз Resume вызывать?
Согласен, что в данном случае никакой.
← →
panov © (2004-10-14 15:59) [27]>Суслик © (14.10.04 15:55) [25]
Функция StartThreadFromQueue должна возвращать True в единственном случае - при передаче задания на выполнение.
В остальных случаях возвращается False.
← →
panov © (2004-10-14 16:01) [28]>Суслик © (14.10.04 15:58) [26]
С чего это? TThreadExecuter уже создан. С чего ему не успевать что-то делать.
Как это с чего?
Пул создается не весь сразу, а по необходимости до достижения максимального количества.
← →
Суслик © (2004-10-14 16:01) [29]
> [27] panov © (14.10.04 15:59)
> Функция StartThreadFromQueue должна возвращать True в единственном
> случае - при передаче задания на выполнение.
> В остальных случаях возвращается False.
рассмотрел :)) Виноват:)
← →
Суслик © (2004-10-14 16:03) [30]
> [28] panov © (14.10.04 16:01)
предлагаю следующее.
1. Учесть те замечания в которых я оказался прав.
2. Реализовать синхронизацию в целях обеспечения своевременности создания очереди.
3. Выложить все в полном виде.
Тогда продолжим разговор. Ок?
← →
Суслик © (2004-10-14 16:07) [31]И вообще - на фига тут FSemaphore: он же используется в контексте одного потока?
← →
Суслик © (2004-10-14 16:12) [32]
> [27] panov © (14.10.04 15:59)
> >Суслик © (14.10.04 15:55) [25]
>
> Функция StartThreadFromQueue должна возвращать True в единственном
> случае - при передаче задания на выполнение.
> В остальных случаях возвращается False.
Странная какая-то эта функция StartThreadFromQueue.
Ей говорят - запусти задание из очереди. Она - ок! Все потоки заняты, но макс. количества потоков не достигнут. Она создает новый поток и возврщает false. Тем самым говоря, клиенту (вызывающей стороне), то все, что могла она запустила. Странно как-то: поток есть (только что создан), задание есть, а вот выполнять его никто ПОКА не собирается - ждем следующего цикла.
← →
panov © (2004-10-14 16:20) [33]>Суслик © (14.10.04 16:03) [30]
2. Реализовать синхронизацию в целях обеспечения своевременности создания очереди.
Как раз это и хотелось обсудить.
Может быть, у кого еще будут предложения по синхронизации без использования объектов ядра, подобных Event
>Суслик © (14.10.04 16:07) [31]
И вообще - на фига тут FSemaphore: он же используется в контексте одного потока?
Семафор удобен для счетчика потоков в пуле и сигнализирования о том, что есть свободные потоки.
>Суслик © (14.10.04 16:12) [32]
Странная какая-то эта функция StartThreadFromQueue.
Как раз из-за проблем с синхронизацией она такая-)
← →
Суслик © (2004-10-14 16:24) [34]
> Может быть, у кого еще будут предложения по синхронизации
> без использования объектов ядра, подобных Event
Ну, Александр, я же привел пример TMultiReadExclusiveWriteSynchronizer. Там то, что нужно. Только разбираться долго надо. Там нет объектов ядра, но есть синхронизация. Там все сделано на interloced функциях.
> >Суслик © (14.10.04 16:07) [31]
> Семафор удобен для счетчика потоков в пуле и сигнализирования
> о том, что есть свободные потоки.
Не, ну вы гляньте :))) Не хочется использовать ОЯ (медленные), а вот семафор (тоже ОЯ) для целей, где он не нужен используется радостно. Не пойму. На фига он тут нужен - к нему обращается всего один поток!
> Как раз из-за проблем с синхронизацией она такая-)
Никаких проблем нет. Какая синхронизация? Потоками управляет ровно один поток - пуллер. Что хош, то и делай там.
← →
Суслик © (2004-10-14 16:33) [35]у рихтера есть про очередь потоков. Может там стоит посмотреть?
← →
panov © (2004-10-14 16:56) [36]Никаких проблем нет. Какая синхронизация? Потоками управляет ровно один поток - пуллер. Что хош, то и делай там.
Уже обсуждали, что Execute выполняется позде, чем происходит обюращение к прочим методам объекта.
← →
Суслик © (2004-10-14 17:01) [37]
> Уже обсуждали, что Execute выполняется позде, чем происходит
> обюращение к прочим методам объекта.
ух... сложно как-то. Все-таки предложение увидеть модифицированный код, и продолжить обсуждение от новой отправной точки.
← →
Суслик © (2004-10-14 17:07) [38]Мне как-то не очень понятно, что эта за задача такая, где требуется такое титаническое быстродействие?
← →
Суслик © (2004-10-14 17:09) [39]
> [36] panov © (14.10.04 16:56)
> Никаких проблем нет. Какая синхронизация? Потоками управляет
> ровно один поток - пуллер. Что хош, то и делай там.
>
> Уже обсуждали, что Execute выполняется позде, чем происходит
> обюращение к прочим методам объекта.
И вообще, никакие уловки, кроме как применять методы синхронизации, не спасут от проблемы, когда поток начинает использоваться раньше, чем начинает работать его поточная ф-я. Вот.
← →
panov © (2004-10-14 17:24) [40]Кстати, насчет TMultiReadExclusiveWriteSynchronize.
Как я понял, единственное, что мне даст, это то, что мне не надо несколько критических секций использовать, и объекты для ожидания создаются статически всего 2.
← →
Суслик © (2004-10-14 17:27) [41]
> [40] panov © (14.10.04 17:24)
я не говорил, что его надо исполозовать. Я говорил, что в его коде используется синхронизация без ОЯ. Вот и все. Причем такая синхронизация, которую не сразу и поймешь. Я вроде разобрался. Но вопротить вряд ли смогу.
Все же хотелось бы услышать ответ на вопрос [38]. Может устроить тесты и выяснить, что собственно ОЯ вполне удовлетворяют? Я бы начал с этого. Я когда-то делал такое сравнение. И скажу честно, что я не смог придумать задачу для своих целей, где бы скорострельности event"ов мне бы не хватало.
← →
panov © (2004-10-14 17:29) [42]>Суслик © (14.10.04 17:27) [41]
Смысл ведь имеет не конкретная задача, а наиболее отимальное универсальное решение.
Например, для моей задачи(а это копирование файлов в нескольких потоках) такое замедление не будет иметь последствия.
Если прикинуть, то такая задача "на скорострельность" может иметь применение при расчетах в графических динамичных приложениях...
← →
Суслик © (2004-10-14 17:31) [43]
> [42] panov © (14.10.04 17:29)
Оптимальность, понятие неоднозначное. Я уверен, что в текущей задаче путь с использование ОЯ будет наиболее оптимальный, т.к.:
1. Позволит доделать класс за 10 мин.
2. Точно не окажет негативного влияния на скорость.
← →
Игорь Шевченко © (2004-10-14 18:07) [44]
> Все же, есть способы объходиться без ОЯ. Например см. класс
> TMultiReadExclusiveWriteSynchronizer. Там реализована очень
> скорострельная синхронизация БЕЗ объектов ядра. Если интересно
> смотри его.
constructor TMultiReadExclusiveWriteSynchronizer.Create;
begin
inherited Create;
InitializeCriticalSection(FLock);
FReadExit := CreateEvent(nil, True, True, nil); // manual reset, start signaled
SetLength(FActiveThreads, 4);
end;
Типа того, что CreateEvent - это уже не создание объекта ядра.
← →
Игорь Шевченко © (2004-10-14 18:08) [45]
> Создал класс для работы с пулом потоков.
Рихтер тоже создал, только на основе IOCompletion, если мне память не изменяет.
← →
Суслик © (2004-10-14 18:11) [46]
> [44] Игорь Шевченко © (14.10.04 18:07)
Игорь, хватит придираться, ты прекрасно понимаешь о чем я? Нет?
Тогда напомню, что я тебя спрашивал как работает этот класс, ты сказал, что там ничего сложного. Если так, то ты не можешь не знать, что многие действия, для которых можно было бы применить ОЯ для синхронизации обходятся без оных. В качесвте примера см. TThreadLocalCounter.
Мое утверждение было о том, что при синхронизации не обязательно использовать ОЯ. И класс был приведен в качестве пример такой реализации.
Возражения есть?
← →
panov © (2004-10-14 18:16) [47]>Игорь Шевченко © (14.10.04 18:08) [45]
Ну можно, конечно, и на основе портов ввода-вывода, только работать в w98 они не будут...
← →
Игорь Шевченко © (2004-10-14 18:20) [48]Суслик © (14.10.04 18:11) [46]
> Возражения есть?
Есть.
Посмотри, как реализованы критические секции и зачем там объект ядра LockSemaphore.
← →
Суслик © (2004-10-14 18:30) [49]
> Посмотри, как реализованы критические секции и зачем там
> объект ядра LockSemaphore.
Где?
Есть не ошибаюсь, критические секции доходят до использования ОЯ только в случае коллизий. Источник: рихтер, win для профи, стр 202. Т.е. большинство операций выполняются в польз. режиме.
В случае же использования ОЯ переход в реж. ядра происходит сразу. Истоник: тот же. + опыт сранения быстродействия.
← →
Verg © (2004-10-14 18:37) [50]
> Есть не ошибаюсь, критические секции доходят до использования
> ОЯ только в случае коллизий.
Не путаете ли вы понятия "обходиться без" и "использовать эффективно".
Если КС использует ОЯ эффективно, это не означает, что она их вообще не использует.
← →
Суслик © (2004-10-14 18:38) [51]
> [50] Verg © (14.10.04 18:37)
может и путаю. Но опыт сравнения быстродействия mutex и кс в свое время сказал мне о многом.
← →
Игорь Шевченко © (2004-10-14 18:42) [52]Суслик © (14.10.04 18:30) [49]
Что есть коллизия по Рихтеру ?
← →
Суслик © (2004-10-14 18:45) [53]
> Что есть коллизия по Рихтеру ?
Сначала на вопрос ответь, где посмотреть реализацию КС?
------------
Microsoft повысила быстродействие критических секций, включив в них спин блокировку Теперь, когда Вы вызываете EnterCriticalSection, она выполняет заданное число циклов спин-блокировки, пытаясь получить доступ к ресурсу и лишь в том случае, когда все попытки закапчиваются неудачно, функция переводит поток в ре жим ядра, где он будет находиться в состоянии ожидания.
← →
Суслик © (2004-10-14 18:47) [54]
> [52] Игорь Шевченко © (14.10.04 18:42)
Вместо того, чтобы ко мне придираться (:))) ответил бы Панову - на первый вопрос [23]. Все польза была бы.
← →
Игорь Шевченко © (2004-10-14 18:48) [55]Суслик © (14.10.04 18:30) [49]
Пример:unit main;
interface
uses
Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,
StdCtrls;
type
TForm1 = class(TForm)
Button1: TButton;
procedure Button1Click(Sender: TObject);
end;
var
Form1: TForm1;
implementation
uses
Thread;
{$R *.DFM}
procedure TForm1.Button1Click(Sender: TObject);
begin
InitializeCriticalSection(CS);
TMyThread.Create (false);
TMyThread.Create (false);
end;
end.
иunit Thread;
interface
uses
Classes,
Windows;
type
TMyThread = class(TThread)
private
protected
procedure Execute; override;
end;
var
CS: TRtlCriticalSection;
implementation
{ TMyThread }
procedure TMyThread.Execute;
begin
EnterCriticalSection(CS);
end;
end.
Запусти и посмотри количество объектов ядра до нажатия кнопки и после. Предупреждаю, процесс лучше снимать из среды, по Ctrl+F2
← →
Игорь Шевченко © (2004-10-14 18:49) [56]Суслик © (14.10.04 18:45) [53]
> Сначала на вопрос ответь, где посмотреть реализацию КС?
windows.pas разумеется
← →
Verg © (2004-10-14 18:51) [57]
> [51] Суслик © (14.10.04 18:38)
>
> > [50] Verg © (14.10.04 18:37)
>
> может и путаю. Но опыт сравнения быстродействия mutex и
> кс в свое время сказал мне о многом.
Не, ну сравнивать производительнось при различных способах использования ОЯ - это уже отдельный вопрос.
Вроде шла речь об обеспечении синхронизации БЕЗ использования ОЯ вообще?
> Мое утверждение было о том, что при синхронизации не обязательно
> использовать ОЯ.
Вот и интересно как?
while lacked do sleep(1)
?
Кстати, Mutex хорош своим ObjectName, т.е. он именованый, а значит синхронизировать по "принципу КС" можно потоки из разных процессов. Вот и все, если этого не требуется, то используй КС. Конечно она эффективнее в таком случае
← →
Суслик © (2004-10-14 18:53) [58]
> Запусти и посмотри количество объектов ядра до нажатия кнопки
> и после. Предупреждаю, процесс лучше снимать из среды, по
> Ctrl+F2
Как?
Никогда не делал.
← →
ЮрийК © (2004-10-14 18:53) [59]Просьба выложить окнчательный вариант ваших изысканий, как только все выскажут свои замечания.
← →
Суслик © (2004-10-14 18:56) [60]
> [57] Verg © (14.10.04 18:51)
Interlocedфункции, они же не ОЯ.
И в тоже время позволяют делать синхнонизацию. Класс TThreadLocalCounter например.
Контрольный вопрос.
В классе TThreadLocalCounter используются методы синхронизации. Например, TThreadLocalCounter.Open. Есть ли здесь ОЯ?
← →
Суслик © (2004-10-14 19:00) [61]
> [57] Verg © (14.10.04 18:51)
> Вот и интересно как?
> while lacked do sleep(1)
Например. interlocked функции еще есть.
← →
panov © (2004-10-14 19:04) [62]>ЮрийК © (14.10.04 18:53) [59]
Просьба выложить окнчательный вариант ваших изысканий, как только все выскажут свои замечания.
На самом деле, похоже, лучше эту задачу реализовать без использования TThread, что я сейчас и буду пробовать.
Конечно, вопрос с максимально работой и ожиданием без спин-блокировки все равно остается.
Но к нему придется вернуться после первых набросков.
← →
panov © (2004-10-14 19:05) [63]>Суслик © (14.10.04 19:00) [61]
Например. interlocked функции еще есть.
InterLocked-функции могут быть применены только для спин-блокировки(активного ожидания).
← →
Суслик © (2004-10-14 19:08) [64]
> InterLocked-функции могут быть применены только для спин-блокировки(активного
> ожидания).
ну да, но это же лучше чем бегать из режима ядра и обратно, как бывает при использовании ОЯ?
ЗЫ
Скаже всем, что понятия не имею, что такое режим ядра. Знаю только, что в него надо переходить для использования ОЯ и это дорогостоящая операция (несколько 1000 тактов). Это я говорю для того, чтобы не приписвать себе знания, коих нет. Но думаю понимание того, что лучше режим ядра избегать являтеся достаточным основанием для продолжения моего участия в дискусе.
Спасибо за внимание.
← →
panov © (2004-10-14 19:08) [65]>ЮрийК © (14.10.04 18:53) [59]
При небольших изменениях в коде код в начале топика будет совершенно работоспособным, другое дело, что все-таки хочется сделать оптимальный код-)
← →
panov © (2004-10-14 19:11) [66]>Суслик © (14.10.04 19:08) [64]
ну да, но это же лучше чем бегать из режима ядра и обратно, как бывает при использовании ОЯ
Пример "почти -)" спин-блокировки привел Verg © (14.10.04 18:51) [57]while lacked do sleep(1)
Вот классический пример:while lacked do ;
Естественно, такой код загрузит на время ожидания процессор на 100%.
Такой метод применим только для гарантированно кратких по времени ожиданий.
← →
Суслик © (2004-10-14 19:11) [67]
> При небольших изменениях в коде код в начале топика будет
> совершенно работоспособным, другое дело, что все-таки хочется
> сделать оптимальный код-)
Когда сделаем, хорошо было бы его здесь увидеть :)
Т.е. я поддерживаю ЮрияК
← →
Суслик © (2004-10-14 19:13) [68]
> Такой метод применим только для гарантированно кратких по
> времени ожиданий.
Все в наших руках - можно гарантировать чего душе угодно. Я серьезно. Нужно внимательно посмотреть на задачу - может такая гарантия есть.
Или хочестся создать суперуниверсальный супероптимальный супервсеоблемющий пул для всех случае жизни? Это миф (имхо). Кадой задаче свое решение.
← →
panov © (2004-10-14 19:17) [69]>Суслик © (14.10.04 19:13) [68]
Суперуниверсальный не нужен, конечно.
Я выше приводил пример, в котором необходима максимальная скорость различных вычислений в потоках - динамичная графика.
Кстати, даже для задачи, которой сейчас я занимаюсь(резервное копирование файлов), желательна минимальная нагрузка на процессор-)
← →
Суслик © (2004-10-14 19:20) [70]
> [69] panov © (14.10.04 19:17)
Мое мнение такое (повторяюсь) - пока не будет тестов производительности, говорить об оптимальности или отсутствии таковой рано.
У меня нет сейчас времени на опыты :( Иначе подготовил бы статистический отчет. При этом надо тестировать не просто бомбардировку ОЯ (т.е. активное их использование), а имитацию реальной нагрузки на ОЯ, (т.е. попытаться смоделировать действительность - как часто будут ОЯ использоваться в зависимости от поребностей пользователя пуллера). Уверяю, что результат удивит - разница в общем производительности будет заметна слабо (конечно, если не брать каких-то экстремальных случаев).
← →
Суслик © (2004-10-14 19:28) [71]И самое мое последнее мнение о направленности разработки супероптимального сервера такое: если оный нужен, то нужно очень серьезно засаживаться за соответствующую литературу.
Я видел много кода, часть на сях, часть на дельфи. Я был поражен теми изощнениями, которыми пользуются люди для оптимальности.
Вот простой пример (не относится к теме, но показывает устремленность людей, как писать оптимально)
function TStringList.Find(const S: string; var Index: Integer): Boolean;
var
L, H, I, C: Integer;
begin
Result := False;
L := 0;
H := FCount - 1;
while L <= H do
begin
I := (L + H) shr 1;
C := CompareStrings(FList^[I].FString, S);
if C < 0 then L := I + 1 else
begin
H := I - 1;
if C = 0 then
begin
Result := True;
if Duplicates <> dupAccept then L := I;
end;
end;
end;
Index := L;
end;
Как вы думаете, почему не написано так (мне какжется так наглядней)if C < 0 then L := I + 1 else
if C > 0 then H := I - 1 else
if C = 0 then
begin
Result := True;
if Duplicates <> dupAccept then L := I;
end;
Все таже пресловутая оптимальность - экономия на одном сравнении (в первом случае нет if c > 0). Понимаю, что пример оффтоповый, но мне кажется, что он показывает, что приемы оптимального программирования так просто не изобретешь - нужно планомерное чтение разной лит-ры и просмотр кода.
← →
VMcL © (2004-10-14 21:13) [72]>Как вы думаете, почему не написано так (мне какжется так наглядней)
if C < 0 then L := I + 1 else
if C > 0 then H := I - 1 else
if C = 0 then
begin
Result := True;
if Duplicates <> dupAccept then L := I;
end;
В любом случае предпоследнее сравнение бесполезно - это видно невооруженным глазом: если C не меньше нуля и не больше, то тогда остается только один вариант (в области действительных чисел:) - нуль.if C < 0 then L := I + 1 else
if C > 0 then H := I - 1 else
//if C = 0 then
begin
Result := True;
if Duplicates <> dupAccept then L := I;
end;
← →
Verg © (2004-10-14 22:43) [73]
> Как вы думаете, почему не написано так (мне какжется так
> наглядней)
>
> if C < 0 then L := I + 1 else
> if C > 0 then H := I - 1 (* C>0 *)else
> if C = 0 then
> begin
> Result := True;
> if Duplicates <> dupAccept then L := I;
> end;
Ой ли? Разве этот код эквивалентен
> if C < 0 then L := I + 1 else
> begin
> H := I - 1; // C>=0
> if C = 0 then
> begin
> Result := True;
> if Duplicates <> dupAccept then L := I;
> end;
> end;
??
← →
Суслик © (2004-10-15 11:12) [74]
> Ой ли? Разве этот код эквивалентен
нет, но делает тоже самое (по сути)
← →
Игорь Шевченко © (2004-10-15 11:18) [75]Суслик © (14.10.04 19:08) [64]
Синхронизация потоков невозможна без диспетчеризации, согласись.
То есть, как минимум, одни поток должен уметь сказать WaitForSingleObject (или какую-то разновидность), пока другой поток не освободит ресурс. После того, как ресурс освобожден, об этом должно стать известно диспетчеру, чтобы он смог запустить ожидавший поток.
Разумно ?
Спин-блокировка применима для синхронизации между потоками в многопроцессорной системе, когда поток на одном процессоре входит в цикл (крутится - spin), опрашивая общую переменную.
← →
Zelius © (2004-10-15 11:36) [76]Кстати, посмотрел код и решил сказать, может пригодится.
Что бы не было вопрос при создании потомков TThread о том что раньше отработает, конструктор с инициализацией всех ресурсов или начнет Execute выполняться, я всегда сначала инициализирую ресурсы и только потом вызываю наследуемый конструктор:
constructor TThreadExecuter.Create(const aOwnerThread: THandle;const aNumThread: Integer);
begin
FreeOnTerminate := True;
FThreadFree := True;
FThreadIdOwner := aOwnerThread;
FNumThread := aNumThread;
inherited Create(True);
end;
← →
Erik1 © (2004-10-15 11:56) [77]Мастера фигей маются, как новички. Видать и им ничто человеческое нечуждо. :)
Вобщето надо проежде всего позаботится об понятности алгоритма, его эфективновность оченивается вовсе не в количестве обращений к ядру. Вобщее правило: если алгоритм красив, то он ненуждается в улучшениях!
Суслик [71]
В 99% случаев я бы выбрал второй вариант, проше читать. А эфективность можно уличшить как нибудь по другому.
to panov
У меня все потоки используют WaitForMultipleObjects с двумя event как минимум. Первый говорит потоку начать работу, второй терминировать поток. Это неокажет некакого замедления на расчеты, поскольку они выполняются внутри процедувы. А если расчеты очень короткие но частые, то suspend resume замедлят их еще больше. К томуже в Dlephi выще 5 версии нельзя делать resume 2 раза подряд, будет exception.
← →
Суслик © (2004-10-15 12:18) [78]2Игорь Шевченко
> Синхронизация потоков невозможна без диспетчеризации, согласись.
> То есть, как минимум, одни поток должен уметь сказать WaitForSingleObject
> (или какую-то разновидность), пока другой поток не освободит
> ресурс. После того, как ресурс освобожден, об этом должно
> стать известно диспетчеру, чтобы он смог запустить ожидавший
> поток.
>
> Разумно ?
Да разумно. Но WaitForSingleObject не единственное решение.
Все предложение: для того, чтобы убедится в том, что я говорю о том, о чем имею предсталение и для того, чтобы не разводить лишние разговоры, предлагаю всем изучить класс TThreadLocalCounter.
Класс TThreadLocalCounter (модуль sysutils) реализует TLS (не полноценный, а для одного конкретного целочисленного значения: TThreadInfo.RecursionCount), средствами дельфи. Причем существенно быстрее, чем TLS windows.
Идея такая: есть список (fHashTable) голов однонапрасленных списков. Хэш индекс строится исходя из GetCurrentThreadId.
Приведенный метод занимается тем, что выделяет вызвавшему потоку структуру PThreadInfo. Поиск идет либо в списке свободных структур (если поток отказывается от использования структура помечается свободной, но из списка не удаляется), либо если не нашел, то создает новую. Заметь, тут нет никаких критических секций. Внимательно читай комментарий. Собственно в тех двух строчках, которые о коментриует, и есть вся суть метода и моего утвержднеия о возможности синхронизации без ОЯ и даже без КС.procedure TThreadLocalCounter.Open(var Thread: PThreadInfo);
var
P: PThreadInfo;
H: Byte;
CurThread: Cardinal;
begin
InterlockedIncrement(FOpenCount);
H := HashIndex;
P := FHashTable[H];
CurThread := GetCurrentThreadID;
// КАЖДОМУ ПОТОКУ ВЫДЕЛЯТЕСЯ РОВНО ОДНА СТРУКТУРА (ЭТО ЕЩЕ ОДНО ОТЛИЧИЕ ОТ TLS WINDOWS)
while (P <> nil) and (P.ThreadID <> CurThread) do
P := P.Next;
// ЕСЛИ НЕ НАШЛИ, ТО...
if P = nil then
begin
// ПЫТАЕМСЯ НАЙТИ В НЕИСПОЛЬЗУЕМЫХ СТРУКТРУАХ СВОБОДНУЮ
P := Recycle;
// ЕСЛИ ТАКОВЫХ НЕТ, ТО СОЗДАЕМ НОВУЮ
if P = nil then
P := PThreadInfo(AllocMem(sizeof(TThreadInfo)));
P.ThreadID := CurThread;
// А ВОТ ТЕПЕРЬ ЕЕ НУЖНО ВСТАВИТЬ В СПИСОК. ЧИТАЙ КОМЕНТАРИЙ.
// Another thread could start traversing the list between when we set the
// head to P and when we assign to P.Next. Initializing P.Next to point
// to itself will make others loop until we assign the tail to P.Next.
P.Next := P; ????? ВОТ ЭТО МЕСТО МЕНЯ ПОТРЯСЛО!
P.Next := PThreadInfo(InterlockedExchange(Integer(FHashTable[H]), Integer(P)));
end;
Thread := P;
end;
После этого продолжим обсуждение синхронизацию доступа к общему ресурсу без ОЯ и КС.
← →
Игорь Шевченко © (2004-10-15 12:29) [79]Суслик © (15.10.04 12:18) [78]
В этом коде модификация списка, которая может затронуть другие потоки, выполняется одной функцией InterlockedExchange. Строчка, которая тебя поразила - это искусственное действие, направленное на то, чтобы модификацию можно было выполнить той самой единственной функцией.
InterlockedExchange реализуется единственной командой (не считая подготовки аргументов) lock cmpxchg, которая в данном случае успешно может использоваться для модификации, так как является атомарной (не прерываемой в середине исполнения) и атомарной же на многопроцессорной системе, так как префикс lock выдает сигнал на блокировку шины доступа к памяти.
Точно такие же приемы используются при работе с длинными строками.
Но синхронизацией это можно назвать с огромной натяжкой, так как осноная цель в этом коде - это реализовать потокобезопасную модификацию списка.
← →
Суслик © (2004-10-15 12:33) [80]
> [79] Игорь Шевченко © (15.10.04 12:29)
Определение синхронизации в студию!
← →
Игорь Шевченко © (2004-10-15 12:36) [81]Суслик © (15.10.04 12:33) [80]
Синхронизация - (от греч. synchronos - одновременный), приведение двух или нескольких процессов к синхроннонсти, т. е. к такому их протеканию, когда одинаковые или соответствующие элементы процессов совершаются с неизменным сдвигом по фазе друг относительно друга или одновременно.
http://encycl.yandex.ru/cgi-bin/art.pl?art=bse/00071/31300.htm&encpage=bse&mrkp=/yandbtm7%3Fq%3D-811200565%26p%3D0%26g%3 D0%26d%3D0%26ag%3Denc_abc%26tg%3D1%26p0%3D0%26q0%3D1334144256%26d0%3D0%26script%3D/yandpage%253F
Мог бы и сам посмотреть (с)
← →
panov © (2004-10-15 12:42) [82]>Zelius © (15.10.04 11:36) [76]
Что бы не было вопрос при создании потомков TThread о том что раньше отработает, конструктор с инициализацией всех ресурсов или начнет Execute выполняться, я всегда сначала инициализирую ресурсы и только потом вызываю наследуемый конструктор:
В данном примере это не имеет значения, так как создание очереди сообщений происходит в поточной функции, а не в конструкторе. Вот создания этой очереди и нужно дождаться.
← →
Суслик © (2004-10-15 12:43) [83]
> [81] Игорь Шевченко © (15.10.04 12:36)
не понял:(
> совершаются с неизменным сдвигом по фазе друг относительно
> друга или одновременно.
Т.е. это значит, что синхронизируемый кусок кода должен выполняться в каждый момент не более чем одним потоком? Или нет?
← →
Игорь Шевченко © (2004-10-15 12:50) [84]Суслик © (15.10.04 12:43) [83]
Синхронизация применительно к вычислительным процессам - это гарантированное предоставление монопольного доступа к ресурсу одному процессу в каждый момент времени непротиворечивого состояния ресурса.
Слово процесс для Windows заменяется на поток.
В коде, приведенном тобой, при помощи специального алгоритма, гарантируется непротиворечивое состояние ресурса для каждого потока в любой момент времени.
← →
Суслик © (2004-10-15 12:55) [85]
> [84] Игорь Шевченко © (15.10.04 12:50)
если учитывать такое определение
> Синхронизация применительно к вычислительным процессам -
> это гарантированное предоставление монопольного доступа
> к ресурсу одному процессу в каждый момент времени непротиворечивого
> состояния ресурса.
то полность согласен с этим
> В коде, приведенном тобой, при помощи специального алгоритма,
> гарантируется непротиворечивое состояние ресурса для каждого
> потока в любой момент времени.
Признаю, что скорее всего был не прав в том, что называл приведенный мной пример синхронизацией - это всего лишь корректная многопоточность.
ЗЫ. Тонкое это дело. Есть какие-нибудь книги про всякие приемы многопоточного программирования?
← →
Игорь Шевченко © (2004-10-15 13:04) [86]Суслик © (15.10.04 12:55) [85]
Вот теперь и мы пришли к единому мнению :)
> Есть какие-нибудь книги про всякие приемы многопоточного
> программирования?
Рихтер, как ни странно, программирование для Win32.
У Таненбаума очень хорошо рассказано про концепции.
Я просто с понятием "
> это гарантированное предоставление монопольного доступа
> к ресурсу одному процессу в каждый момент времени непротиворечивого
> состояния ресурса.
"
знаком с 1985 года, если не ошибаюсь, первый раз встретил у Шоу, в "Логическом проектировании операционных систем".
← →
Суслик © (2004-10-15 18:06) [87]Прошу прощения, за реанимирование темы.
Но я бы хотел высказать пожелание автору топика.
Помню когда-то в "Потрепаться" была борьба за фак. Я не помню, кто был инициатором этой работы, но идея была классная - коллективная работа над материалом. К сожалению, у меня тогда не было клиента, поэтому результатов под рукой нет. Но я помню, что в результате работы было проработано некоторое количество вопросов.
Я считал (и считаю), что фак, это хорошо, но не очень жизненно, т.к. вопросы и ответы в основном для ленивых: ответ всегда можно найти в документации.
Другой вопрос, что существуют проблемы, которые имеют определенное название. Например, пул потоков. Пул потоков нельзя отнести в фак, но в тоже время он является полезным и частоприменимым (в определенных областях) классом.
Думаю, что многие со мной согласятся, что коллективная разработка высокоскоростного пула потоков, являтется общественно полезным делом.
Поэтому предложение автору сего топика продолжить обсуждение кода пула потоков.
← →
Игорь Шевченко © (2004-10-15 18:34) [88]
> Пул потоков нельзя отнести в фак, но в тоже время он является
> полезным и частоприменимым (в определенных областях) классом
Например, в каких областях ?
Кстати, в Win2k он уже встроен в систему.
← →
Суслик © (2004-10-15 18:44) [89]
> [88] Игорь Шевченко © (15.10.04 18:34)
Сервер (компьютераная экономическая игра). Многопоточный. Но она реализована очень слабо - маленькая нагрузка, поэтому пулами не пользуемся. А можно было бы.
> Кстати, в Win2k он уже встроен в систему.
Тот же Фаулер пишет (арх. корп. прогр. прил.), что есть команды разработчков, которые предпочитают не пользоваться готовыми решениями. Я из их группы. Также он пишет, что по определенным соображениям такие разработчики имеют право на существование. Логика ясна?
← →
Суслик © (2004-10-15 18:45) [90]А впрочем, не хотите, как хотите. Полезная школа для неновичков была бы.
← →
panov © (2004-10-15 19:32) [91]>Суслик © (15.10.04 18:45) [90]
Так как желание продолжить обсуждение есть, то могу выложить здесь черноввые наброски своего класса(panov © (14.10.04 19:04) [62]), а также модуль сс своей реализацией потока от y-soft(если будет получено разрешение от него).
← →
Игорь Шевченко © (2004-10-15 21:56) [92]
> что есть команды разработчков, которые предпочитают не пользоваться
> готовыми решениями. Я из их группы. Также он пишет, что
> по определенным соображениям такие разработчики имеют право
> на существование. Логика ясна?
Безусловно, ясна. Для собственной ерундиции, разумеется, изобретение велосипедов есть полезное и достойное занятие. (Эт не шутка, а вполне серьезное заявление, не ставящее целью кого-либо обидеть).
Но для промышленной задачи я бы предпочел готовое и опробованное решение, хотя бы в силу его изобретенности (не придется продираться сквозь те грабли, сквозь которые продрались разработчики), отлаженности (в силу тех же обстоятельств) и документированности.
← →
panov © (2004-10-16 18:46) [93]>Игорь Шевченко © (15.10.04 21:56) [92]
Игорь, подскажи, где можно увидеть пример готового и апробированного решения для платформы Windows(версии не ниже Win95)?
← →
panov © (2004-10-18 10:44) [94]Вот почти готовая реализация 2-х классов - пул потоков(TThreadPool), и, собстувенно, поток в пуле(TThrExecuter).
TThrExecuter пока не реализован.
Это костяк, на который далее буду наращивать мясо.
Пока всё это полностью не реализовано, может есть каки-то замечания...
unit uTThreadPool;
interface
uses classes, windows, messages, sysutils;
var
WM_ThreadPoolChangeProc: DWORD; //Изменение поточной процедуры
// А надо ли?
WM_ThreadPoolEndExecuter: DWORD; //Сообщение пулу потоков об окончании дочернего
WM_ThreadPoolTerminate: DWORD; //Сообщение пулу потоков о завершении
WM_ThreadPoolTerminateExecuter: DWORD; //Запрос на удаление потока из пула
WM_ThreadPoolGetCountQueueJobs: DWORD; //Запрос количества заданий в очереди
WM_ThreadPoolDestroyQueue: DWORD; //Запрос пулу на уничтожение очереди заданий
WM_ThreadPoolCreateExecuter: DWORD; //Сообщение пулу потоков о создании нового потока
type
TExecuteProcedure=procedure(const aParam: Pointer);cdecl; //Процедура потока
TClientProcedure=procedure(const aParam: Pointer); //Клиентская процедура потока
TThrExecuter=class;
PParmJob=^TParmjob;
TParmJob=record
Proc: Pointer;
Parm: Pointer;
end;
TThrExecuter=class
private
FHandle: THandle; //Дескриптор потока
FThreadId: THandle; //Идентификатор потока
FEvent: THandle; //ОЯ для ожидания
FSuspended: Boolean;
FJob: TParmJob;
public
constructor Create;
destructor Destroy;override;
procedure StartProc(aThreadProc:TClientProcedure;aParm: pointer);
end;
TThreadPool=class //Пул потоков
private
FHandle: THandle; //Дескриптор потока
FThreadId: THandle; //Идентификатор потока
FEvent: THandle; //ОЯ для ожидания
FPool: array of TThrExecuter; //Пул потоков на исполнение
FListQueue: TList; //Очередь заданий на выполнение
FMaxThreads: Integer; //Максимальное число потоков в пуле
FreeOnTerminate: Boolean; //Автоматическое освобождение
FTerminated: Boolean; //Состояние завершения
FThreadProcedure: TExecuteProcedure; //Процедура потока
FClientProc: TClientProcedure; //Клиентская процедура потока
FClientParam: Pointer; //reserved
FTimeOutWaitReply: Integer; //Таймаут ожидания ответа от поточной функции
function CreateExecuter: TThrExecuter; //Создание потока в пуле
procedure DeleteExecuter(const aIndex: Integer); overload; //Удалить поток в пуле
procedure DeleteExecuter(const aThrExecuter: TThrExecuter); overload; //Удалить поток в пуле
procedure SetMaxThreads(const MaxThreads: Integer); //Изменить максимальное число потоков в пуле
function GetCountQueueJobs: Integer; //Текущее число потоков в пуле
function GetEvent: THandle; //Создать новый ОЯ "Event"
procedure CloseEvent(const aEvent: THandle); //Удалить ОЯ "Event"
procedure DestroyQueue; //Уничтожить очередь заданий
function PostAndWait(const Msg,Param: Integer): Boolean; //Передача сообщения
//в поточную функцию и ожидание обработки сообщения
procedure _ClearQueue; //Очистка очереди заданий и освобождение
public
constructor Create(const aMaxThreads: Integer);
destructor Destroy;override;
procedure AddJob(const aProc: TClientProcedure; const aParm: Pointer); //Добавить задание в очередь
function SetClientProc(const Proc: TClientProcedure): Boolean; //Изменить поточную функцию менеджера
function TerminateJob(aThreadId: THandle): Boolean; //Терминировать выполняющееся задание в пуле
procedure Release; //Закончить работу менеджера
procedure Terminate; //Выдать запрос на окончание работы менеджера
property Handle: THandle read FHandle; //Дескриптор потока менеджера
property ThreadId: THandle read FThreadId; //Идентификатор потока менеджера
property MaxThreads: Integer read FMaxThreads write SetMaxThreads;
property Terminated: Boolean read FTerminated;
property CountQueue: Integer read GetCountQueueJobs;
// property Priority: TThreadPrioroty read GetPriority write SetPriority;
end;
implementation
{ TThr }
procedure ThrExecuterProc(const aParam: Pointer);cdecl;
var
Thread: TThrExecuter;
Msg: TMsg;
begin
Thread := aParam;
PeekMessage(Msg,0,0,0,PM_NOREMOVE);
while GetMessage(Msg,0,0,0) do
begin
if Msg.message=WM_QUIT then
begin
if Assigned(Thread.FJob.Proc) then TExecuteProcedure(Thread.FJob.Proc)(Thread.FJob.Parm);
end;
end;
Dispose(aParam);
ExitThread(0);
end;
constructor TThrExecuter.Create;
begin
inherited;
FHandle := CreateThread(nil,0,@ThrExecuterProc,Self,0,FThreadId);
end;
destructor TThrExecuter.Destroy;
begin
end;
procedure TThrExecuter.StartProc(aThreadProc: TClientProcedure; aParm: Pointer);
begin
end;
← →
panov © (2004-10-18 10:45) [95]Удалено модератором
Примечание: дубль
← →
panov © (2004-10-18 10:45) [96]
{ TThreadPool }
{
Поточная функция менеджера потоков.
Построена на обменом сообщениями между потоком и классом-оберткой.
const aParm: Pointer - Ссылка на класс-обертку TThreadPool
}
procedure ThreadPoolProcedure(const aParm: Pointer);cdecl;
var
ThreadPool: TThreadPool;
Msg: TMsg;
begin
ThreadPool := TThreadPool(aParm); //Ссылка на класс-обертку TThreadPool
PeekMessage(Msg,0,0,0,PM_NOREMOVE); //Создаем очередь сообщений
SetEvent(ThreadPool.FEvent); //Извещаем о готовности к работе
while GetMessage(Msg,0,0,0) do //Выбираем сообщение из очереди
begin
//Запрос на количество ожидающих заданий в очереди
// wParam - ожидающий объект Event
// lParam - адрес для возврата результата (PInteger)
if Msg.message=WM_ThreadPoolGetCountQueueJobs then
begin
PInteger(Msg.lParam)^ := ThreadPool.FListQueue.Count;
SetEvent(Msg.wParam);
Continue;
end;
//Запрос на уничтожение очереди заданий
// wParam - ожидающий объект Event
if Msg.message=WM_ThreadPoolDestroyQueue then
begin
ThreadPool._ClearQueue;
SetEvent(Msg.wParam);
Continue;
end;
//Запрос на изменение поточной функции
// Сразу входим в новую поточную функцию,
// после окончания заканчиваем Выполнения потока
{ TODO -cЗаметка : Добавить возможность "Временного перехода" в клиентскую функцию) }
if Msg.message=WM_ThreadPoolChangeProc then
begin
if Assigned(ThreadPool.FclientProc) then
begin
ThreadPool.FClientProc(aParm);
ExitThread(0);
end;
end;
//Запрос на окончание потока
//
//
if (Msg.message=WM_QUIT) or (Msg.message=WM_ThreadPoolTerminate) then ExitThread(0);
end;
end;
//Добавить в очередь задание
procedure TThreadPool.AddJob(const aProc: TClientProcedure;
const aParm: Pointer);
begin
end;
constructor TThreadPool.Create(const aMaxThreads: Integer);
begin
inherited Create;
IsMultiThread := True; //Установить многопоточный режим
FreeOnTerminate := True; //Автоматически уничтожать объект
FMaxThreads := aMaxThreads; //Установить количество потоков в пуле
FTimeOutWaitReply := 5000; //Установить максимальное время ожидания
// ответа от поточной функции
FThreadProcedure := ThreadPoolProcedure; //Установить поточную функцию
FEvent := CreateEvent(nil,True,False,nil); //Создать ОЯ "Event" для ожидания готовности
if FEvent=0 then raise Exception.Create("Error create Event object"); //Не получилось создать Event
FHandle := CreateThread(nil,0,@FThreadProcedure,Self,0,FThreadId); //Создаем поток для выполнения поточной функции
if FHandle=0 then
begin
raise Exception.Create("TThreadPool:Error CreateThread:"+SysErrorMessage(GetLastError));
end;
//Ждем, пока поточная функция не будет готова принимать запросы
if WaitForSingleObject(FEvent,FTimeOutWaitReply)<>WAIT_OBJECT_0 then
begin
TerminateThread(FHandle,1);
CloseHandle(FHandle);
FHandle := 0;
raise Exception.Create("TThreadPool:Error CreateThread:"+SysErrorMessage(GetLastError));
end;
FListQueue := TList.Create; //Создаем очередь заданий
end;
//Создать поток в пуле потоков
function TThreadPool.CreateExecuter: TThrExecuter;
var
Thread: TThrExecuter;
begin
Thread := nil;
Result := nil;
if FTerminated then Exit;
PostAndWait(WM_ThreadPoolDestroyQueue,Integer(@Thread));
end;
//Удалить поток в пуле потоков по индексу
procedure TThreadPool.DeleteExecuter(const aIndex: Integer);
begin
end;
//Удалить поток в пуле потоков по ссылке на поток TThreadExecuter
procedure TThreadPool.DeleteExecuter(const aThrExecuter: TThrExecuter);
begin
end;
//Уничтожение объекта TThreadPool
destructor TThreadPool.Destroy;
begin
if not FTerminated then Exit;
if FEvent<>0 then CloseHandle(FEvent);
if FHandle<>0 then CloseHandle(FHandle);
inherited;
end;
//Запрос на уничтожение очереди заданий
procedure TThreadPool.DestroyQueue;
begin
PostAndWait(WM_ThreadPoolDestroyQueue,0);
end;
//Запрос количества заданий в очереди
function TThreadPool.GetCountQueueJobs: Integer;
begin
Result := -1;
PostAndWait(WM_ThreadPoolGetCountQueueJobs,Integer(@Result));
end;
//Создать ОЯ "Event"
function TThreadPool.GetEvent: THandle;
begin
Result := CreateEvent(nil,True,False,nil);
end;
//Освободить ОЯ "Event"
procedure TThreadPool.CloseEvent(const aEvent: THandle);
begin
if aEvent<> 0 then CloseHandle(aEvent);
end;
//Запрос поточной функции и ожидание ответа.
function TThreadPool.PostAndWait(const Msg, Param: Integer): Boolean;
var
Event: THandle;
begin
Result := False;
Event := GetEvent;
if Event=0 then Exit;
try
PostThreadMessage(FThreadId,Msg,Event,Param);
WaitForSingleObject(Event,FTimeOutWaitReply);
finally
CloseEvent(Event);
end;
end;
//Окончание работы
procedure TThreadPool.Release;
begin
if FTerminated then Exit;
FTerminated := True;
DestroyQueue;
PostThreadMessage(FThreadId,WM_ThreadPoolTerminate,0,0);
if WaitForSingleObject(FHandle,FTimeOutWaitReply)<>WAIT_OBJECT_0 then
begin
TerminateThread(FHandle,1);
end;
if FreeOnTerminate then Destroy;
end;
function TThreadPool.SetClientProc(const Proc: TClientProcedure): Boolean;
begin
Result := False;
if Terminated then Exit;
if Assigned(FClientProc) then Exit;
FClientProc := Proc;
PostThreadMessage(FThreadId,WM_ThreadPoolChangeProc,0,0);
Result := True;
end;
procedure TThreadPool.SetMaxThreads(const MaxThreads: Integer);
begin
end;
procedure TThreadPool.Terminate;
begin
if FTerminated then Exit;
Release;
end;
function TThreadPool.TerminateJob(aThreadId: THandle): Boolean;
begin
Result := False;
end;
procedure TThreadPool._ClearQueue;
var
i: Integer;
begin
if not FTerminated then Exit;
if not Assigned(FListQueue) then Exit;
for i := 0 to FListQueue.Count-1 do
begin
FListQueue.Delete(i);
end;
FListQueue.Free;
end;
initialization
WM_ThreadPoolChangeProc := RegisterWindowMessage("TThreadPoolWM_ThreadPoolChangeProc");
WM_ThreadPoolEndExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolEndExecuter");
WM_ThreadPoolTerminate := RegisterWindowMessage("TThreadPoolWM_ThreadPoolTerminate");
WM_ThreadPoolTerminateExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolTerminateExecuter");
WM_ThreadPoolGetCountQueueJobs := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolGetCountQueueJobs");
WM_ThreadPoolDestroyQueue := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolDestroyQueue");
WM_ThreadPoolCreateExecuter := RegisterWindowMessage("TThreadPoolWM_ThreadPoolWM_ThreadPoolCreateExecuter");
finalization
end.
← →
Игорь Шевченко © (2004-10-18 12:53) [97]panov © (16.10.04 18:46) [93]
> Игорь, подскажи, где можно увидеть пример готового и апробированного
> решения для платформы Windows(версии не ниже Win95)?
Я рискну высказать крамольную мысль, но строить серьезную многопоточную систему на потребительских версиях Windows (Win95,Win98,WinME) - это чистой воды буратинизм.
Готовое решение для Win2k есть в книжке Рихтера и Кларка "программирование серверных приложений для Windows 2000.
"
← →
panov © (2004-10-18 13:10) [98]>Игорь Шевченко © (18.10.04 12:53) [97]
Я рискну высказать крамольную мысль, но строить серьезную многопоточную систему на потребительских версиях Windows (Win95,Win98,WinME) - это чистой воды буратинизм.
С тем, что строить на Win98 серьезную многопоточную систему как сервер приложений не стоит. Тут я согласен.
А вот серьезную многопоточную клиентскую программу, в принципе, почему бы и нет-)
Понятно, что в этом случае надежность ОС не обеспечивает, но для многих клиентских программ достаточная надежность и устойчивость на порядки ниже серверных приложений.
Кроме этого, согласись, что та же WinNT4 дает достаточный уровень надежности, но не предоставляет таких встроенных в систему инструментов.
← →
Игорь Шевченко © (2004-10-18 13:22) [99]
> А вот серьезную многопоточную клиентскую программу, в принципе,
> почему бы и нет-)
>
> Понятно, что в этом случае надежность ОС не обеспечивает,
> но для многих клиентских программ достаточная надежность
> и устойчивость на порядки ниже серверных приложений.
А пример ? Именно такой клиентской задачи, где требуется пул ?
А насчет NT4 - у меня сейчас книги под рукой нету, но через какое-то время я таки-посмотрю, что за механизмы там используются и есть ли они в NT4
← →
panov © (2004-10-18 13:28) [100]>Игорь Шевченко © (18.10.04 13:22) [99]
А пример? Именно такой клиентской задачи, где требуется пул ?
Так нет таких задач, где без пула потоков обойтись нельзя.
Ведь это лишь способ организации и управления несколькими потоками.
Такую задачу я в начале топика привел, как пример - многопоточное копирование файлов.
Навскидку могу привести еще примеры - скачивание нескольких файлов одновременно с интернет-сайтов, сканирование сети и пр.
← →
Evgeny V © (2004-10-18 13:30) [101]
> panov © (18.10.04 13:10) [98]
> Кроме этого, согласись, что та же WinNT4 дает достаточный
> уровень надежности, но не предоставляет таких встроенных
> в систему инструментов.
GetQueuedCompletionStatus и PostQueuedCompletionStatus - позволяет удобно организовать пул потоков, в 9x их конечно нет, но по MSDN
Client: Included in Windows XP, Windows 2000 Professional, and Windows NT Workstation 3.5 and later.
Server: Included in Windows Server 2003, Windows 2000 Server, and Windows NT Server 3.5 and later.Header: Declared in Winbase.h; include Windows.h.
Library: Use Kernel32.lib.
В 2000 конечно появились более удобные функции для этой цели - QueueUserWorkItem например
← →
panov © (2004-10-18 13:38) [102]>Evgeny V © (18.10.04 13:30) [101]
Да, действительно.
Но все же у меня дома W98 и мне хочется, чтобы в этой системе тоже работало-)
← →
Игорь Шевченко © (2004-10-18 14:06) [103]panov © (18.10.04 13:28) [100]
Пул потоков есть средство ограничения обработки запросов фиксированным количеством потоков, при этом, остальные запросы ставятся в очередь до освобождения одного из потоков - типичная задача для архитерктуры супер-сервер (СУБД), например. Каким образом это поможет на клиенте, честно, не вижу.
Я могу сослаться только на пост [92] о поводе для написания подобного пула - общее самообразование.
← →
Владислав © (2004-10-19 12:56) [104]Понадобился пул по работе. Вот что я написал
unit ThreadPool;
interface
uses
Windows, SysUtils, asLists;
type
TJobProcedure = procedure(Param: Pointer);
TThreadPool = class(TObject)
private
FJobExistsEvent: DWORD;
FTerminatedEvent: DWORD;
private
FThreadHandles: array of DWORD;
FJobListCS: TRTLCriticalSection;
FJobList: TDoubleLinkedList;
FThreadCount: DWORD;
FStackSize: DWORD;
FTerminated: Boolean;
procedure SetTerminated(const Value: Boolean);
private
function Initialize: Boolean;
procedure Finalize;
procedure LockJobList;
procedure UnlockJobList;
function InsertJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
procedure RemoveJob(AJob: Pointer);
function WaitForJobTimeout(Timeout: DWORD; var Proc: TJobProcedure;
var Param: Pointer): Boolean;
function WaitForJobInfinite(var Proc: TJobProcedure; var Param: Pointer): Boolean;
public
constructor Create(AThreadCount, AStackSize: DWORD);
destructor Destroy; override;
function AddJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
property ThreadCount: DWORD read FThreadCount;
property StackSize: DWORD read FStackSize;
property Terminated: Boolean read FTerminated write SetTerminated;
end;
implementation
type
PJob = ^TJob;
TJob = record
Proc: TJobProcedure;
Param: Pointer;
Link: TDoubleLinkedLink
end;
function ThreadFunc(Param: Pointer): Integer;
var
LPool: TThreadPool;
LProc: TJobProcedure;
LParam: Pointer;
begin
LPool := Param;
try
while not LPool.Terminated do
begin
if LPool.WaitForJobInfinite(LProc, LParam) then
begin
try
LProc(LParam)
except
end
end
end
except
end;
Result := 0
end;
{ TThreadPool }
constructor TThreadPool.Create(AThreadCount, AStackSize: DWORD);
begin
FThreadCount := AThreadCount;
FStackSize := AStackSize;
SetLength(FThreadHandles, FThreadCount);
if not Initialize then
RaiseLastOSError
end;
destructor TThreadPool.Destroy;
begin
Finalize;
inherited;
end;
function TThreadPool.Initialize: Boolean;
var
i: Integer;
LThreadID: DWORD;
begin
InitializeCriticalSection(FJobListCS);
FJobExistsEvent := CreateEvent(nil, False, False, nil);
Result := FJobExistsEvent <> 0;
if not Result then
Exit;
FTerminatedEvent := CreateEvent(nil, True, False, nil);
Result := FTerminatedEvent <> 0;
if not Result then
Exit;
for i := 0 to Integer(FThreadCount) - 1 do
begin
FThreadHandles[i] := BeginThread(nil, FStackSize, @ThreadFunc, Self,
CREATE_SUSPENDED, LThreadID);
Result := FThreadHandles[i] <> 0;
if not Result then
Break
end;
if not Result then
Terminated := True;
for i := 0 to Integer(FThreadCount) - 1 do
if FThreadHandles[i] <> 0 then
ResumeThread(FThreadHandles[i])
end;
procedure TThreadPool.Finalize;
var
i: Integer;
LWaitResult: DWORD;
LLink: PDoubleLinkedLink;
begin
Terminated := True;
for i := 0 to FThreadCount - 1 do
begin
if FThreadHandles[i] <> 0 then
begin
LWaitResult := WaitForSingleObject(FThreadHandles[i], INFINITE);
if LWaitResult <> WAIT_OBJECT_0 then
TerminateThread(FThreadHandles[i], 1);
CloseHandle(FThreadHandles[i]);
FThreadHandles[i] := 0
end
end;
while True do
begin
LLink := RemoveFromHead(@FJobList);
if LLink <> nil then
RemoveJob(LLink^.Data)
else
Break
end;
if FTerminatedEvent <> 0 then
begin
CloseHandle(FTerminatedEvent);
FTerminatedEvent := 0
end;
if FJobExistsEvent <> 0 then
begin
CloseHandle(FJobExistsEvent);
FJobExistsEvent := 0
end;
DeleteCriticalSection(FJobListCS);
end;
procedure TThreadPool.LockJobList;
begin
EnterCriticalSection(FJobListCS);
end;
procedure TThreadPool.UnlockJobList;
begin
LeaveCriticalSection(FJobListCS);
end;
function TThreadPool.AddJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
begin
Result := InsertJob(AProc, AParam)
end;
function TThreadPool.InsertJob(AProc: TJobProcedure; AParam: Pointer): Boolean;
var
LJob: PJob;
begin
try
New(LJob)
except
SetLastError(ERROR_NOT_ENOUGH_MEMORY);
Result := False;
Exit
end;
with LJob^ do
begin
Proc := AProc;
Param := AParam;
with Link do
begin
Next := nil;
Prev := nil;
Data := LJob
end
end;
LockJobList;
try
InsertIntoTail(@FJobList, @LJob^.Link);
SetEvent(FJobExistsEvent)
finally
UnlockJobList
end;
Result := True
end;
procedure TThreadPool.RemoveJob(AJob: Pointer);
begin
Dispose(AJob)
end;
function TThreadPool.WaitForJobTimeout(Timeout: DWORD;
var Proc: TJobProcedure; var Param: Pointer): Boolean;
var
LWaitResult: DWORD;
LLink: PDoubleLinkedLink;
LJob: PJob;
begin
Result := False;
Proc := nil;
Param := nil;
LWaitResult := WaitForMultipleObjects(2, @FJobExistsEvent, False, Timeout);
if LWaitResult = WAIT_OBJECT_0 then
begin
if Terminated then
begin
SetEvent(FJobExistsEvent);
Exit
end;
LockJobList;
try
if FJobList.Size = 0 then
Exit;
LLink := RemoveFromHead(@FJobList);
if FJobList.Size <> 0 then
SetEvent(FJobExistsEvent)
finally
UnlockJobList
end;
if LLink <> nil then
begin
LJob := LLink^.Data;
Proc := LJob^.Proc;
Param := LJob^.Param;
Result := True;
RemoveJob(LJob)
end
end
end;
function TThreadPool.WaitForJobInfinite(var Proc: TJobProcedure;
var Param: Pointer): Boolean;
begin
Result := WaitForJobTimeout(INFINITE, Proc, Param)
end;
procedure TThreadPool.SetTerminated(const Value: Boolean);
begin
if Value then
begin
FTerminated := Value;
SetEvent(FTerminatedEvent)
end
end;
end.
← →
Игорь Шевченко © (2004-10-19 13:59) [105]Владислав © (19.10.04 12:56) [104]
А TThreadList вместо DoubleLinkedList и его обслуги не проще будет применить ?
← →
Владислав © (2004-10-19 14:56) [106]В TList будет постоянный Move, а на кой он нужен?
← →
Игорь Шевченко © (2004-10-19 15:08) [107]Владислав © (19.10.04 12:56) [104]
> В TList будет постоянный Move
Это плохо ?
Зато в коде строчек будет меньше...
← →
Владислав © (2004-10-19 15:11) [108]Их и так минимум :)
← →
Игорь Шевченко © (2004-10-19 15:28) [109]Владислав © (19.10.04 15:11) [108]
> Их и так минимум :)
Это тебе кажется :)
Как минимум - метод WaitForJobTimeOut используется один раз в методе WaitForJobInfinite - смело можно выкидывать и переносить функциональность в WaitForJobInfinite (это первое, что бросилось в глаза).
Кроме того, я бы попробовал сделать с TThreadList - строчек уменьшится, по-моему.
← →
Владислав © (2004-10-19 16:27) [110]Конечно, можно и выкинуть и попробовать :)
Только не лежит у меня душа к TList, а оптимизацией я еще вообще не занимался. Будут узкие места, буду оптимизировать.
← →
panov © (2004-10-27 22:38) [111]<Offtopic>
Чтобы в архив не уехал, так как работа продолжается...
</Offtopic>
Страницы: 1 2 3 вся ветка
Форум: "Основная";
Текущий архив: 2004.11.14;
Скачать: [xml.tar.bz2];
Память: 0.9 MB
Время: 0.041 c