0 / 0 / 0
Регистрация: 19.03.2017
Сообщений: 3
.NET 4.x

Threads Потоки синхронизация, один producer два consumer`a и очередь

21.03.2020, 19:05. Показов 2283. Ответов 3

Студворк — интернет-сервис помощи студентам
Привет. Только начал в шарпы, не говоря уже про потоки
и мне тут такую задачку дали
Итак дано: один produser два consumer`a.
Продюсер добавляет задачи в очередь, а эти два их обрабатывают.
Если задача поступает в момент когда 1 консюмер свободен, тот её обрабатывает, иначе в очередь. Второй вступает в работу только когда в очереди наберется допустим более 10 задач.
Я уже накидал таймер в продюсере, что добавляет задачи каждые 2 секунды в очередь, два потока потребителей у меня используют один и тот же метод, где я пытался изобразить поочередность вступления в работу с EventWaitHandle.
Наделал костылей, что работает только на выключения второго потока вовсе, потом он не включается.
Кажется, что делать две EventWaitHandle бред, ибо их видят сразу все потоки.
Потом тот же результат получил с Семафор(1,2).
Собственно вопрос, как сделать такую поочередность доступа, чтобы второй потребитель включался в зависимости от количества елементов в очереди. Что использовать? хелп ми плиз
Метод с которым работают потребители:
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
EventWaitHandle SignalForConsumerOne = new ManualResetEvent(true);
EventWaitHandle SignalForConsumerTwo = new ManualResetEvent(false);
 
public Semaphore semaphore = new Semaphore(1,2); 
 
 
public void Consume()
        {
 
            //Вот тут я пробовал понять как работаетEventWaitHandle, так в работу вступает один потребитель
            SignalForConsumerOne.Set();
            SignalForConsumerTwo.WaitOne();
            if (Thread.CurrentThread.Name == "Thread_1" && _queue.Lenght <= 10)
                SignalForConsumerTwo.WaitOne();
            else
            {
                if (Thread.CurrentThread.Name == "Thread_0")
                {
                    SignalForConsumerOne.Set();
                    if (_queue.Lenght > 10)
                        SignalForConsumerTwo.Set();
                }
 
            }
            //ИЛИ
            semaphore.WaitOne(); //Вот так приступает в роботу сразу два потребителя
 
            Console.WriteLine($" Consumer {Thread.CurrentThread.Name} started ");
            Thread.Sleep(1000);
            while (true)
            {
                                  
                string task = null;
                lock (_queue.locker)
                {
                    if (_queue.Lenght > 0)
                    {
                        task = _queue.Dequeue();
                        if (task == null)
                        {
                            Console.WriteLine("Queue is empty");
                            return;
                        }
                    }
                }
                if (task != null)
                {
                    Console.WriteLine($"Number of elements in queue: {_queue.Lenght}");
                    Console.WriteLine("Produser prodused: {0} ---- and Consumer consume in Thread {1}", task, Thread.CurrentThread.Name);
                    Thread.Sleep(2000);
                }
                else
                {
                   // dispose
                }
                if ( _queue.Lenght == 10)
                {
                    //Отсюда делать Semaphore.Release() или SignalForConsumerTwo.Set() и снова вызывать Consume, чтобы уже с двумя потребителями началась работа, но получается, что далее работает только один.
                    //Consume;
                    //break; 
                }
            }
        }
0
Лучшие ответы (1)
cpp_developer
Эксперт
20123 / 5690 / 1417
Регистрация: 09.04.2010
Сообщений: 22,546
Блог
21.03.2020, 19:05
Ответы с готовыми решениями:

Синхронизация потоков через Event-ы (задача producer/consumer)
Задание :Реализуйте синхронизацию для простейшего случая задачи producer/consumer (spsc – single producer single consumer). Я уже...

Consumer-Producer на семафорах
Всем добрый день. Есть программа #include &lt;windows.h&gt; #include &lt;stdio.h&gt; #include &quot;iostream&quot; using namespace std; int Sklad = 0; ...

Классическая задача producer\consumer
Базовая задачка по потокам\семафорам. Поставщиц потребитель. Помогите вынести Buffer как отдельный класс. Чтобы он отвечал за переменную...

3
490 / 167 / 98
Регистрация: 23.01.2020
Сообщений: 324
22.03.2020, 07:20
Лучший ответ Сообщение было отмечено OlegLysovych как решение

Решение

OlegLysovych, Здравствуйте, я новичок в C# и потоках, но стала интересна ваша задача, посидел поковырялся, один из вариантов который пришел в голову:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
        private static Queue<string> works;
        static Thread producerThread,worker1, worker2;
        static Random rndSleep;
        static void Main(string[] args)
        {
            works = new Queue<string>();
            rndSleep = new Random();
            producerThread = new Thread(new ThreadStart(producer));
            producerThread.Start();
 
            worker1 = new Thread(new ThreadStart(worker));
            worker1.Name = "woker1";
            worker1.Start();
 
            worker2 = new Thread(new ThreadStart(worker));
            worker2.Name = "worker2";
            worker2.Start();
 
            Console.ReadKey();
        }
 
        static void producer()
        {
            int workId = 0;
            while (true)
            {
                try
                {
                    works.Enqueue("work" + workId);
                    workId++;
                    Thread.Sleep(rndSleep.Next(200,1000));
                }
                catch(Exception e)
                {
                    Console.WriteLine(e.Message);
                    Console.WriteLine(e.StackTrace);
                    Console.WriteLine(works.Count);
                }
            }
        }
 
        static void worker()
        {
            while (true)
            {
                if (Thread.CurrentThread == worker2)
                {
                    if (works.Count > 10)
                    {
                        Console.Write("Помогаю ");
                        doWork();
                    }
                }
                else
                {
                    doWork();
                }
            }
        }
 
        static void doWork()
        {
            if (works.Count != 0)
            {
                Console.WriteLine($"WORK:{works.Dequeue()} THREAD:{Thread.CurrentThread.Name} WORKS:{works.Count}");
                Thread.Sleep(rndSleep.Next(600, 1500));
            }
        }
Я не уверен что так правильно и что так можно делать, но вроде что-то работает)) Надеюсь кто-то из опытных поправит и подскажет как правильно)

Добавлено через 13 минут
OlegLysovych, Хотя прошу прощения, не совсем верно, не все условия задачи выполняются) Например проверка на занятость потока)
1
Неадекват
 Аватар для freeba
1501 / 1237 / 248
Регистрация: 02.04.2010
Сообщений: 2,807
22.03.2020, 07:48
Как то так.
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
 
namespace ConsoleApp46
{
    class WorkData
    {
        public string Message { get; set; }
        public int Delay { get; set; }
    }
 
    class Program
    {
        static readonly Random rnd = new Random();
 
        static readonly Channel<WorkData> channel1 = Channel.CreateBounded<WorkData>(new BoundedChannelOptions(10) { SingleWriter = true });
        static readonly Channel<WorkData> channel2 = Channel.CreateUnbounded<WorkData>(new UnboundedChannelOptions() { SingleWriter = true });
 
        static void Main(string[] args)
        {
            Task.Run(async () => await Worker1());
            Task.Run(async () => await Worker2());
 
            Task task = Go();
            Console.ReadKey();
        }
 
        static async Task Go()
        {
            var count = 100;
 
            while (count-- > 0)
            {
                var data = new WorkData() { Delay = rnd.Next(100, 300), Message = $"Operation: {100 - count}" };
 
                var isWrite = channel1.Writer.TryWrite(data);
                if (!isWrite) await channel2.Writer.WriteAsync(data);
 
                await Task.Delay(100);
            }
        }
 
        static async Task Worker1()
        {
            while (await channel1.Reader.WaitToReadAsync())
            {
                if (channel1.Reader.TryRead(out WorkData data))
                {
                    await Task.Delay(data.Delay);
                    Console.WriteLine($"[Worker1]: {data.Message}");
                }
            }
        }
 
        static async Task Worker2()
        {
            while (await channel2.Reader.WaitToReadAsync())
            {
                if (channel2.Reader.TryRead(out WorkData data))
                {
                    await Task.Delay(data.Delay);
                    Console.WriteLine($"[Worker2]: {data.Message}");
                }
            }
        }
    }
}
1
1152 / 860 / 263
Регистрация: 30.04.2009
Сообщений: 3,603
22.03.2020, 22:49
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
 
namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            ConcurrentQueue<int> workItems = new ConcurrentQueue<int>();
            AutoResetEvent mainWorkerSignal = new AutoResetEvent(false);
            AutoResetEvent reserveWorkerSignal = new AutoResetEvent(false);
            CancellationTokenSource cancellation = new CancellationTokenSource();
 
            var mainWorker = StartThread(() =>
            {
                while (!cancellation.Token.IsCancellationRequested)
                {
                    mainWorkerSignal.WaitOne();
 
                    while (workItems.TryDequeue(out int item))
                    {
                        Console.WriteLine($"Processing work item {item} in Main Worker. Remaining items in queue: {workItems.Count}");
                        Thread.Sleep(1000);
                    }
                }
            });
            
            var reserveWorker = StartThread(() =>
            {
                while (!cancellation.Token.IsCancellationRequested)
                {
                    reserveWorkerSignal.WaitOne();
                    
                    while (workItems.Count >= 10 && workItems.TryDequeue(out int item))
                    {
                        Console.WriteLine($"Processing work item {item} in Reserve Worker. Remaining items in queue: {workItems.Count}");
                        Thread.Sleep(500);
                    }
                }
            });
 
            var producer = StartThread(() =>
            {
                foreach (var workItem in Enumerable.Range(1, 30))
                {
                    workItems.Enqueue(workItem);
                    mainWorkerSignal.Set();
 
                    if (workItems.Count >= 10)
                    {
                        reserveWorkerSignal.Set();
                    }
                    
                    Thread.Sleep(200);
                }
            });
 
            producer.Join();
            mainWorker.Join();
            reserveWorker.Join();
 
            Console.WriteLine("Finished");
            Console.ReadKey();
        }
 
        static Thread StartThread(Action threadTask)
        {
            var thread = new Thread(() => threadTask());
            thread.Start();
            return thread;
        }
    }
}
1
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
raxper
Эксперт
30234 / 6612 / 1498
Регистрация: 28.12.2010
Сообщений: 21,154
Блог
22.03.2020, 22:49
Помогаю со студенческими работами здесь

Подскажите по потокам, не могу разобраться с producer/consumer
Всех приветствую, подскажите , не могу понять как всетаки правильно имплементировать producer/consumer, подскажите по русски как нужно с...

Threads, потоки
Здравствуйте, ребят, помогите решить задачу, не могу понять как все реализовать. Создать класс Flood, который получает в конструкторе...

C++, Curl и threads-потоки
Есть такой пример у разработчиков CURL: http://curl.haxx.se/libcurl/c/multithread.html Суть - в 4 потока одновременно открывается 4...

Потоки демоны (Daemon threads)
Объясните зачем нужны потоки-демоны. В каких случаях они применяются ?

Потоки (threads) и сжатие файлов
Всем привет! Я недавно изучаю C#, пытаюсь самостоятельно выполнять практические задания. Есть задание, общая суть такая:...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
4
Ответ Создать тему
Опции темы

Новые блоги и статьи
Отчёт о спецтехнике находящейся в ремонте
Maks 20.04.2026
Отчёт из решения ниже размещен в конфигурации КА2. Задача: отобразить спецтехнику, которая на данный момент находится в ремонте. Есть нетиповой документ "Заявка на ремонт спецтехники" который. . .
Памятка для бота и "визитка" для читателей "Semantic Universe Layer (Слой семантической вселенной)"
Hrethgir 19.04.2026
Сгенерировано для краткого описания по случаю сборки и компиляции скелета серверного приложения. И пусть после этого скажут, что статьи сгенерированные AI - туфта и не интересно. И это не реклама -. . .
Запрет удаления строк ТЧ документа при определенном условии
Maks 19.04.2026
Алгоритм из решения ниже реализован на примере нетипового документа "Аккумуляторы", разработанного в конфигурации КА2. У данного документа есть ТЧ, в которой в зависимости от прав доступа. . .
Модель заражения группы наркоманов
alhaos 17.04.2026
Условия задачи сформулированы тут Суть: - Группа наркоманов из 10 человек. - Только один инфицирован ВИЧ. - Колются одной иглой. - Колются раз в день. - Колются последовательно через. . .
Мысли в слух. Про "навсегда".
kumehtar 16.04.2026
Подумалось тут, что наверное очень глупо использовать во всяких своих установках понятие "навсегда". Это очень сильное понятие, и я только начинаю понимать край его смысла, не смотря на то что давно. . .
My Business CRM
MaGz GoLd 16.04.2026
Всем привет, недавно возникла потребность создать CRM, для личных нужд. Собственно программа предоставляет из себя базу данных клиентов, в которой можно фиксировать звонки, стадии сделки, а также. . .
Знаешь почему 90% людей редко бывают счастливыми?
kumehtar 14.04.2026
Потому что они ждут. Ждут выходных, ждут отпуска, ждут удачного момента. . . а удачный момент так и не приходит.
Фиксация колонок в отчете СКД
Maks 14.04.2026
Фиксация колонок в СКД отчета типа Таблица. Задача: зафиксировать три левых колонки в отчете. Процедура ПриКомпоновкеРезультата(ДокументРезультат, ДанныеРасшифровки, СтандартнаяОбработка) / / . . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2026, CyberForum.ru