Главная страница
Top.Mail.Ru    Яндекс.Метрика
Текущий архив: 2006.03.19;
Скачать: CL | DM;

Вниз

Как усыпить поток.   Найти похожие ветки 

 
Kolan ©   (2006-02-06 21:50) [0]

Здравствуйте,
 Много раз слышал при обсуждении работы с потоками
"если то-то то поток засыпает, если еще что-то то просыпается..."

А как это. Можно пример усыпления.
Я просто проверяю какое-нибудь условие и если потоку делать нечего делаю например Sleep(1). А иначе загрузка процессора на 97% - 99%


 
Eraser ©   (2006-02-06 22:41) [1]


> Kolan ©   (06.02.06 21:50)

чаще всего под "усыплением" потока подразумевают ожидание в этом потоке какого-либо объекта синхронизации, какай-либо ф-ей, предназначеной для этого, например WaitForSingleObject или EnterCriticalSection.
Sleep это всё таки немного иное...


 
Kolan ©   (2006-02-06 23:54) [2]

Понятно.
Я димал мож есть функция какая. Что-то вроде Thread.Sleep и Thread.Awake...


 
Gero ©   (2006-02-07 00:01) [3]

> Kolan ©   (06.02.06 23:54)

Suspend, Resume


 
evvcom ©   (2006-02-07 11:36) [4]


> Я димал мож есть функция какая.

Думал? Есть, но не функция, а метод, если речь идет о TThread. См. [3]


 
Kolan ©   (2006-02-07 12:59) [5]

Интересует вот что: Допустим поток будет выполнять работу если какой-то флаг = True.

В случае если прошло какое-то время и флаг всевремя был равен False( Те поток бездействовал), поток засыпал бы....


 
Digitman ©   (2006-02-07 13:05) [6]


> В случае если прошло какое-то время и флаг всевремя был
> равен False( Те поток бездействовал), поток засыпал бы


Вряд ли такая затея чем-то оправдана


 
Kolan ©   (2006-02-07 13:09) [7]

Вряд ли такая затея чем-то оправдана
Наверно потому что...
http://delphimaster.net/view/4-1138784983/
Digitman ©   (01.02.06 12:41) [3]
Ведь создание и старт трэда сродни пуску холодного автодвигателя !

Да?

Я просто попробовать такое сделать хотел...


 
Digitman ©   (2006-02-07 13:09) [8]

Если потоку нечего делать, лучшее чем он может заняться в это время - "спать".

"Пустая" циклическая же проверка же флага (если имеется ввиду флаг как результат user-time-вычисления некоего условия) ничем не оправдана.


 
Digitman ©   (2006-02-07 13:13) [9]


> Наверно потому что


> ... автодвигателя


Нет, вовсе не поэтому.


 
wal ©   (2006-02-07 13:14) [10]

Можно в качестве такого "флага" использовать какой-либо объект синхронизации, например крит.секцию, только продумать все надо, чтобы дед-локов не натворить.


 
Kolan ©   (2006-02-07 13:20) [11]

"спать"
Те надо сделать Suspend?

Вот часть моей программы:

procedure TPackageExtractThread.Execute;
var
 State: TState;
 CurrentIndex: Longint;
 BufferEnd: Longint;
begin

 FreeOnTerminate := True;
 while not Terminated do
 begin
    if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
    begin
   {Working}
    end
    else
    begin
      Sleep(2);
    end;
 end;
end;


Если я Вместо Sleep буду вызовать Suspend, тогода при частой смене условия  (FState <> sIdle) and (FWriteIndex <> FReadIndex)(а оно так и происходит когда идет работа), получится что поток постоянно просыпается и завсыпает(я думаю, что это гораздо хуже будет).

Поэтому я хочу наложить условие Если ничего не делал 1мин то засапаю...


 
Digitman ©   (2006-02-07 13:27) [12]


> получится что поток постоянно просыпается и завсыпает(я
> думаю, что это гораздо хуже будет).


В этом-то как раз нет ничего страшного.

Но вcе те 2 мс, пока поток спит на строчке Sleep(2), он не способен ни на что реагировать, вплоть до своего "пробуждения"... Если это не критично, то логика вида

while not Terminated do
 if Нечего_делать then
   sleep(..)
 else
   делать_дело

в принципе имеет право на жизнь.

Хотя гораздо резонней будет подход, когда поток "спит" и "слышит" сигналы "пробуждения" (см. Wait-ф-ции)


 
Kolan ©   (2006-02-07 13:35) [13]

Ага нашел... Читаю :)


 
Kolan ©   (2006-02-07 13:48) [14]

Допустим я создам

FEvent := CreateEvent(nil, False, False, nil);

И буду ждать его WaitForSingleObject

Вопрос: а как управлять FEvent. Те как установить его в положение "случилось/неслучилось"?

И дальше я жду это событие с INFINITE Таймаутом. И пока оно не наступит поток будет спать? Так?


 
wal ©   (2006-02-07 14:03) [15]


> Те как установить его в положение "случилось/неслучилось"?
Set/Reset/PulseEvent

> И пока оно не наступит поток будет спать? Так?
Так


 
Fay ©   (2006-02-07 14:03) [16]

SetEvent/ResetEvent


 
Digitman ©   (2006-02-07 14:03) [17]


> как установить его в положение "случилось/неслучилось"?


SetEvent() = "случилось


> я жду это событие с INFINITE Таймаутом. И пока оно не наступит
> поток будет спать? Так?


Да.


 
Kolan ©   (2006-02-07 14:05) [18]

О. Шас попробую...


 
Fay ©   (2006-02-07 14:10) [19]

> И пока оно не наступит  поток будет спать?
или пока event не сдохнет


 
Kolan ©   (2006-02-07 14:19) [20]

Вопрос такой: Если я уже сделал SetEvent и сделаю ео повторно - что будет?

И еще а посмотреть состояние Event"а можно как-нибудб(для отладки).


 
Fay ©   (2006-02-07 14:22) [21]

> и сделаю ео повторно - что будет?
Event будет в сигнальном состоянии. А что ещё может быть? 8)


 
Digitman ©   (2006-02-07 14:23) [22]


> Если я уже сделал SetEvent и сделаю ео повторно - что будет?


Тоже самое что установка в True булевой переменной, уже содержащей на этот момент True.


> посмотреть состояние Event"а можно как-нибудб(для отладки).


Не нужно его "смотреть".

Просто поставь брейкпойнт на следующей строчке за вызовом Wait-ф-ции и поймай его.


 
Kolan ©   (2006-02-07 14:32) [23]

Fay ©   (07.02.06 14:22) [21]
Просто с ходу не получилось...

Вот что получилось.

FreeOnTerminate := True;
 while not Terminated do
 begin
   WaitForSingleObject(FEvent, INFINITE);
    if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
    begin
     {Packge parsing}
    begin
      ResetEvent(FEvent);
    end;
 end;


И еще две процедуры


procedure TPackageExtractThread.AddArray(Arr: array of Byte);
var
 I, J: Longint;
begin
{Adding}
 SetEvent(FEvent);
end;


procedure TPackageExtractThread.MakeAndSendPackage;
var
 I, J: Longint;
 Answer: array of Byte;
begin
 ResetEvent(FEvent);
 {Converting...}
 FState := sIdle;
 FReadIndex := 0;
 FWriteIndex := 0;
 FPackageReadyEvent(Self, Answer);
end;


Почемуто затыкается на 3 вызове Wait функции. Немогу отладить...

До MakeAndSendPackage
дело не доходит..

Fay ©   (07.02.06 14:22) [21]
Вот я и подумал, что может повторный вызов SetEvent причина...


 
Kolan ©   (2006-02-07 14:37) [24]

Поясню суть работы потока.

Он ананлизирует циклический массив на предмет нахождения пакета данных. Добавляет элементы в цикл массив другой поток...

Если нашел пакет то
Synchronize(MakeAndSendPackage);

Спать должен если нашел пакет или дошел до конца цикл массива и ждет когда добавят продолжение.


 
Digitman ©   (2006-02-07 14:41) [25]


> циклический массив


Это еще что за зверь ?!


 
Kolan ©   (2006-02-07 14:44) [26]

Да этопросто массив фиксированный длинны. И два индекса по которому читаю и по которуму добовляю элементы.

FWriteIndex <> FReadIndex
Вот они. FReadIndex как бы догоняет FWriteIndex если они сравнялись, то жду пока еще добавят байты...

Как только пакет найден я его передаю, а индексы обнуляю(начинаю заполнять массив занаво).

Тут все работает проверял....


 
Digitman ©   (2006-02-07 14:56) [27]


> Вот они. FReadIndex как бы догоняет FWriteIndex если они
> сравнялись, то жду пока еще добавят байты


Ты, вероятно, хотел сказать "кольцевой буфер" ..

Кстати, а какой поток формирует значения FReadIndex и FWriteIndex ?


 
Kolan ©   (2006-02-07 15:00) [28]

формирует
Не понял, что это занчит

Но изменяют их главный и доп поток...

Но повторюсь что все проверял на больших покетах м все работало... 100%

"кольцевой буфер"
циклический массив


Да уж оба слова перепутал...
:)


 
Fay ©   (2006-02-07 15:03) [29]

Pocket M - это кто?


 
Kolan ©   (2006-02-07 15:06) [30]

Извените, отвечу позже :)


 
evvcom ©   (2006-02-07 15:07) [31]


> Но изменяют их главный и доп поток...

Что-то в твоем коде синхронизации доступа к ним не видно... :( Очень подозрительна фраза "все работает"


 
evvcom ©   (2006-02-07 15:09) [32]


> FReadIndex как бы догоняет FWriteIndex если они сравнялись,
>  то жду пока еще добавят байты

А если Write на круг обогнал Read?


 
Digitman ©   (2006-02-07 15:33) [33]


> Kolan ©   (07.02.06 15:00) [28]


Да уж .. где синхронизация-то ? доступа к этим данным ?


 
Kolan ©   (2006-02-07 16:13) [34]

Да уж .. где синхронизация-то ? доступа к этим данным ?
Нету

Я думая она ненужна... Ведь записываю и читаю всегда по разным индексам...
Может я и неправ.

Вообщето она была, но я убрал ее...

А если Write на круг обогнал Read?
Нет такое невозможно... Если по идее до конца массива они не дойдут( если дойдут массив увеличу...)


 
Digitman ©   (2006-02-07 16:16) [35]


> Может я и неправ


с учетом


> изменяют их главный и доп поток


оч даже неправ !


 
evvcom ©   (2006-02-07 16:28) [36]


> Ведь записываю и читаю всегда по разным индексам

Ты сами переменные читаешь и пишешь в разных потоках, уже это надо синхронизировать.


 
Kolan ©   (2006-02-07 16:32) [37]

Доп поток изменяет только индекс чтения. Гл поток изменяет индекс записи.
Причем если состояние Idle(те доп поток не работает) обнуляет индексы.

Когда поток нашел покет, он вызовет процедуру передач и пакета и обнулит индексы. Не пойму где они пересекутся.

Или если один поток пишет по индексу A а второй по индексу B одного и тогоже массива(причем точно индексы разные) - будет ошибка?

Скажите - это может повлиять на правильность работы с Event. Те из - за чего межет засыпать...


 
Digitman ©   (2006-02-07 16:36) [38]


> Когда поток нашел покет, он вызовет процедуру передач и
> пакета и обнулит индексы. Не пойму где они пересекутся



> Доп поток изменяет только индекс чтения


"Как тебя понимать, Саид ?" (с)


 
Kolan ©   (2006-02-07 16:40) [39]

Kolan ©   (07.02.06 14:19) [20]

Вопрос такой: Если я уже сделал SetEvent и сделаю ео повторно - что будет?

Я это не просто так спрашивал. Я сначала все позашишал кр. секциями...

Возникала сит когда я дважды делал Release и потом Acquire уже не вызывался. Как проверить тек сост кр. секции я ненашел...
Поэтому переделал без кс.

А что будет ошибка если один поток сделает
FWriteIndex := J;
А второй в этоже время проверит
if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
?


 
Digitman ©   (2006-02-07 16:42) [40]


> А что будет ошибка если один поток сделает
> FWriteIndex := J;
> А второй в этоже время проверит
> if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
> ?


Бардак будет.


 
Kolan ©   (2006-02-07 16:46) [41]

Индыксы обнулятся только если состояние sIdle и доп поток гарантированно не использует эти переменные

Вообщем вот полный код:

procedure TPackageExtractThread.AddArray(Arr: array of Byte);
var
 I, J: Longint;
begin
 if FState = sIdle then
 begin
   FWriteIndex := 0;
   FReadIndex := 0;
 end;
 J := FWriteIndex;
 for I := Low(Arr) to High(Arr) do
 begin
   FBuffer[J] := Arr[I];
   J := J + 1;
 end;
 FWriteIndex := J;
 if FState = sIdle then
 begin
   FState := sWaitForStart;
 end;
end;


procedure TPackageExtractThread.Execute;
begin

 FreeOnTerminate := True;
 while not Terminated do
 begin
    if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
    begin
      case FState of
        sWaitForStart:
          begin
            if (FBuffer[FReadIndex] = StartByte) then
            begin
              FState := sWaitForEnd;
              FStartIndex := FReadIndex;
            end;
              FReadIndex := FReadIndex + 1;
           end;

        sWaitForEnd:
          begin
            if (FBuffer[FReadIndex] = EndByte) then
            begin
              FState := sEndFound;
            end;
            FReadIndex := FReadIndex + 1;
          end;

        sEndFound:
          begin
            if (FBuffer[FReadIndex] <> EndByte) then
            begin
              FState := sPackegeReady;
              FReadIndex := FReadIndex - 1;
            end
            else
            begin
              FState := sWaitForEnd;
            end;
            FReadIndex := FReadIndex + 1;
          end;

        sPackegeReady:
          begin
            FEndIndex := FReadIndex;
            FState := sIdle;
            Synchronize(MakeAndSendPackage);
          end;
      end;
    end
    else
    begin
      Sleep(1);
    end;
 end;
end;



procedure TPackageExtractThread.MakeAndSendPackage;
var
 I, J: Longint;
 Answer: array of Byte;
begin
 SetLength(Answer, (FEndIndex - FStartIndex) + 1);
 J := Low(Answer);
 for I := FStartIndex to FEndIndex do
 begin
   Answer[J] := FBuffer[I];
   J := J + 1;
 end;
 FState := sIdle;
 FReadIndex := 0;
 FWriteIndex := 0;
 FPackageReadyEvent(Self, Answer);
end;


 
Digitman ©   (2006-02-07 17:15) [42]


> Kolan ©   (07.02.06 16:46) [41]


Эта логика ведет к непредсказуемым (печальным) последствиям...

Доступ к массиву и тем самым индексам в нем должен быть защищен крит.секцией.


 
Kolan ©   (2006-02-07 17:44) [43]

Я непонял
if (FState <> sIdle) and (FWriteIndex <> FReadIndex) then
Это же тоже доступ?
Те весь этот if надо зашитить?


 
Defunct ©   (2006-02-07 19:30) [44]

> Kolan

Почитайте последний пост, в вашей предыдущей ветке.


 
Kolan ©   (2006-02-07 20:58) [45]

Спасибо за ответ. Завтра разберусь....
:)


 
evvcom ©   (2006-02-08 08:39) [46]

Это же не весь код. AddArray только описан, но где вызывается не ясно. В коде доступ к FWriteIndex, FReadIndex осуществляется из всех этих 3-х методов. Execute и MakeAndSendPackage синхронизированы, а AddArray не ясно.
FWriteIndex <> FReadIndex - это тоже доступ.

> Те весь этот if надо зашитить?

Все зависит от того, синхронизированы ли уже доступы из разных потоков.


 
Digitman ©   (2006-02-08 09:20) [47]


> Kolan ©   (07.02.06 20:58) [45]


Насколько я понял, тебе требуется реализовать следующее :

Есть некая очередь данных, требующих обработки.
Один код.поток (1) помещает данные по мере их формирования в хвост очереди, другой код.поток (2) извлекает данные из головы очереди и обрабатывает их (либо "спит" если очередь пуста).

Если так, то использование массива в кач-ве очереди вряд ли оправдано.

Например, есть готовый класс TQueue, реализующий логику очереди. Можно защитить доступ к методам/св-вам его экз-ра крит.секцией, при добавлении в хвост очереди устанавливать событие (SetEvent), при опустошении очереди сбрасывать событие (ResetEvent).
Поток 1, добавляя очередной элемент (или несколько очередных элементов) в хвост, "сигналит" об этом установкой события. Поток 2 "просыпается" по этому сигналу и начинает циклически выбирать элементы из головы и обрабатывать их, до тех пор пока не обнаружит опустошение очереди, после чего вновь "засыпает", сбросив непосредственно перед этим событие в несигналящее состояние.


 
Defunct ©   (2006-02-08 09:43) [48]


> Digitman ©   (08.02.06 09:20) [47]
> Если так, то использование массива в кач-ве очереди вряд ли оправдано.


нет, у него там немного иная задача.

Один поток обслуживает канальный уровень по приему пакетов от некоего устройства через COM порт. Второй поток должен обрабатывать уже готовые пакеты. Очередь (TQueue) можно использовать в качестве согласующего звена между первым и вторым потоком, но вот отказываться от кольцевого буфера в принимающем потоке - не стоит.


 
Digitman ©   (2006-02-08 09:49) [49]


> отказываться от кольцевого буфера в принимающем потоке -
>  не стоит


Тогда потери пакетов не исключены.


 
Digitman ©   (2006-02-08 09:57) [50]

Да и не кольцевой это буфер при ближайшем рассмотрении..
Для принимающего трэда это обычный аккумулирующий поток.
А раз поток, то гораздо логичней было бы вместо массива использовать TStream-наследника, например, TMemoryStream


 
Defunct ©   (2006-02-08 10:00) [51]

Digitman ©   (08.02.06 09:49) [49]

не вижу причин для этого.
Учитывая что протокол предполагает байт-стаффинг, а кольцевой буфер в два или более раз превышает максимально допустимый объем пакета, то можно гарантировать работу без потерь если придерживаться следующих условий:
1. выделение пакета осуществлять "на лету" (т.е. прием вести по одному байту и каждый принятый байт анализировать на предмет старта/стопа пакета)
2. Распознанный пакет сразу же сбрасывать в очередь и оповещать получателя сл. уровеня.


 
Defunct ©   (2006-02-08 10:07) [52]

> Для принимающего трэда это обычный аккумулирующий поток.
> А раз поток, то гораздо логичней было бы вместо массива использовать TStream-наследника, например, TMemoryStream


а смысл? добавить еще одну заботу очистки/контроля размера потока?


 
Digitman ©   (2006-02-08 10:21) [53]


> Defunct



> а смысл? добавить еще одну заботу очистки/контроля размера
> потока?


Забота треда транспортного уровня - писать в стрим все что принято (с уведомлением обрабатывающего треда о факте очередной записи в стрим).

Забота обрабатывающего треда - читать из стрима, распознавать пакеты, извлекать их из стрима (с соотв. коррекцией его размера и позиции записи) и обрабатывать.

Все !
И никаких кольц.буферов.

По сути автор как раз и пытается приспособить массив для работы в кач-ве стрима. Но массив этот имеет фикс.размер, и  если обрабатывающий тред не будет поспевать за транспортным (что не так уж и маловероятно), то рано или поздно возникнет ситуация либо выхода индекса записи за пределы верх.границы массива либо потери инф-ции в случае использования массива транспортным тредом в кач-ве кольц.буфера.


 
Defunct ©   (2006-02-08 10:38) [54]

Digitman ©   (08.02.06 10:21) [53]
По сути автор как раз и пытается приспособить массив для работы в кач-ве стрима. Но массив этот имеет фикс.размер, и  если обрабатывающий тред не будет поспевать за транспортным (что не так уж и маловероятно), то рано или поздно возникнет ситуация либо выхода индекса записи за пределы верх.границы массива либо потери инф-ции в случае использования массива транспортным тредом в кач-ве кольц.буфера.

Да Вы правы в том, что так как эту задачу пытается решить автор (обращаться к одному и тому же массиву на канальном и транспортном уроване) возможны проблемы.

А так как я предлагаю решить эту задачу, проблемы ислючены.
Описываю алгоритм работы потока обслуживающего канальный уровнь:

1. Ждать активности COM порта.
2. Считать байт, если не управляющий символ - то записать в буфер и сместить индекс.
3. Если символ "старта" - запонить позицию старта пакета.
4. Если символ "стопа" - (для начала можно расчитать CRC) начиная с запомненной позиции старта скопировать данные по текущую позицию и положить в очередь принятых пакетов, оповестить транспортный уровень о том, что был получен очередной пакет очередной пакет.
5. Goto 1.

При этом в качестве буфера на канальном уровне используется как положено - кольцевой буфер, на транспортном - очередь, все гладко. И events можно прикрутить, с помощью events канальный уровень должен взаимодействовать с транспортом.


 
Digitman ©   (2006-02-08 10:47) [55]


> Defunct ©   (08.02.06 10:38) [54]


Что ж ...
Вполне нормальное решение.


 
Kolan ©   (2006-02-08 11:39) [56]

evvcom ©   (08.02.06 08:39) [46]

Кроме выше описанных потоков есть поток, который проверяет очередь порта, если они есть вызывает событие:

procedure TForm1.ReadFromComm(Sender: TObject; ReadBytes: array of Byte);
begin
 FPackageExtractor.AddBayteArray(ReadBytes);
 StatusBar1.Panels[2].Text  := "&#207;&#238;&#235;&#243;&#247;&#229;&#237;&#237;&#238; &#225;&#224;&#233;&#242;: "
   + IntToStr(Length(ReadBytes)) + ".";
end;


1. Ждать активности COM порта.
2. Считать байт, если не управляющий символ - то записать в буфер и сместить индекс.
3. Если символ "старта" - запонить позицию старта пакета.
4. Если символ "стопа" - (для начала можно расчитать CRC) начиная с запомненной позиции старта скопировать данные по текущую позицию и положить в очередь принятых пакетов, оповестить транспортный уровень о том, что был получен очередной пакет очередной пакет.
5. Goto 1.


2. Чтение по 1 байту критично ка я понял?

Итак у меня 3 потока

1. Тот что читает из порта. Он вызовет событие при получении 1 байта.
Тут ничего не меняю

2. По возникновении этого события, основной поток этот байт кладет в кольцевой буффер.
И устанавливает Event.

3. В свою очередь поток, выделяющий пакеты, просыпается по Event и выполняет проверку(она у меня есть, тут тоже не меняю ничего?)

4. Прочитав байт он сравнивает индексы(они равны) и сбрасывает Event.

5. Обнаружив пакет он копирует его из кольц. буффера, обнуляет индексы, сбрасывает Event и через событие вызываемое через гл поток(как в п.2) кладет этот массив в очередь потока №3. Устанавливаепт его
Event2.

6. Тот поток прсыпается и пока очередь непуста парсит пакеты вызывая соотв событие, когда пакет распарсен...

Господи помоги мне понять где тут что надо зашитить и как... :)

Благодарю за помощь. :)
Правильно ли я понял идею?

Больше всего меня беспокоет то, что потоки как бы общаются через главный,
вызавая соотв. события с Synchronize. Может на прямую можно. Как?


 
Digitman ©   (2006-02-08 12:09) [57]


> Итак у меня 3 потока


Один из 3-х, imho, явно лишний.. "дармоед" он)

Я все же настаиваю на реализации со стримом, а не с кольц.буфером.

Поток 1 (транспортный) работает с портом :

- по событию приема принимает из порта данные (сколько бы их ни было принято - хоть один хоть косой десяток), убирает если нужно упр.символы, входит в крит.секцию, записывает принятое в стрим (св-во Position увеличивается на размер записанных данных), устанавливает event, выходит из крит.секции

Поток 2 (обрабатывающий)  в цикле с проверкой на Terninated:

1  Спит, ожидая сигнал event"а
2. по сигналу event"а просыпается, входит в крит.секцию,
3. анализирует имеющиеся на сей момент в стриме данные на предмет наличия хотя бы одного целостного пакета, если такового нет, то сбрасывает event и идет на 7.
4. извлекает из стрима очередной обнаруженный пакет, корректирует (уменьшает) размер стрима и св-во Position на величину извлеченного пакета (оставшиеся в стриме данные сдвигаются в начало стрима)
5. обрабатывает пакет нужным образом и идет на 3.
7. Выходит из крит секции, идет на 1.


 
Digitman ©   (2006-02-08 12:16) [58]


> Больше всего меня беспокоет то, что потоки как бы общаются
> через главный,
> вызавая соотв. события с Synchronize


Если беспокоит, то есть повод для предметного и детального анализа такой необходимости.


 
Leonid Troyanovsky ©   (2006-02-08 12:17) [59]


> Digitman ©   (08.02.06 12:09) [57]

> 4. извлекает из стрима очередной обнаруженный пакет, корректирует
> (уменьшает) размер стрима и св-во Position на величину извлеченного
> пакета (оставшиеся в стриме данные сдвигаются в начало стрима)


IMHO, "уплотнять" данные лучше тогда, когда очередь пуста.
Ну, и делать это так, чтобы пришедший запрос мог сразу
прервать оное занятие.

--
Regards, LVT.


 
Digitman ©   (2006-02-08 12:25) [60]


> Leonid Troyanovsky ©   (08.02.06 12:17) [59]


> "уплотнять" данные лучше тогда, когда очередь пуста


Можно и так.


 
Kolan ©   (2006-02-08 13:22) [61]


> - по событию приема принимает из порта данные (сколько бы
> их ни было принято - хоть один хоть косой десяток)


Вот так сейчас и есть.
Попробую сделать ваш вариант.
Стирим это -TMemoryStream?
Просто с этим не работал. Надо изучить...

С крит секциями у меня проблеммы т.к. я неточно понимая логику их работы.

Что будет если обрабатывающий поток занял её. А транспортный получил байты и пытается тоже занять её. Байты не пропадут?

А если эта ситуация неск раз подрад будет. Те приходит допустим 3 массива из порта. Все патаются заити в кс, а она занята.

Когда она освободится что будет?..

Буду пытатся сделать :)

Еще раз благодарю за подсказки. Оч. интересно и позновательно :)


 
Eraser ©   (2006-02-08 13:37) [62]


> Kolan ©

скажу и своё имхо.
Думаю поток данных с COM порта не на чтолько велик, чтобы надо было создавать какие-то дуфера для его обработки.
А задачи синхронного чтения/обработки данных решается "взаимопересекающимеся" критическими секциями (или же соотв. объектами ядра). Примерно так.
Схаматично.

var
 cs1, cs2, cs3: TКритическаяСекция;
 buf: Буффер;
...
инициализируем крит. секции.

1 поток - считывание данных

EnterCriticalSection(cs1);
EnterCriticalSection(cs3);
считываем данные в буффер;
EnterCriticalSection(cs3);
LeaveCriticalSection(cs2);

2 поток - обработака данных

EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
EnterCriticalSection(cs3);
LeaveCriticalSection(cs1);

впринципе к такой схеме можно и двойную-тройную и т.д. буфферизацию не сложно приделать.


 
Eraser ©   (2006-02-08 13:38) [63]

пардон.

var
cs1, cs2, cs3: TКритическаяСекция;
buf: Буффер;
...
инициализируем крит. секции.

1 поток - считывание данных

EnterCriticalSection(cs1);
EnterCriticalSection(cs3);
считываем данные в буффер;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs2);

2 поток - обработака данных

EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs1);


 
Eraser ©   (2006-02-08 13:56) [64]

ещё одно уточнение!
var
cs1, cs2, cs3: TКритическаяСекция;
buf: Буффер;
...
инициализируем крит. секции.

1 поток - считывание данных

чтобы код работал эффективно считываем данные из портма
здесь! т.к. перед крит. секциями!
EnterCriticalSection(cs1);
EnterCriticalSection(cs3);
заполняем буффер, предварительно считанными данными;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs2);

2 поток - обработака данных

EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs1);


 
Digitman ©   (2006-02-08 14:47) [65]


> Стрим это -TMemoryStream?


Да.


> Что будет если обрабатывающий поток занял её. А транспортный
> получил байты и пытается тоже занять её. Байты не пропадут?
>


Нет, не пропадут.

Когда обрабатывающий тред закончит обработку очередных данных в стриме и выйдет из крит.секции, в нее тут же войдет транспортный тред (все это время покорно ждавший освобождения занятой трансп.потоком крит.секции) и наполнит стрим новыми данными ..


 
evvcom ©   (2006-02-08 15:40) [66]


> А задачи синхронного чтения/обработки данных решается "взаимопересекающимеся"
> критическими секциями
> EnterCriticalSection(cs2);
> LeaveCriticalSection(cs1);

Интересное решение. И что? Думаешь будет работать?


 
Digitman ©   (2006-02-08 15:49) [67]


> Eraser ©   (08.02.06 13:56) [64]


Ну и куда ты их, крит.секций, наплодил столько ?

Ты хоть сам в состоянии кратко и внятно сказать, какая из трех крит.секций у тебя какой ресурс защищает ?


 
Eraser ©   (2006-02-08 17:25) [68]


> evvcom ©   (08.02.06 15:40) [66]

насчёт крит. секций - не пробовал, в вот точно такое решение, но на мьютексах работает.

> Digitman ©   (08.02.06 15:49) [67]


> Ты хоть сам в состоянии кратко и внятно сказать, какая из
> трех крит.секций у тебя какой ресурс защищает ?

1 поток - считывание данных
EnterCriticalSection(cs1); // пробуем "войти" в первую крит. секцию, если не удаётся - значит ещё не обработан предыдущий буфер (пакет данных) - ждём
EnterCriticalSection(cs3); // т.к. буфер у нас один - защищаем его на момент записи в него данных
заполняем буффер, предварительно считанными данными;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs2); // второй поток может приступать к считыванию буфера

2 поток - обработака данных

EnterCriticalSection(cs2);
EnterCriticalSection(cs3);
обрабатываем данные, кторые в буффере;
LeaveCriticalSection(cs3);
LeaveCriticalSection(cs1); // буфер обработан (или сохранён для послудующей обработки за критической секцией)! разреаем первому потоку писать слеудющую порцию данных

На базе семафоров можно смастерить таким образом n-ю буфферизацию.


 
Digitman ©   (2006-02-08 17:47) [69]


> Eraser ©   (08.02.06 17:25) [68]


Общий ресурс для транспортного и обрабатывающего тредов в случае автора - один единственный буфер (или поток). Зачем ему куча КС, когда речь пока идет об одном-единственном ресурсе ?


 
Eraser ©   (2006-02-08 18:02) [70]


> Digitman ©   (08.02.06 17:47) [69]

ага.. понятно.
Однако тут почти всё зависит от того какая скорость "распарсивания" и какая скорость "считывания"... надо чтобы автор прояснил этот момент.

Если скорость распарсивания намного выше чем скорость чтения (а я подозреваю, что оно так и есть), то тут вопрос - стоит ли вообще плодить кучу потоков? не проще чтение/распарсивание сделать в одном - последовательно. Т.к. большую часть времени распарсевающий поток будут ждать или проверять "не готов ли буффер/стрим?"


 
Defunct ©   (2006-02-08 23:02) [71]


> Kolan ©   (08.02.06 11:39) [56]

> 1. Ждать активности COM порта.
> 2. Считать байт, если не управляющий символ - то записать
> в буфер и сместить индекс.
> 3. Если символ "старта" - запонить позицию старта пакета.
>
> 4. Если символ "стопа" - (для начала можно расчитать CRC)
> начиная с запомненной позиции старта скопировать данные
> по текущую позицию и положить в очередь принятых пакетов,
>  оповестить транспортный уровень о том, что был получен
> очередной пакет очередной пакет.
> 5. Goto 1.
>
> 2. Чтение по 1 байту критично ка я понял?
>
Да - критично.

> Итак у меня 3 потока
>
> 1. Тот что читает из порта. Он вызовет событие при получении
> 1 байта.
> Тут ничего не меняю

Не надо никакого события вызывать при приеме байта. Событие надо вызывать только тогда, когда пакет целиком выделен.

> 2. По возникновении этого события, основной поток этот байт
> кладет в кольцевой буффер.
> И устанавливает Event.

Как следствие - основной поток даже не знает о наличии кольцевого буфера.

> 3. В свою очередь поток, выделяющий пакеты, просыпается
> по Event и выполняет проверку(она у меня есть, тут тоже
> не меняю ничего?)

Поток который принимает - он же и пакеты выделяет. Т.е. не надо разделять задачу приема байт с выделением пакета на две задачи.

> 4. Прочитав байт он сравнивает индексы(они равны) и сбрасывает
> Event.

Как следствие вышесказанного никто кроме принимающего потока даже и не знает о наличии кольцевого буфера и об индексах.

> Больше всего меня беспокоет то, что потоки как бы общаются
> через главный,
> вызавая соотв. события с Synchronize. Может на прямую можно.

Зачем там Synchronize, зачем 3 потока?
зззз. мне проще написать пример программы чем объяснить вам как должно работать.
Еще раз внимательно прочитайте [54].


> Digitman

Почему Вы предлагаете совместить канальный уровень с транспортным?

реализация со Stream/Queue хороша для транспорта, там где данные уже лежат в виде пакетов. Для канального же уровня (где ведется сборка пакетов) Stream добавит лишь дополнительную проблему - по его очистке.


 
Defunct ©   (2006-02-08 23:05) [72]

упс, забыл поставить закрывающий тэг..


 
Германн ©   (2006-02-09 02:35) [73]

Очередное моё ФИ насчёт применения доп-потоков для чтения информации с ком-порта. Да, забыл добавить - для "циклического" чтения информации с кои-порта. Ззз, как говаривала ...


 
Kolan ©   (2006-02-09 10:19) [74]

Германн ©   (09.02.06 02:35) [73]
А шо если данные не пришли то гл поток должен повиснуть?


 
Digitman ©   (2006-02-09 10:23) [75]


> если данные не пришли то гл поток должен повиснуть?


"Вис" любого потока "лечится" использованием асинхронного режима ввода/вывода.


 
Kolan ©   (2006-02-09 10:46) [76]

3. анализирует имеющиеся на сей момент в стриме данные на предмет наличия хотя бы одного целостного пакета, если такового нет, то сбрасывает event и идет на 7.

Непонял как анализировать. И что делать если его нет. Получается что анализировать стрим надо каждый раз зананово.

Или надо запоминать тек позицию?....


 
Digitman ©   (2006-02-09 10:50) [77]


> Непонял как анализировать


Тебе видней. Протокол мне неизвестен.


> что делать если его нет


Ничего. Просто спать ..


> Или надо запоминать тек позицию?


Разве в этом проблема ?


 
Kolan ©   (2006-02-09 10:58) [78]

Вот я зашел в КС и хочу анализировать. Я просматриваю состояние, пусть они сейчас sWaitForStart

Раньше я проверял тек эл кольцевого буфера на равенство  StartByte

Вот пример:
case FState of
        sWaitForStart:
          begin
            if (FBuffer[FReadIndex] = StartByte) then


Непойму ка это будет выглядеть в случае стрима?


 
Digitman ©   (2006-02-09 11:07) [79]

Смотря что пишешь в стрим.

Да и о конкретном протоколе в этой ветке ты ни словом не обмолвился.


 
Kolan ©   (2006-02-09 11:15) [80]

Поток 1 (транспортный) работает с портом :

- по событию приема принимает из порта данные (сколько бы их ни было принято - хоть один хоть косой десяток), убирает если нужно упр.символы, входит в крит.секцию, записывает принятое в стрим (св-во Position увеличивается на размер записанных данных), устанавливает event, выходит из крит.секции

Поток 2 (обрабатывающий)  в цикле с проверкой на Terninated:


Смотря что пишешь в стрим.
Данные из порта (массив байт)

Да и о конкретном протоколе в этой ветке
Протокол такой
Начало - это StartByte и следующий любой не старт.
Конец - это EndByte и  следующий любой не EndByte.

В
Kolan ©   (07.02.06 16:46) [41]

procedure TPackageExtractThread.Execute;

Тут и происходит разбор...


 
Defunct ©   (2006-02-09 11:57) [81]

Я уже не могу смотеть на то, как вы мучаетесь.. Написал прототип класса работающего в вашем протоколе..

unit uChannel;

interface

{$define DemoMode}

uses SysUtils, SyncObjs
    {$ifdef DemoMode}, Dialogs {$endif}
    ;

const protoPACKETSTART = $AC;        // Символ старта пакета
     protoPACKETSTOP  = $A5;        // Символ стопа
     protoMAXPACKETLENGTH = 2048;    // Максимальная длина пакета

     protoCW : set of byte = [protoPACKETSTART, protoPACKETSTOP];
     rsSIZE = protoMAXPACKETLENGTH * 2;

type

   TDataChannel = class( TObject )
   private
   // секция обслуживания кольцевого буфера
      FRing : array[0..rsSIZE -1] of byte;
      FRingIndex : Integer;
      FStartIndex : Integer;
      FLastValue : Byte;
      FMonitorPacket : boolean;
      FAccumulatedPacketLength : Integer;
      procedure AddToRing( Value : byte );
      function StartCondition(Value : byte):boolean;
      function StopCondition(Value : byte):boolean;
      function GetPacketByteIndex( Offset : integer ):integer;
   protected
   // секция обслуживания событий
      FEvent : TEvent;
      procedure NotifyNextLayer;virtual;
   protected
   // секция обслуживания пакетов
      FPacket : array[0..protoMAXPACKETLENGTH] of byte;
      FPacketSize : integer;

      function CheckCRC:boolean;virtual;
   public
      procedure AddByte( Value : byte );
      constructor Create( APacketReceptionEvent : TEvent);
      destructor Destory;virtual;
   end;

implementation

{ TDataChannel }
constructor TDataChannel.Create(APacketReceptionEvent: TEvent);
begin
  FEvent := APacketReceptionEvent;
  FRingIndex := 0;
  FMonitorPacket := false;
end;

procedure TDataChannel.AddByte(Value: byte);
var
 i : integer;
begin
  if StartCondition( Value ) then
     begin
        FStartIndex := FRingIndex;
        FMonitorPacket := True;
        FAccumulatedPacketLength := 0;
     end
  else if StopCondition( Value ) then
    begin
       if FMonitorPacket then // пакет принят
       begin
          FMonitorPacket := false;
          FPacketSize := FAccumulatedPacketLength - 1;
          for i := 0 to FPacketSize do
             FPacket[ i ] := FRing[ GetPacketByteIndex(i) ];
          if CheckCRC then // Пакет распознан и CRC верная
          begin // Оповещаем обработчик пакета
             NotifyNextLayer;
              {$ifdef DemoMode}
                // В Демо режиме ВЫВОДИМ НА ЭКРАН ИНФОРМАЦИЮ О ПОЛУЧЕННОМ ПАКЕТЕ
                ShowMessage( Format("Принят и распознан пакет, длина = %D",[FPacketSize]) );
              {$endif}
          end
       end;
    end;

   AddToRing( Value );
end;

function TDataChannel.StartCondition(Value: byte): boolean;
begin
 Result := (FLastValue = protoPACKETSTART) and (Value <> protoPACKETSTART)
end;

function TDataChannel.StopCondition(Value: byte): boolean;
begin
 Result := (FLastValue = protoPACKETSTOP) and (Value <> protoPACKETSTOP)
end;

function TDataChannel.GetPacketByteIndex(Offset: integer): integer;
begin
 Result := Offset + FStartIndex;
 if Result >= rsSIZE then
    Result := Result - rsSIZE;
end;

procedure TDataChannel.AddToRing(Value: byte);
begin
  if (Value in protoCW) and (Value = FLastValue) then
  begin
     FLastValue := 0;
     exit;
  end;

  FRing[ FRingIndex ] := Value;
  FLastValue := Value;
  inc( FRingIndex );
  if FRingIndex = rsSIZE then
     FRingIndex := 0;

   if FMonitorPacket then
      begin
         Inc( FAccumulatedPacketLength );
         if FAccumulatedPacketLength > protoMAXPACKETLENGTH then
         begin
            FAccumulatedPacketLength := 0;
            FMonitorPacket := false;
         end;
      end;
end;

function TDataChannel.CheckCRC: boolean;
begin
{ abstract CRC function  }
  Result := True
end;

procedure TDataChannel.NotifyNextLayer;
begin
{ abstract procedure }
  if Assigned( FEvent ) then
     FEvent.SetEvent
end;

destructor TDataChannel.Destory;
begin
 inherited;
end;

end.


Пример использования, процедура AddByte - для помещения очередного принятого байта с COM порта:

procedure TForm1.Button1Click(Sender: TObject);
var
 DC : TDataChannel;
 i : integer;
begin
 DC := TDataChannel.Create( nil );

 for i := 0 to 10000 do
    DC.AddByte( $A);

 DC.AddByte( protoPACKETSTART );
 DC.AddByte( $A);
 DC.AddByte( protoPACKETSTOP );
 DC.AddByte( protoPACKETSTOP );
 DC.AddByte( $A);
 DC.AddByte( protoPACKETSTOP );
 DC.AddByte( $A);

 DC.Free;
end;


 
Kolan ©   (2006-02-09 12:11) [82]

Благодарю. Я правда не мучался. Обязательно разберу ваш код.
Просото я стараюсь разобраться сам. Тем более пока время есть...
Я ж еще только учус (С)
:)


 
Defunct ©   (2006-02-09 12:20) [83]

Kolan ©   (09.02.06 12:11) [82]

У меня там есть ошибка (поспешил выложить код, не проперив надлежащим образом).

после вот этой строки:
    FPacketSize := FAccumulatedPacketLength - 1;

необходимо сбросить
FAccumulatedPacketLength := 0;


 
Kolan ©   (2006-02-09 12:54) [84]

Defunct ©   (09.02.06 12:20) [83]
Очень признателен за помощь :). Даже неудобно что вы так помогаете :)...


 
evvcom ©   (2006-02-09 14:24) [85]

1 поток - считывание данных
EnterCriticalSection(cs1); // пробуем "войти" в первую крит. секцию, если не удаётся - значит ещё не обработан предыдущий буфер (пакет данных) - ждём

В твоем коде ничто не мешает войти в cs1.
LeaveCriticalSection(cs2); // второй поток может приступать к считыванию буфера
Какой смысл покидать cs2, если поток в нее и не входил? И также ничто не мешает 2-му потоку повторно войти в cs2. Далее по тексту аналогично.


 
Defunct ©   (2006-02-09 15:39) [86]

> Kolan
да не за что, мне не жалко,
напротив, спасибо вам за заинтересовавший меня вопрос. ;>


 
Eraser ©   (2006-02-09 20:08) [87]


> evvcom ©   (09.02.06 14:24) [85]


> В твоем коде ничто не мешает войти в cs1.

Мешает! А именно мешает следующая интерация цикла в потоке. Т.е. цикл не будет выполнятся далее, если буфер не обработан. В том то и "фишка".


 
evvcom ©   (2006-02-10 09:24) [88]


> мешает следующая интерация цикла в потоке. Т.е. цикл не будет выполнятся далее,

Бред. Напиши в тексте программы 10 раз друг за другом EnterCriticalSection(cs1); и посмотри, что получится.


 
Eraser ©   (2006-02-10 15:26) [89]


> evvcom ©   (10.02.06 09:24) [88]

хм... действительно я допустил ошибку, предлжив использовать крит. секции...

After a thread has ownership of a critical section, it can make additional calls to EnterCriticalSection or TryEnterCriticalSection without blocking its execution. This prevents a thread from deadlocking itself while waiting for a critical section that it already owns.

В своём проекте я использовать сходную схему, НО только применял семафоры... WaitForSingleObject работает по-другому.



Страницы: 1 2 3 вся ветка

Текущий архив: 2006.03.19;
Скачать: CL | DM;

Наверх




Память: 0.77 MB
Время: 0.032 c
3-1138175325
Sergey_S
2006-01-25 10:48
2006.03.19
Работа с Access через ADO


2-1141127705
denis24
2006-02-28 14:55
2006.03.19
Зыкрытие всех форм созданных с главной


15-1140778361
Суслик
2006-02-24 13:52
2006.03.19
Вопрос про перечилимые типы


2-1141581940
49 Cent
2006-03-05 21:05
2006.03.19
Как скопировать запись в Adotable?


2-1141057077
John_Doe
2006-02-27 19:17
2006.03.19
SQL не воспринимает дату