Текущий архив: 2006.03.19;
Скачать: CL | DM;
Вниз
Как усыпить поток. Найти похожие ветки
← →
Digitman © (2006-02-07 16:42) [40]
> А что будет ошибка если один поток сделает
> FWriteIndex := J;
> А второй в этоже время проверит
> if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
> ?
Бардак будет.
← →
Kolan © (2006-02-07 16:46) [41]Индыксы обнулятся только если состояние
sIdle
и доп поток гарантированно не использует эти переменные
Вообщем вот полный код:procedure TPackageExtractThread.AddArray(Arr: array of Byte);
var
I, J: Longint;
begin
if FState = sIdle then
begin
FWriteIndex := 0;
FReadIndex := 0;
end;
J := FWriteIndex;
for I := Low(Arr) to High(Arr) do
begin
FBuffer[J] := Arr[I];
J := J + 1;
end;
FWriteIndex := J;
if FState = sIdle then
begin
FState := sWaitForStart;
end;
end;procedure TPackageExtractThread.Execute;
begin
FreeOnTerminate := True;
while not Terminated do
begin
if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
begin
case FState of
sWaitForStart:
begin
if (FBuffer[FReadIndex] = StartByte) then
begin
FState := sWaitForEnd;
FStartIndex := FReadIndex;
end;
FReadIndex := FReadIndex + 1;
end;
sWaitForEnd:
begin
if (FBuffer[FReadIndex] = EndByte) then
begin
FState := sEndFound;
end;
FReadIndex := FReadIndex + 1;
end;
sEndFound:
begin
if (FBuffer[FReadIndex] <> EndByte) then
begin
FState := sPackegeReady;
FReadIndex := FReadIndex - 1;
end
else
begin
FState := sWaitForEnd;
end;
FReadIndex := FReadIndex + 1;
end;
sPackegeReady:
begin
FEndIndex := FReadIndex;
FState := sIdle;
Synchronize(MakeAndSendPackage);
end;
end;
end
else
begin
Sleep(1);
end;
end;
end;
procedure TPackageExtractThread.MakeAndSendPackage;
var
I, J: Longint;
Answer: array of Byte;
begin
SetLength(Answer, (FEndIndex - FStartIndex) + 1);
J := Low(Answer);
for I := FStartIndex to FEndIndex do
begin
Answer[J] := FBuffer[I];
J := J + 1;
end;
FState := sIdle;
FReadIndex := 0;
FWriteIndex := 0;
FPackageReadyEvent(Self, Answer);
end;
← →
Digitman © (2006-02-07 17:15) [42]
> Kolan © (07.02.06 16:46) [41]
Эта логика ведет к непредсказуемым (печальным) последствиям...
Доступ к массиву и тем самым индексам в нем должен быть защищен крит.секцией.
← →
Kolan © (2006-02-07 17:44) [43]Я непонял
if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
Это же тоже доступ?
Те весь этот if надо зашитить?
← →
Defunct © (2006-02-07 19:30) [44]> Kolan
Почитайте последний пост, в вашей предыдущей ветке.
← →
Kolan © (2006-02-07 20:58) [45]Спасибо за ответ. Завтра разберусь....
:)
← →
evvcom © (2006-02-08 08:39) [46]Это же не весь код. AddArray только описан, но где вызывается не ясно. В коде доступ к FWriteIndex, FReadIndex осуществляется из всех этих 3-х методов. Execute и MakeAndSendPackage синхронизированы, а AddArray не ясно.
FWriteIndex <> FReadIndex - это тоже доступ.
> Те весь этот if надо зашитить?
Все зависит от того, синхронизированы ли уже доступы из разных потоков.
← →
Digitman © (2006-02-08 09:20) [47]
> Kolan © (07.02.06 20:58) [45]
Насколько я понял, тебе требуется реализовать следующее :
Есть некая очередь данных, требующих обработки.
Один код.поток (1) помещает данные по мере их формирования в хвост очереди, другой код.поток (2) извлекает данные из головы очереди и обрабатывает их (либо "спит" если очередь пуста).
Если так, то использование массива в кач-ве очереди вряд ли оправдано.
Например, есть готовый класс TQueue, реализующий логику очереди. Можно защитить доступ к методам/св-вам его экз-ра крит.секцией, при добавлении в хвост очереди устанавливать событие (SetEvent), при опустошении очереди сбрасывать событие (ResetEvent).
Поток 1, добавляя очередной элемент (или несколько очередных элементов) в хвост, "сигналит" об этом установкой события. Поток 2 "просыпается" по этому сигналу и начинает циклически выбирать элементы из головы и обрабатывать их, до тех пор пока не обнаружит опустошение очереди, после чего вновь "засыпает", сбросив непосредственно перед этим событие в несигналящее состояние.
← →
Defunct © (2006-02-08 09:43) [48]
> Digitman © (08.02.06 09:20) [47]
> Если так, то использование массива в кач-ве очереди вряд ли оправдано.
нет, у него там немного иная задача.
Один поток обслуживает канальный уровень по приему пакетов от некоего устройства через COM порт. Второй поток должен обрабатывать уже готовые пакеты. Очередь (TQueue) можно использовать в качестве согласующего звена между первым и вторым потоком, но вот отказываться от кольцевого буфера в принимающем потоке - не стоит.
← →
Digitman © (2006-02-08 09:49) [49]
> отказываться от кольцевого буфера в принимающем потоке -
> не стоит
Тогда потери пакетов не исключены.
← →
Digitman © (2006-02-08 09:57) [50]Да и не кольцевой это буфер при ближайшем рассмотрении..
Для принимающего трэда это обычный аккумулирующий поток.
А раз поток, то гораздо логичней было бы вместо массива использовать TStream-наследника, например, TMemoryStream
← →
Defunct © (2006-02-08 10:00) [51]Digitman © (08.02.06 09:49) [49]
не вижу причин для этого.
Учитывая что протокол предполагает байт-стаффинг, а кольцевой буфер в два или более раз превышает максимально допустимый объем пакета, то можно гарантировать работу без потерь если придерживаться следующих условий:
1. выделение пакета осуществлять "на лету" (т.е. прием вести по одному байту и каждый принятый байт анализировать на предмет старта/стопа пакета)
2. Распознанный пакет сразу же сбрасывать в очередь и оповещать получателя сл. уровеня.
← →
Defunct © (2006-02-08 10:07) [52]> Для принимающего трэда это обычный аккумулирующий поток.
> А раз поток, то гораздо логичней было бы вместо массива использовать TStream-наследника, например, TMemoryStream
а смысл? добавить еще одну заботу очистки/контроля размера потока?
← →
Digitman © (2006-02-08 10:21) [53]
> Defunct
> а смысл? добавить еще одну заботу очистки/контроля размера
> потока?
Забота треда транспортного уровня - писать в стрим все что принято (с уведомлением обрабатывающего треда о факте очередной записи в стрим).
Забота обрабатывающего треда - читать из стрима, распознавать пакеты, извлекать их из стрима (с соотв. коррекцией его размера и позиции записи) и обрабатывать.
Все !
И никаких кольц.буферов.
По сути автор как раз и пытается приспособить массив для работы в кач-ве стрима. Но массив этот имеет фикс.размер, и если обрабатывающий тред не будет поспевать за транспортным (что не так уж и маловероятно), то рано или поздно возникнет ситуация либо выхода индекса записи за пределы верх.границы массива либо потери инф-ции в случае использования массива транспортным тредом в кач-ве кольц.буфера.
← →
Defunct © (2006-02-08 10:38) [54]Digitman © (08.02.06 10:21) [53]
По сути автор как раз и пытается приспособить массив для работы в кач-ве стрима. Но массив этот имеет фикс.размер, и если обрабатывающий тред не будет поспевать за транспортным (что не так уж и маловероятно), то рано или поздно возникнет ситуация либо выхода индекса записи за пределы верх.границы массива либо потери инф-ции в случае использования массива транспортным тредом в кач-ве кольц.буфера.
Да Вы правы в том, что так как эту задачу пытается решить автор (обращаться к одному и тому же массиву на канальном и транспортном уроване) возможны проблемы.
А так как я предлагаю решить эту задачу, проблемы ислючены.
Описываю алгоритм работы потока обслуживающего канальный уровнь:
1. Ждать активности COM порта.
2. Считать байт, если не управляющий символ - то записать в буфер и сместить индекс.
3. Если символ "старта" - запонить позицию старта пакета.
4. Если символ "стопа" - (для начала можно расчитать CRC) начиная с запомненной позиции старта скопировать данные по текущую позицию и положить в очередь принятых пакетов, оповестить транспортный уровень о том, что был получен очередной пакет очередной пакет.
5. Goto 1.
При этом в качестве буфера на канальном уровне используется как положено - кольцевой буфер, на транспортном - очередь, все гладко. И events можно прикрутить, с помощью events канальный уровень должен взаимодействовать с транспортом.
← →
Digitman © (2006-02-08 10:47) [55]
> Defunct © (08.02.06 10:38) [54]
Что ж ...
Вполне нормальное решение.
← →
Kolan © (2006-02-08 11:39) [56]evvcom © (08.02.06 08:39) [46]
Кроме выше описанных потоков есть поток, который проверяет очередь порта, если они есть вызывает событие:procedure TForm1.ReadFromComm(Sender: TObject; ReadBytes: array of Byte);
begin
FPackageExtractor.AddBayteArray(ReadBytes);
StatusBar1.Panels[2].Text := "Ïîëó÷åííî áàéò: "
+ IntToStr(Length(ReadBytes)) + ".";
end;
1. Ждать активности COM порта.
2. Считать байт, если не управляющий символ - то записать в буфер и сместить индекс.
3. Если символ "старта" - запонить позицию старта пакета.
4. Если символ "стопа" - (для начала можно расчитать CRC) начиная с запомненной позиции старта скопировать данные по текущую позицию и положить в очередь принятых пакетов, оповестить транспортный уровень о том, что был получен очередной пакет очередной пакет.
5. Goto 1.
2. Чтение по 1 байту критично ка я понял?
Итак у меня 3 потока
1. Тот что читает из порта. Он вызовет событие при получении 1 байта.
Тут ничего не меняю
2. По возникновении этого события, основной поток этот байт кладет в кольцевой буффер.
И устанавливаетEvent
.
3. В свою очередь поток, выделяющий пакеты, просыпается поEvent
и выполняет проверку(она у меня есть, тут тоже не меняю ничего?)
4. Прочитав байт он сравнивает индексы(они равны) и сбрасываетEvent
.
5. Обнаружив пакет он копирует его из кольц. буффера, обнуляет индексы, сбрасываетEvent
и через событие вызываемое через гл поток(как в п.2) кладет этот массив в очередь потока №3. Устанавливаепт егоEvent2
.
6. Тот поток прсыпается и пока очередь непуста парсит пакеты вызывая соотв событие, когда пакет распарсен...
Господи помоги мне понять где тут что надо зашитить и как... :)
Благодарю за помощь. :)
Правильно ли я понял идею?
Больше всего меня беспокоет то, что потоки как бы общаются через главный,
вызавая соотв. события сSynchronize
. Может на прямую можно. Как?
← →
Digitman © (2006-02-08 12:09) [57]
> Итак у меня 3 потока
Один из 3-х, imho, явно лишний.. "дармоед" он)
Я все же настаиваю на реализации со стримом, а не с кольц.буфером.
Поток 1 (транспортный) работает с портом :
- по событию приема принимает из порта данные (сколько бы их ни было принято - хоть один хоть косой десяток), убирает если нужно упр.символы, входит в крит.секцию, записывает принятое в стрим (св-во Position увеличивается на размер записанных данных), устанавливает event, выходит из крит.секции
Поток 2 (обрабатывающий) в цикле с проверкой на Terninated:
1 Спит, ожидая сигнал event"а
2. по сигналу event"а просыпается, входит в крит.секцию,
3. анализирует имеющиеся на сей момент в стриме данные на предмет наличия хотя бы одного целостного пакета, если такового нет, то сбрасывает event и идет на 7.
4. извлекает из стрима очередной обнаруженный пакет, корректирует (уменьшает) размер стрима и св-во Position на величину извлеченного пакета (оставшиеся в стриме данные сдвигаются в начало стрима)
5. обрабатывает пакет нужным образом и идет на 3.
7. Выходит из крит секции, идет на 1.
← →
Digitman © (2006-02-08 12:16) [58]
> Больше всего меня беспокоет то, что потоки как бы общаются
> через главный,
> вызавая соотв. события с Synchronize
Если беспокоит, то есть повод для предметного и детального анализа такой необходимости.
← →
Leonid Troyanovsky © (2006-02-08 12:17) [59]
> Digitman © (08.02.06 12:09) [57]
> 4. извлекает из стрима очередной обнаруженный пакет, корректирует
> (уменьшает) размер стрима и св-во Position на величину извлеченного
> пакета (оставшиеся в стриме данные сдвигаются в начало стрима)
IMHO, "уплотнять" данные лучше тогда, когда очередь пуста.
Ну, и делать это так, чтобы пришедший запрос мог сразу
прервать оное занятие.
--
Regards, LVT.
← →
Digitman © (2006-02-08 12:25) [60]
> Leonid Troyanovsky © (08.02.06 12:17) [59]
> "уплотнять" данные лучше тогда, когда очередь пуста
Можно и так.
← →
Kolan © (2006-02-08 13:22) [61]
> - по событию приема принимает из порта данные (сколько бы
> их ни было принято - хоть один хоть косой десяток)
Вот так сейчас и есть.
Попробую сделать ваш вариант.
Стирим это -TMemoryStream
?
Просто с этим не работал. Надо изучить...
С крит секциями у меня проблеммы т.к. я неточно понимая логику их работы.
Что будет если обрабатывающий поток занял её. А транспортный получил байты и пытается тоже занять её. Байты не пропадут?
А если эта ситуация неск раз подрад будет. Те приходит допустим 3 массива из порта. Все патаются заити в кс, а она занята.
Когда она освободится что будет?..
Буду пытатся сделать :)
Еще раз благодарю за подсказки. Оч. интересно и позновательно :)
← →
Eraser © (2006-02-08 13:37) [62]
> Kolan ©
скажу и своё имхо.
Думаю поток данных с COM порта не на чтолько велик, чтобы надо было создавать какие-то дуфера для его обработки.
А задачи синхронного чтения/обработки данных решается "взаимопересекающимеся" критическими секциями (или же соотв. объектами ядра). Примерно так.
Схаматично.
var
cs1, cs2, cs3: TКритическаяСекция;
buf: Буффер;
...
инициализируем крит. секции.
1 поток - считывание данных
EnterCriticalSection(cs1);
EnterCriticalSection(cs3);
считываем данные в буффер;
EnterCriticalSection(cs3);
LeaveCriticalSection(cs2);
2 поток - обработака данных
EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
EnterCriticalSection(cs3);
LeaveCriticalSection(cs1);
впринципе к такой схеме можно и двойную-тройную и т.д. буфферизацию не сложно приделать.
← →
Eraser © (2006-02-08 13:38) [63]пардон.
var
cs1, cs2, cs3: TКритическаяСекция;
buf: Буффер;
...
инициализируем крит. секции.
1 поток - считывание данных
EnterCriticalSection(cs1);
EnterCriticalSection(cs3);
считываем данные в буффер;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs2);
2 поток - обработака данных
EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs1);
← →
Eraser © (2006-02-08 13:56) [64]ещё одно уточнение!
var
cs1, cs2, cs3: TКритическаяСекция;
buf: Буффер;
...
инициализируем крит. секции.
1 поток - считывание данных
чтобы код работал эффективно считываем данные из портма
здесь! т.к. перед крит. секциями!
EnterCriticalSection(cs1);
EnterCriticalSection(cs3);
заполняем буффер, предварительно считанными данными;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs2);
2 поток - обработака данных
EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs1);
← →
Digitman © (2006-02-08 14:47) [65]
> Стрим это -TMemoryStream?
Да.
> Что будет если обрабатывающий поток занял её. А транспортный
> получил байты и пытается тоже занять её. Байты не пропадут?
>
Нет, не пропадут.
Когда обрабатывающий тред закончит обработку очередных данных в стриме и выйдет из крит.секции, в нее тут же войдет транспортный тред (все это время покорно ждавший освобождения занятой трансп.потоком крит.секции) и наполнит стрим новыми данными ..
← →
evvcom © (2006-02-08 15:40) [66]
> А задачи синхронного чтения/обработки данных решается "взаимопересекающимеся"
> критическими секциями
> EnterCriticalSection(cs2);
> LeaveCriticalSection(cs1);
Интересное решение. И что? Думаешь будет работать?
← →
Digitman © (2006-02-08 15:49) [67]
> Eraser © (08.02.06 13:56) [64]
Ну и куда ты их, крит.секций, наплодил столько ?
Ты хоть сам в состоянии кратко и внятно сказать, какая из трех крит.секций у тебя какой ресурс защищает ?
← →
Eraser © (2006-02-08 17:25) [68]
> evvcom © (08.02.06 15:40) [66]
насчёт крит. секций - не пробовал, в вот точно такое решение, но на мьютексах работает.
> Digitman © (08.02.06 15:49) [67]
> Ты хоть сам в состоянии кратко и внятно сказать, какая из
> трех крит.секций у тебя какой ресурс защищает ?
1 поток - считывание данных
EnterCriticalSection(cs1); // пробуем "войти" в первую крит. секцию, если не удаётся - значит ещё не обработан предыдущий буфер (пакет данных) - ждём
EnterCriticalSection(cs3); // т.к. буфер у нас один - защищаем его на момент записи в него данных
заполняем буффер, предварительно считанными данными;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs2); // второй поток может приступать к считыванию буфера
2 поток - обработака данных
EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs1); // буфер обработан (или сохранён для послудующей обработки за критической секцией)! разреаем первому потоку писать слеудющую порцию данных
На базе семафоров можно смастерить таким образом n-ю буфферизацию.
← →
Digitman © (2006-02-08 17:47) [69]
> Eraser © (08.02.06 17:25) [68]
Общий ресурс для транспортного и обрабатывающего тредов в случае автора - один единственный буфер (или поток). Зачем ему куча КС, когда речь пока идет об одном-единственном ресурсе ?
← →
Eraser © (2006-02-08 18:02) [70]
> Digitman © (08.02.06 17:47) [69]
ага.. понятно.
Однако тут почти всё зависит от того какая скорость "распарсивания" и какая скорость "считывания"... надо чтобы автор прояснил этот момент.
Если скорость распарсивания намного выше чем скорость чтения (а я подозреваю, что оно так и есть), то тут вопрос - стоит ли вообще плодить кучу потоков? не проще чтение/распарсивание сделать в одном - последовательно. Т.к. большую часть времени распарсевающий поток будут ждать или проверять "не готов ли буффер/стрим?"
← →
Defunct © (2006-02-08 23:02) [71]
> Kolan © (08.02.06 11:39) [56]
> 1. Ждать активности COM порта.
> 2. Считать байт, если не управляющий символ - то записать
> в буфер и сместить индекс.
> 3. Если символ "старта" - запонить позицию старта пакета.
>
> 4. Если символ "стопа" - (для начала можно расчитать CRC)
> начиная с запомненной позиции старта скопировать данные
> по текущую позицию и положить в очередь принятых пакетов,
> оповестить транспортный уровень о том, что был получен
> очередной пакет очередной пакет.
> 5. Goto 1.
>
> 2. Чтение по 1 байту критично ка я понял?
>
Да - критично.
> Итак у меня 3 потока
>
> 1. Тот что читает из порта. Он вызовет событие при получении
> 1 байта.
> Тут ничего не меняю
Не надо никакого события вызывать при приеме байта. Событие надо вызывать только тогда, когда пакет целиком выделен.
> 2. По возникновении этого события, основной поток этот байт
> кладет в кольцевой буффер.
> И устанавливает Event.
Как следствие - основной поток даже не знает о наличии кольцевого буфера.
> 3. В свою очередь поток, выделяющий пакеты, просыпается
> по Event и выполняет проверку(она у меня есть, тут тоже
> не меняю ничего?)
Поток который принимает - он же и пакеты выделяет. Т.е. не надо разделять задачу приема байт с выделением пакета на две задачи.
> 4. Прочитав байт он сравнивает индексы(они равны) и сбрасывает
> Event.
Как следствие вышесказанного никто кроме принимающего потока даже и не знает о наличии кольцевого буфера и об индексах.
> Больше всего меня беспокоет то, что потоки как бы общаются
> через главный,
> вызавая соотв. события с Synchronize. Может на прямую можно.
Зачем там Synchronize, зачем 3 потока?
зззз. мне проще написать пример программы чем объяснить вам как должно работать.
Еще раз внимательно прочитайте [54].
> Digitman
Почему Вы предлагаете совместить канальный уровень с транспортным?
реализация со Stream/Queue хороша для транспорта, там где данные уже лежат в виде пакетов. Для канального же уровня (где ведется сборка пакетов) Stream добавит лишь дополнительную проблему - по его очистке.
← →
Defunct © (2006-02-08 23:05) [72]упс, забыл поставить закрывающий тэг..
← →
Германн © (2006-02-09 02:35) [73]Очередное моё ФИ насчёт применения доп-потоков для чтения информации с ком-порта. Да, забыл добавить - для "циклического" чтения информации с кои-порта. Ззз, как говаривала ...
← →
Kolan © (2006-02-09 10:19) [74]Германн © (09.02.06 02:35) [73]
А шо если данные не пришли то гл поток должен повиснуть?
← →
Digitman © (2006-02-09 10:23) [75]
> если данные не пришли то гл поток должен повиснуть?
"Вис" любого потока "лечится" использованием асинхронного режима ввода/вывода.
← →
Kolan © (2006-02-09 10:46) [76]3. анализирует имеющиеся на сей момент в стриме данные на предмет наличия хотя бы одного целостного пакета, если такового нет, то сбрасывает event и идет на 7.
Непонял как анализировать. И что делать если его нет. Получается что анализировать стрим надо каждый раз зананово.
Или надо запоминать тек позицию?....
← →
Digitman © (2006-02-09 10:50) [77]
> Непонял как анализировать
Тебе видней. Протокол мне неизвестен.
> что делать если его нет
Ничего. Просто спать ..
> Или надо запоминать тек позицию?
Разве в этом проблема ?
← →
Kolan © (2006-02-09 10:58) [78]Вот я зашел в КС и хочу анализировать. Я просматриваю состояние, пусть они сейчас
sWaitForStart
Раньше я проверял тек эл кольцевого буфера на равенствоStartByte
Вот пример:case FState of
sWaitForStart:
begin
if (FBuffer[FReadIndex] = StartByte) then
Непойму ка это будет выглядеть в случае стрима?
← →
Digitman © (2006-02-09 11:07) [79]Смотря что пишешь в стрим.
Да и о конкретном протоколе в этой ветке ты ни словом не обмолвился.
← →
Kolan © (2006-02-09 11:15) [80]Поток 1 (транспортный) работает с портом :
- по событию приема принимает из порта данные (сколько бы их ни было принято - хоть один хоть косой десяток), убирает если нужно упр.символы, входит в крит.секцию, записывает принятое в стрим (св-во Position увеличивается на размер записанных данных), устанавливает event, выходит из крит.секции
Поток 2 (обрабатывающий) в цикле с проверкой на Terninated:
Смотря что пишешь в стрим.
Данные из порта (массив байт)
Да и о конкретном протоколе в этой ветке
Протокол такой
Начало - это StartByte и следующий любой не старт.
Конец - это EndByte и следующий любой не EndByte.
В
Kolan © (07.02.06 16:46) [41]procedure TPackageExtractThread.Execute;
Тут и происходит разбор...
Страницы: 1 2 3 вся ветка
Текущий архив: 2006.03.19;
Скачать: CL | DM;
Память: 0.68 MB
Время: 0.045 c