Форум программистов, компьютерный форум, киберфорум
C++
Войти
Регистрация
Восстановить пароль
 
Рейтинг 4.82/11: Рейтинг темы: голосов - 11, средняя оценка - 4.82
0 / 0 / 0
Регистрация: 17.11.2018
Сообщений: 4
1

Неблокирующие алгоритмы на C++, многопоточность

29.11.2019, 15:59. Показов 2116. Ответов 7
Метки нет (Все метки)

Добрый день!
Прошу помочь понять задание. Я новичок.
Тема: неблокирующие алгоритмы. Задание дано для java, но мне нужно выполнить на C++ с помощью соответствующих конструкций.

Разработать многопоточную программу, которая с помощью AtomicInteger и метода compareAndSet выполняет следующие операции для одномерного массива.
Создать параллельные функции для нахождения:
- количества элементов массива типа int;
- контрольной суммы с использованием XOR для элементов массива типа int.
- минимального и максимального элементов массива типа long, а также их индексы.

Если я правильно понимаю, мне нужно создать массив, потом эти три функции для работы с ним, потом вручить разным потокам выполнение этих функций. Если правильно поняла, в C++ в неблокирующих алгоритмах используется <atomic>. Но я не могу понять, к чему тут применить <atomic>. Каждый поток будет выполнять свою функцию, которая заключается в чтении данных массива, а не в их модификации. Зачем же тогда тут этот неблокирующий алгоритм или я совсем неправильно поняла задание?
0

Помощь в написании контрольных, курсовых и дипломных работ здесь.

Лучшие ответы (1)
Programming
Эксперт
94731 / 64177 / 26122
Регистрация: 12.04.2006
Сообщений: 116,782
29.11.2019, 15:59
Ответы с готовыми решениями:

Неблокирующие сокеты
Пожалуйста помогите разобраться с темой неблокирующих сокетов. Литература или если есть у кого...

Неблокирующие сокеты
Всем привет! Столкнулся с проблемой при выполнении лабораторной работы. В ней нужно реализовать...

MPI программы: неблокирующие сообщения
...доброго времени суток, уважаемые форумчане!... имеется следующая задача: в исходном тексте...

Неблокирующие сокеты, проверка установки соединения
Создал неблокирующий сокет. Когда делаю connect - возвращает -1 Operation now in progress Как я...

7
2146 / 691 / 265
Регистрация: 10.02.2018
Сообщений: 1,622
29.11.2019, 19:38 2
Чисто теоретически...
Например, нужно найти минимум.

Можно разбить массив на несколько частей (первая и вторая половины). Запустить для каждой части поток (два потока). В потоках найти минимумы по заданным частям (минимум первой половины и минимум второй половины). В конце выбрать минимум из полученных результатов каждого потока (минимальный из двух минимумов).

Как можно тут прикрутить атомарные операции?
В каждом потоке ищем не локальный минимум, а общий. Заводим атомарный минимум. Каждый поток сравнивает значение с ним и, если нужно, заносит в него новый минимум. Стрёмный какой-то вариант, но почему бы и нет...

Стрёмный, потому что на запись требуется несколько операций, а они уже не будут атомарными по своей природе, только если их завернуть в какой-нибудь мьютекс... или атомарный локер.

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
atomic_int global_min; // атомарное значение, можем одновременно читать и записывать из разных потоков
int global_min_index; // не атомарное значение
 
// функция потока
void thread(int* array, int start, int stop)
{
  // цикл по заданным элементам
  for (int pos=start; pos<stop; pos++)
  {
    // сравнение с атомиком
    if (array[pos] < global_min)
    {
      // << lock ??? на запись нужно как-то заблокировать, что бы два потока не лезли одновременно
 
      // повторная проверка из-за возможности пересечения нескольких потоков
      // если какой-то поток уже внёс свои изменения, то нужно перепроверить минимум
      //  если всё ещё минимум, то обновить
      //  если минимум устарел, то ничего не делаем
      if (array[pos] < global_min)
      {
        global_min = array[pos];// запись в атомик
        global_min_index = pos;
      }
 
      // << unlock ???
    }
  }
}
Проще всё таки считать локальные минимумы, а потом свести результат по потокам в одну переменную. Можно сделать сведение с блокировкой на спинлоке, это типа и будет неблокирующий алгоритм.

С количеством проще, наверное имеется в виду количество элементов равных чему-то. Заводим атомарный счётчик и увеличиваем его из разных потоков.

С XOR не очень понятно. Можно попробовать воспользоваться fetch_xor для каждого элемента, но какой смысл? Проще опять же посчитать локальные XOR и потом объединить результат.
1
468 / 444 / 71
Регистрация: 29.05.2015
Сообщений: 2,907
29.11.2019, 20:21 3
Цитата Сообщение от juliyasos Посмотреть сообщение
Зачем же тогда тут этот неблокирующий алгоритм или я совсем неправильно поняла задание?
Потому неблокирующий, что все три потока выполняют вычисления независимо друг от друга. Не нужно блокировать один поток, что-бы дождаться результатов работы другого. Я так думаю.

Добавлено через 6 минут
А вообще, нужно уточнить задание. В двух случаях фигурирует массив int, в третьем - long. Это что, два разных массива нужно? А может три - по массиву на каждый поток?
1
0 / 0 / 0
Регистрация: 17.11.2018
Сообщений: 4
29.11.2019, 20:51  [ТС] 4
Ygg, благодарю! Сейчас попробую с атомарным минимумом.
Нам дали образец выполнения. На java. Но поскольку я не знакома с java, не могу понять, где там деление действий между потоками и почему считается 3 суммы?..

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.company;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public class Main {
   static int sum = 0;
   public static void main(String[] args) {
       int array[] = new int [5000000]; /* long array[] = new Random().longs(SIZE).toArray();*/ 
       AtomicInteger atomicSum = new AtomicInteger(); /*создание атомарной переменной (atomic variable)*/
       Arrays.fill(array,2);
       int serialSum = IntStream.of(array).sum();
       IntStream.of(array).parallel().forEach( arrayElement -> {   /*встроенный метод для выполнения параллельных вычислений в Java*/
           sum += arrayElement;    /*нахождение суммы без использования атомарной переменной*/
           int oldValue;
           int newValue;
           do{ //смена суммы с использованием метода compareAndSet
               oldValue = atomicSum.get();
               newValue = oldValue + arrayElement;
           }while(!atomicSum.compareAndSet(oldValue , newValue));
       });
       System.out.println("Serial sum:   " + serialSum);
       System.out.println("Blocking sum: " + sum);
       System.out.println("Atomic sum:   " + atomicSum.get());
   }
}
0
2146 / 691 / 265
Регистрация: 10.02.2018
Сообщений: 1,622
29.11.2019, 23:07 5
Лучший ответ Сообщение было отмечено juliyasos как решение

Решение

Java
1
2
3
4
5
6
7
8
9
IntStream.of(array).parallel().forEach( arrayElement -> { /*встроенный метод для выполнения параллельных вычислений в Java*/
  sum += arrayElement; /*нахождение суммы без использования атомарной переменной*/
  int oldValue;
  int newValue;
  do{ //смена суммы с использованием метода compareAndSet
    oldValue = atomicSum.get();
    newValue = oldValue + arrayElement;
  }while(!atomicSum.compareAndSet(oldValue , newValue));
});
Я Java не знаю, но чисто логически...
IntStream.of(array) - создаём какой-то объект "стрим" из массива "array"
IntStream.of(array).parallel() - создаём из стрима параллельный стрим
IntStream.of(array).parallel().forEach(<function>) - для каждого элемента стрима выполняем функцию "function"
В случае параллельного стрима весь диапазон автоматически разбивается на несколько частей и для каждой части в отдельном потоке выполняется функция "function"
Код ниже выполняет атомарное сложение:
Java
1
2
3
4
  do{ //смена суммы с использованием метода compareAndSet
    oldValue = atomicSum.get();
    newValue = oldValue + arrayElement;
  }while(!atomicSum.compareAndSet(oldValue , newValue));
atomicSum.compareAndSet(oldValue , newValue) - поместить в "atomicSum" значение "newValue" при условии что там сейчас лежит значение "oldValue". В противном случае возвращается ошибка и повторяем операцию.
Жуть какая-то с точки зрения скорости выполнения на мой взгляд, но если оно так нужно, то можно посмотреть в сторону compare_exchange.

C++
1
2
3
4
5
6
7
8
9
// поиск минимума для части диапазона по аналогии с кодом джава
for (int pos=start; pos<stop; pos++)
{
  int prev = atomic_min_index;
  do{
    if (array[pos] >= array[prev])
      break;
  } while (!atomic_min_index.compare_exchange_strong(prev, pos));
}
1
Mental handicap
1245 / 623 / 171
Регистрация: 24.11.2015
Сообщений: 2,429
30.11.2019, 02:08 6
Цитата Сообщение от alexu_007 Посмотреть сообщение
Потому неблокирующий, что все три потока выполняют вычисления независимо друг от друга. Не нужно блокировать один поток, чтобы дождаться результатов работы другого. Я так думаю.
Почти так, но не стоит полагать, что lock-free - это магия, как бы не так. За все надо платить, за атомарные операции, за копирование и тд. Но исходя из задания ТС ей это не надо, тк потоки будут только читать данные и синхронизации не нужны (данные еще и можно разделить), тогда почему в этом мире мы должны платить за это?
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <thread>
#include <future>
#include <random>
#include <algorithm>
 
template <size_t N>
void fill(int(&array)[N], int lower_bound, int upper_bound) noexcept {
    static std::mt19937 gen { std::random_device()() }; 
    std::uniform_int_distribution<int> uid(lower_bound, upper_bound);
 
    std::generate(array, array + N, [&uid] { return uid(gen); });
}
 
template <size_t N>
void print(int(&array)[N]) noexcept {
    for (size_t i = 0; i < N; i++) {
        printf("%d ", array[i]);
    }
}
 
int find_max(int* begin, int* end) noexcept {
    return *std::max_element(begin, end);
}
 
int main() {
    int data[100];
    fill(data, 1, 99);
    print(data);
 
    auto worker1 = std::async(find_max, data,      data + 50 );
    auto worker2 = std::async(find_max, data + 50, data + 100);
 
    int max_from_thread_1 = worker1.get();
    int max_from_thread_2 = worker2.get();
 
    printf("\nmax value = %d", std::max(max_from_thread_1, max_from_thread_2));
}
Код
Out:
54 48 84 80 74 13 54 21 16 96 61 15 36 90 35 44 77 41 79 5 79 24 50 24 95 80 41 75 80 26 40 56 75 12 60 68 19 66 66 75 45 19 84 3 55 89 77 60 46 50 39 51 22 51 85 65 29 94 67 42 27 85 59 28 82 21 70 72 91 87 22 65 54 8 35 20 53 63 55 55 28 44 39 63 17 60 1 14 64 22 14 31 60 98 71 80 11 78 32 80 
max value = 98
1
2146 / 691 / 265
Регистрация: 10.02.2018
Сообщений: 1,622
30.11.2019, 11:00 7
Цитата Сообщение от juliyasos Посмотреть сообщение
и почему считается 3 суммы?
serialSum - это сумма посчитанная обычным последовательным сложением без распараллеливания, эталон
sum - это сумма посчитанная параллельным сложением без атомарности, может быть ошибочной
atomicSum - это сумма посчитанная параллельным сложением с атомарностью, должна быть равна эталону
1
6738 / 4537 / 1839
Регистрация: 07.05.2019
Сообщений: 13,725
Записей в блоге: 1
30.11.2019, 16:51 8
Цитата Сообщение от Azazel-San Посмотреть сообщение
Почти так, но не стоит полагать, что lock-free - это магия, как бы не так. За все надо платить, за атомарные операции, за копирование и тд. Но исходя из задания ТС ей это не надо, тк потоки будут только читать данные и синхронизации не нужны (данные еще и можно разделить), тогда почему в этом мире мы должны платить за это?
Там просто задание безграмотное, никакие блокировки для решения этих задач не нужны (ну, возможно, кроме контрольной суммы, тут тупо не знаю).
Единственная синхронизация здесь - ожидание завершения всех потоков.
1
IT_Exp
Эксперт
87844 / 49110 / 22898
Регистрация: 17.06.2006
Сообщений: 92,604
30.11.2019, 16:51

Неблокирующие сокеты и тайм-аут подключения
Имеется tcp-соединение с некоторым устройством. Устройство выступает в качестве сервера, клиент -...

Неблокирующие сокеты блокируют во время передачи данных или нет
Если использовать не блокирующие сокеты то приложение не будет зависать на ожидании данных. Но...

Реализовать алгоритмы построения прямой: простой пошаговый алгоритм и алгоритмы Брезенхема
1. Написать на языке PASCAL программу, реализующую алгоритмы построения прямой: простой пошаговый...

Циклические алгоритмы. Алгоритмы обработки последовательностей чисел
Помогите пожалуйста program Lab_3_1; const x1=1; xn=3; dx=0.2; a=3.9; ...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
8
Ответ Создать тему
Опции темы

КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2021, vBulletin Solutions, Inc.