Главная страница
    Top.Mail.Ru    Яндекс.Метрика
Форум: "Прочее";
Текущий архив: 2014.04.13;
Скачать: [xml.tar.bz2];

Вниз

Событие для всех потоков.   Найти похожие ветки 

 
Дмитрий СС   (2013-10-21 16:10) [0]

Подскажите, пожалуйста, как проще всего организовать следующее:
- Есть несколько потоков ожидающие события.
- Событие происходит.
- *Все* ожидающие потоки начинают работать.

Что-то вроде надежной версии PulseEvent.


 
Eraser ©   (2013-10-21 16:39) [1]

Думаю событие с флагом bManualReset должно подойти.


 
Дмитрий СС   (2013-10-21 16:52) [2]


> Думаю событие с флагом bManualReset должно подойти.

Получается не очень красиво - т.к. не понятно когда этот самый reset делать.
К тому же поток может получить сигнал во время обработки предыдущего сигнала. В таком случае получится что поток пропустит второй сигнал.

Сейчас у меня только одна идея:
1 поток = 1 event.


 
DVM ©   (2013-10-21 17:31) [3]


> Дмитрий СС

Если число потоков известно заранее, то можно скомбинировать событие и какой нибудь атомарный счетчик, которому при взведении события присваивается значение равное числу потоков, а каждый поток получив событие уменьшал бы значение этого счетчика, последний поток сбрасывал бы событие.


 
НЛО   (2013-10-21 18:03) [4]

Семафор?


 
Медвешонок Порошок   (2013-10-21 20:28) [5]

К тому же поток может получить сигнал во время обработки предыдущего сигнала.

Тогда тебе не надо того, чего ты спросил.

- *Все* ожидающие потоки начинают работать.

как же они "начинают работать", если они в это время не могут начать работать, так как заняты другой важной фигней?


 
NoUser ©   (2013-10-21 21:24) [6]

НЛО, не - даёшь антиСемафор!


>Сейчас у меня только одна идея:
>1 поток = 1 event.


Ага, а как тот поток, который будет их взводить узнает что уже можно/нужно это делать - неужели еще по одному евенту на поток добавишь ?


> Получается не очень красиво - т.к. не понятно когда этот
> самый reset делать.


Задачу озвучивай, а то
http://delphimaster.net/view/2-1382127402/
тебе тож подходит.


 
Дмитрий СС   (2013-10-22 11:25) [7]

Задача такая:
Есть некое хранилище, данные в котором время от времени изменяются.
И есть несколько потоков, которые за этими данными следят (должны отправлять их при каждом изменении).


> DVM ©   (21.10.13 17:31) [3]

Интересная идея, но тоже возможна ситуация, когда данные изменятся, а событие не сработает.
1. Поток А и Б ждут события Event.
2. Хранилище изменяется. Счетчик К=2. SetEvent(Event).
3. Поток А разблокируется и уменьшает счетчик К=1. Начинает работу с данными.
4. Хранилище изменяется еще раз. Счетчик К=2. SetEvent(Event).
5. Поток Б разблокируется и уменьшает счетчик К=1. Начинает работу с данными.
Ну и далее понятно, что получится ерунда.
Получается, что хранилище, прежде чем делать SetEvent должно ждать, пока этот самый Event станет NotSignaled (или как его там), а это не очень хорошо, ввиду того, что какой-нибудь поток может хорошенько затупить.

То что предлагаю я - это у каждого потока есть свой Event, а у хранилища есть блокируемый список Евентов. Когда поток начинает работу, он добавляет свой Евент в список; когда заканчивает - удаляет. Соответственно каждый поток ждет своей евент, а хранилище, в случае изменения, делает SetEvent для каждого = никто никого не ждет, каждый работает сам по себе.


 
vuk ©   (2013-10-22 11:38) [8]

А потокам сообщения отправлять - не?


 
DVM ©   (2013-10-22 11:45) [9]


> Дмитрий СС   (22.10.13 11:25) [7]


> но тоже возможна ситуация, когда данные изменятся, а событие
> не сработает.

Да, я прочитал твой пост [2] и потом уже подумал, что если события повторно могут возникать в процессе обработки потоками еще первого события, то такой метод отпадает. Тут надо перестраивать логику как то.


> То что предлагаю я - это у каждого потока есть свой Event,
>  а у хранилища есть блокируемый список Евентов. Когда поток
> начинает работу, он добавляет свой Евент в список; когда
> заканчивает - удаляет.

А зачем городить такую петрушку с этими эвентами, ведь тебе надо данные обрабатывать а не эвенты.

Я бы сделал что-то типа очереди заданий у каждого потока и раздавал задания в очереди потоков извне, потоки же будут ждать появления данных в очереди и просыпаться для обработки данных, а потом опять засыпать.


 
Медвешонок Порошок   (2013-10-22 12:09) [10]

А потокам сообщения отправлять - не?


а смысл-то?
если он говорит, что поток не может в любой момент времени отвлечься на выполнение команды извне.

при такой схеме не потоки надо оповещать, а сами потоки, освободившись должны лезть в главный модуль/поток за очередной порцией задания.


 
Владислав ©   (2013-10-22 12:30) [11]

Есть хранилище данных, есть нечто, изменяющее данные, есть потоки, обрабатывающие эти изменения.
Создаем очередь заданий для потоков. Обработка очереди потоками управляется событием с автосбросом.
В обработанном состоянии заданий в очереди нет, и потоки ждут сигнального состояния события.
Нечто, изменяющее данные при изменении добавляет задачу в очередь (возможно с указанием, какие данные изменились) и устанавливает событие в сигнальное состояние.
При переходе события в сигнальное состояние просыпается один поток. Состояние события сбрасывается. Поток забирает одну задачу из очереди. Если в очереди есть еще задания, поток переводит событие в сигнальное состояние, позволяя тем самым начать обработку следующего задания другим потоком. После такой работы с очередью заданий поток приступает собственно к обработке своего задания.
Обращение к самой очереди заданий тоже нужно синхронизировать. Можно использовать критическую секцию, например (если внутри одного процесса).


 
vuk ©   (2013-10-22 12:30) [12]

to Медвешонок Порошок   (22.10.13 12:09) [10]:

> если он говорит, что поток не может в любой момент времени
> отвлечься на выполнение команды извне.
> при такой схеме не потоки надо оповещать, а сами потоки,
>  освободившись должны лезть в главный модуль/поток за очередной
> порцией задания.

Зависит от того, как все устроено. Если потоки, например, занимаются разными вещами и по разным алгоритмам, о которых главный модуль ни сном и ни духом, то возможно, что им будет достаточно оповещения, что что-то изменилось. При этом, если будут использованы сообщения, потоки достанут их из своих очередей сообщений в тот момент, когда смогут это сделать и не будут нужны всякие замороченные схемы.


 
Медвешонок Порошок   (2013-10-22 12:35) [13]

конечно зависит.
и у него все устроено так, что рассылать одно глобальное событие всем потокам смысла лишено.

К тому же поток может получить сигнал во время обработки предыдущего сигнала. В таком случае получится что поток пропустит второй сигнал.


 
vuk ©   (2013-10-22 12:45) [14]

to Медвешонок Порошок   (22.10.13 12:35) [13]:

> К тому же поток может получить сигнал во время обработки
> предыдущего сигнала. В таком случае получится что поток
> пропустит второй сигнал.

Вот-вот. Если это именно и только нотификация об изменениях, то сообщение из очереди никуда не пропадет и ни второй сигнал, ни десятый не будут пропущены. Если же что-то типа "задания на обработку", то тут нужна простая очередь с заданиями конкретному потоку. Собственно, разница здесь только в том, что очередь заданий формируется приложением, а очередь сообщений встроена в систему.


 
Медвешонок Порошок   (2013-10-22 13:00) [15]

а какая разница, пропадет нотификация или нет?

вопрос в том, нужна ли она вообще, если на нее никто особо и не готов откликнуться в момент ее прихода.


 
Медвешонок Порошок   (2013-10-22 13:02) [16]

это как когда вы за рулем на магистрали, и вам звонит жена типа купи молока и далее по списку.

вместо :

вы приехали в магазин и звоните жене с вопросом а что там надо купить?


 
Дмитрий СС   (2013-10-22 15:41) [17]

Блин. Прошу прощения, видимо я неясно поставил задачу. Попробую совсем конкретно:
Есть некое хранилище данных, которое может кем-то изменяться.
Есть несколько потоков, каждый из которых при изменении данных в хранилище должен эти данные отправить (в сеть), после чего снова встать в ожидание.
Каждый раз отправляются все данные, а не только изменения, поэтому количество отправок не обязательно должно быть равно количеству изменений.
Задача потоков в том, чтобы их сетевые клиенты имели максимально актуальные данные из хранилища, т. е. поток не должен зависать в ожидании, если с момента последней его отправки что-то изменилось.

Что еще можно уточнить? Связь односторонняя. Поток может передавать данные, а принимать не может.
Ну и еще, что код делается под Linux, насчет очереди сообщений не уверен.


 
Eraser ©   (2013-10-22 15:54) [18]


> Дмитрий СС   (22.10.13 15:41) [17]

я бы делал так.
основной поток (который предоставляет данные остальным), по мере поступления данных записывает их в каждый подчиненный поток.

внутри подчиненного потока, соответственно, идет цикл отправки данных. через какой-то промежуток времени (например, 1 секунду) поток проверяет появились ли в его буфере данные, если да - отправляет их и очищает буфер. естественно, копирование данных из буфера и очистка буфера должны быть закрыты критической секцией, в свою очередь, управляющий поток при заполнении буфера, тоже должен использовать эту же критическую секцию подчиненного потока. отправка данных и любые более-менее длительные операции должны быть, конечно же, вне это секции.


 
Медвешонок Порошок   (2013-10-22 15:54) [19]

каждый из которых при изменении данных в хранилище должен эти данные отправить

но каждый из которых в момент изменения данных может быть занят другой важной фигней и ничего отправить в момент изменения данных не может.

посему и оповещать их незачем.
пусть они сами, когда они освободятся, и когда они будут готовы что-то там кому-то там заслать - вот тогда пусть они сами и спросят а есть ли чего для отправки.


 
Юрий Зотов ©   (2013-10-22 17:33) [20]

1. У хранилища есть список потоков, первоначально пустой.

2. При изменении данных хранилище находит в списке спящий поток, будит его (Resume) и передает ему данные для отправки. Если в этот момент спящих потоков в списке нет, то хранилище создает еще один поток, заносит его в список и передает ему данные для отправки.

3. Заканчивая отправку данных, каждый поток проверяет свое свойство Terminated. Если оно true, то поток удаляет себя из списка и завершается, а если оно false, то поток просто засыпает (Suspend).

4. При завершении программы хранилище проставляет каждому потоку в списке Terminate и будит спящие потоки - а потом ждет, пока не опустеет список и тоже завершается.

-------------------------------------

Таким образом, скорость отправки данных (то есть, количество потоков) автоматически подстраивается под скорость изменения данных. Правда, только в одну сторону (в сторону увеличения) - но если доработать алгоритм так, чтобы хранилище завершало потоки, которые слишком долго спят, то получится нормальный пул.


 
Владислав ©   (2013-10-22 18:15) [21]

Дмитрий СС   (22.10.13 15:41) [17]

Для каждого потока флаг.
Если флаг установлен, данные изменились и поток должен их отправить. Если флаг сброшен, то данные после предыдущей отправки не изменялись.
Нечто, изменяющее данные устанавливает флаги.
Поток периодически проверяет свой флаг. Если флаг установлен, сбрасывает его и отправляет данные клиенту. Если поток стоит в ожидании, пусть по таймауту проверяет свой флаг.

Для синхронизации при изменении/чтении флагов их можно поместить в одну структуру (массив?..) и использовать одну критическую секцию.

Только сдается мне, что этот вопрос нужно решать в комплексе с хранилищем данных. Что будет, если потоки читают и отправляют данные, а их (данные) пытается изменить нечто?..


 
Дмитрий СС   (2013-10-22 18:19) [22]


> Что будет, если потоки читают и отправляют данные, а их
> (данные) пытается изменить нечто?..

Этот вопрос просто и решен крит. секцией.


> Для каждого потока флаг.

А в чем разница флага для каждого потока и Евента для каждого потока?


 
DVM ©   (2013-10-22 18:20) [23]


> Дмитрий СС   (22.10.13 18:19) [22]

флаг хуже, его ожидать нельзя


 
Дмитрий СС   (2013-10-22 18:22) [24]


> Юрий Зотов ©   (22.10.13 17:33) [20]

Немного не та логика.
Потоки создаются извне (сетевым сервером) и их количество не зависит от хранилища.


 
Владислав ©   (2013-10-22 18:34) [25]

Дмитрий СС   (22.10.13 18:19) [22]

> Этот вопрос просто и решен крит. секцией.
Отправлять данные можно параллельно нескольким потокам. Критическая секция не позволит это делать.

> А в чем разница флага для каждого потока и Евента для каждого потока?
В линуксе не силен. В Windows событие - системные ресурсы, флаг - байт в памяти.
Судя по логике, которую Вы описали, у Вас поток или трудится над другой задачей, или что-то ожидает. Если он уже что-то ожидает, зачем дополнительное событие?

DVM ©   (22.10.13 18:20) [23]
Судя по описанию задачи ждать ничего не нужно, нужно узнать, изменялись ли данные, и если изменялись, то отправить. Тут я согласен с Медвешонок Порошок   (22.10.13 15:54) [19]


 
Rouse_ ©   (2013-10-22 19:30) [26]


> Дмитрий СС   (22.10.13 11:25) [7]

Оно?
http://www.frolov-lib.ru/books/bsp/v26/ch4_6.htm


 
DVM ©   (2013-10-22 21:57) [27]

Чем больше я смотрю на эту ветку, тем больше убеждаюсь, что тут вообще никакие объекты синхронизации не нужны (ну разве что для доступа к источнику данных если делать в лоб). Делаем каждому потоку по 1 атомарному счетчику, если счетчик больше 0, то поток просыпается и отсылает все что нужно, если 0, то спит. Если произошло несколько событий подряд в процессе работы потока, то и дарные он отошлет несколько раз. После каждой отсылки поток уменьшает значение счетчика.
Извне счетчики потоков каждый раз увеличиваются при возникновении события.
Вот и все имхо.
Плохо тут лишь то, что потоки будут крутиться пусть и со слипами, но будут крутиться всегда проверяя значение счетчика, в вин это плохой подход имхо.


 
Eraser ©   (2013-10-22 22:03) [28]


> DVM ©   (22.10.13 21:57) [27]

подход нормальный, зря что-ли по 6 ядер в процессоры ставят, хоть на что-то сгодятся ) да и даже если поставить slpee(1) на любом более-менее современном процессоре, даже с сотней потоков нагрузка будет доли процента на проверки, это в лучше случае. а если поставить slpee(1000) - как ни старайся - никакой нагрузки не будет, просто столько потоков не позволит создать ОС.


 
DVM ©   (2013-10-22 22:05) [29]


> Eraser ©   (22.10.13 22:03) [28]

а дело не в загрузке, дело в том, что такой код не может быть выгружен при необходимости


 
Eraser ©   (2013-10-22 22:08) [30]


> DVM ©   (22.10.13 22:05) [29]

почему же, проверять флаг terminated там же.


 
DVM ©   (2013-10-22 22:14) [31]


> Eraser ©   (22.10.13 22:08) [30]

я не про завершение потока, хотя завершение потока ждущего события завершения обычно происходит оперативнее, особенно при больших интервалах в Sleep, я про выгрузке кода в файл подкачки.


 
NoUser ©   (2013-10-23 04:21) [32]


> Есть некое хранилище данных, которое может кем-то изменяться.
> Есть несколько потоков, каждый из которых при изменении
> данных в хранилище должен эти данные отправить (в сеть),
>  после чего снова встать в ожидание.

Тот кто инициировал/увидел изменение должен оповестить валидаторов и ждать завершения/отработки последнего (или ждать еще перед оповещением). Такая вот обратная связь по производительности.

Соответственно (как ты и сказал) по евенту на поток для старта задачи и еще по одному для контроля выполнения (или как в [3] – атомарный счетчик и всего два общих евента)

Усё


 
Дмитрий СС   (2013-10-23 09:59) [33]


> Соответственно (как ты и сказал) по евенту на поток для
> старта задачи и еще по одному для контроля выполнения (или
> как в [3] – атомарный счетчик и всего два общих евента)
>

А зачем ждать завершения?


 
Владислав ©   (2013-10-23 10:46) [34]

DVM ©   (22.10.13 21:57) [27]
Можно и с атомарными счетчиками. Только счетчик, как таковой, не нужен. Нужен именно флаг. Судя по тому, что пишет Дмитрий, важно самую свежую версию данных отправить. Так что если поток данные уже отправил, повторно это нужно делать только после того, как данные опять изменились.

> Плохо тут лишь то, что потоки будут крутиться пусть и со слипами
Опять же, Дмитрий пишет, что потоки либо выполняют другую полезную работу, либо ожидают. Если из ожидания можно выходить по таймауту и проверять атомарные счетчики, то и слипы не нужны.


 
НЛО   (2013-10-23 11:03) [35]

Rouse_ ©   (22.10.13 19:30) [26]

>> Дмитрий СС   (22.10.13 11:25) [7]

>Оно?
>http://www.frolov-lib.ru/books/bsp/v26/ch4_6.htm

Уже предлагали, нет надо через события. :-)

-А мы удалили гланды!
-Так уже лет 500 удаляют гланды.
-А мы через анус. (с)Анекдот


 
Медвешонок Порошок   (2013-10-23 11:36) [36]

а почему анекдот?
есть два способа удаления желчного пузыря для мужчин
и три способа того же самого для женщин.

первые два способа :
- полостная операция  (вскрывают живот)
- лапароскопическая операция (делают три-четыре дырочки в животе)

а для женщин есть третий способ, не портящий красоту живота.


 
Владислав ©   (2013-10-23 12:22) [37]

НЛО   (23.10.13 11:03) [35]

А можете алгоритмы накидать, как то же самое можно сделать с помощью семафоров (для общего развития)?


 
NoUser ©   (2013-10-23 14:42) [38]

> А зачем ждать завершения?

Исходя из

> > Думаю событие с флагом bManualReset должно подойти.
>
> Получается не очень красиво - т.к. не понятно когда этот
> самый reset делать.
> К тому же поток может получить сигнал во время обработки
> предыдущего сигнала. В таком случае получится что поток
> пропустит второй сигнал.

+> [5]

думаю ты и сам понимаешь зачем.

type
TMultiSend = class
  fEvGo    :THandle;  
  fEvOk    :THandle;
  fAtomic  :Cardinal;
  fThCount :Cardinal;
 private
  procedure   WorkTh;
  procedure   SendOk;
 public
  constructor Create;
  procedure   SendGo;  
end;

{ TMultiSend }

constructor TMultiSend.Create;
begin
 fEvOk := CreateEvent(nil,True,True, nil);
 fEvGo := CreateEvent(nil,True,False,nil);
end;

procedure TMultiSend.SendGo;
begin
 if (WaitForSingleObject(fEvOk,INFINITE) <> WAIT_OBJECT_0) then Exit;
 fAtomic := AtomicIncrement(fThCount,0);
 if (fAtomic > 0) then begin
  ResetEvent(fEvOk);
  SetEvent(fEvGo);
 end;
end;

procedure TMultiSend.SendOk;
begin
 if (AtomicDecrement(fAtomic) > 0)
  then WaitForSingleObject(fEvOk,INFINITE)
  else begin
   ResetEvent(fEvGo);
   SetEvent(fEvOk);
  end;
end;

procedure TMultiSend.WorkTh;
begin
 AtomicIncrement(fThCount);
 while (True) do begin
  if (WaitForSingleObject(fEvGo,INFINITE) <> WAIT_OBJECT_0) then Break;
  // Work;
  SendOk;
 end;
 AtomicDecrement(fThCount);
end;


( Владислав ©   (23.10.13 12:22) [37] ) ++;


 
НЛО   (2013-10-23 15:36) [39]

Владислав ©   (23.10.13 12:22) [37]

НЛО   (23.10.13 11:03) [35]

А можете алгоритмы накидать, как то же самое можно сделать с помощью семафоров (для общего развития)?

НЛО   (23.10.13 11:03) [35]

Rouse_ ©   (22.10.13 19:30) [26]

>> Дмитрий СС   (22.10.13 11:25) [7]

>Оно?
>http://www.frolov-lib.ru/books/bsp/v26/ch4_6.htm

Ссылка Розыча как раз все обьясняет что, куда, сколько и как это работает.


 
NoUser ©   (2013-10-23 16:15) [40]

>>> http://www.frolov-lib.ru/books/bsp/v26/ch4_6.htm

> но чтобы при этом в каждый момент работало только ограниченное
> количество задач, например, не больше трех. Для этого мы
> создаем семафор,

> Ссылка Розыча
НЛО, семафоры это там где и стрелки, но не нужно их переводить. Код в студию ;)



Страницы: 1 2 вся ветка

Форум: "Прочее";
Текущий архив: 2014.04.13;
Скачать: [xml.tar.bz2];

Наверх





Память: 0.58 MB
Время: 0.003 c
15-1382905802
Юрий
2013-10-28 00:30
2014.04.13
С днем рождения ! 28 октября 2013 понедельник


15-1382006712
stas
2013-10-17 14:45
2014.04.13
Проблема VS 2010+Devexpress 2012 1.5


2-1372673534
Анастасия
2013-07-01 14:12
2014.04.13
Дельфи начинающим


3-1297857711
Avil
2011-02-16 15:01
2014.04.13
Firebird формат строки


15-1381896201
KilkennyCat
2013-10-16 08:03
2014.04.13
в Си функция объявлена как static





Afrikaans Albanian Arabic Armenian Azerbaijani Basque Belarusian Bulgarian Catalan Chinese (Simplified) Chinese (Traditional) Croatian Czech Danish Dutch English Estonian Filipino Finnish French
Galician Georgian German Greek Haitian Creole Hebrew Hindi Hungarian Icelandic Indonesian Irish Italian Japanese Korean Latvian Lithuanian Macedonian Malay Maltese Norwegian
Persian Polish Portuguese Romanian Russian Serbian Slovak Slovenian Spanish Swahili Swedish Thai Turkish Ukrainian Urdu Vietnamese Welsh Yiddish Bengali Bosnian
Cebuano Esperanto Gujarati Hausa Hmong Igbo Javanese Kannada Khmer Lao Latin Maori Marathi Mongolian Nepali Punjabi Somali Tamil Telugu Yoruba
Zulu
Английский Французский Немецкий Итальянский Португальский Русский Испанский