Форум программистов, компьютерный форум, киберфорум
C# для начинающих
Войти
Регистрация
Восстановить пароль
Карта форума Темы раздела Блоги Сообщество Поиск Заказать работу  
 
Рейтинг 4.82/11: Рейтинг темы: голосов - 11, средняя оценка - 4.82
65 / 65 / 16
Регистрация: 07.04.2014
Сообщений: 334
1

Событие добавления нового члена в коллекцию FIFO

26.01.2015, 15:43. Показов 2016. Ответов 8
Метки нет (Все метки)

Author24 — интернет-сервис помощи студентам
Всем добрый день (утро/вечер)!

Суть такова. Принимаю большое количество данных, записываю их в FIFO. При записи в FIFO выдаю событие. Один из подписчиков события вынимает из FIFO сообщение и начинает длительную обработку. В итоге начинаются проблемы, так как подписчик не "отпускает" событие пока не выполнит обработку => часть передаваемых сообщений теряется (передаются с МК, у него буфер программный переполняется если долго не принимать сообщения). Игрался с потоками, всё едино. Можно сделать for() опрос коллекции например в режиме sleep(5), но это некрасиво.

Итого цель: сделать постоянный приём сообщений, и оповещение о приёме сообщения которое не блочит приём (плюс обработка сообщений в порядке их прихода).

Как сделать событие которое оповестит подписчика и сразу же вернёт управление назад? Может тут какой-то паттерн придуман? тестовый вариант проблемы ниже.

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
//очередь новых сообщений
static Queue<object> qe = new Queue<object>();
 
//оповещение  о новом сообщение
public delegate void StatisticMessage();
public static event StatisticMessage newMessage;
 
private static Thread thread1;
private static Thread thread2;
 
static void Main(string[] args)
{
     newMessage +=messageProcessor;
     Thread thread1 = new Thread(AddToFIFO);
     thread1.Start();
     Console.ReadKey();
}
 
//добавляю в очередь 10 сообщений
public static void AddToFIFO()
{
     for (var i = 10; i > 0; i--)
     {
          string message = i.ToString();;
 
          //"в очередь, сукины дети, в очередь"
          EnqueueQueueMessage(message);
     }
}
 
//функция добавления + оповещение о добавлении
public static void EnqueueQueueMessage(object message)
{
     qe.Enqueue(message); 
 
     try
     {
          newMessage();
     }
     catch (Exception e)
     {
          Console.WriteLine(e.Message + " " + e.TargetSite + " " + e.StackTrace + "\n");
     }
}
 
//подписчик оповещения о добавлении
public static void messageProcessor()
{
     processing(qe.Dequeue); // в таком виде обрабатывается по порядку, но пока 
//обработчик  3 секунды не поспит, новое сообщение не принимается
 
     /*
     если сделать тут поток, то вроде всё ок, но последовательность обработки сообщений сбивается.
 
     thread2=new Thread(Processing);
     thread2.Start(qe.Dequeue)
 
     предполагаю что нужно делать какую-то блокировку потоков, чтобы они обрабатывали 
     сообщения в том порядке в каком поток вызывается, а не как им вздумается.
     */
}
 
//обработка сообщения
public static void Processing(object message)
{
     Thread.Sleep(3000);
     Console.WriteLine((string)message);
}
0
Лучшие ответы (1)
Programming
Эксперт
94731 / 64177 / 26122
Регистрация: 12.04.2006
Сообщений: 116,782
26.01.2015, 15:43
Ответы с готовыми решениями:

Максимально быстрый способ добавления миллионов объектов в коллекцию
Всем привет. Собственно имеем объекты содержащие два поля long, double; таких объектов свыше...

Событие передачи объектов в коллекцию
Хочу реализовать следующую задачу: Есть абстрактный класс, есть его наследники. Хочу чтобы при...

Есть небольшая проблема. После добавления слов в коллекцию не переводит. Как исправить ошибку?
using System; using System.Collections.Generic; namespace ConsoleApp3 { class Program ...

Добавления нового человека
Здравствуйте уважаемые форумчане, у меня такая проблема: Есть Контроллер в котором прописанный два...

8
Эксперт .NET
5534 / 4298 / 1217
Регистрация: 12.10.2013
Сообщений: 12,332
Записей в блоге: 2
26.01.2015, 15:49 2
fidgi, вы не очень понятно рассказали задачу, но возможно вам поможет реализация Producer/Consumer?
1
65 / 65 / 16
Регистрация: 07.04.2014
Сообщений: 334
26.01.2015, 16:06  [ТС] 3
Цитата Сообщение от insite2012 Посмотреть сообщение
вы не очень понятно рассказали задачу
Попробую объяснить.

Я принимаю сообщения от микроконтроллера. Вплоть до 1000 в секунду. Далее мне нужно их обрабатывать. Если я не буду вовремя забирать сообщения из программного буфера МК, то сообщения будут затираться. Обработка довольно ресурсоёмкая, поэтому я не могу обрабатывать сообщения на лету (точнее я это сейчас примерно и делаю в том виде который представлен выше, но после 400к сообщений накапливается запаздывание и сообщения в программном буфере начинают затираться выдавая ошибки переполнения. И это на core i7)

Следовательно я решил записывать их в FIFO коллекцию и уже оттуда вытаскивать по одному и обрабатывать.
Т.е. задача состоит в том чтобы не потерять сообщения от МК и постепенно их обработать и вывести результаты в том порядке в котором они приходили от МК.

В том виде в каком программа дана выше коллекция бесполезна потому что в неё новое сообщение записывается только после завершения обработки, а если добавить доп. поток, то первая часть задачи выполняется и сообщения пишутся в коллекцию независимо, но ломается обработка, так как сообщения начинают обрабатываться вразнобой.

Цитата Сообщение от insite2012 Посмотреть сообщение
возможно вам поможет реализация Producer/Consumer?
посмотрю
0
Эксперт .NETАвтор FAQ
10410 / 5140 / 1825
Регистрация: 11.01.2015
Сообщений: 6,226
Записей в блоге: 34
26.01.2015, 16:17 4
Почему бы не вызывать событие в отдельном потоке ?
1
65 / 65 / 16
Регистрация: 07.04.2014
Сообщений: 334
26.01.2015, 16:25  [ТС] 5
Цитата Сообщение от Storm23 Посмотреть сообщение
Почему бы не вызывать событие в отдельном потоке ?
Пробовал. Результат аналогичен созданию потока в обработчике события. Обработка и вывод сообщений будет идти не по порядку.
0
Эксперт .NETАвтор FAQ
10410 / 5140 / 1825
Регистрация: 11.01.2015
Сообщений: 6,226
Записей в блоге: 34
26.01.2015, 16:36 6
Тогда вам нужно сделать локальную очередь у каждого подписчика.
Подписчик принимает сообщение о новых данных и копирует принятые данные в свое FIFO (это будет быстро), а обрабатывает эту локальную очередь уже отдельный поток.
1
Эксперт .NET
5534 / 4298 / 1217
Регистрация: 12.10.2013
Сообщений: 12,332
Записей в блоге: 2
26.01.2015, 16:48 7
Лучший ответ Сообщение было отмечено fidgi как решение

Решение

fidgi, если порядок обработки элементов критичен, то только однопоточный Поставщик-Потребитель поможет, ятд. Вот его классическая реализация (в однопоточном варианте)
Класс:
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace Producer_Consumer {
    public class ProducerConsumerQueue: IDisposable {
        EventWaitHandle wh = new AutoResetEvent(false);
        Thread worker;
        object locker = new object();
        Queue<string> tasks = new Queue<string>();
 
        public ProducerConsumerQueue() {
            worker = new Thread(Work);
            worker.Start();
        }
        public void EnqueueTask(string task) {
            lock (locker) {
                tasks.Enqueue(task);
                wh.Set();
            }
        }
        public void Dispose() {
            EnqueueTask(null);
            worker.Join();
            wh.Close();
        }
        void Work() {
            while (true) {
                string task = null;
                lock (locker) {
                    if (tasks.Count > 0) {
                        task = tasks.Dequeue();
                        if (task == null)
                            return;
                    }
                }
                if (task != null) {
                    Console.WriteLine("Produce task: {0}", task.ToString());
                    Thread.Sleep(2000);
                }
                else
                    wh.WaitOne();
            }
        }
    }
}
Использование:
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
 
namespace Producer_Consumer {
    class Program {
        static void Main(string[] args) {
            using (ProducerConsumerQueue q=new ProducerConsumerQueue()){
                for (int i = 0; i < 10; i++) {
                    q.EnqueueTask(i.ToString());
                }
            }
            Console.ReadLine();
        }
    }
}
Поставщик ставит задачи в очередь, а Потребитель их обрабатывает. Если задач нет-очередь ожидает их поступления.
1
Master of Orion
Эксперт .NET
6098 / 4954 / 905
Регистрация: 10.07.2011
Сообщений: 14,522
Записей в блоге: 5
26.01.2015, 17:22 8
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
 
class Program
{
    static Random random = new Random();
    static void Main()
    {
        var proc = new MyProcessor();
        proc.NewMessage += (sender, args) =>
        {
            Thread.Sleep(random.Next(500, 4000)); // симулируем задержку
            Console.WriteLine("Object {0} sended message {1}", sender, args.Data);
        };
        proc.Start();
        Thread.Sleep(TimeSpan.FromSeconds(10));
        proc.Stop();
    }
}
 
 
 
class MyProcessor
{
    public class MessageEventArgs : EventArgs
    {
        public string Data { get; set; }
 
        public MessageEventArgs(string data)
        {
            Data = data;
        }
    }
 
    public EventHandler<MessageEventArgs> NewMessage = delegate { };
    public bool IsWorking { get; set; }
 
    private int maxmessages = 10; //для тестовых целей
 
    public void Start()
    {
        IsWorking = true;
        Task.Factory.StartNew(Work);
    }
 
    public void Stop()
    {
        IsWorking = false;
    }
 
    private void Work()
    {
        while (IsWorking && maxmessages-->0)
        {
            var msg = ReadMessage();
            Task.Factory.StartNew(() => NewMessage(this, new MessageEventArgs(msg)));
            Console.WriteLine("Message proceeded");
        }
    }
 
    private string ReadMessage()
    {
        var count = Environment.TickCount;
        return count%2 == 0 ? "tick" : "tack";
    }
}
как видите, сообщения все получены сразу, а обработка может занимать любое время.

Добавлено через 26 минут
Извиняюсь, там должно быть:
C#
1
public event EventHandler<MessageEventArgs> NewMessage = delegate { };
2
65 / 65 / 16
Регистрация: 07.04.2014
Сообщений: 334
26.01.2015, 17:27  [ТС] 9
Спасибо за помощь! Сижу осознаю.
0
26.01.2015, 17:27
IT_Exp
Эксперт
87844 / 49110 / 22898
Регистрация: 17.06.2006
Сообщений: 92,604
26.01.2015, 17:27
Помогаю со студенческими работами здесь

Добавления нового инстанса
Имеется сервер БД Oracle, на котором имеется один инстанс БД. Задача добавить ещё один инстанс и...

Макрос добавления нового Листа
Доброй ночи Всем! Написал простенький макрос для добавления нового листа(ПОКАЗАТЬ(выбираю...

Метод добавления нового заказа
Всем здрасте :) Помогите задачку решить нужно сформировать заказ на сайте без участия пользователя....

Форма добавления нового пользователя
как сделать форму добавления нового пользователя? ACCESS 2007

Авторизация. проверка и добавления нового пользователя
Здравствуйте . У меня есть окно авторизации где вводятся имя пользователя и пароль . Если ...

Добавления нового узла в дерево XML файла
Еще раз приветствую, пытаюсь разобраться с &quot;linq to xml&quot;, на этот раз хотел бы спросить по данному...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
9
Ответ Создать тему
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2024, CyberForum.ru