Форум программистов, компьютерный форум, киберфорум
C# для начинающих
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  
 
Рейтинг 4.56/25: Рейтинг темы: голосов - 25, средняя оценка - 4.56
Уважайте чужое время
75 / 23 / 8
Регистрация: 01.02.2013
Сообщений: 191

Получение результата вызова функции из очереди

16.08.2021, 21:22. Показов 4840. Ответов 17
Метки нет (Все метки)

Студворк — интернет-сервис помощи студентам
Есть некая функция, которая получает данные, обрабатывает и возвращает результат обработки. Функция, как есть =)
Данные, которые она получает, можно обрабатывать только по очереди (где-то увидел интересный тип очереди: BlockingCollection<T>).
Одновременная обработка возможна, но приведёт к ооочень большим затратам по памяти и процессору, т.к. создание объектов обработчика — дорогое занятие в этом плане, поэтому желательно исходить из того, что это невозможно, но варианты рассмотрю.

Итого, я хочу не плодить обработчики, а пользоваться одним, но последовательно, притом, что вызовы обработчика происходят асинхронно. Если обработчик начинает работу, а работа уже шла, то он благополучно падает на середине пути.

Идея с очередью показалась близкой с искомой, но моя проблема в том, что я не могу сообразить, как мне связать добавление объекта в очередь и получить выполнение функции с аргументом в виде этого объекта из очереди для конкретного вызова?
Либо, я вполне вероятно, упускаю какой-то более разумный подход к организации подобного процесса.

Рабочего кода относящегося к теме особо нет, поэтому
далее условный код, коротко иллюстрирующий искомое:
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
        public static BlockingCollection<object>? BlockingCollectionQueue { get; set; }
 
        public static void ProcessQueue()
        {
            object itemNext;
            while (!IsCancelled && BlockingCollectionQueue .TryTake(out itemNext, Timeout.Infinite)) // [OFF] p.s. я знаю что такое CancellationToken, пока не до этого, сбыдлокодил как было =)[/OFF]
            {
                [B]ProcessingFunction(itemNext);[/B] // отсюда хочу вернуть значение каким-то образом, может передавать функцию в очередь? Но тогда всё равно неясно, как получить в вызывающий метод результат выполнения именно того, что было передано.
            }
        }
 
        public static async Task<object> CallOfProcessing(object argObject)
        {
            BlockingCollectionQueue .Add(argObject);
            return Task<object>.Run(() => [B]??????????[/B]); // Как вернуть сюда искомое?
        }
Пока писал это, подумал, что я мог бы нумеровать вызовы и возвращать результаты в дополнительную очередь с номером вызова, а в методе CallOfProcessing ожидать в цикле результат вызова с нужным номером, но выглядит тааааак костыльно...

Точно есть способ лучше, подскажите плз, или хотя бы пните в нужную сторону =)
0
IT_Exp
Эксперт
34794 / 4073 / 2104
Регистрация: 17.06.2006
Сообщений: 32,602
Блог
16.08.2021, 21:22
Ответы с готовыми решениями:

Встраивание результата функции в место ее вызова
Здравствуйте. Можно ли как-то реализовать чтобы перед компиляцией программы выполнолась функция, а ее результат встраивался в место ее...

Получение результата скалярной функции Sql Server из VBA
Не понимаю, почему не могу получить результат выполнения именно хранимой функции (скалярной) SQL Server, если вызывать хранимую процедуру...

Получение результата из функции. Поиск нужного значения по матрице
Задача: Найти значения &quot;а&quot; в функциях f4 и f5 в т.С 1. f2(x, a) = x+cos(a); =&gt; выход 2. f4(x) = f2(x, f2(x)); =&gt; выход ...

17
Модератор
Эксперт .NET
 Аватар для Элд Хасп
16140 / 11264 / 2888
Регистрация: 21.04.2018
Сообщений: 33,111
Записей в блоге: 2
16.08.2021, 21:34
big1991, вы как бы противоречите сами себе.
Вам нужна асинхронность, но обработка последовательная....

Последовательная по какой причине?
Если для обработки следующего элемента нужны результаты обработки предыдущего, то здесь никакую асинхронность не запихнёшь.
Максимум можно занять один поток и в нём последовательно запускать задачи в порядке элементов.

Если же последовательность нужна для возврата результатов в той же последовательности, то ваше предположение в конце поста - один из возможных вариантов реализации.
0
Уважайте чужое время
75 / 23 / 8
Регистрация: 01.02.2013
Сообщений: 191
16.08.2021, 21:44  [ТС]
Этим обработчиком пользуются разные службы и разные инстансы одинаковых служб, которые работают независимо друг от друга.

Обработчик их обрабатывает последовательно, т.к. он один, а вот запросы поступают вразнобой и иногда одновременно, что заставляет обработчик захлебнуться.

Их время блокировки даже при одном единственном обработчике несущественное, поэтому на данный момент меня только интересует возможность отправлять запросы асинхронно и дальше выполнять независимые вычисления, пока запрошенное то значение не понадобится, а далее я уже подумаю, как плодить обработчики не тормозя работу и снижать нагрузку на очередь (мб буду добавлять их при достижении очереди выше порога).

Смысл не в том, чтобы забирать ответы в том же порядке, а в том, чтобы каждый вызов забрал точно своё значение потокобезопасным образом.
А в приведённом участке этого не происходит.
0
Модератор
Эксперт .NET
 Аватар для Элд Хасп
16140 / 11264 / 2888
Регистрация: 21.04.2018
Сообщений: 33,111
Записей в блоге: 2
16.08.2021, 21:57
big1991, стало ещё меньше понятно....
Если это метод-обработчик, то при чём здесь очередь?

Произошло событие, вызвался обработчик в том же потоке что и событие, получил результат и вернул его.
Если в это время произойдёт другое событие, то этот же обработчик будет в потоке другого события обрабатывать другие данные.
Обработчик это метод и какая разница сколько событий его одновременно вызывают?
0
Модератор
Эксперт .NET
 Аватар для Элд Хасп
16140 / 11264 / 2888
Регистрация: 21.04.2018
Сообщений: 33,111
Записей в блоге: 2
16.08.2021, 21:58
big1991, по интерфейсу Форума:
 Комментарий модератора 
При обращении к другому пользователю указывайте его ник в тегах [NICK][/NICK] или цитируйте часть сообщения на которое отвечаете.
В противном случае ему не придёт уведомление о вашем обращении и вы можете не дождаться ответа на своё сообщение.

Для вставки ника: введите ник, выделите его и нажмите кнопку "Динамик" на панели редактора сообщений.
Или кликните по нику автора сообщения в панели слева от текста его сообщения.

Для вставки цитаты: выделите нужную цитату, должна появиться всплывающая кнопка "Цитировать", нажмите её.
1
Уважайте чужое время
75 / 23 / 8
Регистрация: 01.02.2013
Сообщений: 191
17.08.2021, 00:11  [ТС]

Не по теме:

Элд Хасп, спасибо за пояснение по обращению, всё думал, как же это делается.



Цитата Сообщение от Элд Хасп Посмотреть сообщение
Обработчик это метод
В том-то и дело, что в моём случае обработчик - это Объект сторонней подключаемой библиотеки. Извиняюсь, что сразу некорректно выразился. Думаю, что это очень многое прояснит.

Просто я использую этот объект в методе, который вызывает этот обработчик, поэтому так описал этот момент в коде.

Если я создам на каждый вызов по обработчику, то потеряю приблизительно в 300-400 раз больше времени, чем используя один и тот же для разных вызовов (работа заранее созданного обработчика для одного вызова в среднем занимает ~1мс, а создать новый — 300-400мс), поэтому пользуюсь одним, созданным при инициализации программы и записанном в статическом свойстве, к которому обращаются все вызовы в коде.
Ускорение на пару порядков таким образом получил, но иногда происходит эксепшен о том, что к обработчику обратилось больше вызовов, чем он может потянуть одновременно.

Добавлено через 1 час 8 минут
Пока обошёлся костылём:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    public static class SomeStaticClass
    {
        private static BlockingCollection<QueueEntry>? BlockingCollectionQueue { get; set; }
 
        private static List<QueueEntry>? ResultsList { get; set; }
 
        public static void ProcessQueue()
        {
            while (!IsCancelled && BlockingCollectionQueue.TryTake(out var objToRecognizeNext, Timeout.Infinite))
            {
                objToRecognizeNext.result =
                    ProcessingFunction(objToRecognizeNext);;
                ResultsList.Add(objToRecognizeNext);
            }
        }
 
        public static object CallOfProcessing(object argObject)
        {
            var entry = new QueueEntry(argObject);
            BlockingCollectionQueue.TryAdd(entry);
            
            var result = ResultsList.FirstOrDefault(x => x._guid == entry._guid);
            while (result._guid == Guid.Empty)
            {
                result = ResultsList.FirstOrDefault(x => x._guid == entry._guid);
                Thread.Sleep(5);
            }
 
            ResultsList.Remove(result);
            return result.result;
        }
 
        public struct QueueEntry
        {
            public Guid _guid;
            public object args;
            public string result;
 
            public QueueEntry(object argsParam)
            {
                _guid = Guid.NewGuid();
                args = argsParam;
                result = string.Empty;
            }
        }
    }
Но всё ещё открыт к предложениям по улучшению организации или избранию иного подхода к проблеме!
В перспективе уйду от статического объекта обработчика (используемый в функции ProcessingFunction), к набору объектов, а эти методы порефакторю до чего-то более адекватного, но осознаю, что хоть оно и работает, но это не более, чем костыль =(
0
Модератор
Эксперт .NET
 Аватар для Элд Хасп
16140 / 11264 / 2888
Регистрация: 21.04.2018
Сообщений: 33,111
Записей в блоге: 2
17.08.2021, 09:18
Цитата Сообщение от big1991 Посмотреть сообщение
Пока обошёлся костылём:
Хоть бы комменты вставили - как понять что здесь что?

Вызывается асинхронно метод CallOfProcessing?
Он создаёт новый QueueEntry добавляет его в BlockingCollectionQueue.
Потом в цикле получает этот же QueueEntry и ожидает пока _guid не перестанет быть равным Guid.Empty.

Метод ProcessQueue удаляет objToRecognizeNext из коллекции и после удаления записывает его в ResultsList.
Но абсолютно непонятно откуда берётся этот objToRecognizeNext и что за флаг IsCancelled .

Так же непонятно как эти методы вызываются.

Информация очень скупая и сделать по ней какие-либо выводы просто невозможно.

Цитата Сообщение от big1991 Посмотреть сообщение
работа заранее созданного обработчика для одного вызова в среднем занимает ~1мс, а создать новый — 300-400мс)
Тоже непонятно.
Пусть обработчик объект.
Но в каждом вызове вы обращаетесь к его методу.
Это не означает, что нужно для каждого вызова создавать новый объект обработчика.
Ничто не мешает асинхронно юзать его методы в любом количестве.


Покажите как вы реализуете эти вызовы в случае если для каждого надо создавать новый объект-обработчик и для случая использования единного.
Код сократите до существенного только для обсуждаемого вопроса.
0
Уважайте чужое время
75 / 23 / 8
Регистрация: 01.02.2013
Сообщений: 191
17.08.2021, 11:46  [ТС]
Цитата Сообщение от Элд Хасп Посмотреть сообщение
Код сократите до существенного только для обсуждаемого вопроса.
Сейчас код исключительно сокращенный, поэтому и нет контекста вызовов этих методов. И ни одной лишней реализации тоже не воткнул. А т.к. я его практически "обфусцировал" вручную, то в вызывающем контексте не так чтобы много смысла...

Насчет комментариев к коду - мой косяк, согласен.

Цитата Сообщение от Элд Хасп Посмотреть сообщение
Вызывается асинхронно метод CallOfProcessing?
Этот метод может быть вызван в асинхронном контексте из разных потоков, да. Но сам он синхронный, как и весь класс в приведённом листинге.

Цитата Сообщение от Элд Хасп Посмотреть сообщение
Потом в цикле получает этот же QueueEntry и ожидает пока _guid не перестанет быть равным Guid.Empty.
Ну да. Он пытается получить этот же объект, но из другой очереди (позже я немного переделал класс и метод, т.к. List тоже ни... разу не потокобезопасен), и если не получил, то уходит в цикл, пока не получит. Ну а появится там этот объект, только после того, как обработается в цикле в ProcessQueue, как вы позже описали.

Цитата Сообщение от Элд Хасп Посмотреть сообщение
Метод ProcessQueue удаляет objToRecognizeNext из коллекции и после удаления записывает его в ResultsList.
А ещё этот objToRecognizeNext обрабатывается в ProcessingFunction, и её результатом заполняет своё поле objToRecognizeNext.result.

Цитата Сообщение от Элд Хасп Посмотреть сообщение
Но абсолютно непонятно откуда берётся этот objToRecognizeNext и что за флаг IsCancelled.
Вот отсюда он взялся — из очереди, в которую мы заполняем будущую работу обработчика, строка № 9:
Цитата Сообщение от big1991 Посмотреть сообщение
while (!IsCancelled && BlockingCollectionQueue.TryTake(out var objToRecognizeNext, Timeout.Infinite))
А по существу:
Цитата Сообщение от Элд Хасп Посмотреть сообщение
Пусть обработчик объект.
Но в каждом вызове вы обращаетесь к его методу.
Это не означает, что нужно для каждого вызова создавать новый объект обработчика.
Ничто не мешает асинхронно юзать его методы в любом количестве.
В общем виде, вы правы, конечно, но если бы в моём случае это было так, то тема не появилась бы. Я ещё в начале предложил принять это за аксиому: один объект-обработчик может работать только последовательно с каждым обращением к нему по очереди.
Объект сам налагает ограничение на один вызов своих методов одновременно, т.к. это не просто метод на 5 строчек кода, а немаленькая библиотека на 400кб оптимизированного кода. (написана не мной).

В коде не описан флаг
Code
1
IsCancelled
, который предлагается в роли аналога CancellationToken (можно обойтись и без этого флага), а также не описана
Code
1
ProcessingFunction
, т.к. её сигнатура очевидна, а реализация не существенна для этой темы, достаточно знать, что она может выполняться только для последовательно.
Я бы внёс ясность, если бы сделал класс нестатическим, либо если бы добавил объект-обработчик, полагаю:

Расширил код до обсуждаемого объёма с комментариями. Если вставить его в IDE, то должно быть понятно.
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
    public static class SomeStaticClass
    {
        /// <summary>
        /// Очередь на обработку
        /// </summary>
        private static BlockingCollection<QueueEntry>? BlockingCollectionQueue { get; set; }
 
        /// <summary>
        /// Результаты выполнения
        /// </summary>
        private static BlockingCollection<QueueEntry>? ResultsList { get; set; }
 
        /// <summary>
        /// Флаг отмены операции
        /// </summary>
        private static bool IsCancelled { get; set; }
 
        /// <summary>
        /// Запуск обработки очереди
        /// </summary>
        private static void ProcessQueue()
        {
            while (!IsCancelled && BlockingCollectionQueue.TryTake(out var objToRecognizeNext, Timeout.Infinite))
            {
                objToRecognizeNext.result =
                    processingObject.ProcessingFunction(objToRecognizeNext);;
                ResultsList.Add(objToRecognizeNext);
            }
        }
 
        /// <summary>
        /// Тот самый объект-обработчик.
        /// Который раньше я инициализировал для каждого вызова.
        /// </summary>
        private static ProcessingObjectType processingObject = new ProcessingObjectType();
        
        /// <summary>
        /// Моя функция для использования этого обработчика
        /// </summary>
        /// <param name="processingObject">Собственно обработчик</param>
        /// <param name="objToRecognizeNext">Обрабатываемая информация</param>
        /// <returns>Результат</returns>
        private static string ProcessingFunction(this ProcessingObjectType processingObject, QueueEntry objToRecognizeNext)
        {
            return processingObject.SomeProcess(objToRecognizeNext);
        }
 
        /// <summary>
        /// Вызываемый в контексте метод
        /// </summary>
        /// <param name="argObject">Обрабатываемые аргументы</param>
        /// <returns>Обработанный объект</returns>
        public static object CallOfProcessing(object argObject)
        {
            var entry = new QueueEntry(argObject);
            BlockingCollectionQueue.TryAdd(entry);
            
            
            var result = ResultsList.FirstOrDefault(x => x._guid == entry._guid);
            while (result._guid == Guid.Empty)
            {
                result = ResultsList.FirstOrDefault(x => x._guid == entry._guid);
                Thread.Sleep(5);
            }
 
            ResultsList.TryTake(out result);
            return result.result;
        }
 
        internal struct QueueEntry
        {
            public Guid _guid;
            public object args;
            public string result;
 
            public QueueEntry(object argsParam)
            {
                _guid = Guid.NewGuid();
                args = argsParam;
                result = string.Empty;
            }
        }
    }
 
    /// <summary>
    /// Внешняя библиотека, которой я пользуюсь
    /// </summary>
    internal class ProcessingObjectType
    {
        /// <summary>
        /// Какой-то её внутренний процесс, который не позволяет запускать обработчик асинхронно
        /// </summary>
        /// <param name="objToRecognizeNext">Аргумент</param>
        /// <returns>Результат</returns>
        public string SomeProcess(SomeStaticClass.QueueEntry objToRecognizeNext)
        {
            // someWork with objToRecognizeNext.args; 
            return string.Empty;
        }
    }
0
Эксперт .NET
 Аватар для kolorotur
17823 / 12973 / 3382
Регистрация: 17.09.2011
Сообщений: 21,261
17.08.2021, 13:31
Лучший ответ Сообщение было отмечено big1991 как решение

Решение

big1991, по-моему, вам нужна асинхронная очередь с TaskCompletionSource.
Как-то так:
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Объект из сторонней сбоки, выполняющий всю непосредственную работу. В одном экземпляре.
private readonly ProcessingObjectType processor = new();
 
// Обработчик запросов со встроенным буфером/очередью.
// Добавьте using System.Threading.Tasks.Dataflow
private readonly ActionBlock<QueueEntry> Requests = new(
    ProcessQueueEntry, // Ссылка на метод, обрабатывающий каждый элемент очереди (см. ниже)
    new ExecutionDataflowBlockOptions 
    { 
        BoundedCapacity = 100, // Максимальное количество элементов в очереди. Если очередь полна, то SendAsync (см. ниже) будет асинхронно ждать освобождения.
        MaxDegreeOfParallelism = 1, // Количество элементов, обрабатываемых одновременно. У вас ограничение на один, потому единица.
    });
 
// Пользователи вызывают этот метод: string result = await instance.CallOfProcessing(arg, token);
public async Task<string> CallOfProcessing(object args, CancellationToken token = default)
{
    using QueueEntry entry = new(args, token);
 
    // Добавление элемента в очередь на обработку.
    // Если очередь полна (InputCount == BoundedCapacity), то выполнение будет асинхронно ожидать свободного места.
    await Requests.SendAsync(entry, token).ConfigureAwait(false);
 
    Console.WriteLine($"Enqueued {args} (current queue: {Requests.InputCount})");
 
    // Ожидание окончания работы: пока до элемент дойдет очередь и обработчик не произведет результат.
    return await entry.Completion.ConfigureAwait(false);
}
 
// Этот метод вызывается для каждого элемента очереди.
// Может быть асинхронным — зависит от реализации метода из сторонней сборки.
private void ProcessQueueEntry(QueueEntry entry)
{
    string result = processor.SomeProcess(entry.Args, entry.CancellationToken);
 
    // Уведомление о том, что выполнение закончено: в этот момент код с string result = await CallOfProcessing(arg, token) завершится.
    entry.Complete(result);
}
 
private readonly struct QueueEntry : IDisposable
{
    private readonly TaskCompletionSource<string> _tcs;
    private readonly CancellationTokenSource _cancellation;
    private readonly CancellationTokenRegistration _cancellationRegistration;
 
    public QueueEntry(object argsParam, CancellationToken token = default) : this()
    {
        Args = argsParam;
        _tcs = new();
 
        // Обработка отмены
        _cancellation = CancellationTokenSource.CreateLinkedTokenSource(token);
        _cancellationRegistration = _cancellation.Token.Register(Cancel);
    }
    public object Args { get; }
    public CancellationToken CancellationToken => _cancellation.Token;
    public Task<string> Completion => _tcs.Task;
 
    public void Dispose()
    {
        _cancellation.Dispose();
        _cancellationRegistration.Dispose();
    }
    internal void Cancel()
    {
        _tcs.TrySetCanceled(CancellationToken);
    }
    internal void Complete(string result) => _tcs.TrySetResult(result);
}
 
private class ProcessingObjectType
{
    static readonly Random rand = new();
    public string SomeProcess(object arg, CancellationToken cancellationToken)
    {
        // Имитируем бурную деятельность.
        TimeSpan timeToComplete = TimeSpan.FromSeconds(rand.Next(15));
        Console.WriteLine($"Work on {arg} will complete in {timeToComplete}");
        Task.Delay(timeToComplete, cancellationToken).Wait();
 
        return $"Work on {arg} completed in {timeToComplete}";
    }
}
Ниже простой код для тестирования.
Каждая введенная в консоль строка будет отправлена на "обработку" как Arg.
Ввод пустой строки отменяет все текущие задачи.
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
ProcessorWrapper processor = new(); // Класс, инкапсулирующий всю вышеприведенную логику.
CancellationTokenSource massCancellation = new();
while (true)
{
    string arg = Console.ReadLine();
    if (string.IsNullOrEmpty(arg)) 
        break;
 
    // Ожидаем окончания в отдельном потоке из пула — чтобы не блокировать ввод консоли для следующей задачи и для имитации одновременных запросов разными пользователями
    _ = Task.Run(async () => Console.WriteLine(await processor.CallOfProcessing(arg, massCancellation.Token)));
}
 
// Отменяем все висящие и активные запросы из очереди.
massCancellation.Cancel();
2
Модератор
Эксперт .NET
 Аватар для Элд Хасп
16140 / 11264 / 2888
Регистрация: 21.04.2018
Сообщений: 33,111
Записей в блоге: 2
17.08.2021, 13:34
Цитата Сообщение от big1991 Посмотреть сообщение
Расширил код до обсуждаемого объёма с комментариями. Если вставить его в IDE, то должно быть понятно.
Да, намного понятнее стало.

Если правильно понял, то "бутылочное горлышко" это метод SomeProcess который может вызываться одновременно только один?
Для следующего вызова надо подождать завершения предыдущего вызо?
0
Уважайте чужое время
75 / 23 / 8
Регистрация: 01.02.2013
Сообщений: 191
17.08.2021, 14:00  [ТС]
Цитата Сообщение от Элд Хасп Посмотреть сообщение
"бутылочное горлышко" это метод SomeProcess который может вызываться одновременно только один
Элд Хасп, точно так!

Добавлено через 18 минут
Цитата Сообщение от kolorotur Посмотреть сообщение
асинхронная очередь с TaskCompletionSource
Это выглядит как то, что я искал!

Сегодня-завтра попробую внедрить.

Но здесь вроде бы "из коробки" параллелизм доступен только для одного инстанса обработчика.

А если я хочу, чтобы при таком же подходе при "переполнении" определённого порога наполненности очереди (пусть те же 100 или меньше), он не в ожидание вставал, а асинхронно создавался новый инстанс и работал уже параллельно первому, то мне нужно будет:
1. Переделать поле обработчика на список.
2. Добавить механизм, который:
2.1. Добавляет и убирает обработчики по мере необходимости
2.2. Отмечает, когда обработчик свободен (возможно записывает рядом с ним в какой-то коллекции значение флага по факту Completion последнего обработанного элемента или типа того... Пока этого не понял, как попробую в коде — отпишусь, как сделал.
2.3. Ну и чтобы выбирал, какой из обработчиков запустить к текущему элементу.
3. Устанавливать MaxDegreeOfParallelism в значение .Count списка обработчиков.

Выглядит просто. Спасибо огромное, kolorotur!

Не по теме:

Кликните здесь для просмотра всего текста
Осталось эту асинхронность запилить в мой повсеместно синхронный код, ничего не сломав в логике.
Разумеется асинхронность в программе присутствует, но намного дальше кода, вызывающего описанные выше участки, они выполняются последовательно, но каждый в своём потоке.
Но это уже к теме не относится.

0
Эксперт .NET
 Аватар для kolorotur
17823 / 12973 / 3382
Регистрация: 17.09.2011
Сообщений: 21,261
17.08.2021, 14:11
Лучший ответ Сообщение было отмечено big1991 как решение

Решение

Цитата Сообщение от big1991 Посмотреть сообщение
то мне нужно будет
Создайте сразу количество обработчиков равное количеству логических процессоров — проще, чем потом списками манипулировать.

Цитата Сообщение от big1991 Посмотреть сообщение
станавливать MaxDegreeOfParallelism в значение .Count списка обработчиков.
Если обработка производится на ЦП, то ставить значение выше количества логических процессоров контрпродуктивно.

Цитата Сообщение от big1991 Посмотреть сообщение
Осталось эту асинхронность запилить в мой повсеместно синхронный код
Дык если у вас везде синхронность, то вопрос не имеет смысла.
Вам же надо было прицепить именно асинхронные вызовы к синхронной обработке.
1
Модератор
Эксперт .NET
 Аватар для Элд Хасп
16140 / 11264 / 2888
Регистрация: 21.04.2018
Сообщений: 33,111
Записей в блоге: 2
17.08.2021, 15:31
Лучший ответ Сообщение было отмечено big1991 как решение

Решение

Цитата Сообщение от big1991 Посмотреть сообщение
точно так!
Если понял правильно ваши разъяснения, то нечто подобное:
C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    public static class ProcessingObjectTypeSync
    {
        // Единственный экземпляр обработчика
        private static readonly ProcessingObjectType processing = new();
 
        // Синхронный локирующий метод
        // Если было несколько вызовов, то блокируется выполнени
        // до завершение предыдущих вызово
        public static string SomeProcess(QueueEntry objToRecognizeNext)
        {
            lock (processing)
                return processing.SomeProcess(objToRecognizeNext);
        }
 
        // Перегрузка для Фабрики Task, чтобы не создавать лямбд
        private static string SomeProcess(object obj)
            => SomeProcess((QueueEntry)obj);
 
        // Асинхронный метод не блокирующий вызывающий поток,
        // если одновременно несколько вызовов.
        public static async Task<string> SomeProcessAsync(QueueEntry objToRecognizeNext)
            => await Task.Factory.StartNew(SomeProcess, objToRecognizeNext);
    }
1
Эксперт .NET
 Аватар для kolorotur
17823 / 12973 / 3382
Регистрация: 17.09.2011
Сообщений: 21,261
17.08.2021, 15:51
Цитата Сообщение от Элд Хасп Посмотреть сообщение
нечто подобное
Я бы не стал такой подход использовать.
Это реализация "очереди", где потоки из пула используются вместо массива элементов, только на каждый элемент выделяется 1 МБ памяти вместо пары десяткой байтов на ссылки и целый поток, который тут же блокируется вместо того, чтобы заниматься полезной работой.
Судя по имени параметра objToRecognizeNext можно предположить, что у автора может быть какая-нибудь обработка изображений. Несложно представить хаос, который начнется в пуле потоков, если пользователь скормит программе папочку с базой фотографий, на каждую из которых будет вызван SomeProcessAsync.
Плюс очередь выполнения не гарантируется (что не так страшно, если автору не нужно именно последовательное выполнение).
1
Модератор
Эксперт .NET
 Аватар для Элд Хасп
16140 / 11264 / 2888
Регистрация: 21.04.2018
Сообщений: 33,111
Записей в блоге: 2
17.08.2021, 16:14
Цитата Сообщение от kolorotur Посмотреть сообщение
Это реализация "очереди", где потоки из пула используются вместо массива элементов...
Да.
Но исходя из условий задачи, не думаю, что стоит заморачиваться:
Цитата Сообщение от big1991 Посмотреть сообщение
работа заранее созданного обработчика для одного вызова в среднем занимает ~1мс
Добавлено через 4 минуты
Цитата Сообщение от kolorotur Посмотреть сообщение
Плюс очередь выполнения не гарантируется
Да.
Этот момент я специально у TC уточнял.
0
Эксперт .NET
 Аватар для kolorotur
17823 / 12973 / 3382
Регистрация: 17.09.2011
Сообщений: 21,261
17.08.2021, 16:19
Цитата Сообщение от Элд Хасп Посмотреть сообщение
работа заранее созданного обработчика для одного вызова в среднем занимает ~1мс
Это да.
Правда, тогда непонятна вот эта ситуация:
Цитата Сообщение от big1991 Посмотреть сообщение
при таком же подходе при "переполнении" определённого порога наполненности очереди (пусть те же 100 или меньше)
То есть запросы могут прилетать с периодичностью чаще, чем раз в 1мс, да еще так, что может заполниться очередь?
0
Модератор
Эксперт .NET
 Аватар для Элд Хасп
16140 / 11264 / 2888
Регистрация: 21.04.2018
Сообщений: 33,111
Записей в блоге: 2
17.08.2021, 16:27
Цитата Сообщение от kolorotur Посмотреть сообщение
То есть запросы могут прилетать с периодичностью чаще, чем раз в 1мс, да еще так, что может заполниться очередь?
Я тоже не совсем это понял.
Пытался выяснить, но как-то ясного ответа не получил.

Так как если возможна такая большая очерёдность, то нужно создавать несколько экземпляров объектов-обработчиков и "дирижировать" ими.
Иначе, как не делай, будут возможны лаги.

Добавлено через 1 минуту
big1991, АУ !!!
Треба большей ясности по вероятному количеству одновременных запросов и допускам по лагам в связи с этим.
0
Уважайте чужое время
75 / 23 / 8
Регистрация: 01.02.2013
Сообщений: 191
17.08.2021, 16:49  [ТС]
Цитата Сообщение от Элд Хасп Посмотреть сообщение
lock (processing)
Только не кидайтесь помидорами, но на данный момент меня устраивает эта конкретная конструкция полностью.

Цитата Сообщение от Элд Хасп Посмотреть сообщение
не думаю, что стоит заморачиваться
Верно, когда добавлю больше потоков или подключу больше форматов данных, то рассмотрю необходимость реализации с TaskCompletionSource.
И у меня с асинхронными и параллельными операциями пока что всё плохо, так что спасибо за "пинок"!

Цитата Сообщение от kolorotur Посмотреть сообщение
То есть запросы могут прилетать с периодичностью чаще, чем раз в 1мс, да еще так, что может заполниться очередь?
В контексте этого проекта, в перспективе — да, переполнение возможно даже при столь низкой нагрузке на одну итерацию, а при расширении функционала нагрузка может и возрасти кратно, тогда-то возможно будет и не обойтись без TaskCompletionSource с кол-вом движком по кол-ву процессоров.

Всем спасибо, для меня — решено.

Добавлено через 11 минут
Цитата Сообщение от Элд Хасп Посмотреть сообщение
если возможна такая большая очерёдность, то нужно создавать несколько экземпляров объектов-обработчиков и "дирижировать" ими
Да-да, вот я это и описал, что именно этим я и займусь, когда такая необходимость возникнет.

Цитата Сообщение от big1991 Посмотреть сообщение
Их время блокировки даже при одном единственном обработчике несущественное, поэтому на данный момент меня только интересует возможность отправлять запросы асинхронно... а далее ...
В общем, блокирование движка на момент вычислений меня устраивает на данном этапе развития проекта.

Цитата Сообщение от kolorotur Посмотреть сообщение
То есть запросы могут прилетать с периодичностью чаще, чем раз в 1мс, да еще так, что может заполниться очередь?
Когда я создал эту тему, то у меня даже эксепшен без блокировки выскакивал в среднем 1 раз в ~час.
Наплыв запросов происходит не 100% времени работы, но это систематическая обычная ситуация даже при средней нагрузке приложения.
Дальше будет больше, и соответственно начнется переполнение очереди и понадобится асинхронная параллельная обработка разными обработчиками.

Допускаю, что сейчас проблема ещё и в архитектуре самого приложения, но эти грабли пока что вне моего поля зрения =)

Добавлено через 10 минут
Цитата Сообщение от Элд Хасп Посмотреть сообщение
по вероятному количеству одновременных запросов и допускам по лагам в связи с этим.
В цифрах это затруднительно для меня описать.

В зависимости от настроек и загруженности приложения, частота будет от 300 запросов (т.е. и 1 обработчик справится с блокировками) в секунду и до нескольких тысяч, тут уже возможны задержки, но если наплодить обработчиков на каждый поток, как посоветовал уважаемый kolorotur, то проблем не станет.

По лагам: не существенно. В зависимости от того, насколько хорошо получится разобраться с другими модулями, может быть удастся снизить частоту запросов и проблемы вообще не станет. Это узкое место только в данный момент.
Так что ещё раз спасибо!
0
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
BasicMan
Эксперт
29316 / 5623 / 2384
Регистрация: 17.02.2009
Сообщений: 30,364
Блог
17.08.2021, 16:49
Помогаю со студенческими работами здесь

Вывод на экран результата вызова ls
Есть задача вывести на экран результат программы. Суть программы в том что она повторяет функцию ls! Прошу вас, уважаемые пользователи...

После вызова второго запроса, запросы начинаются повторяться по очереди
Здравствуйте,есть на одной странице AJAX запросы.Выполняются по нажатию на кнопку. Прикрутил интервал запуска. Проблема: После вызова...

Зачем нужно преобразование при присваивании указателю результата вызова malloc?
Здравия всем! Когда я читал уроки по выделению памяти, там, в большинстве случаев, был примерно такой код: char *cptr = (char *)...

Получение коллекции из ассинхронного вызова
Здравствуйте. Есть классы, с помощью которых парсится json файл public class SampleDataItem { public...

Получение телефонного номера входящего вызова в ОС windows phone 7
Пишу программку для ОС windows phone 7. Вопрос в следующем: возможно ли получить телефонный номер входящего вызова? Ответа...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
18
Ответ Создать тему
Новые блоги и статьи
Отправка уведомления на почту при изменении наименования справочника
Maks 24.03.2026
Программная отправка письма электронной почты на примере изменения наименования типового справочника "Склады" в конфигурации БП3. Перед реализацией необходимо выполнить настройку системной учетной. . .
модель ЗдравоСохранения 5. Меньше увольнений- больше дохода!
anaschu 24.03.2026
Теперь система здравосохранения уменьшает количество увольнений. 9TO2GP2bpX4 a42b81fb172ffc12ca589c7898261ccb/ https:/ / rutube. ru/ video/ a42b81fb172ffc12ca589c7898261ccb/ Слева синяя линия -. . .
Midnight Chicago Blues
kumehtar 24.03.2026
Такой Midnight Chicago Blues, знаешь?. . Когда вечерние улицы становятся ночными, а ты не можешь уснуть. Ты идёшь в любимый старый бар, и бармен наливает тебе виски. Ты смотришь на пролетающие. . .
SDL3 для Desktop (MinGW): Вывод текста со шрифтом TTF с помощью библиотеки SDL3_ttf на Си и C++
8Observer8 24.03.2026
Содержание блога Финальные проекты на Си и на C++: finish-text-sdl3-c. zip finish-text-sdl3-cpp. zip
Жизнь в неопределённости
kumehtar 23.03.2026
Жизнь — это постоянное существование в неопределённости. Например, даже если у тебя есть список дел, невозможно дойти до точки, где всё окончательно завершено и больше ничего не осталось. В принципе,. . .
Модель здравоСохранения: работники работают быстрее после её введения.
anaschu 23.03.2026
geJalZw1fLo Корпорация до введения программа здравоохранения имела много невыполненных работниками заданий, после введения программы количество заданий выросло. Но на выплатах по больничным это. . .
Контроль уникальности заводского номера
Maks 23.03.2026
Алгоритм контроля уникальности заводского (или серийного) номера на примере нетипового документа выдачи шин для спецтехники с табличной частью, разработанного в конфигурации КА2. Данные берутся из. . .
Хочу заставить корпорации вкладываться в здоровье сотрудников: делаю мат модель здравосохранения
anaschu 22.03.2026
e7EYtONaj8Y Z4Tv2zpXVVo https:/ / github. com/ shumilovas/ med2. git
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2026, CyberForum.ru