Текущий архив: 2006.03.19;
Скачать: CL | DM;
Вниз
Как усыпить поток. Найти похожие ветки
← →
Kolan © (2006-02-06 21:50) [0]Здравствуйте,
Много раз слышал при обсуждении работы с потоками
"если то-то то поток засыпает, если еще что-то то просыпается..."
А как это. Можно пример усыпления.
Я просто проверяю какое-нибудь условие и если потоку делать нечего делаю например Sleep(1). А иначе загрузка процессора на 97% - 99%
← →
Eraser © (2006-02-06 22:41) [1]
> Kolan © (06.02.06 21:50)
чаще всего под "усыплением" потока подразумевают ожидание в этом потоке какого-либо объекта синхронизации, какай-либо ф-ей, предназначеной для этого, например WaitForSingleObject или EnterCriticalSection.
Sleep это всё таки немного иное...
← →
Kolan © (2006-02-06 23:54) [2]Понятно.
Я димал мож есть функция какая. Что-то вродеThread.Sleep
иThread.Awake
...
← →
Gero © (2006-02-07 00:01) [3]> Kolan © (06.02.06 23:54)
Suspend, Resume
← →
evvcom © (2006-02-07 11:36) [4]
> Я димал мож есть функция какая.
Думал? Есть, но не функция, а метод, если речь идет о TThread. См. [3]
← →
Kolan © (2006-02-07 12:59) [5]Интересует вот что: Допустим поток будет выполнять работу если какой-то флаг = True.
В случае если прошло какое-то время и флаг всевремя был равен False( Те поток бездействовал), поток засыпал бы....
← →
Digitman © (2006-02-07 13:05) [6]
> В случае если прошло какое-то время и флаг всевремя был
> равен False( Те поток бездействовал), поток засыпал бы
Вряд ли такая затея чем-то оправдана
← →
Kolan © (2006-02-07 13:09) [7]Вряд ли такая затея чем-то оправдана
Наверно потому что...
http://delphimaster.net/view/4-1138784983/
Digitman © (01.02.06 12:41) [3]
Ведь создание и старт трэда сродни пуску холодного автодвигателя !
Да?
Я просто попробовать такое сделать хотел...
← →
Digitman © (2006-02-07 13:09) [8]Если потоку нечего делать, лучшее чем он может заняться в это время - "спать".
"Пустая" циклическая же проверка же флага (если имеется ввиду флаг как результат user-time-вычисления некоего условия) ничем не оправдана.
← →
Digitman © (2006-02-07 13:13) [9]
> Наверно потому что
> ... автодвигателя
Нет, вовсе не поэтому.
← →
wal © (2006-02-07 13:14) [10]Можно в качестве такого "флага" использовать какой-либо объект синхронизации, например крит.секцию, только продумать все надо, чтобы дед-локов не натворить.
← →
Kolan © (2006-02-07 13:20) [11]"спать"
Те надо сделать Suspend?
Вот часть моей программы:procedure TPackageExtractThread.Execute;
var
State: TState;
CurrentIndex: Longint;
BufferEnd: Longint;
begin
FreeOnTerminate := True;
while not Terminated do
begin
if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
begin
{Working}
end
else
begin
Sleep(2);
end;
end;
end;
Если я ВместоSleep
буду вызоватьSuspend
, тогода при частой смене условия(FState <> sIdle) and (FWriteIndex <> FReadIndex)
(а оно так и происходит когда идет работа), получится что поток постоянно просыпается и завсыпает(я думаю, что это гораздо хуже будет).
Поэтому я хочу наложить условиеЕсли ничего не делал 1мин то засапаю...
← →
Digitman © (2006-02-07 13:27) [12]
> получится что поток постоянно просыпается и завсыпает(я
> думаю, что это гораздо хуже будет).
В этом-то как раз нет ничего страшного.
Но вcе те 2 мс, пока поток спит на строчке Sleep(2), он не способен ни на что реагировать, вплоть до своего "пробуждения"... Если это не критично, то логика вида
while not Terminated do
if Нечего_делать then
sleep(..)
else
делать_дело
в принципе имеет право на жизнь.
Хотя гораздо резонней будет подход, когда поток "спит" и "слышит" сигналы "пробуждения" (см. Wait-ф-ции)
← →
Kolan © (2006-02-07 13:35) [13]Ага нашел... Читаю :)
← →
Kolan © (2006-02-07 13:48) [14]Допустим я создам
FEvent := CreateEvent(nil, False, False, nil);
И буду ждать егоWaitForSingleObject
Вопрос: а как управлятьFEvent
. Те как установить его в положение "случилось/неслучилось"?
И дальше я жду это событие сINFINITE
Таймаутом. И пока оно не наступит поток будет спать? Так?
← →
wal © (2006-02-07 14:03) [15]
> Те как установить его в положение "случилось/неслучилось"?
Set/Reset/PulseEvent
> И пока оно не наступит поток будет спать? Так?
Так
← →
Fay © (2006-02-07 14:03) [16]SetEvent/ResetEvent
← →
Digitman © (2006-02-07 14:03) [17]
> как установить его в положение "случилось/неслучилось"?
SetEvent() = "случилось
> я жду это событие с INFINITE Таймаутом. И пока оно не наступит
> поток будет спать? Так?
Да.
← →
Kolan © (2006-02-07 14:05) [18]О. Шас попробую...
← →
Fay © (2006-02-07 14:10) [19]> И пока оно не наступит поток будет спать?
или пока event не сдохнет
← →
Kolan © (2006-02-07 14:19) [20]Вопрос такой: Если я уже сделал
SetEvent
и сделаю ео повторно - что будет?
И еще а посмотреть состояниеEvent"а
можно как-нибудб(для отладки).
← →
Fay © (2006-02-07 14:22) [21]> и сделаю ео повторно - что будет?
Event будет в сигнальном состоянии. А что ещё может быть? 8)
← →
Digitman © (2006-02-07 14:23) [22]
> Если я уже сделал SetEvent и сделаю ео повторно - что будет?
Тоже самое что установка в True булевой переменной, уже содержащей на этот момент True.
> посмотреть состояние Event"а можно как-нибудб(для отладки).
Не нужно его "смотреть".
Просто поставь брейкпойнт на следующей строчке за вызовом Wait-ф-ции и поймай его.
← →
Kolan © (2006-02-07 14:32) [23]Fay © (07.02.06 14:22) [21]
Просто с ходу не получилось...
Вот что получилось.
FreeOnTerminate := True;
while not Terminated do
begin
WaitForSingleObject(FEvent, INFINITE);
if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
begin
{Packge parsing}
begin
ResetEvent(FEvent);
end;
end;
И еще две процедуры
procedure TPackageExtractThread.AddArray(Arr: array of Byte);
var
I, J: Longint;
begin
{Adding}
SetEvent(FEvent);
end;procedure TPackageExtractThread.MakeAndSendPackage;
var
I, J: Longint;
Answer: array of Byte;
begin
ResetEvent(FEvent);
{Converting...}
FState := sIdle;
FReadIndex := 0;
FWriteIndex := 0;
FPackageReadyEvent(Self, Answer);
end;
Почемуто затыкается на 3 вызове Wait функции. Немогу отладить...
ДоMakeAndSendPackage
дело не доходит..
Fay © (07.02.06 14:22) [21]
Вот я и подумал, что может повторный вызов SetEvent причина...
← →
Kolan © (2006-02-07 14:37) [24]Поясню суть работы потока.
Он ананлизирует циклический массив на предмет нахождения пакета данных. Добавляет элементы в цикл массив другой поток...
Если нашел пакет тоSynchronize(MakeAndSendPackage);
Спать должен если нашел пакет или дошел до конца цикл массива и ждет когда добавят продолжение.
← →
Digitman © (2006-02-07 14:41) [25]
> циклический массив
Это еще что за зверь ?!
← →
Kolan © (2006-02-07 14:44) [26]Да этопросто массив фиксированный длинны. И два индекса по которому читаю и по которуму добовляю элементы.
FWriteIndex <> FReadIndex
Вот они. FReadIndex как бы догоняет FWriteIndex если они сравнялись, то жду пока еще добавят байты...
Как только пакет найден я его передаю, а индексы обнуляю(начинаю заполнять массив занаво).
Тут все работает проверял....
← →
Digitman © (2006-02-07 14:56) [27]
> Вот они. FReadIndex как бы догоняет FWriteIndex если они
> сравнялись, то жду пока еще добавят байты
Ты, вероятно, хотел сказать "кольцевой буфер" ..
Кстати, а какой поток формирует значения FReadIndex и FWriteIndex ?
← →
Kolan © (2006-02-07 15:00) [28]формирует
Не понял, что это занчит
Но изменяют их главный и доп поток...
Но повторюсь что все проверял на больших покетах м все работало... 100%
"кольцевой буфер"
циклический массив
Да уж оба слова перепутал...
:)
← →
Fay © (2006-02-07 15:03) [29]Pocket M - это кто?
← →
Kolan © (2006-02-07 15:06) [30]Извените, отвечу позже :)
← →
evvcom © (2006-02-07 15:07) [31]
> Но изменяют их главный и доп поток...
Что-то в твоем коде синхронизации доступа к ним не видно... :( Очень подозрительна фраза "все работает"
← →
evvcom © (2006-02-07 15:09) [32]
> FReadIndex как бы догоняет FWriteIndex если они сравнялись,
> то жду пока еще добавят байты
А если Write на круг обогнал Read?
← →
Digitman © (2006-02-07 15:33) [33]
> Kolan © (07.02.06 15:00) [28]
Да уж .. где синхронизация-то ? доступа к этим данным ?
← →
Kolan © (2006-02-07 16:13) [34]
Да уж .. где синхронизация-то ? доступа к этим данным ?
Нету
Я думая она ненужна... Ведь записываю и читаю всегда по разным индексам...
Может я и неправ.
Вообщето она была, но я убрал ее...А если Write на круг обогнал Read?
Нет такое невозможно... Если по идее до конца массива они не дойдут( если дойдут массив увеличу...)
← →
Digitman © (2006-02-07 16:16) [35]
> Может я и неправ
с учетом
> изменяют их главный и доп поток
оч даже неправ !
← →
evvcom © (2006-02-07 16:28) [36]
> Ведь записываю и читаю всегда по разным индексам
Ты сами переменные читаешь и пишешь в разных потоках, уже это надо синхронизировать.
← →
Kolan © (2006-02-07 16:32) [37]Доп поток изменяет только индекс чтения. Гл поток изменяет индекс записи.
Причем если состояние Idle(те доп поток не работает) обнуляет индексы.
Когда поток нашел покет, он вызовет процедуру передач и пакета и обнулит индексы. Не пойму где они пересекутся.
Или если один поток пишет по индексу A а второй по индексу B одного и тогоже массива(причем точно индексы разные) - будет ошибка?
Скажите - это может повлиять на правильность работы с Event. Те из - за чего межет засыпать...
← →
Digitman © (2006-02-07 16:36) [38]
> Когда поток нашел покет, он вызовет процедуру передач и
> пакета и обнулит индексы. Не пойму где они пересекутся
> Доп поток изменяет только индекс чтения
"Как тебя понимать, Саид ?" (с)
← →
Kolan © (2006-02-07 16:40) [39]Kolan © (07.02.06 14:19) [20]
Вопрос такой: Если я уже сделал SetEvent и сделаю ео повторно - что будет?
Я это не просто так спрашивал. Я сначала все позашишал кр. секциями...
Возникала сит когда я дважды делалRelease
и потомAcquire
уже не вызывался. Как проверить тек сост кр. секции я ненашел...
Поэтому переделал без кс.
А что будет ошибка если один поток сделаетFWriteIndex := J;
А второй в этоже время проверитif (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
?
← →
Digitman © (2006-02-07 16:42) [40]
> А что будет ошибка если один поток сделает
> FWriteIndex := J;
> А второй в этоже время проверит
> if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
> ?
Бардак будет.
← →
Kolan © (2006-02-07 16:46) [41]Индыксы обнулятся только если состояние
sIdle
и доп поток гарантированно не использует эти переменные
Вообщем вот полный код:procedure TPackageExtractThread.AddArray(Arr: array of Byte);
var
I, J: Longint;
begin
if FState = sIdle then
begin
FWriteIndex := 0;
FReadIndex := 0;
end;
J := FWriteIndex;
for I := Low(Arr) to High(Arr) do
begin
FBuffer[J] := Arr[I];
J := J + 1;
end;
FWriteIndex := J;
if FState = sIdle then
begin
FState := sWaitForStart;
end;
end;procedure TPackageExtractThread.Execute;
begin
FreeOnTerminate := True;
while not Terminated do
begin
if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
begin
case FState of
sWaitForStart:
begin
if (FBuffer[FReadIndex] = StartByte) then
begin
FState := sWaitForEnd;
FStartIndex := FReadIndex;
end;
FReadIndex := FReadIndex + 1;
end;
sWaitForEnd:
begin
if (FBuffer[FReadIndex] = EndByte) then
begin
FState := sEndFound;
end;
FReadIndex := FReadIndex + 1;
end;
sEndFound:
begin
if (FBuffer[FReadIndex] <> EndByte) then
begin
FState := sPackegeReady;
FReadIndex := FReadIndex - 1;
end
else
begin
FState := sWaitForEnd;
end;
FReadIndex := FReadIndex + 1;
end;
sPackegeReady:
begin
FEndIndex := FReadIndex;
FState := sIdle;
Synchronize(MakeAndSendPackage);
end;
end;
end
else
begin
Sleep(1);
end;
end;
end;
procedure TPackageExtractThread.MakeAndSendPackage;
var
I, J: Longint;
Answer: array of Byte;
begin
SetLength(Answer, (FEndIndex - FStartIndex) + 1);
J := Low(Answer);
for I := FStartIndex to FEndIndex do
begin
Answer[J] := FBuffer[I];
J := J + 1;
end;
FState := sIdle;
FReadIndex := 0;
FWriteIndex := 0;
FPackageReadyEvent(Self, Answer);
end;
← →
Digitman © (2006-02-07 17:15) [42]
> Kolan © (07.02.06 16:46) [41]
Эта логика ведет к непредсказуемым (печальным) последствиям...
Доступ к массиву и тем самым индексам в нем должен быть защищен крит.секцией.
← →
Kolan © (2006-02-07 17:44) [43]Я непонял
if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
Это же тоже доступ?
Те весь этот if надо зашитить?
← →
Defunct © (2006-02-07 19:30) [44]> Kolan
Почитайте последний пост, в вашей предыдущей ветке.
← →
Kolan © (2006-02-07 20:58) [45]Спасибо за ответ. Завтра разберусь....
:)
← →
evvcom © (2006-02-08 08:39) [46]Это же не весь код. AddArray только описан, но где вызывается не ясно. В коде доступ к FWriteIndex, FReadIndex осуществляется из всех этих 3-х методов. Execute и MakeAndSendPackage синхронизированы, а AddArray не ясно.
FWriteIndex <> FReadIndex - это тоже доступ.
> Те весь этот if надо зашитить?
Все зависит от того, синхронизированы ли уже доступы из разных потоков.
← →
Digitman © (2006-02-08 09:20) [47]
> Kolan © (07.02.06 20:58) [45]
Насколько я понял, тебе требуется реализовать следующее :
Есть некая очередь данных, требующих обработки.
Один код.поток (1) помещает данные по мере их формирования в хвост очереди, другой код.поток (2) извлекает данные из головы очереди и обрабатывает их (либо "спит" если очередь пуста).
Если так, то использование массива в кач-ве очереди вряд ли оправдано.
Например, есть готовый класс TQueue, реализующий логику очереди. Можно защитить доступ к методам/св-вам его экз-ра крит.секцией, при добавлении в хвост очереди устанавливать событие (SetEvent), при опустошении очереди сбрасывать событие (ResetEvent).
Поток 1, добавляя очередной элемент (или несколько очередных элементов) в хвост, "сигналит" об этом установкой события. Поток 2 "просыпается" по этому сигналу и начинает циклически выбирать элементы из головы и обрабатывать их, до тех пор пока не обнаружит опустошение очереди, после чего вновь "засыпает", сбросив непосредственно перед этим событие в несигналящее состояние.
← →
Defunct © (2006-02-08 09:43) [48]
> Digitman © (08.02.06 09:20) [47]
> Если так, то использование массива в кач-ве очереди вряд ли оправдано.
нет, у него там немного иная задача.
Один поток обслуживает канальный уровень по приему пакетов от некоего устройства через COM порт. Второй поток должен обрабатывать уже готовые пакеты. Очередь (TQueue) можно использовать в качестве согласующего звена между первым и вторым потоком, но вот отказываться от кольцевого буфера в принимающем потоке - не стоит.
← →
Digitman © (2006-02-08 09:49) [49]
> отказываться от кольцевого буфера в принимающем потоке -
> не стоит
Тогда потери пакетов не исключены.
← →
Digitman © (2006-02-08 09:57) [50]Да и не кольцевой это буфер при ближайшем рассмотрении..
Для принимающего трэда это обычный аккумулирующий поток.
А раз поток, то гораздо логичней было бы вместо массива использовать TStream-наследника, например, TMemoryStream
← →
Defunct © (2006-02-08 10:00) [51]Digitman © (08.02.06 09:49) [49]
не вижу причин для этого.
Учитывая что протокол предполагает байт-стаффинг, а кольцевой буфер в два или более раз превышает максимально допустимый объем пакета, то можно гарантировать работу без потерь если придерживаться следующих условий:
1. выделение пакета осуществлять "на лету" (т.е. прием вести по одному байту и каждый принятый байт анализировать на предмет старта/стопа пакета)
2. Распознанный пакет сразу же сбрасывать в очередь и оповещать получателя сл. уровеня.
← →
Defunct © (2006-02-08 10:07) [52]> Для принимающего трэда это обычный аккумулирующий поток.
> А раз поток, то гораздо логичней было бы вместо массива использовать TStream-наследника, например, TMemoryStream
а смысл? добавить еще одну заботу очистки/контроля размера потока?
← →
Digitman © (2006-02-08 10:21) [53]
> Defunct
> а смысл? добавить еще одну заботу очистки/контроля размера
> потока?
Забота треда транспортного уровня - писать в стрим все что принято (с уведомлением обрабатывающего треда о факте очередной записи в стрим).
Забота обрабатывающего треда - читать из стрима, распознавать пакеты, извлекать их из стрима (с соотв. коррекцией его размера и позиции записи) и обрабатывать.
Все !
И никаких кольц.буферов.
По сути автор как раз и пытается приспособить массив для работы в кач-ве стрима. Но массив этот имеет фикс.размер, и если обрабатывающий тред не будет поспевать за транспортным (что не так уж и маловероятно), то рано или поздно возникнет ситуация либо выхода индекса записи за пределы верх.границы массива либо потери инф-ции в случае использования массива транспортным тредом в кач-ве кольц.буфера.
← →
Defunct © (2006-02-08 10:38) [54]Digitman © (08.02.06 10:21) [53]
По сути автор как раз и пытается приспособить массив для работы в кач-ве стрима. Но массив этот имеет фикс.размер, и если обрабатывающий тред не будет поспевать за транспортным (что не так уж и маловероятно), то рано или поздно возникнет ситуация либо выхода индекса записи за пределы верх.границы массива либо потери инф-ции в случае использования массива транспортным тредом в кач-ве кольц.буфера.
Да Вы правы в том, что так как эту задачу пытается решить автор (обращаться к одному и тому же массиву на канальном и транспортном уроване) возможны проблемы.
А так как я предлагаю решить эту задачу, проблемы ислючены.
Описываю алгоритм работы потока обслуживающего канальный уровнь:
1. Ждать активности COM порта.
2. Считать байт, если не управляющий символ - то записать в буфер и сместить индекс.
3. Если символ "старта" - запонить позицию старта пакета.
4. Если символ "стопа" - (для начала можно расчитать CRC) начиная с запомненной позиции старта скопировать данные по текущую позицию и положить в очередь принятых пакетов, оповестить транспортный уровень о том, что был получен очередной пакет очередной пакет.
5. Goto 1.
При этом в качестве буфера на канальном уровне используется как положено - кольцевой буфер, на транспортном - очередь, все гладко. И events можно прикрутить, с помощью events канальный уровень должен взаимодействовать с транспортом.
← →
Digitman © (2006-02-08 10:47) [55]
> Defunct © (08.02.06 10:38) [54]
Что ж ...
Вполне нормальное решение.
← →
Kolan © (2006-02-08 11:39) [56]evvcom © (08.02.06 08:39) [46]
Кроме выше описанных потоков есть поток, который проверяет очередь порта, если они есть вызывает событие:procedure TForm1.ReadFromComm(Sender: TObject; ReadBytes: array of Byte);
begin
FPackageExtractor.AddBayteArray(ReadBytes);
StatusBar1.Panels[2].Text := "Ïîëó÷åííî áàéò: "
+ IntToStr(Length(ReadBytes)) + ".";
end;
1. Ждать активности COM порта.
2. Считать байт, если не управляющий символ - то записать в буфер и сместить индекс.
3. Если символ "старта" - запонить позицию старта пакета.
4. Если символ "стопа" - (для начала можно расчитать CRC) начиная с запомненной позиции старта скопировать данные по текущую позицию и положить в очередь принятых пакетов, оповестить транспортный уровень о том, что был получен очередной пакет очередной пакет.
5. Goto 1.
2. Чтение по 1 байту критично ка я понял?
Итак у меня 3 потока
1. Тот что читает из порта. Он вызовет событие при получении 1 байта.
Тут ничего не меняю
2. По возникновении этого события, основной поток этот байт кладет в кольцевой буффер.
И устанавливаетEvent
.
3. В свою очередь поток, выделяющий пакеты, просыпается поEvent
и выполняет проверку(она у меня есть, тут тоже не меняю ничего?)
4. Прочитав байт он сравнивает индексы(они равны) и сбрасываетEvent
.
5. Обнаружив пакет он копирует его из кольц. буффера, обнуляет индексы, сбрасываетEvent
и через событие вызываемое через гл поток(как в п.2) кладет этот массив в очередь потока №3. Устанавливаепт егоEvent2
.
6. Тот поток прсыпается и пока очередь непуста парсит пакеты вызывая соотв событие, когда пакет распарсен...
Господи помоги мне понять где тут что надо зашитить и как... :)
Благодарю за помощь. :)
Правильно ли я понял идею?
Больше всего меня беспокоет то, что потоки как бы общаются через главный,
вызавая соотв. события сSynchronize
. Может на прямую можно. Как?
← →
Digitman © (2006-02-08 12:09) [57]
> Итак у меня 3 потока
Один из 3-х, imho, явно лишний.. "дармоед" он)
Я все же настаиваю на реализации со стримом, а не с кольц.буфером.
Поток 1 (транспортный) работает с портом :
- по событию приема принимает из порта данные (сколько бы их ни было принято - хоть один хоть косой десяток), убирает если нужно упр.символы, входит в крит.секцию, записывает принятое в стрим (св-во Position увеличивается на размер записанных данных), устанавливает event, выходит из крит.секции
Поток 2 (обрабатывающий) в цикле с проверкой на Terninated:
1 Спит, ожидая сигнал event"а
2. по сигналу event"а просыпается, входит в крит.секцию,
3. анализирует имеющиеся на сей момент в стриме данные на предмет наличия хотя бы одного целостного пакета, если такового нет, то сбрасывает event и идет на 7.
4. извлекает из стрима очередной обнаруженный пакет, корректирует (уменьшает) размер стрима и св-во Position на величину извлеченного пакета (оставшиеся в стриме данные сдвигаются в начало стрима)
5. обрабатывает пакет нужным образом и идет на 3.
7. Выходит из крит секции, идет на 1.
← →
Digitman © (2006-02-08 12:16) [58]
> Больше всего меня беспокоет то, что потоки как бы общаются
> через главный,
> вызавая соотв. события с Synchronize
Если беспокоит, то есть повод для предметного и детального анализа такой необходимости.
← →
Leonid Troyanovsky © (2006-02-08 12:17) [59]
> Digitman © (08.02.06 12:09) [57]
> 4. извлекает из стрима очередной обнаруженный пакет, корректирует
> (уменьшает) размер стрима и св-во Position на величину извлеченного
> пакета (оставшиеся в стриме данные сдвигаются в начало стрима)
IMHO, "уплотнять" данные лучше тогда, когда очередь пуста.
Ну, и делать это так, чтобы пришедший запрос мог сразу
прервать оное занятие.
--
Regards, LVT.
← →
Digitman © (2006-02-08 12:25) [60]
> Leonid Troyanovsky © (08.02.06 12:17) [59]
> "уплотнять" данные лучше тогда, когда очередь пуста
Можно и так.
← →
Kolan © (2006-02-08 13:22) [61]
> - по событию приема принимает из порта данные (сколько бы
> их ни было принято - хоть один хоть косой десяток)
Вот так сейчас и есть.
Попробую сделать ваш вариант.
Стирим это -TMemoryStream
?
Просто с этим не работал. Надо изучить...
С крит секциями у меня проблеммы т.к. я неточно понимая логику их работы.
Что будет если обрабатывающий поток занял её. А транспортный получил байты и пытается тоже занять её. Байты не пропадут?
А если эта ситуация неск раз подрад будет. Те приходит допустим 3 массива из порта. Все патаются заити в кс, а она занята.
Когда она освободится что будет?..
Буду пытатся сделать :)
Еще раз благодарю за подсказки. Оч. интересно и позновательно :)
← →
Eraser © (2006-02-08 13:37) [62]
> Kolan ©
скажу и своё имхо.
Думаю поток данных с COM порта не на чтолько велик, чтобы надо было создавать какие-то дуфера для его обработки.
А задачи синхронного чтения/обработки данных решается "взаимопересекающимеся" критическими секциями (или же соотв. объектами ядра). Примерно так.
Схаматично.
var
cs1, cs2, cs3: TКритическаяСекция;
buf: Буффер;
...
инициализируем крит. секции.
1 поток - считывание данных
EnterCriticalSection(cs1);
EnterCriticalSection(cs3);
считываем данные в буффер;
EnterCriticalSection(cs3);
LeaveCriticalSection(cs2);
2 поток - обработака данных
EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
EnterCriticalSection(cs3);
LeaveCriticalSection(cs1);
впринципе к такой схеме можно и двойную-тройную и т.д. буфферизацию не сложно приделать.
← →
Eraser © (2006-02-08 13:38) [63]пардон.
var
cs1, cs2, cs3: TКритическаяСекция;
buf: Буффер;
...
инициализируем крит. секции.
1 поток - считывание данных
EnterCriticalSection(cs1);
EnterCriticalSection(cs3);
считываем данные в буффер;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs2);
2 поток - обработака данных
EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs1);
← →
Eraser © (2006-02-08 13:56) [64]ещё одно уточнение!
var
cs1, cs2, cs3: TКритическаяСекция;
buf: Буффер;
...
инициализируем крит. секции.
1 поток - считывание данных
чтобы код работал эффективно считываем данные из портма
здесь! т.к. перед крит. секциями!
EnterCriticalSection(cs1);
EnterCriticalSection(cs3);
заполняем буффер, предварительно считанными данными;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs2);
2 поток - обработака данных
EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs1);
← →
Digitman © (2006-02-08 14:47) [65]
> Стрим это -TMemoryStream?
Да.
> Что будет если обрабатывающий поток занял её. А транспортный
> получил байты и пытается тоже занять её. Байты не пропадут?
>
Нет, не пропадут.
Когда обрабатывающий тред закончит обработку очередных данных в стриме и выйдет из крит.секции, в нее тут же войдет транспортный тред (все это время покорно ждавший освобождения занятой трансп.потоком крит.секции) и наполнит стрим новыми данными ..
← →
evvcom © (2006-02-08 15:40) [66]
> А задачи синхронного чтения/обработки данных решается "взаимопересекающимеся"
> критическими секциями
> EnterCriticalSection(cs2);
> LeaveCriticalSection(cs1);
Интересное решение. И что? Думаешь будет работать?
← →
Digitman © (2006-02-08 15:49) [67]
> Eraser © (08.02.06 13:56) [64]
Ну и куда ты их, крит.секций, наплодил столько ?
Ты хоть сам в состоянии кратко и внятно сказать, какая из трех крит.секций у тебя какой ресурс защищает ?
← →
Eraser © (2006-02-08 17:25) [68]
> evvcom © (08.02.06 15:40) [66]
насчёт крит. секций - не пробовал, в вот точно такое решение, но на мьютексах работает.
> Digitman © (08.02.06 15:49) [67]
> Ты хоть сам в состоянии кратко и внятно сказать, какая из
> трех крит.секций у тебя какой ресурс защищает ?
1 поток - считывание данных
EnterCriticalSection(cs1); // пробуем "войти" в первую крит. секцию, если не удаётся - значит ещё не обработан предыдущий буфер (пакет данных) - ждём
EnterCriticalSection(cs3); // т.к. буфер у нас один - защищаем его на момент записи в него данных
заполняем буффер, предварительно считанными данными;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs2); // второй поток может приступать к считыванию буфера
2 поток - обработака данных
EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs1); // буфер обработан (или сохранён для послудующей обработки за критической секцией)! разреаем первому потоку писать слеудющую порцию данных
На базе семафоров можно смастерить таким образом n-ю буфферизацию.
← →
Digitman © (2006-02-08 17:47) [69]
> Eraser © (08.02.06 17:25) [68]
Общий ресурс для транспортного и обрабатывающего тредов в случае автора - один единственный буфер (или поток). Зачем ему куча КС, когда речь пока идет об одном-единственном ресурсе ?
← →
Eraser © (2006-02-08 18:02) [70]
> Digitman © (08.02.06 17:47) [69]
ага.. понятно.
Однако тут почти всё зависит от того какая скорость "распарсивания" и какая скорость "считывания"... надо чтобы автор прояснил этот момент.
Если скорость распарсивания намного выше чем скорость чтения (а я подозреваю, что оно так и есть), то тут вопрос - стоит ли вообще плодить кучу потоков? не проще чтение/распарсивание сделать в одном - последовательно. Т.к. большую часть времени распарсевающий поток будут ждать или проверять "не готов ли буффер/стрим?"
← →
Defunct © (2006-02-08 23:02) [71]
> Kolan © (08.02.06 11:39) [56]
> 1. Ждать активности COM порта.
> 2. Считать байт, если не управляющий символ - то записать
> в буфер и сместить индекс.
> 3. Если символ "старта" - запонить позицию старта пакета.
>
> 4. Если символ "стопа" - (для начала можно расчитать CRC)
> начиная с запомненной позиции старта скопировать данные
> по текущую позицию и положить в очередь принятых пакетов,
> оповестить транспортный уровень о том, что был получен
> очередной пакет очередной пакет.
> 5. Goto 1.
>
> 2. Чтение по 1 байту критично ка я понял?
>
Да - критично.
> Итак у меня 3 потока
>
> 1. Тот что читает из порта. Он вызовет событие при получении
> 1 байта.
> Тут ничего не меняю
Не надо никакого события вызывать при приеме байта. Событие надо вызывать только тогда, когда пакет целиком выделен.
> 2. По возникновении этого события, основной поток этот байт
> кладет в кольцевой буффер.
> И устанавливает Event.
Как следствие - основной поток даже не знает о наличии кольцевого буфера.
> 3. В свою очередь поток, выделяющий пакеты, просыпается
> по Event и выполняет проверку(она у меня есть, тут тоже
> не меняю ничего?)
Поток который принимает - он же и пакеты выделяет. Т.е. не надо разделять задачу приема байт с выделением пакета на две задачи.
> 4. Прочитав байт он сравнивает индексы(они равны) и сбрасывает
> Event.
Как следствие вышесказанного никто кроме принимающего потока даже и не знает о наличии кольцевого буфера и об индексах.
> Больше всего меня беспокоет то, что потоки как бы общаются
> через главный,
> вызавая соотв. события с Synchronize. Может на прямую можно.
Зачем там Synchronize, зачем 3 потока?
зззз. мне проще написать пример программы чем объяснить вам как должно работать.
Еще раз внимательно прочитайте [54].
> Digitman
Почему Вы предлагаете совместить канальный уровень с транспортным?
реализация со Stream/Queue хороша для транспорта, там где данные уже лежат в виде пакетов. Для канального же уровня (где ведется сборка пакетов) Stream добавит лишь дополнительную проблему - по его очистке.
← →
Defunct © (2006-02-08 23:05) [72]упс, забыл поставить закрывающий тэг..
← →
Германн © (2006-02-09 02:35) [73]Очередное моё ФИ насчёт применения доп-потоков для чтения информации с ком-порта. Да, забыл добавить - для "циклического" чтения информации с кои-порта. Ззз, как говаривала ...
← →
Kolan © (2006-02-09 10:19) [74]Германн © (09.02.06 02:35) [73]
А шо если данные не пришли то гл поток должен повиснуть?
← →
Digitman © (2006-02-09 10:23) [75]
> если данные не пришли то гл поток должен повиснуть?
"Вис" любого потока "лечится" использованием асинхронного режима ввода/вывода.
← →
Kolan © (2006-02-09 10:46) [76]3. анализирует имеющиеся на сей момент в стриме данные на предмет наличия хотя бы одного целостного пакета, если такового нет, то сбрасывает event и идет на 7.
Непонял как анализировать. И что делать если его нет. Получается что анализировать стрим надо каждый раз зананово.
Или надо запоминать тек позицию?....
← →
Digitman © (2006-02-09 10:50) [77]
> Непонял как анализировать
Тебе видней. Протокол мне неизвестен.
> что делать если его нет
Ничего. Просто спать ..
> Или надо запоминать тек позицию?
Разве в этом проблема ?
← →
Kolan © (2006-02-09 10:58) [78]Вот я зашел в КС и хочу анализировать. Я просматриваю состояние, пусть они сейчас
sWaitForStart
Раньше я проверял тек эл кольцевого буфера на равенствоStartByte
Вот пример:case FState of
sWaitForStart:
begin
if (FBuffer[FReadIndex] = StartByte) then
Непойму ка это будет выглядеть в случае стрима?
← →
Digitman © (2006-02-09 11:07) [79]Смотря что пишешь в стрим.
Да и о конкретном протоколе в этой ветке ты ни словом не обмолвился.
← →
Kolan © (2006-02-09 11:15) [80]Поток 1 (транспортный) работает с портом :
- по событию приема принимает из порта данные (сколько бы их ни было принято - хоть один хоть косой десяток), убирает если нужно упр.символы, входит в крит.секцию, записывает принятое в стрим (св-во Position увеличивается на размер записанных данных), устанавливает event, выходит из крит.секции
Поток 2 (обрабатывающий) в цикле с проверкой на Terninated:
Смотря что пишешь в стрим.
Данные из порта (массив байт)
Да и о конкретном протоколе в этой ветке
Протокол такой
Начало - это StartByte и следующий любой не старт.
Конец - это EndByte и следующий любой не EndByte.
В
Kolan © (07.02.06 16:46) [41]procedure TPackageExtractThread.Execute;
Тут и происходит разбор...
← →
Defunct © (2006-02-09 11:57) [81]Я уже не могу смотеть на то, как вы мучаетесь.. Написал прототип класса работающего в вашем протоколе..
unit uChannel;
interface
{$define DemoMode}
uses SysUtils, SyncObjs
{$ifdef DemoMode}, Dialogs {$endif}
;
const protoPACKETSTART = $AC; // Символ старта пакета
protoPACKETSTOP = $A5; // Символ стопа
protoMAXPACKETLENGTH = 2048; // Максимальная длина пакета
protoCW : set of byte = [protoPACKETSTART, protoPACKETSTOP];
rsSIZE = protoMAXPACKETLENGTH * 2;
type
TDataChannel = class( TObject )
private
// секция обслуживания кольцевого буфера
FRing : array[0..rsSIZE -1] of byte;
FRingIndex : Integer;
FStartIndex : Integer;
FLastValue : Byte;
FMonitorPacket : boolean;
FAccumulatedPacketLength : Integer;
procedure AddToRing( Value : byte );
function StartCondition(Value : byte):boolean;
function StopCondition(Value : byte):boolean;
function GetPacketByteIndex( Offset : integer ):integer;
protected
// секция обслуживания событий
FEvent : TEvent;
procedure NotifyNextLayer;virtual;
protected
// секция обслуживания пакетов
FPacket : array[0..protoMAXPACKETLENGTH] of byte;
FPacketSize : integer;
function CheckCRC:boolean;virtual;
public
procedure AddByte( Value : byte );
constructor Create( APacketReceptionEvent : TEvent);
destructor Destory;virtual;
end;
implementation
{ TDataChannel }
constructor TDataChannel.Create(APacketReceptionEvent: TEvent);
begin
FEvent := APacketReceptionEvent;
FRingIndex := 0;
FMonitorPacket := false;
end;
procedure TDataChannel.AddByte(Value: byte);
var
i : integer;
begin
if StartCondition( Value ) then
begin
FStartIndex := FRingIndex;
FMonitorPacket := True;
FAccumulatedPacketLength := 0;
end
else if StopCondition( Value ) then
begin
if FMonitorPacket then // пакет принят
begin
FMonitorPacket := false;
FPacketSize := FAccumulatedPacketLength - 1;
for i := 0 to FPacketSize do
FPacket[ i ] := FRing[ GetPacketByteIndex(i) ];
if CheckCRC then // Пакет распознан и CRC верная
begin // Оповещаем обработчик пакета
NotifyNextLayer;
{$ifdef DemoMode}
// В Демо режиме ВЫВОДИМ НА ЭКРАН ИНФОРМАЦИЮ О ПОЛУЧЕННОМ ПАКЕТЕ
ShowMessage( Format("Принят и распознан пакет, длина = %D",[FPacketSize]) );
{$endif}
end
end;
end;
AddToRing( Value );
end;
function TDataChannel.StartCondition(Value: byte): boolean;
begin
Result := (FLastValue = protoPACKETSTART) and (Value <> protoPACKETSTART)
end;
function TDataChannel.StopCondition(Value: byte): boolean;
begin
Result := (FLastValue = protoPACKETSTOP) and (Value <> protoPACKETSTOP)
end;
function TDataChannel.GetPacketByteIndex(Offset: integer): integer;
begin
Result := Offset + FStartIndex;
if Result >= rsSIZE then
Result := Result - rsSIZE;
end;
procedure TDataChannel.AddToRing(Value: byte);
begin
if (Value in protoCW) and (Value = FLastValue) then
begin
FLastValue := 0;
exit;
end;
FRing[ FRingIndex ] := Value;
FLastValue := Value;
inc( FRingIndex );
if FRingIndex = rsSIZE then
FRingIndex := 0;
if FMonitorPacket then
begin
Inc( FAccumulatedPacketLength );
if FAccumulatedPacketLength > protoMAXPACKETLENGTH then
begin
FAccumulatedPacketLength := 0;
FMonitorPacket := false;
end;
end;
end;
function TDataChannel.CheckCRC: boolean;
begin
{ abstract CRC function }
Result := True
end;
procedure TDataChannel.NotifyNextLayer;
begin
{ abstract procedure }
if Assigned( FEvent ) then
FEvent.SetEvent
end;
destructor TDataChannel.Destory;
begin
inherited;
end;
end.
Пример использования, процедура AddByte - для помещения очередного принятого байта с COM порта:procedure TForm1.Button1Click(Sender: TObject);
var
DC : TDataChannel;
i : integer;
begin
DC := TDataChannel.Create( nil );
for i := 0 to 10000 do
DC.AddByte( $A);
DC.AddByte( protoPACKETSTART );
DC.AddByte( $A);
DC.AddByte( protoPACKETSTOP );
DC.AddByte( protoPACKETSTOP );
DC.AddByte( $A);
DC.AddByte( protoPACKETSTOP );
DC.AddByte( $A);
DC.Free;
end;
← →
Kolan © (2006-02-09 12:11) [82]Благодарю. Я правда не мучался. Обязательно разберу ваш код.
Просото я стараюсь разобраться сам. Тем более пока время есть...
Я ж еще только учус (С)
:)
← →
Defunct © (2006-02-09 12:20) [83]Kolan © (09.02.06 12:11) [82]
У меня там есть ошибка (поспешил выложить код, не проперив надлежащим образом).
после вот этой строки:
FPacketSize := FAccumulatedPacketLength - 1;
необходимо сбросить
FAccumulatedPacketLength := 0;
← →
Kolan © (2006-02-09 12:54) [84]Defunct © (09.02.06 12:20) [83]
Очень признателен за помощь :). Даже неудобно что вы так помогаете :)...
← →
evvcom © (2006-02-09 14:24) [85]1 поток - считывание данных
EnterCriticalSection(cs1); // пробуем "войти" в первую крит. секцию, если не удаётся - значит ещё не обработан предыдущий буфер (пакет данных) - ждём
В твоем коде ничто не мешает войти в cs1.
LeaveCriticalSection(cs2); // второй поток может приступать к считыванию буфера
Какой смысл покидать cs2, если поток в нее и не входил? И также ничто не мешает 2-му потоку повторно войти в cs2. Далее по тексту аналогично.
← →
Defunct © (2006-02-09 15:39) [86]> Kolan
да не за что, мне не жалко,
напротив, спасибо вам за заинтересовавший меня вопрос. ;>
← →
Eraser © (2006-02-09 20:08) [87]
> evvcom © (09.02.06 14:24) [85]
> В твоем коде ничто не мешает войти в cs1.
Мешает! А именно мешает следующая интерация цикла в потоке. Т.е. цикл не будет выполнятся далее, если буфер не обработан. В том то и "фишка".
← →
evvcom © (2006-02-10 09:24) [88]
> мешает следующая интерация цикла в потоке. Т.е. цикл не будет выполнятся далее,
Бред. Напиши в тексте программы 10 раз друг за другом EnterCriticalSection(cs1); и посмотри, что получится.
← →
Eraser © (2006-02-10 15:26) [89]
> evvcom © (10.02.06 09:24) [88]
хм... действительно я допустил ошибку, предлжив использовать крит. секции...
After a thread has ownership of a critical section, it can make additional calls to EnterCriticalSection or TryEnterCriticalSection without blocking its execution. This prevents a thread from deadlocking itself while waiting for a critical section that it already owns.
В своём проекте я использовать сходную схему, НО только применял семафоры... WaitForSingleObject работает по-другому.
Страницы: 1 2 3 вся ветка
Текущий архив: 2006.03.19;
Скачать: CL | DM;
Память: 0.77 MB
Время: 0.032 c