Форум программистов, компьютерный форум, киберфорум
C# для начинающих
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  
 
Рейтинг 4.55/11: Рейтинг темы: голосов - 11, средняя оценка - 4.55
0 / 0 / 0
Регистрация: 19.03.2017
Сообщений: 3
.NET 4.x

Threads Потоки синхронизация, один producer два consumer`a и очередь

21.03.2020, 19:05. Показов 2258. Ответов 3

Студворк — интернет-сервис помощи студентам
Привет. Только начал в шарпы, не говоря уже про потоки
и мне тут такую задачку дали
Итак дано: один produser два consumer`a.
Продюсер добавляет задачи в очередь, а эти два их обрабатывают.
Если задача поступает в момент когда 1 консюмер свободен, тот её обрабатывает, иначе в очередь. Второй вступает в работу только когда в очереди наберется допустим более 10 задач.
Я уже накидал таймер в продюсере, что добавляет задачи каждые 2 секунды в очередь, два потока потребителей у меня используют один и тот же метод, где я пытался изобразить поочередность вступления в работу с EventWaitHandle.
Наделал костылей, что работает только на выключения второго потока вовсе, потом он не включается.
Кажется, что делать две EventWaitHandle бред, ибо их видят сразу все потоки.
Потом тот же результат получил с Семафор(1,2).
Собственно вопрос, как сделать такую поочередность доступа, чтобы второй потребитель включался в зависимости от количества елементов в очереди. Что использовать? хелп ми плиз
Метод с которым работают потребители:
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
EventWaitHandle SignalForConsumerOne = new ManualResetEvent(true);
EventWaitHandle SignalForConsumerTwo = new ManualResetEvent(false);
 
public Semaphore semaphore = new Semaphore(1,2); 
 
 
public void Consume()
        {
 
            //Вот тут я пробовал понять как работаетEventWaitHandle, так в работу вступает один потребитель
            SignalForConsumerOne.Set();
            SignalForConsumerTwo.WaitOne();
            if (Thread.CurrentThread.Name == "Thread_1" && _queue.Lenght <= 10)
                SignalForConsumerTwo.WaitOne();
            else
            {
                if (Thread.CurrentThread.Name == "Thread_0")
                {
                    SignalForConsumerOne.Set();
                    if (_queue.Lenght > 10)
                        SignalForConsumerTwo.Set();
                }
 
            }
            //ИЛИ
            semaphore.WaitOne(); //Вот так приступает в роботу сразу два потребителя
 
            Console.WriteLine($" Consumer {Thread.CurrentThread.Name} started ");
            Thread.Sleep(1000);
            while (true)
            {
                                  
                string task = null;
                lock (_queue.locker)
                {
                    if (_queue.Lenght > 0)
                    {
                        task = _queue.Dequeue();
                        if (task == null)
                        {
                            Console.WriteLine("Queue is empty");
                            return;
                        }
                    }
                }
                if (task != null)
                {
                    Console.WriteLine($"Number of elements in queue: {_queue.Lenght}");
                    Console.WriteLine("Produser prodused: {0} ---- and Consumer consume in Thread {1}", task, Thread.CurrentThread.Name);
                    Thread.Sleep(2000);
                }
                else
                {
                   // dispose
                }
                if ( _queue.Lenght == 10)
                {
                    //Отсюда делать Semaphore.Release() или SignalForConsumerTwo.Set() и снова вызывать Consume, чтобы уже с двумя потребителями началась работа, но получается, что далее работает только один.
                    //Consume;
                    //break; 
                }
            }
        }
0
Лучшие ответы (1)
cpp_developer
Эксперт
20123 / 5690 / 1417
Регистрация: 09.04.2010
Сообщений: 22,546
Блог
21.03.2020, 19:05
Ответы с готовыми решениями:

Синхронизация потоков через Event-ы (задача producer/consumer)
Задание :Реализуйте синхронизацию для простейшего случая задачи producer/consumer (spsc – single producer single consumer). Я уже...

Consumer-Producer на семафорах
Всем добрый день. Есть программа #include &lt;windows.h&gt; #include &lt;stdio.h&gt; #include &quot;iostream&quot; using namespace std; int Sklad = 0; ...

Классическая задача producer\consumer
Базовая задачка по потокам\семафорам. Поставщиц потребитель. Помогите вынести Buffer как отдельный класс. Чтобы он отвечал за переменную...

3
490 / 167 / 98
Регистрация: 23.01.2020
Сообщений: 324
22.03.2020, 07:20
Лучший ответ Сообщение было отмечено OlegLysovych как решение

Решение

OlegLysovych, Здравствуйте, я новичок в C# и потоках, но стала интересна ваша задача, посидел поковырялся, один из вариантов который пришел в голову:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
        private static Queue<string> works;
        static Thread producerThread,worker1, worker2;
        static Random rndSleep;
        static void Main(string[] args)
        {
            works = new Queue<string>();
            rndSleep = new Random();
            producerThread = new Thread(new ThreadStart(producer));
            producerThread.Start();
 
            worker1 = new Thread(new ThreadStart(worker));
            worker1.Name = "woker1";
            worker1.Start();
 
            worker2 = new Thread(new ThreadStart(worker));
            worker2.Name = "worker2";
            worker2.Start();
 
            Console.ReadKey();
        }
 
        static void producer()
        {
            int workId = 0;
            while (true)
            {
                try
                {
                    works.Enqueue("work" + workId);
                    workId++;
                    Thread.Sleep(rndSleep.Next(200,1000));
                }
                catch(Exception e)
                {
                    Console.WriteLine(e.Message);
                    Console.WriteLine(e.StackTrace);
                    Console.WriteLine(works.Count);
                }
            }
        }
 
        static void worker()
        {
            while (true)
            {
                if (Thread.CurrentThread == worker2)
                {
                    if (works.Count > 10)
                    {
                        Console.Write("Помогаю ");
                        doWork();
                    }
                }
                else
                {
                    doWork();
                }
            }
        }
 
        static void doWork()
        {
            if (works.Count != 0)
            {
                Console.WriteLine($"WORK:{works.Dequeue()} THREAD:{Thread.CurrentThread.Name} WORKS:{works.Count}");
                Thread.Sleep(rndSleep.Next(600, 1500));
            }
        }
Я не уверен что так правильно и что так можно делать, но вроде что-то работает)) Надеюсь кто-то из опытных поправит и подскажет как правильно)

Добавлено через 13 минут
OlegLysovych, Хотя прошу прощения, не совсем верно, не все условия задачи выполняются) Например проверка на занятость потока)
1
Неадекват
 Аватар для freeba
1501 / 1237 / 248
Регистрация: 02.04.2010
Сообщений: 2,807
22.03.2020, 07:48
Как то так.
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
 
namespace ConsoleApp46
{
    class WorkData
    {
        public string Message { get; set; }
        public int Delay { get; set; }
    }
 
    class Program
    {
        static readonly Random rnd = new Random();
 
        static readonly Channel<WorkData> channel1 = Channel.CreateBounded<WorkData>(new BoundedChannelOptions(10) { SingleWriter = true });
        static readonly Channel<WorkData> channel2 = Channel.CreateUnbounded<WorkData>(new UnboundedChannelOptions() { SingleWriter = true });
 
        static void Main(string[] args)
        {
            Task.Run(async () => await Worker1());
            Task.Run(async () => await Worker2());
 
            Task task = Go();
            Console.ReadKey();
        }
 
        static async Task Go()
        {
            var count = 100;
 
            while (count-- > 0)
            {
                var data = new WorkData() { Delay = rnd.Next(100, 300), Message = $"Operation: {100 - count}" };
 
                var isWrite = channel1.Writer.TryWrite(data);
                if (!isWrite) await channel2.Writer.WriteAsync(data);
 
                await Task.Delay(100);
            }
        }
 
        static async Task Worker1()
        {
            while (await channel1.Reader.WaitToReadAsync())
            {
                if (channel1.Reader.TryRead(out WorkData data))
                {
                    await Task.Delay(data.Delay);
                    Console.WriteLine($"[Worker1]: {data.Message}");
                }
            }
        }
 
        static async Task Worker2()
        {
            while (await channel2.Reader.WaitToReadAsync())
            {
                if (channel2.Reader.TryRead(out WorkData data))
                {
                    await Task.Delay(data.Delay);
                    Console.WriteLine($"[Worker2]: {data.Message}");
                }
            }
        }
    }
}
1
1152 / 860 / 263
Регистрация: 30.04.2009
Сообщений: 3,603
22.03.2020, 22:49
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
 
namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            ConcurrentQueue<int> workItems = new ConcurrentQueue<int>();
            AutoResetEvent mainWorkerSignal = new AutoResetEvent(false);
            AutoResetEvent reserveWorkerSignal = new AutoResetEvent(false);
            CancellationTokenSource cancellation = new CancellationTokenSource();
 
            var mainWorker = StartThread(() =>
            {
                while (!cancellation.Token.IsCancellationRequested)
                {
                    mainWorkerSignal.WaitOne();
 
                    while (workItems.TryDequeue(out int item))
                    {
                        Console.WriteLine($"Processing work item {item} in Main Worker. Remaining items in queue: {workItems.Count}");
                        Thread.Sleep(1000);
                    }
                }
            });
            
            var reserveWorker = StartThread(() =>
            {
                while (!cancellation.Token.IsCancellationRequested)
                {
                    reserveWorkerSignal.WaitOne();
                    
                    while (workItems.Count >= 10 && workItems.TryDequeue(out int item))
                    {
                        Console.WriteLine($"Processing work item {item} in Reserve Worker. Remaining items in queue: {workItems.Count}");
                        Thread.Sleep(500);
                    }
                }
            });
 
            var producer = StartThread(() =>
            {
                foreach (var workItem in Enumerable.Range(1, 30))
                {
                    workItems.Enqueue(workItem);
                    mainWorkerSignal.Set();
 
                    if (workItems.Count >= 10)
                    {
                        reserveWorkerSignal.Set();
                    }
                    
                    Thread.Sleep(200);
                }
            });
 
            producer.Join();
            mainWorker.Join();
            reserveWorker.Join();
 
            Console.WriteLine("Finished");
            Console.ReadKey();
        }
 
        static Thread StartThread(Action threadTask)
        {
            var thread = new Thread(() => threadTask());
            thread.Start();
            return thread;
        }
    }
}
1
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
raxper
Эксперт
30234 / 6612 / 1498
Регистрация: 28.12.2010
Сообщений: 21,154
Блог
22.03.2020, 22:49
Помогаю со студенческими работами здесь

Подскажите по потокам, не могу разобраться с producer/consumer
Всех приветствую, подскажите , не могу понять как всетаки правильно имплементировать producer/consumer, подскажите по русски как нужно с...

Threads, потоки
Здравствуйте, ребят, помогите решить задачу, не могу понять как все реализовать. Создать класс Flood, который получает в конструкторе...

C++, Curl и threads-потоки
Есть такой пример у разработчиков CURL: http://curl.haxx.se/libcurl/c/multithread.html Суть - в 4 потока одновременно открывается 4...

Потоки демоны (Daemon threads)
Объясните зачем нужны потоки-демоны. В каких случаях они применяются ?

Потоки (threads) и сжатие файлов
Всем привет! Я недавно изучаю C#, пытаюсь самостоятельно выполнять практические задания. Есть задание, общая суть такая:...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
4
Ответ Создать тему
Новые блоги и статьи
SDL3 для Web (WebAssembly): Подключение Box2D v3, физика и отрисовка коллайдеров
8Observer8 12.02.2026
Содержание блога Box2D - это библиотека для 2D физики для анимаций и игр. С её помощью можно определять были ли коллизии между конкретными объектами и вызывать обработчики событий столкновения. . . .
SDL3 для Web (WebAssembly): Загрузка PNG с прозрачным фоном с помощью SDL_LoadPNG (без SDL3_image)
8Observer8 11.02.2026
Содержание блога Библиотека SDL3 содержит встроенные инструменты для базовой работы с изображениями - без использования библиотеки SDL3_image. Пошагово создадим проект для загрузки изображения. . .
SDL3 для Web (WebAssembly): Загрузка PNG с прозрачным фоном с помощью SDL3_image
8Observer8 10.02.2026
Содержание блога Библиотека SDL3_image содержит инструменты для расширенной работы с изображениями. Пошагово создадим проект для загрузки изображения формата PNG с альфа-каналом (с прозрачным. . .
Установка Qt-версии Lazarus IDE в Debian Trixie Xfce
volvo 10.02.2026
В общем, достали меня глюки IDE Лазаруса, собранной с использованием набора виджетов Gtk2 (конкретно: если набирать текст в редакторе и вызвать подсказку через Ctrl+Space, то после закрытия окошка. . .
SDL3 для Web (WebAssembly): Работа со звуком через SDL3_mixer
8Observer8 08.02.2026
Содержание блога Пошагово создадим проект для загрузки звукового файла и воспроизведения звука с помощью библиотеки SDL3_mixer. Звук будет воспроизводиться по клику мышки по холсту на Desktop и по. . .
SDL3 для Web (WebAssembly): Основы отладки веб-приложений на SDL3 по USB и Wi-Fi, запущенных в браузере мобильных устройств
8Observer8 07.02.2026
Содержание блога Браузер Chrome имеет средства для отладки мобильных веб-приложений по USB. В этой пошаговой инструкции ограничимся работой с консолью. Вывод в консоль - это часть процесса. . .
SDL3 для Web (WebAssembly): Обработчик клика мыши в браузере ПК и касания экрана в браузере на мобильном устройстве
8Observer8 02.02.2026
Содержание блога Для начала пошагово создадим рабочий пример для подготовки к экспериментам в браузере ПК и в браузере мобильного устройства. Потом напишем обработчик клика мыши и обработчик. . .
Философия технологии
iceja 01.02.2026
На мой взгляд у человека в технических проектах остается роль генерального директора. Все остальное нейронки делают уже лучше человека. Они не могут нести предпринимательские риски, не могут. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2026, CyberForum.ru