Форум программистов, компьютерный форум CyberForum.ru

Создание параллельных задач - C++

Восстановить пароль Регистрация
 
Рейтинг: Рейтинг темы: голосов - 20, средняя оценка - 4.90
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
01.02.2013, 01:37     Создание параллельных задач #1
Начал потихоньку изучать параллельное программирование. И вот, застрял на такой реальной задачи - есть функция которую требуется вычислить, допустим 20 раз параллельно с разными аргументами, результаты вывести на экран. Суть создания потоков я понял, передача в future, но никак не могу все это слепить вместе, понять как определять, что потоки уже завершились и заново перезапускать новые с новыми данными для вычисления. Кому не лень, объясните как правильно сделать. Короче, вот пока все что мне в голову пришло:
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <thread>
#include <chrono>
#include <iostream>
#include <future>
#include <memory>
#include <vector>
 
auto fib = [](const unsigned n)
{
    if (n<3) return 1;
    else return fib(n-1) + fib(n-2);
};
 
int main()
{
    std::vector<std::future<int> > vf(std::thread::hardware_concurrency());
    unsigned i = 40;  //аргумент для лямбды, надо вычислить от 40 до 60
    for (auto it = vf.begin(), end = vf.end(); it!=end; ++it)
    {
        *it = std::async(std::launch::async, fib, i++); //создаются у меня тут два потока и все
         ....
    }
    /*unsigned p = 1;
    for (auto it = vf.begin(), end = vf.end(); it!=end; ++it)
    {
        (*it).get();
        std::cout<<"thread "<<p++<<" passed\n";
    }*/
    system("pause");
    return 0;
}
После регистрации реклама в сообщениях будет скрыта и будут доступны все возможности форума.
Shepard90
5 / 5 / 0
Регистрация: 18.10.2010
Сообщений: 140
01.02.2013, 02:36     Создание параллельных задач #2
Тоже потихоньку изучаю сейчас параллельное программирование. future не юзал.
Не пойму, что вам мешает объявить массив из 20 потоков(thread) и в цикле запустить их, передав каждому функцию и соответствующий параметр
C++
1
2
3
4
5
6
7
8
9
10
11
std::vector<std::thread> th;
 
    int nr_threads = 20;
 
    for (int i = 0; i < nr_threads; ++i) {
        th.push_back(std::thread(func,i+40));
    }
 
    for(auto &t : th){
        t.join();
    }
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
01.02.2013, 12:31  [ТС]     Создание параллельных задач #3
Цитата Сообщение от Shepard90 Посмотреть сообщение
что вам мешает объявить массив из 20 потоков(thread) и в цикле запустить их, передав каждому функцию и соответствующий параметр
Да ничто не мешает, просто я не знаю как потом вытащить из этих потоков возвращаемые значения. Вот еще сделал набросок но бросается исключение:
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <thread>
#include <iostream>
#include <future>
#include <memory>
#include <vector>
 
auto fib = [](const unsigned n)  //функция для вычислений
{
    if (n<3) return 1;
    else return fib(n-1) + fib(n-2);
};
 
int main()
{
    auto myPromise = std::make_shared<std::promise<int>>();
    std::future<int> waiter = myPromise->get_future();
    auto fibadapt = [myPromise](const unsigned val)
    {
        myPromise->set_value(fib(val));
    };
    std::vector<std::thread> vt(std::thread::hardware_concurrency());
    unsigned num = 20;
    auto it = vt.begin(), end = vt.end();
    while(num<30)
    {
        for (it = vt.begin(); it!=end; ++it)
        {
            *it = std::move(std::thread(fibadapt, num));
            (*it).detach();
            ++num;
        }
        for (it = vt.begin(); it!=end; ++it)
        {
            std::cout<<"Result of "<<num<<" = "<<waiter.get()<<"\n";  // как привязать waiter к определенному потоку?
        }
    }
    system("pause");
    return 0;
}
ForEveR
Модератор
Эксперт C++
 Аватар для ForEveR
7927 / 4709 / 318
Регистрация: 24.06.2010
Сообщений: 10,524
Завершенные тесты: 3
01.02.2013, 13:07     Создание параллельных задач #4
yuron_477, Вы хотите перезапускать вычисление после того как значение уже будет рассчитано? Опрашивать на то готов-ли future, если готов - выводить результат и перезапускать.
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
01.02.2013, 13:22  [ТС]     Создание параллельных задач #5
Цитата Сообщение от ForEveR Посмотреть сообщение
Опрашивать на то готов-ли future, если готов - выводить результат и перезапускать
Вот как раз с этим у меня пока затруднения. Особенно с перезапускать. Как это все грамотно реализовать в коде?
ForEveR
Модератор
Эксперт C++
 Аватар для ForEveR
7927 / 4709 / 318
Регистрация: 24.06.2010
Сообщений: 10,524
Завершенные тесты: 3
01.02.2013, 13:43     Создание параллельных задач #6
Мой gcc косячный, wait_for должен возвращать future_status, а не bool.
+ все можно сделать красивше, но как пример пойдет.

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <algorithm>
 
std::pair<int, int> func(const std::pair<int, int>& p)
{
   return p.first < 3 ? std::make_pair(1, 1) : std::make_pair(func(std::make_pair(p.first - 1, p.second)).first + 
      func(std::make_pair(p.first - 2, p.second)).first, p.second);
}
 
int main()
{
   const int number = 5;
   const int max = 20;
   std::vector<std::future<std::pair<int, int>>> futures(std::max(std::thread::hardware_concurrency(), 6u));
   auto find_ready = [](std::future<std::pair<int, int>>& fut) -> bool
   {
      auto status = fut.wait_for(std::chrono::milliseconds(1));
      return status == true;
   };
   int num = number;
   for (auto& fut : futures)
   {
      fut = std::async(std::launch::async, func, std::make_pair(num, num));
      ++num;
   }
   for (int i = num; i < max;)
   {
      auto it = std::find_if(futures.begin(), futures.end(), find_ready);
      if (it == futures.end())
      {
         continue;
      }
      auto res = it->get();
      std::cout << "Number: " << res.second << " result: " << res.first << std::endl;
      *it = std::async(std::launch::async, func, std::make_pair(i, i));
      std::cout << "Start new future for: " << i << std::endl;
      ++i;
   }
   std::cout << "Get results from rest futures" << std::endl;
   for (auto& fut : futures)
   {
      auto res = fut.get();
      std::cout << "Number: " << res.second << " result: " << res.first << std::endl;
   }
}
Код тот же, однако подсчет от 20 до 40.

forever@pterois:~/My_pro1/cpp_pro$ ./new.exe
Number: 20 result: 6765
Start new future for: 26
Number: 21 result: 10946
Start new future for: 27
Number: 22 result: 17711
Start new future for: 28
Number: 23 result: 28657
Start new future for: 29
Number: 24 result: 46368
Start new future for: 30
Number: 25 result: 75025
Start new future for: 31
Number: 26 result: 121393
Start new future for: 32
Number: 27 result: 196418
Start new future for: 33
Number: 28 result: 317811
Start new future for: 34
Number: 29 result: 514229
Start new future for: 35
Number: 30 result: 832040
Start new future for: 36
Number: 31 result: 1346269
Start new future for: 37
Number: 32 result: 2178309
Start new future for: 38
Number: 33 result: 3524578
Start new future for: 39
Get results from rest futures
Number: 38 result: 39088169
Number: 39 result: 63245986
Number: 34 result: 5702887
Number: 35 result: 9227465
Number: 36 result: 14930352
Number: 37 result: 24157817
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
01.02.2013, 14:55  [ТС]     Создание параллельных задач #7
ForEveR, да, то что надо, принцип понятен спасибо. Я так понял, find_ready я могу переделать и так:
C++
1
2
3
4
auto find_ready = [](std::future<std::pair<int, int>>& fut) -> bool
{
    return fut.wait_for(std::chrono::milliseconds(1)) == std::future_status::ready;
};
Еще один вопрос у меня возникает по поводу оптимального количества потоков. Если у меня двухведерный процессор, то это:
C++
1
std::max(std::thread::hardware_concurrency(), 6u)
определит 6 потоков, почему 6 а не 2 или 4?
ForEveR
Модератор
Эксперт C++
 Аватар для ForEveR
7927 / 4709 / 318
Регистрация: 24.06.2010
Сообщений: 10,524
Завершенные тесты: 3
01.02.2013, 15:39     Создание параллельных задач #8
yuron_477, Ну у меня 6. А в gcc 4.6.3 std::thread::hardware_concurrency возвращает 0... Потому и поставил max, дабы работало.)

Добавлено через 42 минуты
Подрефакторил. Как-то так.

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <thread>
#include <future>
#include <vector>
#include <iostream>
#include <algorithm>
 
int fib(int i) { return i < 3 ? 1 : fib(i - 1) + fib(i - 2); }
 
int main()
{
   typedef std::pair<int, std::future<int>> future;
 
   const int start = 20, end = 40;
   auto is_future_valid = [](future& fut)
   {
      return fut.second.valid();
   };
   auto is_future_ready = [&is_future_valid](future& fut)
   {
      return is_future_valid(fut) && fut.second.wait_for(std::chrono::microseconds(0));
   };
   std::vector<future> futures(std::thread::hardware_concurrency());
   int number = start;
   for (future& f : futures)
   {
      f = std::make_pair(number, std::async(std::launch::async, fib, number));
      ++number;
   }
   while (!std::none_of(futures.begin(), futures.end(), is_future_valid))
   {
      auto next = std::find_if(futures.begin(), futures.end(), is_future_ready);
      if (next == futures.end())
      {
         continue;
      }
      std::cout << "Result of fib(" << next->first << ") is " << next->second.get() << std::endl;
      if (number != end)
      {
         *next = std::make_pair(number, std::async(std::launch::async, fib, number));
         ++number;
      }
   }
}
Bash
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
forever@pterois:~/My_pro1/cpp_pro$ ./new.exe 
Result of fib(20) is 6765
Result of fib(21) is 10946
Result of fib(26) is 121393
Result of fib(22) is 17711
Result of fib(28) is 317811
Result of fib(27) is 196418
Result of fib(23) is 28657
Result of fib(24) is 46368
Result of fib(25) is 75025
Result of fib(29) is 514229
Result of fib(30) is 832040
Result of fib(31) is 1346269
Result of fib(32) is 2178309
Result of fib(33) is 3524578
Result of fib(34) is 5702887
Result of fib(36) is 14930352
Result of fib(35) is 9227465
Result of fib(37) is 24157817
Result of fib(38) is 39088169
Result of fib(39) is 63245986
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
01.02.2013, 15:39  [ТС]     Создание параллельных задач #9
Я не к этому веду. Если код будет запущен на одноядерной машине и будет создано 6 потоков они как будут обрабатываться - параллельно с переключениями между потоками или последовательно?
silent_1991
Эксперт C++
4938 / 3014 / 149
Регистрация: 11.11.2009
Сообщений: 7,024
Завершенные тесты: 1
04.02.2013, 09:52     Создание параллельных задач #10
Цитата Сообщение от yuron_477 Посмотреть сообщение
параллельно с переключениями между потоками или последовательно?
Это одно и то же. Если ядро одно - как вы получите действительно параллельную обработку? Параллельность в данном случае будет эмулироваться за счёт переключения между потоками (обработали часть одного - переключились - обработали часть второго - переключились и т.д.). Визуально обработка параллельная (одновременная), на деле - последовательная.
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
06.02.2013, 19:40  [ТС]     Создание параллельных задач #11
ForEveR, я немного разобрался в вашем последнем коде и нашел в нем одну проблему - он не совсем параллельно работает, только одно ядро у меня грузит. Походу, проблема в функции is_future_ready. Вот немного переделал, теперь все ядра по 100% в загрузке и вывод результатов по порядку. Вот, может кому надо будет.
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <thread>
#include <future>
#include <vector>
#include <iostream>
#include <algorithm>
#include <iterator>
 
long long fib(long long i) { return i < 3 ? 1 : fib(i - 1) + fib(i - 2); }
 
int main()
{
    typedef std::pair<int, std::future<long long>> future;
 
    const int start = 30, end = 48;
    auto is_future_valid = [](future& fut)
    {
        return fut.second.valid();
    };
    auto is_future_ready = [&is_future_valid](future& fut)
    {
        return is_future_valid(fut) && fut.second.wait_for(std::chrono::microseconds(100)) == std::future_status::ready;
    };
    std::vector<future> futures(std::thread::hardware_concurrency());
    std::cout<<"threads = "<<futures.size()<<"\n\n";
    int number = start;
    for (future& f : futures)
    {
        f = std::make_pair(number, std::async(std::launch::async, fib, number));
        ++number;
    }
 
    while (std::any_of(futures.begin(), futures.end(), is_future_valid))
    {
        auto next = std::find_if(futures.begin(), futures.end(), is_future_ready);
        if (next == futures.end())
        {
            continue;
        }
        std::cout<<"thread = "<<std::distance(futures.begin(), next)<<"\n";
        std::cout << "Result of fib(" << next->first << ") is " << next->second.get() << std::endl;
        if (number <= end)
        {
            
            *next = std::make_pair(number, std::async(std::launch::async, fib, number));
            ++number;
        }
    }
    system("pause");
    return 0;
}
MoreAnswers
Эксперт
37091 / 29110 / 5898
Регистрация: 17.06.2006
Сообщений: 43,301
27.03.2013, 02:20     Создание параллельных задач
Еще ссылки по теме:

C++ Разработка параллельных алгоритмов для решения СЛАУ
C++ Шаблон параллельных вычислений
C++ Найти суммы диагоналей параллельных главной

Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
27.03.2013, 02:20  [ТС]     Создание параллельных задач #12
Вот еще один элегантный способ решения данной задачи. Вместо того что бы заводить вектор потоков, вылавливать готовые задачи и перезапускать новые мы просто заводим вектор будущих результатов, запускаем необходимое количество задач с помощью async и потом в цикле ждем когда get() вернет результаты по каждому запущеному потоку. Правильное распределение по потокам берет на себя async.
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <future>
#include <vector>
 
unsigned long long fib(const unsigned i) { return i < 3 ? 1 : fib(i - 1) + fib(i - 2); }
 
int main() 
{   
    const unsigned numberOfTask = 20;
    unsigned n = 30;
    std::vector<std::future<unsigned long long>> futures(numberOfTask);
    for (auto& f : futures) f = std::async(std::launch::async, fib, ++n);
    n = 30;
    for (auto& f : futures) std::cout<<"fib("<<++n<<") = "<<f.get()<<"\n";
    
    return 0;
}
Yandex
Объявления
27.03.2013, 02:20     Создание параллельных задач
Ответ Создать тему
Опции темы

Текущее время: 22:37. Часовой пояс GMT +3.
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2016, vBulletin Solutions, Inc.
Рейтинг@Mail.ru