Форум программистов, компьютерный форум, киберфорум
Наши страницы
Assembler: Windows/protected mode
Войти
Регистрация
Восстановить пароль
 
Рейтинг 4.60/5: Рейтинг темы: голосов - 5, средняя оценка - 4.60
Jin X
Заблокирован
#1

Parallel.For, хочу посоветоваться

13.08.2018, 23:54. Просмотров 1003. Ответов 18
Метки нет (Все метки)

Всем привет!

Решил написать небольшой include для fasm под Windows, который будет позволять выполнять параллельные вычисления для for-циклов по аналогии с TParallel.For в Delphi или Parallel.For в C#, но без таких заморочек, как там, а что-то простенькое (строк на 500 асм-кода, ну может, чуть больше). Результат потом выложу.
Кто не в теме – схема такая. Запускается функция инициализации, которая создаёт ждущие потоки. Далее (сразу или в нужный момент) вызывается функция, которой передаётся адрес процедуры цикла (ProcAddr) диапазон цикла for (либо кол-во итераций). Эта функция будит потоки (SetEvent) и либо завершает свою работу, либо ждёт окончания цикла. Потоки, в свою очередь, инициализируют цикл по определённым правилам (см. ниже вопросы) и вызывают на каждом цикле процедуру цикла (ProcAddr), передавая ей номер цикла и порядковый номер потока (чтобы было удобнее группировать результаты вычислений без использования lock и критических секций). В конце программист вызывает функцию завершения, которая даёт потокам команду завершить работу.

В ходе реализации возникло несколько моментов, о которых я решил посоветоваться с местными спецами

Итак, вопрос №1: какое кол-во потоков оптимально?
Тут два варианта:
  • либо affinity mask текущего процесса,
  • либо общее число логических процессоров.
Как лучше, на ваш взгляд, и почему?
Кроме того, хочу по умолчанию умножать это число на 2 (потому что функция цикла может грузить процессор не на все 100%, а так всё же надёжнее будет).
Ну и давать программисту возможность ограничивать общее число потоков сверху.

Вопрос №1А (туда же): есть ли смысл назначать affinity mask или хотя бы номер идеального процессора для каждого потока?
Мне кажется, что нет (система сама разберётся как лучше).

Вопрос №2: как лучше распределять итерации циклов между потоками?
Представим, что у нас 4 потока и мы делаем цикл i=0...999.
  • Сначала я хотел просто тупо разделить весь цикл поровну, т.е. thread1:i=0,4,8,12..., thread2:i=1,5,9,13..., thread3:i=2,6,10,14..., thread4:i=3,7,11,15... Но потом прикинул, что так не годится, поскольку для некоторых i процедура цикла может выполняться быстрее, чем для других. В результате получится, что какой-то поток закончит свою работу раньше других и далее параллельно будут работать только 3 потока, затем 2 и потом 1.

  • Далее я подумал, что можно делать так: каждый освободившийся поток берёт следующую по счёту итерацию. К примеру:
    Код
    time thread1 thread2 thread3 thread4
     0     [0]     [1]     [2]     [3]
     1      :       :       :       :
     2     [4]      :       :       :
     3      :       :      [5]      :
     4      :      [6]      :      [7]
     5      :       :      [8]      :
     6     [9]      :       :       :
     7      :      [10]     :      [11]
     8      :       :      [12]     :
    Здесь в квадратных скобках указывается номер итерации (i), которую берёт себе поток, а двоеточием - выполнение расчётов в этой итерации. При такой схеме работы, что ни один из потоков не простаивает и захватывает итерацию в тот момент, когда закончил своё вычисление.
    Вроде бы идеальный вариант, но есть один минус: такой подход требует выполнять lock-операцию для получения каждого номера следующей итерации, что для может быть довольно дорогим по времени удовольствием для быстрых процедур цикла при большом кол-ве итераций.

  • Наконец я решил скомбинировать оба подхода и сделать так: взять какое-то число, например, 256. Нацело разделить на него кол-во итераций (в нашем случае 1000/256=3; если получится 0, принять 1). Обозначим результат как BLC (block loop size). Теперь мы выделяем каждому потоку по BLC циклов, после выполнения которых потоки захватывают (по аналогии с предыдущим алгоритмом, т.е. через lock) следующий блок. Т.о., кол-во lock-операций будет примерно равно 256 или чуть больше (максимум 511, но для циклов с большим кол-вом итераций будет 256-257 "захватов"). Такой маленький BLC=3 получился из-за небольшого кол-ва итераций цикла, для 1'000'000 итераций BLC = 3906.
    Код
    time thread1 thread2 thread3 thread4
     0     [0]     [3]     [6]     [9]
     1      :       :       7       :
     2      1       :       :       10
     3      :       4       :       11
     4      2       :       8       :
     5     [12]     5       :       :
     6      :       :       :      [15]
     7      13      [18]    :       :
     8      14      :      [21]     16
    (в квадратных скобках указывается момент захвата блока итераций, а числа без скобок - итерации внутри блока).
    Можно взять в качестве делителя не 256, а 1000, например (давайте это число тоже обсудим). И давать программисту самому задавать его. Более того, можно ограничить число BLC, скажем, значением = 1000 сверху (а реально ли это надо? и каким числом ограничивать?) и кол-вом потоков снизу (на случай, если делитель будет меньше кол-ва потоков). Либо сделать так, чтобы на последних итерациях BLC было = (BLC/(ЧислоПотоков*2)) или что-то в таком духе.
    Такой подход, с одной стороны, не сильно нагружает процессор и шину lock-операциями, с другой - не позволяет процессору, который освободился раньше, простаивать долгое время (разве что на последних итерациях).
А может, у вас есть более интересная идея (но не чересчур замороченная)?

Вопрос №3: достаточно ли указания кол-ва итераций (причём, знаковым значением, т.е. 0x7FFFFFFF для 32-х бит) или обязательно нужно указывать и начальное, и конечное значение счётчика?
Вариант с указанием обоих границ имеет некоторые сложности в реализации при кол-ве итераций, приближённых к максимально возможным.
Но всё решаемо, конечно, при необходимости. Другой вопрос: есть ли такая необходимость?

Что думаете обо всём этом?
Повторюсь, что это не претензия на какую-то грандиозную задумку, а довольно простой, но всё же полезный инструмент для ускорения циклических вычислений.
2
Лучшие ответы (1)
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Similar
Эксперт
41792 / 34177 / 6122
Регистрация: 12.04.2006
Сообщений: 57,940
13.08.2018, 23:54
Ответы с готовыми решениями:

Домашняя сеть, хочу посоветоваться.
Привет! У меня дома есть 2 ноута, 1 десктоп, и 1 КПК, на всех этих устройствах...

Хочу посоветоваться с Java EE гуру
Привет. Хочу посоветоваться с Java EE гуру. Перед мной стоит задача разработать...

Решил обновить комп.Хочу посоветоваться.
Парни доброго времени суток! Решил обновить комп.Хочу посоветоваться. ...

хочу посоветоваться системник около 25 000 игровой
корпус thermaltake или zalman (тысячи за 2-3) ну или cooler master процессор...

Eвклидова норма вектора с помощью метода Parallel.For и Parallel.Invoke
Имеется прога (евклидова норма вектора) Преобразовать её, чтобы...

18
Kukuxumushu
976 / 533 / 97
Регистрация: 13.06.2015
Сообщений: 1,787
Завершенные тесты: 2
14.08.2018, 04:35 #2
Цитата Сообщение от Jin X Посмотреть сообщение
Решил написать небольшой include для fasm под Windows
Очень полезное дело! А вот оформить эту штуку в виде DLL было бы вообще шедевром!
Цитата Сообщение от Jin X Посмотреть сообщение
Как лучше, на ваш взгляд, и почему?
Ну с т.з. ОС конечно правильнее affinity mask, другой вопрос насколько криво оно внутри винды реализовано.
Цитата Сообщение от Jin X Посмотреть сообщение
есть ли смысл назначать affinity mask или хотя бы номер идеального процессора для каждого потока?
Никакого.
Цитата Сообщение от Jin X Посмотреть сообщение
А может, у вас есть более интересная идея (но не чересчур замороченная)?
Твой третий вариант - это по сути первый, только в увеличенном масштабе, со всеми его недостатками (тоже в увеличенном масштабе). Идеальный вариант для реализации - второй, LOCK-инструкции совершенно не страшны, т.к. задачи, в которых это как-либо может повлиять на быстродействие, ни один дурак не параллелит. Наоборот, частый лок поможет корректно работать с мьютексами (я тебе приводил пример, с которым сталкивался год назад, и там ты мне показывал мануал интела, из которого этот принцип предельно понятен).
Цитата Сообщение от Jin X Посмотреть сообщение
Вопрос №3: достаточно ли указания кол-ва итераций (причём, знаковым значением, т.е. 0x7FFFFFFF для 32-х бит) или обязательно нужно указывать и начальное, и конечное значение счётчика?
Без указания границ теряет смысл понятие "переменная цикла", если ты её принудительно руками вобьёшь от нуля до N, то возможности такой библиотеки сразу резко сократятся. Конечно можно и так сделать, но тогда лишний гемор по контролю над переменной ляжет на плечи прикладного программиста, причём каждый раз, вместо одного раза и навсегда в библиотеке.
2
Ethereal
Заблокирован
14.08.2018, 07:53 #3
Цитата Сообщение от Kukuxumushu Посмотреть сообщение
Никакого.
Задача, такая как у ТС-а решается в CPU-майнерах криптовалют. И там от скорости рассчета зависит количество намайненного бабла. Так-что стимул найти оптимальное решение там вполне материальный. Так вот в этих майнерах типично создается число потоков равное числу ядер у процессора и каждый поток пришпиливается SetThreadAffinityMask к своему ядру. А по твоему тут смысла никакого.
2
VTsaregorodtsev
519 / 447 / 67
Регистрация: 19.02.2010
Сообщений: 1,720
14.08.2018, 10:44 #4
Цитата Сообщение от Jin X Посмотреть сообщение
есть ли смысл назначать affinity mask или хотя бы номер идеального процессора для каждого потока?
Имеет, ибо отдельные L1- и L2-кэши у ядра процессора никто не отменял.
А если поток будет прыгать по ядрам - ему придётся ждать, пока модифицируемые им данные (результаты расчётов) через общий и более медленный L3-кэш переберутся на каждое новое ядро.


Цитата Сообщение от Jin X Посмотреть сообщение
можно делать так: каждый освободившийся поток берёт следующую по счёту итерацию.
...
Вроде бы идеальный вариант, но есть один минус: такой подход требует выполнять lock-операцию для получения каждого номера следующей итерации, что для может быть довольно дорогим по времени удовольствием для быстрых процедур цикла при большом кол-ве итераций.
Вместо угадываний ("может быть довольно дорогим") просто проверьте, насколько быстро прокрутится, например, миллион вызовов виндовозной функции InterlockedIncrement. Удивитесь
2
Jin X
Заблокирован
14.08.2018, 13:04  [ТС] #5
Цитата Сообщение от VTsaregorodtsev Посмотреть сообщение
А если поток будет прыгать по ядрам - ему придётся ждать, пока модифицируемые им данные (результаты расчётов) через общий и более медленный L3-кэш переберутся на каждое новое ядро.
Здравая мысль.
А надо ли основному потоку (который может тоже исполнять что-то параллельно, т.к. будет предусмотрена асинхронная работа for'а) тоже назначать affinity mask из 1 бита на время работы for'а?

Цитата Сообщение от VTsaregorodtsev Посмотреть сообщение
миллион вызовов виндовозной функции InterlockedIncrement
А что мне миллион?
Я миллиард хочу
6.8 сек против 2.2 сек без lock'а (разница в 3+ раза).

Но так тоже неинтересно.
Запустим 10 потоков, которые делают "lock inc" в цикле (по 100 млн итераций каждая).
Получим 26.5 сек против 0.7 сек (разница почти в 40 раз!).

Понятное дело, что на практике потоки не будут делать в цикле lock inc, но эксперимент вполне показательный.

Тут на другом форуме предложили ещё одну идею, где lock'ов будет ещё меньше, чуть позже опишу технологию.

Добавлено через 1 минуту
Цитата Сообщение от Kukuxumushu Посмотреть сообщение
А вот оформить эту штуку в виде DLL было бы вообще шедевром!
Можно и DLL сделать, и OBJ...

Добавлено через 2 минуты
Ethereal, VTsaregorodtsev (и другие), что думаете по поводу кол-ва потоков (вопрос 1) и границ цикла (вопрос 3).

Ещё я думаю, что нужно при запуске for'а задавать (опционально) процедуру обработки результата, которая будет запускаться после for'а (в нитке, выполнившей последнюю итерацию). Полезно при асинхронных запусках for'а.
Зачем запускать for асинхронно? К примеру, для отображения % выполнения в основном потоке или для ожидания ввода юзером какого-то значения (которое понадобится позже) и пр.

Добавлено через 2 минуты
Меня удивляет, почему в Embarcadero не додумались передавать в процедуру цикла номер потока. Ведь так можно выделить заранее массив из N элементов (где N - кол-во потоков) и сбрасывать туда результаты работы итерации безо всяких lock'ов, если результаты, например, суммируются.

Добавлено через 34 минуты
Цитата Сообщение от Jin X Посмотреть сообщение
Понятное дело, что на практике потоки не будут делать в цикле lock inc, но эксперимент вполне показательный.
Сделал сейчас (на Delphi) прогу, суммирующую побайтно в 8 потоках область памяти длиной в 1 Гб (каждый поток суммирует в свою переменную).
После суммирования делается inc отдельной переменной (в одном случае с lock'ом, в другом - без).
Результат: без lock'а - 1.2 сек, с lock'ом - 32.5 сек (в 27 раз)!
Если делать этот inc/lock inc не каждый раз, а 1 раз в 4 итерации, то получим 8.6 сек против 1.35 сек (6+ раз).
Процедуры не очень типичные, но вполне реалистичные

Добавлено через 12 минут
Тот "загадочный ещё один метод с другого форума" больше похож на первый предложенный мной вариант, поэтому ещё такой момент интересует...
Т.к. итерации всё равно будут идти не совсем последовательно (скажем, 1,3,2,5,4,6,7,9,8,11,10,13,12...), может, вообще стоит делить весь цикл на равные части и отдавать потокам целиковые куски?
Т.е. не так:
thread1=0,4,8,12...
thread2=1,5,9,13...
thread3=2,6,10,14...
thread4=3,7,11,15...
А вот так:
thread1=0..249
thread2=250..499
thread3=500..749
thread4=750..999
Для упрощения реализации (и ускорения).
В итоге итерации будут происходить примерно в таком виде: (0,250,750,1,1000,251,751,1001,252,2,752,253,1002,3,753,503,1003...).
Или не стоит, т.к. иногда этот вариант может быть более напряжным? Хотя и там, и там цикл не последовательный...

Добавлено через 6 минут
Цитата Сообщение от Kukuxumushu Посмотреть сообщение
Без указания границ теряет смысл понятие "переменная цикла", если ты её принудительно руками вобьёшь от нуля до N, то возможности такой библиотеки сразу резко сократятся.
На самом деле всё решается внутри процедуры цикла элементарным i+start.
Если делать совсем по-нормальному, тогда нужно и шаг приращения указывать (включая отрицательные значения), чего я делать не буду.
Если сравнить сишный и паскалевский for, то возможности второго тоже сильно урезаны.
Так что... реально ли нужно это начальное значение или всё же 0 достаточно?
p.s. Наверное, я всё же буду исходить из сложности реализации, но сдаётся мне, что проблем с этим быть не должно.
1
Ethereal
Заблокирован
14.08.2018, 21:14 #6
Цитата Сообщение от Jin X Посмотреть сообщение
В результате получится, что какой-то поток закончит свою работу раньше других и далее параллельно будут работать только 3 потока, затем 2 и потом 1.
Если пришпиливать каждый поток к своему ядру, то минимум один поток в Виндах будет работать не просто медленнее остальных, а в разы и иногда даже на порядки медленнее. Проверено на практике. В случае задачи с майнингом это не составляет проблем, потому-что конечный индекс цикла никогда не достигается. Сигнал потокам прекратить рассчеты все равно будет разослан раньше, чем они обсчитают все возможные варианты, коих астрономическое количество. А у тебя потоки должны обсчитывать от и до. В таком случае мне приходит в голову алгоритм "тарелки с пирожками". Пирожки - это поддиапазоны и тот поток, что крутится быстрее, может взять с тарелки больше пирожков. Делаем так - рассылаем потокам конец диапазона и величину порции в диапазоне, т.е. "пирожка". Имеем глобальной переменной начало диапазона. Каждый поток проверяет, что она меньше конца диапазона. Если не меньше (что означает, что задача уже обсчитана полностью), то ничего не делает. А если меньше, то занимает переменную, снова проверяет, что она не меньше конца диапазона, запоминает ее как начало поддиапазона (длиной "пирожок") который будет обсчитывать, увеличивает переменную на величину "пирожка", но не больше конца диапазона, освобождает ее и начинает обсчитывать свой "пирожок". В итоге потоки, выполняющиеся быстрее, расхватают с тарелки больше поддиапазонов-пирожков и обсчитают их, а те, что медленнее меньше. Но в целом будут обсчитывать задачу, каждый по своим возможностям. Просто пришло в голову, как вариант алгоритма распределения задачи по с разной скоростью выполняющимся потокам.
2
Jin X
Заблокирован
15.08.2018, 07:26  [ТС] #7
В общем, есть ещё вот такая идея...

Для каждого потока задано кол-во оставшихся итераций (обозначим его как IC). К примеру, если у нас цикл 0...999 и 4 потока, то на старте для всех нитей IC=250 (и при запуске итерации это число будет уменьшаться без блокировки). Допустим, thread1 закончил раньше. При этом у thread2:IC=30, thread3:IC=20, thread4:IC=10.
1. Ищем максимальное IC. Если это значение = 0, завершаем свою работу. Иначе мы нашли нить, у которой будем отбирать работу (это thread2, у него IC=30, обозначим его как ICx).
2. Вычисляем NewIC = ICx/2 = 15.
3. Делаем xchg ICx,NewIC (тут срабатывает блокировка).
4. Смотрим какое значение мы получили через этот обмен. Если оно <= NewIC, значит goto 1 (такое возможно, когда ICx=1 или около того, либо ещё какой-то поток тоже закончил работу и влез раньше нас).
5. Устанавливаем у себя IC = ICx-NewIC, выполняем оставшиеся итерации.

Нормальная схема?

Добавлено через 44 минуты
Ещё и начальное значение нужно хранить, чтобы "отбирающий" поток знал, с какого места ему продолжать (т.к. "отбираемый" мог тоже у кого-то отобрать)
2
Ethereal
Заблокирован
15.08.2018, 07:36 #8
Это уже алгоритм драки между потоками за тарелку с пирожками
1
Jin X
Заблокирован
15.08.2018, 11:45  [ТС] #9
Ethereal, я пока не очень понял (читал вчера ночью) твой алгоритм. Сейчас попробую ещё раз осмыслить на свежую голову

Добавлено через 8 минут
Цитата Сообщение от Jin X Посмотреть сообщение
Ещё и начальное значение нужно хранить, чтобы "отбирающий" поток знал, с какого места ему продолжать (т.к. "отбираемый" мог тоже у кого-то отобрать)
Вернее, не начальное значение нужно хранить, а номер следующей итерации.
Более того, тут уже попахивает cmpxchg8b, ибо между нашим xchg ICx и чтением номера следующей итерации отбираемый поток может закончить свою работу и отобрать у кого-то другого, поменяв оба значения (когда мы одно обновили/получили, а второе только собираемся читать). Уф...

Добавлено через 1 минуту

Не по теме:

Пойду читать про шаурму пирожки от Артура... :D

1
Jin X
Заблокирован
15.08.2018, 11:45  [ТС] #10
Я тут ещё прикинул, что лучше хранить номер последней итерации и кол-во оставшихся итераций (для каждого потока).
Рабочий поток будет на каждом цикле уменьшать кол-во оставшихся (без lock'а), а номер текущей итерации будет храниться просто в регистре (другим он особо не нужен).
Обирающий поток будет проверять кол-во оставшихся итераций и просто читать номер последней итерации, чтобы вычислить откуда ему продолжить (это всё без lock'а), а lock cmpxchg8b здесь будет работать эффективнее, т.к. поток меняет только одно из значений, и меньше вероятности "несрабатывания" (сравнения с ложным результатом). При этом решается ещё одна проблема: если хранить номер следующей (а не последней) итерации и кол-во оставшихся, рабочему потоку нужно как-то менять эти 2 значения одновременно. А тут только одно.


Кстати, кто мне скажет: может ли поток, выполняющий операцию с lock'ом вклиниться между микрооперациями, скажем, инструкции dec? Т.е.:
Код
time   thread1 (dec)   thread2 (xchg)
  0    temp = memory
  1    temp--          locked swap(memory, reg)
  2    memory = temp
???
Или lock всегда ждёт завершения инструкции, выполняемой другими ядрами? (Хотя, сдаётся мне, что не ждёт... блин... тогда вариант уменьшения кол-ва оставшихся итераций из рабочего потока без lock'а не канает ).


Ещё такой момент интересует: т.к. итерации всё равно будут идти не совсем последовательно (скажем, 1,3,2,5,4,6,7,9,8,11,10,13,12...), может, вообще стоит делить весь цикл на равные части и отдавать потокам целиковые куски?
Т.е. не так:
thread1=0,4,8,12...
thread2=1,5,9,13...
thread3=2,6,10,14...
thread4=3,7,11,15...
А вот так:
thread1=0..249
thread2=250..499
thread3=500..749
thread4=750..999
Для упрощения реализации (и ускорения).
В итоге итерации будут происходить примерно в таком виде: (0,250,750,1,1000,251,751,1001,252,2,752,253,1002,3,753,503,1003...).
Или не стоит, т.к. иногда этот вариант может быть более напряжным? Хотя и там, и там цикл не последовательный...


Цитата Сообщение от Ethereal Посмотреть сообщение
Пирожки - это поддиапазоны и тот поток, что крутится быстрее, может взять с тарелки больше пирожков. Делаем так - рассылаем потокам конец диапазона и величину порции в диапазоне, т.е. "пирожка". Имеем глобальной переменной начало диапазона. Каждый поток проверяет, что она меньше конца диапазона. Если не меньше (что означает, что задача уже обсчитана полностью), то ничего не делает. А если меньше, то занимает переменную, снова проверяет, что она не меньше конца диапазона, запоминает ее как начало поддиапазона (длиной "пирожок") который будет обсчитывать, увеличивает переменную на величину "пирожка", но не больше конца диапазона, освобождает ее и начинает обсчитывать свой "пирожок". В итоге потоки, выполняющиеся быстрее, расхватают с тарелки больше поддиапазонов-пирожков и обсчитают их, а те, что медленнее меньше. Но в целом будут обсчитывать задачу, каждый по своим возможностям. Просто пришло в голову, как вариант алгоритма распределения задачи по с разной скоростью выполняющимся потокам.
Не могу понять алгоритма. Во-первых, неясно какие переменные глобальные, а какие индивидуальны для каждого потока. Плюс, "рассылаем потокам конец диапазона" – общего или для каждого свой? Что значит "занимает переменную" (какую переменную)? И т.д. И ещё вопрос: не будет ли каждое такое "расхватывание" пирожков лочить какую-нибудь переменную?
1
Ethereal
Заблокирован
16.08.2018, 06:13 #11
Пусть надо обсчитать диапазон 0..0xFFF порциями по 0x10.
I_start = 0 лежит глобально. Пусть также
I_end = 0x1000 ; и Step = 0x10 ;
Первый поток сделал I_start = 0x10 и считает 0..0xF
Второй поток сделал I_start = 0x20 и считает 0x10..0x1F
Третий поток сделал I_start = 0x30 и считает 0x20..0x2F
Второй поток досчитал свой диапазон, сделал I_start = 0x40 и считает новый диапазон 0x30..0x3F
Первый поток досчитал свой диапазон, сделал I_start = 0x50 и считает новый диапазон 0x40..0x4F
И так далее пока I_start не будет поднята до I_end и считать будет уже нечего.

Занял и освободил переменную I_start в монопольное пользование это например так :
C
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
flag = FALSE ;
pthread_mutex_lock(&mutex) ; // занял
if (I_start < I_end)
{
    j = I_start ;
    k = j + Step ;
    I_start = k ;
    flag = TRUE ;
}
pthread_mutex_unlock(&mutex) ; // освободил
if (flag)
{
    for (i = j ; i < k ; i++)
    {
        ...
    }
}
Занял ресурс в монопольное пользование и освободил - это в данном случае просто общие выражения. Неважно как, но занял и потом освободил. Без уточнения как именно это делается. Как-то.

----------------------------------------------------------------------------------------------
Тут чутка не хватает знаний, но вот если вот так без обращений к ОС и всех этих мьютексов-шмутексов :
Assembler
1
2
3
4
5
6
7
8
9
10
11
12
    mov     ecx, I_end
    mov     ebx, Step
    mov     al, 1
@:  xchg    Semafor, al ; xchg ставит lock на шину
    test    al, al
    jnz     @
    mov     edx, I_start
    cmp     edx, ecx
    jae     @@
    add     I_start, ebx
@@: mov     Semafor, al
    ; цикл [edx..ecx[ если edx<ecx
Изначально Semafor равен нулю.
2
Jin X
Заблокирован
16.08.2018, 16:50  [ТС] #12
Цитата Сообщение от Ethereal Посмотреть сообщение
И так далее пока I_start не будет поднята до I_end и считать будет уже нечего.
А, ну это фактически мой вариант 3 в первом сообщении.

Цитата Сообщение от Ethereal Посмотреть сообщение
Занял и освободил переменную I_start в монопольное пользование это например так
А зачем тут flag, тем более, что он в локе не проверяется в fasle никогда не устанавливается?



В общем, я придумал, как сделать без локов!!!
Локи будут только у перехватчиков (это для алгоритма с перехватами, описанного здесь (#7), там я его немного модифицировал, но суть та же).

Теперь у каждого потока есть счётчик текущей и последней выполняемой для данного потока итерации (Cur и Last). В массиве ThreadCounters они расположены друг за другом (Cur0,Last0, Cur1,Last1, Cur2,Last2...).

Код работающего потока:
Assembler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
; ebx - номер текущего цикла (Cur), на старте - начальный номер
; esi - номер текущего потока, начиная с 0
  cmp ebx,[ThreadCounters+esi*8+4]  ; Cur <= Last ?
  jle .start  ; да, начинаем
  jmp .finish  ; нет, пустой цикл, сразу выходим
.loop:
  mov [ThreadCounters+ebx*8],ebx  ; Cur (mov атомарен, lock ненужен) :)
.start:
  stdcall [MainProc]  ; вызов основной функции цикла
  inc ebx
  cmp ebx,[ThreadCounters+ebx*8+4]  ; Cur+1 <= Last ?
  jle .loop  ; да, продолжаем цикл
 
.findjob:
; тут мы переходим в режим перехватчика и начинаем искать незавершённый поток,
; у которого есть необработанные итерации (не считая текущей, которую он обрабатывает)
Перехватчик (отбирающий поток) делает так:
Assembler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
; edi содержит номер потока с наибольшей разницей Last-Cur (если у всех потоков эта разница <= 0
;   [хотя меньше 0 она не должна быть], завершаем работу, не доходя до этого места в коде)
  mov ebp,ATTEMPT_COUNT  ; кол-во попыток отобрать работу у какого-либо потока, пусть будет = 4
  mov edx,[ThreadCounters+edi*8+4]  ; edx = Last (обновляем данные)
  mov eax,[ThreadCounters+edi*8]  ; eax = Cur
.tryagain:
  mov ebx,edx
  sub ebx,eax  ; Last - Cur
; какой результат мы можем тут получить?
; если поток завершил работу или выполняет последнюю итерацию, то Cur = Last, ebx = 0
; если поток выполняет не самую последнюю итерацию, то ebx > 0 (даже если кол-во итераций = 0x80000000, а больше
;   быть не может, т.к. при работе в 1 потоке мы будет действовать по-другому, там всё это нагромождение ни к чему)
  cmp ebx,1
  jle .findjob  ; если этот поток уже завершён, выполняет последнюю итерацию или у него осталась ещё лишь 1 итерация,
  ;   попробуем найти какой-нибудь другой поток (хотя... может, лучше сразу на выход?); последнюю итерацию отбирать
  ;   нельзя, иначе может произойти казус, если lock cmpxchg8b попадёт между cmp-jle-mov в цикле кода работающего потока
;  inc ebx  ; раскомментировать, если мы хотим делить нечётные числа в свою пользу, а не в пользу потока-владельца
  shr ebx,1  ; кол-во отбираемых итераций
  mov ecx,edx
  sub ecx,ebx  ; ecx = новый номер последней итерации, т.е. новый Last = (Last-(Last-Cur)/2)
  mov ebx,eax  ; ebx = Cur, его мы не меняем, а только проверяем (чтобы он случайно не вышел за границы нового Last'а)
  lock cmpxchg [ThreadCounters+edi*8],ecx  ; if (edx:eax == Last:Cur) { Last = ecx; Cur = ebx; )
  jnz .robfail  ; облом! :(
; ура, успех!
  lea ebx,[ecx+1]  ; наш новый Cur = Last+1 потока, у которого мы отобрали работу
  mov [ThreadCounters+esi*8+4],0x80000000  ; Last = MinInt, чтобы другой поток не отжал у нас то, чего ещё у нас пока нет
  ;   (влезая между двумя следующими mov'ами)
  mov [ThreadCounters+esi*8],ebx  ; Cur
  mov [ThreadCounters+esi*8+4],edx  ; наш новый Last
  jmp .start
 
.robfail:
  cmp edx,[ThreadCounters+edi*8+4]
  jne .findjob  ; какая-то зараза (другой поток) успел влезть и перехватить работающий поток раньше нас - идём искать новую работу! :(
  dec ebp
  jnz .tryagain  ; мы просто не успели, поток перешёл на следующую итерацию, пробуем ещё ebp раз...
; если кол-во попыток исчерпано, продолжать смысла нет, поток слишком быстро отрабатывает итерации
 
.finish:
; завершение цикла для данного потока
Вроде всё должно быть чётко. Но прошу проследить за моей мыслью, вдруг я что-то не учёл...
1
Ethereal
Заблокирован
17.08.2018, 00:53 #13
Цитата Сообщение от Jin X Посмотреть сообщение
А зачем тут flag
Флаг тут тру если надо чего-то считать и фалс если вообще больше нечего делать. И цикл обсчета фор запускается только если флаг тру.
Цитата Сообщение от Jin X Посмотреть сообщение
mov edx,[ThreadCounters+edi*8+4] ; edx = Last (обновляем данные)
mov eax,[ThreadCounters+edi*8] ; eax = Cur
Пусть сразу после этих двух команд управление передается другому потоку и он досчитывает свою работу до конца и у него Cur достигает Last. Потом управление возвращается твоему потоку назад и ты, обладая в регистре eax уже устаревшими данными о того потока Cur якобы "отбираешь у него работу" и откатываешь его Cur на позиции, которые он уже обсчитал, да и сам начинаешь считать тоже уже обсчитанное.
1
Jin X
Заблокирован
17.08.2018, 07:39  [ТС] #14
Цитата Сообщение от Ethereal Посмотреть сообщение
Пусть сразу после этих двух команд управление передается другому потоку и он досчитывает свою работу до конца и у него Cur достигает Last. Потом управление возвращается твоему потоку назад и ты, обладая в регистре eax уже устаревшими данными о того потока Cur якобы "отбираешь у него работу" и откатываешь его Cur на позиции, которые он уже обсчитал, да и сам начинаешь считать тоже уже обсчитанное.
Так, у меня же cmpxchg8b проверяет соответствие этих edx и eax на актуальность, и только тогда идёт перехват.

Добавлено через 27 секунд
Цитата Сообщение от Ethereal Посмотреть сообщение
Флаг тут тру если надо чего-то считать и фалс если вообще больше нечего делать. И цикл обсчета фор запускается только если флаг тру.
Для этого используются event'ы, зачем проц грузить циклом?
1
Ethereal
Заблокирован
17.08.2018, 08:07 #15
Цитата Сообщение от Jin X Посмотреть сообщение
Для этого используются event'ы, зачем проц грузить циклом?
Так это цикл того гипотетического рассчета, который и должна осуществлять программа. Как же не загрузить проц, да как раз именно им ?

Добавлено через 12 минут
Цитата Сообщение от Jin X Посмотреть сообщение
Так, у меня же cmpxchg8b проверяет соответствие этих edx и eax на актуальность
У тебя эта команда проверяет не равен ли Cur другого потока величине Last-(Last-возможно_уже_протухшее_Cur)/2) которую ты насчитал. В чем вообще смысл такой проверки, я предложил рассмотреть ситуацию, когда Cur другого потока стал больше этой величины и даже уже достиг Last. И вообще я не понимаю смысл проверки Cur другого потока на равенство Last-(Last-Cur_в_какой-то_момент_времени)/2), эта величина ведь не имеет никакого физического смысла. Тот другой поток ведь считает от Cur до Last. Для него Last-(Last-Cur_в_какой-то_момент_времени)/2) это один из ничего не значащих промежуточных индексов рассчета. Если передача управления другому потоку произошла в тот момент, что я указал, то его Cur может оказаться как меньше, так и больше этой величины.
2
Jin X
Заблокирован
17.08.2018, 15:39  [ТС] #16
Цитата Сообщение от Ethereal Посмотреть сообщение
У тебя эта команда проверяет не равен ли Cur другого потока величине Last-(Last-возможно_уже_протухшее_Cur)/2) которую ты насчитал.
Тьфу, блин. Пардонте-с. Очепятка, ёлы-палы, не lock cmpxchg [ThreadCounters+edi*8],ecx там, а lock cmpxchg8b [ThreadCounters+edi*8] конечно же!
Сначала хотел сделать без 8b, потом всё переделал, а тут забыл поменять
Но комменты верные: if (edx:eax == Last:Cur) { Last = ecx; Cur = ebx; }. И там в любом случае нет проверки Cur на Last-(Last-Cur)). Инструкция cmpxchg сравнивает память с eax (а не со 2-м операндом), а второй операнд записывает на место памяти в случае совпадения. А cmpxchg8b - см. комменты. У меня соответственно eax и edx не меняются нигде, весь расчёт идёт в ecx/ebx.
И ещё я там по запаре вместо esi использовал ebx в паре мест (в адресации).

Добавлено через 14 минут
Цитата Сообщение от Ethereal Посмотреть сообщение
Так это цикл того гипотетического рассчета, который и должна осуществлять программа. Как же не загрузить проц, да как раз именно им ?
Ну а вообще да, можно выставить флаг (чтобы работающий поток не трогал Cur и не пошёл на новый цикл) и дальше использовать просто cmpxchg, чтобы поменять Last на новое значение. Наверное. Надо покумекать...
Но идея интересная, спасибо!

Добавлено через 2 часа 55 минут
[cut]
1
Jin X
Заблокирован
17.08.2018, 22:35  [ТС] #17
Итак, новый вариант (реализация накладывает свои коррективы на теорию).
Имеем структуру ThreadData, которая содержит 4 поля dword для каждого потока: Handle, Lock (флаг запрета обработки итераций), Current (номер текущей итерации), Last (номер последней итерации).
Обозначу смещения до них как @Handle, @Lock, @Current и @Last (0, 4, 8 и 12).

Работающий поток:
Код работающего потока:
Assembler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
; esi = номер текущего потока, начиная с 0.
  mov ebp,[ForProc]  ; ebp = адрес основной процедуры цикла
  lea edi,[esi+esi]
  lea [edi*8+ThreadData]  ; edi = ThreadData + esi*16
  mov ebx,[edi+@Current]
  cmp ebx,[edi+@Last]  ; Current <= Last ?
  jle .start  ; да, стартуем
  jmp .finish  ; нет, это пустой цикл, сразу выходим
 
; Spin-цикл ожидания разблокировки (пока кто-то пытается отобрать у нас работу)
.pause:
  mov eax,16
@@:
  pause
  cmp dword [edi+@Lock],0
  je .start  ; выходим из цикла
  dec eax
  jnz @B
  invoke SwitchToThread  ; раз в 16 итераций переключаем контекст, т.к. при таком большом числе
    ; циклов есть немалая вероятность, что перехватывающий поток находится на том же ядре
  jmp .pause
 
.loop:
  mov [edi+@Current],ebx  ; mov атомарен, lock не нужен
  cmp dword [edi+@Lock],0
  jne .pause  ; у нас пытаются отобрать работу?
.start:
  stdcall ebp, ebx, esi  ; вызов основной функции цикла (с 2 параметрами: номером итерации и потока)
  inc ebx  ; Current++
  cmp ebx,[edi+@Last]  ; Current+1 <= Last ?
  jle .loop  ; да, продолжаем цикл
 
.findjob:
; тут мы переходим в режим перехватчика и начинаем искать незавершённый поток,
; у которого есть необработанные итерации (не считая текущей, которую он обрабатывает)
Перехватчик (отбирающий поток) делает так:
Assembler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
; edx = номер потока с наибольшей разницей Last-Cur (если у всех потоков эта разница <= 1
;   завершаем работу, не доходя до этого места в коде)
; eax = Last этого потока
  lea ecx,[edx+edx]
  lea ecx,[ecx*8+ThreadData]  ; ecx = ThreadData + edx*16
  mov edx,1
  xchg [ecx+@Lock],edx  ; блокируем работающий поток (даже если мы попали не туда, и он
    ; прокрутит ещё одну итерацию, это не страшно, дальше заблокируется, у нас есть запас)
  test edx,edx
  jnz .findjob  ; какая-то зараза (другой поток) успела влезть и перехватить работающий поток раньше нас - идём искать новую работу
  mov edx,eax
  sub edx,[ecx+@Current]  ; кол-во оставшихся итераций
  cmp edx,1  ; Last-Current <= 1 ?
  jle .toolate  ; да, осталось слишком мало итераций
  shr edx,1  ; кол-во отбираемых итераций
  mov ebx,eax
  sub ebx,edx  ; новый @Last
  cmpxchg [ecx+@Last],ebx  ; if (eax = Last) Last = ebx
  mov [ecx+@Lock],0  ; освобождаем поток
  jnz .findjob  ; оказывается, другой поток успел отхватить работу аж до xchg!
 
  inc ebx  ; наш новый Current = Last+1 потока, у которого мы отобрали работу
  mov [edi+@Last],0x80000000  ; Last = MinInt, чтобы другой поток не отжал у нас то, чего ещё у нас пока нет
    ; (влезая между двумя следующими mov'ами)
  mov [edi+@Current],ebx
  mov [edi+@Last],eax  ; наш новый Last
  jmp .start
 
.toolate:
  mov [ecx+@Lock],0  ; освобождаем поток
  jmp .findjob  ; идём искать новую работу
 
.finish:
; завершение цикла для данного потока
И этот вариант мне тоже не до конца нравится из-за длинной блокировки основного потока при перехвате (и SwitchToThread). Более длинной, чем при cmpxchg8b. Но вариант с cmpxchg8b может обломаться при быстрых циклах. Короче, х/з. Ещё что-то придумывать???

Добавлено через 2 часа 37 минут
Кажется, у меня есть идея, как усовершенствовать мой последний код
1
Ethereal
Заблокирован
18.08.2018, 00:51 #18
Лучший ответ Сообщение было отмечено Mikl___ как решение

Решение

Цитата Сообщение от Jin X Посмотреть сообщение
Инструкция cmpxchg сравнивает память с eax (а не со 2-м операндом), а второй операнд записывает на место памяти в случае совпадения.
Ееее ... никогда ей не пользовался и лень было посмотреть в описание. А потому интуитивно ее понял неверно. Сорри.
Но все равно работа со значениями, которые возможно уже протухли в твоем старом варианте ... надо обдумать может ли она в принципе кончиться чем-то хорошим. И сейчас обдумаю твой новый вариант.

Добавлено через 28 минут
З.Ы.
А ... понял наконец идею (только идею, реализацию уже не смотрю) твоего старого алго, если в нем поставить lock cmpxchg8b. А идея-то была абсолютно верная. Ты сначала готовишь данные для перехвата (которые возможно уже и подтухают, пока ты их обсчитываешь), а потом по lock cmpxchg8b перехватываешь, но только в том случае, если протухания не было. А если данные таки подтухли, то загружаешь данные посвежее и предпринимаешь еще одну попытку. И так до тех пор, пока между подргузкой свежачка и lock cmpxchg8b вклинения другого потока не было. Браво !!! Идея просто монстр !
2
Jin X
Заблокирован
18.08.2018, 09:49  [ТС] #19
Спасибо, Артур! Но это вполне стандартный вариант использования lock cmpxchg8b.

Итак, новый вариант...
Суть в том, что мы будем записывать в Lock не просто 1 и 0, а изначально хранить там 0x7FFFFFFF, а потом заносить новый Last. А работающий в цикле поток будет останавливаться не при любом ненулевом значении, а только когда Current доходит до Lock. Т.е. по факту он почти никогда не будет подвисать.

Работающий поток:
Assembler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
; THREAD_UNLOCKED = 0x7FFFFFFF - значение, при котором поток разблокирован (MaxInt)
; esi = номер текущего потока, начиная с 0.
  mov ebp,[ForProc]  ; ebp = адрес основной процедуры цикла
  lea edi,[esi+esi]
  lea [edi*8+ThreadData]  ; edi = ThreadData + esi*16
  mov ebx,[edi+@Current]
  cmp ebx,[edi+@Last]  ; Current <= Last ?
  jle .start  ; да, стартуем
  jmp .finish  ; нет, это пустой цикл, сразу выходим
 
.loop:
  mov [edi+@Current],ebx  ; Current++ (mov атомарен, lock не нужен)
.start:
  stdcall ebp, ebx, esi  ; вызов основной функции цикла (с 2 параметрами: номером итерации и потока)
  inc ebx  ; Current+1
  cmp ebx,[edi+@Lock]  ; Current+1 <= Lock ?
  jle .continue  ; да, продолжаем
; делаем паузу, если достигли значение Lock (spin-цикл ожидания разблокировки)
.pause:
  mov eax,16
@@:
  pause
  cmp dword [edi+@Lock],THREAD_UNLOCKED
  je .continue  ; выходим из spin-цикла
  dec eax
  jnz @B
  invoke SwitchToThread  ; раз в 16 итераций переключаем контекст, т.к. при таком большом числе
    ; циклов есть немалая вероятность, что перехватывающий поток находится на том же ядре
  jmp .pause
; продолжаем цикл
.continue:
  cmp ebx,[edi+@Last]  ; Current+1 <= Last ?
  jle .loop  ; да, продолжаем цикл
 
.findjob:
; тут мы переходим в режим перехватчика и начинаем искать незавершённый поток,
; у которого есть необработанные итерации (не считая текущей, которую он обрабатывает)
Перехватчик:
Assembler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
; ecx = номер потока с наибольшей разницей Last-Cur (если у всех потоков эта разница <= 1
;   завершаем работу, не доходя до этого места в коде)
; ebx = Last этого потока
  add ecx,ecx
  lea ecx,[ecx*8+ThreadData]  ; ecx = ThreadData + ecx*16
  mov eax,ebx
  mov edx,ebx
  sub eax,[ecx+@Current]
  shr eax,1  ; (Last - Current) / 2
  sub edx,eax  ; edx = Last - eax = новый Lock, он же новый Last
  mov eax,THREAD_UNLOCKED
  lock cmpxchg [ecx+@Lock],edx  ; if (Lock == THREAD_UNLOCKED) { Lock = edx ; zf = 1; } else { eax = Lock; zf = 0; }
  jnz .findjob  ; какая-то зараза (другой поток) успела влезть и перехватить работающий поток раньше нас, идём искать новую работу
  cmp [ecx+@Current],edx
  jge .toolate  ; поток уже дошёл до итерации Lock (ситуация Current = Lock = новый Last нам тоже не подходит)
  mov eax,ebx
  cmpxchg [ecx+@Last],edx  ; if (eax == Last) { Last = edx; }
  mov [ecx+@Lock],THREAD_UNLOCKED  ; освобождаем поток
  jnz .findjob  ; оказывается, поток закончил работу и взял итерации у кого-то другого,
    ; либо другой поток успел отжать работу у нашего подопечного ещё до lock cmpxchg!
  inc edx  ; наш новый Current = новый Last потока, у которого мы отобрали работу + 1
  mov [edi+@Last],0x80000000  ; наш Last = MinInt, чтобы другой поток не отжал у нас то, чего ещё у нас пока нет
    ; (влезая между двумя следующими mov'ами)
  mov [edi+@Current],edx
  mov [edi+@Last],eax
  jmp .start
 
.toolate:
  mov [ecx+@Lock],THREAD_UNLOCKED  ; освобождаем поток
  jmp .findjob  ; идём искать новую работу
 
.finish:
; завершение цикла для данного потока
Добавлено через 3 минуты
Осталось теперь проверить на практике
3
18.08.2018, 09:49
MoreAnswers
Эксперт
37091 / 29110 / 5898
Регистрация: 17.06.2006
Сообщений: 43,301
18.08.2018, 09:49

Просто посоветоваться, насчет оперативки
Плохо разбираюсь в железе. Вот в чем проблема, хочу поставить ещё 1 планку...

посоветоваться о эффективном алгоритме резервирования данных
Стоит задача реализовать резервирование данных в локальной одноранговой сети....

Нужно посоветоваться с тру фулстек-программистом
Думаю, из названия темы всё понятно. P.S. тема создана не в том разделе,...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
19
Ответ Создать тему
Опции темы

КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2018, vBulletin Solutions, Inc.
Рейтинг@Mail.ru