Форум программистов, компьютерный форум CyberForum.ru

Зависание потоков. Работа с файлами - C++

Восстановить пароль Регистрация
 
razor_ua
10 / 10 / 0
Регистрация: 20.05.2011
Сообщений: 71
16.07.2014, 15:57     Зависание потоков. Работа с файлами #1
Всем еще раз привет. Очень прошу помощи.
Пишу на С++ под винду, с учетом специфики проекта выбор остановился на boost library.

Пишу многопоточное приложение следующего вида:

создается несколько потоков. далее потоки поочереди читают файл, выполняют какую-то длительную логику и поочереди записывают в другой файл.

т.е. здесь важна очередность записи в файл, каждый поток ждет предыдущего:
например есть 3 потока.
- 1й поток прочитал, выполнил логику, записал и ждет пока 3й не закончит чтение файла.
- 2й ждал пока первый закончит чтение и начал читать файл далее, выполнять логику, потом проверяет, не записывает ли 1й в файл. Если нет - записывает 2й и т.д.

Для решения задачи
использую:
C++
1
2
3
4
5
    boost:: thread_group, делаю join_all();
    boost:: ptr_vector<boost::mutex> _readMutexArray;
    boost:: ptr_vector<boost::mutex> _writeMutexArray;
    boost:: ptr_vector<boost::condition_variable> _readConditionVarArray;
    boost:: ptr_vector<boost::condition_variable> _writeConditionVarArray;
Ниже примерный алгоритм работы программ:

Зависание потоков. Работа с файлами

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void Decrypter::DecryptMultithreading(const std::string& sourceFilePath, const std::string& destinationFilePath, size_t threadId )
{
    while (_byteWrote != _fileSize)
    {
        WaitForEvent(threadId, _readMutexArray, _readConditionVarArray);
        _firstPart = false;
 
//       open file stream for sourceFilePath and read required bytes.
        
        _readConditionVarArray[threadId].notify_all();
 
//        smth decrypt logics here
 
        WaitForEvent(threadId, _writeMutexArray, _writeConditionVarArray);
 
//        open file stream for destinationFilePath and write required bytes.
//   _byteWrote += currentBlockSize;
 
        _writeConditionVarArray[threadId].notify_all();
    }
}
 
void Decrypter::WaitForEvent(int threadId, boost::ptr_vector<boost::mutex>& eventsArrayGuard, boost::ptr_vector<boost::condition_variable>& cond)
{
    size_t waitedThread = -1;
 
    if (0 == threadId && (_firstPart || 0 == _byteWrote))
    {
        return;
    }
    else
    {
        waitedThread = 0 == threadId ? _threadsCount: threadId;
    }
    --waitedThread;
 
    boost::unique_lock<boost::mutex> lock(eventsArrayGuard[waitedThread]);
    cond[waitedThread].wait(lock);
}
 
void Decrypter::CreateReadWriteEvents()
{
    for (size_t i = 0; i < _threadsCount; ++i)
    {
        _readMutexArray.push_back(new boost::mutex());
        _writeMutexArray.push_back(new boost::mutex());
 
        _readConditionVarArray.push_back(new boost::condition_variable());
        _writeConditionVarArray.push_back(new boost::condition_variable());
    }
}
У меня такая проблема: поток выходит из ожидания, только когда вызывается notify_() метод у cond. var.
В итоге все потоки у меня зависают на cond[waitedThread].wait(lock); после первого прохода.
Как бы тут выкрутиться?
было бы не плохо если бы было сигнальное состояние и нет.
Может кто-то подсказать куда копать?
Similar
Эксперт
41792 / 34177 / 6122
Регистрация: 12.04.2006
Сообщений: 57,940
16.07.2014, 15:57     Зависание потоков. Работа с файлами
Посмотрите здесь:

работа с файлами C++
Работа с файлами C++
C++ Реализация 2х потоков, работа с буфером, механизм семафоров
C++ Работа с файлами
C++ Работа с файлами
После регистрации реклама в сообщениях будет скрыта и будут доступны все возможности форума.
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
16.07.2014, 16:31     Зависание потоков. Работа с файлами #2
Цитата Сообщение от razor_ua Посмотреть сообщение
- 1й поток прочитал, выполнил логику, записал и ждет пока 3й не закончит чтение файла.
- 2й ждал пока первый закончит чтение и начал читать файл далее, выполнять логику, потом проверяет, не записывает ли 1й в файл. Если нет - записывает 2й и т.д.
Читать файл можно же одновременно всемя потоками, затем вылавливать завершенные потоки и писать результат в файл (естественно уже синхронизировано). Или тут как то по другому надо?
razor_ua
10 / 10 / 0
Регистрация: 20.05.2011
Сообщений: 71
16.07.2014, 16:44  [ТС]     Зависание потоков. Работа с файлами #3
да, вы правы.. на чтение - лишнее.

я вообще верно делаю: на 1 поток один мютекс и одна condition variable ?
DiffEreD
 Аватар для DiffEreD
1420 / 757 / 95
Регистрация: 21.06.2011
Сообщений: 1,740
Записей в блоге: 2
16.07.2014, 16:51     Зависание потоков. Работа с файлами #4
Один глобальный мютекс и одна глобальная condition variable.

Добавлено через 4 минуты
Я бы вообще все это реализовал бы на будущих результатах и std::async. Вылавливание завершенных потоков можно осуществить примерно так (код с другой темы взят):
C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <iostream>
#include <vector>
#include <algorithm>
#include <future>
#include <thread>
#include <chrono>
 
int main()
{
   using fut_t = std::future<int>;
   std::vector<fut_t> futures;
 
   auto is_future_valid = [](fut_t& fut)
   {
      return fut.valid();
   };
   auto is_future_ready = [&is_future_valid](fut_t& fut)
   {
      return is_future_valid(fut) && fut.wait_for(
               std::chrono::microseconds(100)) == std::future_status::ready;
   };
 
   futures.emplace_back(std::async(std::launch::async, []{
      std::this_thread::sleep_for(std::chrono::seconds(5));
      return 1;
   }));
 
   futures.emplace_back(std::async(std::launch::async, []{
      std::this_thread::sleep_for(std::chrono::seconds(15));
      return 2;
   }));
 
   futures.emplace_back(std::async(std::launch::async, []{
      std::this_thread::sleep_for(std::chrono::seconds(10));
      return 3;
   }));
 
   futures.emplace_back(std::async(std::launch::async, []{
      std::this_thread::sleep_for(std::chrono::seconds(3));
      return 42;
   }));
 
   while (std::any_of(futures.begin(), futures.end(), is_future_valid)) {
      auto next = std::find_if(futures.begin(), futures.end(), is_future_ready);
      if (next == futures.end())
      {
         continue;
      }
 
      std::cout << next->get() << std::endl;
   }
 
   return 0;
}
Переделать под вашу задачу думаю будет не сложно.
razor_ua
10 / 10 / 0
Регистрация: 20.05.2011
Сообщений: 71
16.07.2014, 17:16  [ТС]     Зависание потоков. Работа с файлами #5
не могу пользоваться с++11... особенности проекта.

Цитата Сообщение от DiffEreD Посмотреть сообщение
Один глобальный мютекс и одна глобальная condition variable.
мютекс - boost:: mutex ? или другой ?
только не пойму как.
потоков, к примеру 4, в первом проходе первым к записи в файл придет, к примеру 3й поток, залочит и будет писать, а мне нужно чтобы 3й дождался 2го, а 2й - 1го.
так с 1м не получиться
ForEveR
Модератор
Эксперт C++
 Аватар для ForEveR
7927 / 4709 / 318
Регистрация: 24.06.2010
Сообщений: 10,524
Завершенные тесты: 3
16.07.2014, 17:25     Зависание потоков. Работа с файлами #6
В boost если что тоже есть future.
Yandex
Объявления
16.07.2014, 17:25     Зависание потоков. Работа с файлами
Ответ Создать тему
Опции темы

Текущее время: 13:20. Часовой пояс GMT +3.
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2016, vBulletin Solutions, Inc.
Рейтинг@Mail.ru