Форум программистов, компьютерный форум, киберфорум
С++ для начинающих
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск  
 
 
2 / 2 / 0
Регистрация: 10.09.2023
Сообщений: 70

Многопоточная обработка большого файла

04.04.2025, 12:22. Показов 5745. Ответов 33

Студворк — интернет-сервис помощи студентам
Всем привет. Передо мной стоит такая задача. Есть очень большой файл с отсортированными по возрастанию числами типа double в тектовом формате, разделенными символом новой строки. Нужно посчитать среднее и стандартное отклонение по этому файлу, обязательно параллельно.

Сначала я написал однопоточную версию, которая использует алгоритм Вэллфорда для подсчета стандартного отклонения:

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
bool FileAnalyzer::analyze_file(const std::string& file_name, const std::string& analytics_report_file_name)
{
    std::ifstream fin(file_name);
    if (!fin.is_open())
        return false;
 
    long double mean = 0.0;
    long double sum = 0.0; // sum of squared differences needed for standard deviation calculation
    std::size_t count = 0;   // number of elements in the file
 
    long double number{};
    while (fin >> number)
    {
        ++count;
        long double old_mean = mean;
        long double delta = number - mean;
        mean += delta / count;
        sum += (number - old_mean) * delta;
    }
    if (count == 0)
        return false;
    // standard deviation
    long double stddev = count > 1 ? std::sqrt(sum / (count - 1)) : 0.0;
 
    output_results(std::cout, mean, stddev);
 
    return true;
}
А потом параллельную версию:

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
bool FileAnalyzer::parallel_analyze_file(const std::string& file_name, const std::string& analytics_report_file_name)
{
    struct ThreadEntry
    {
        std::thread thread;
        long double mean = 0.0;
        long double sum = 0.0;
        std::size_t count = 0;
    };
 
    const std::size_t FILE_SIZE = std::filesystem::file_size(file_name);
    const std::size_t CHUNKS = std::thread::hardware_concurrency();
    const std::size_t CHUNK_SIZE = FILE_SIZE / CHUNKS;
    std::vector<ThreadEntry> entries(CHUNKS);
    for (std::size_t i = 0; i < CHUNKS; ++i)
    {
        std::size_t begin = i * CHUNK_SIZE;
        std::size_t end = (i == CHUNKS - 1) ? FILE_SIZE : begin + CHUNK_SIZE;
        entries[i].thread = std::thread([i, &entries, &file_name](std::size_t begin, std::size_t end)
        {
            std::ifstream fin(file_name);
            if (!fin.is_open())
                return;
            fin.seekg(begin);
 
            long double number{};
            long double mean = 0.0;
            long double sum = 0.0; // sum of squared differences needed for standard deviation calculation
            std::size_t count = 0;   // number of elements in the chunk
 
            if (i != 0)
                fin >> number;
            while (fin.tellg() <= end && (fin >> number))
            {
                ++count;
                long double old_mean = mean;
                long double delta = number - mean;
                mean += delta / count;
                sum += (number - old_mean) * delta;
            }
            entries[i].mean = mean;
            entries[i].sum = sum;
            entries[i].count = count;
        }, begin, end);
    }
    for (auto& entry : entries)
        entry.thread.join();
 
    long double mean = 0.0;
    long double sum = 0.0;
    std::size_t count = 0;
    for (const auto& entry : entries)
    {
        if (entry.count == 0)
            continue;
        if (count == 0)
        {
            mean = entry.mean;
            sum = entry.sum;
            count = entry.count;
        }
        else
        {
            std::size_t next_count = count + entry.count;
            long double delta = entry.mean - mean;
            mean += delta * entry.count / next_count;
            sum += entry.sum + delta * delta * count * entry.count / next_count;
            count = next_count;
        }
    }
    std::cout << "count = " << count << "\n";
    if (count == 0)
        return false;
    long double stddev = count > 1 ? std::sqrt(sum / (count - 1)) : 0.0; // standard deviation
 
    output_results(std::cout, mean, stddev, p50, p99, p01);
 
    return true;
}
Вторая версия работает так же как первая (предположим, что это правильно, хотя не уверен) только если количество потоков (CHUNKS) <= std::thread::hardware_concurrency(). Иначе пропускается очень много чисел почему-то.
Подскажите в чем причина, должно ли быть так? Может нужно вообще иначе распраллеливать? Просто я впервые столкнулся с необходимостью многопоточной обработки очень больших файлов.
0
Лучшие ответы (1)
cpp_developer
Эксперт
20123 / 5690 / 1417
Регистрация: 09.04.2010
Сообщений: 22,546
Блог
04.04.2025, 12:22
Ответы с готовыми решениями:

Многопоточная обработка блоков данных
Всех приветствую, проблема вот в чем, работая с многопоточностью в Qt столкнулся с вопросом как правильно обрабатывать блоки данных. ...

Обработка большого xml файла в CBuilder6
Здраствуйте. Необходимо из программы, написанной в Builder 6 разобрать xml файл. Пробовал работать с XMLDocument - обработка идет...

TCP обработка большого файла
Добрый день, коллеги :) Я в TCP новичок, и мой вопрос теоретический. Задача - обработать большой текстовый файл, например 1-й том...

33
 Аватар для SmallEvil
4086 / 2975 / 813
Регистрация: 29.06.2020
Сообщений: 11,000
04.04.2025, 17:56
Студворк — интернет-сервис помощи студентам
И ещё раз, если формат файла текстовый, вы не сможете его взять и разбить на части, никак.
0
2 / 2 / 0
Регистрация: 10.09.2023
Сообщений: 70
04.04.2025, 18:01  [ТС]
Мда, я затупил, извиняюсь. Работает, если число потоков кратно трем.

Добавлено через 1 минуту
Нет, оказывается правильно)

Добавлено через 49 секунд
Почему никак? У меня ж вроде получилось.
0
 Аватар для SmallEvil
4086 / 2975 / 813
Регистрация: 29.06.2020
Сообщений: 11,000
04.04.2025, 18:08
Цитата Сообщение от MeXaL Посмотреть сообщение
Работает, если число потоков кратно трем.
Это не вся проблема.
Даже если файл имеет строки равной длины, в символах. То делить файл можно только на части кратных этой длине.
Иначе вы "порежете" числа.

Цитата Сообщение от MeXaL Посмотреть сообщение
У меня ж вроде получилось.
Подчеркнуто ключевое слово.
0
2736 / 891 / 331
Регистрация: 10.02.2018
Сообщений: 2,128
04.04.2025, 18:22
Цитата Сообщение от MeXaL Посмотреть сообщение
А если больше, то десятки тысяч чисел куда-то теряются.
Как вы это поняли? Суммарное количество count во всех entity получилось меньше необходимого или результат вычислений не соответствует ожиданиям?

Я отсекаю "пограничные числа" (для всех потоков, кроме первого)
А предыдущий поток его "добирает"
Да, это вы хитро придумали, я сразу и не уловил всей задумки. Тогда вопрос работы при удачном разбиении фрагментов, не будет ли в таком случае "скушано" первое число фрагмента?
0
2 / 2 / 0
Регистрация: 10.09.2023
Сообщений: 70
04.04.2025, 18:26  [ТС]
Цитата Сообщение от Ygg Посмотреть сообщение
Суммарное количество count во всех entity получилось меньше необходимого
и соответственно
Цитата Сообщение от Ygg Посмотреть сообщение
результат вычислений не соответствует ожиданиям
0
2736 / 891 / 331
Регистрация: 10.02.2018
Сообщений: 2,128
04.04.2025, 18:39
Лучший ответ Сообщение было отмечено MeXaL как решение

Решение

MeXaL, а если посмотреть на результирующее количество чисел по фрагментам, насколько они "плохи", среди них попадаются нолики или все фрагменты содержат хоть сколько-нибудь чисел? Может тут какой-нибудь эффект связанный с сиком по текстовому файлу?
1
2 / 2 / 0
Регистрация: 10.09.2023
Сообщений: 70
04.04.2025, 19:17  [ТС]
Ну да, все просто. Почему я сам не посмотрел. В некоторых потоках std::ifstream сразу же фэйлится. И видимо понятно почему. В некоторых местах указатель буффера может попасть на точку, либо символ 'E', либо '+', либо '-' в тектовом представлении числа))) Поэтому писать
C++
1
2
if (i != 0)
    fin >> number;
явно нельзя))

Заменил на
C++
1
2
3
4
5
if (i != 0)
{
       std::string dummy;
       std::getline(fin, dummy, '\n');
}
сейчас буду тестить.

Можно более элегантно:
C++
1
2
if (i != 0)
    fin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
Добавлено через 22 минуты
Проблема решилась, спасибо за помощь!
0
04.04.2025, 19:47

Не по теме:

:D
Убийца HDD.

Цитата Сообщение от MeXaL Посмотреть сообщение
Проблема решилась
Ага. Погроммист готов.

0
2 / 2 / 0
Регистрация: 10.09.2023
Сообщений: 70
04.04.2025, 21:34  [ТС]
А почему убийца HDD? Что не так?

Добавлено через 1 час 32 минуты
Цитата Сообщение от SmallEvil Посмотреть сообщение
Убийца HDD.
Мне правда интересно, почему?
0
2 / 2 / 0
Регистрация: 10.09.2023
Сообщений: 70
05.04.2025, 10:10  [ТС]
Цитата Сообщение от SmallEvil Посмотреть сообщение
Убийца HDD
Вы можете объяснить, что вы имели в виду?
0
фрилансер
 Аватар для Алексей1153
6495 / 5724 / 1133
Регистрация: 11.10.2019
Сообщений: 15,286
05.04.2025, 10:36
MeXaL, HDD, в отличие от SSD, имеет механическую часть. И если в двух потоках с двух удалённых друг от друга частей файла начать читать данные, механика будет бешено двигаться туда-сюда.
0
2 / 2 / 0
Регистрация: 10.09.2023
Сообщений: 70
05.04.2025, 10:52  [ТС]
Ясно, а что тогда будет лучше?
0
Эксперт функциональных языков программированияЭксперт С++
 Аватар для Royal_X
6301 / 3023 / 1053
Регистрация: 01.06.2021
Сообщений: 11,475
05.04.2025, 10:57
Цитата Сообщение от MeXaL Посмотреть сообщение
Ясно, а что тогда будет лучше?
я тебе уже писал. Читаешь файл один раз и работаешь с оперативной памятью. В твоем условии нет никаких ограничений по памяти. В потоки передаешь указатели.
0
2 / 2 / 0
Регистрация: 10.09.2023
Сообщений: 70
05.04.2025, 11:18  [ТС]
Спасибо тебе/
0
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
raxper
Эксперт
30234 / 6612 / 1498
Регистрация: 28.12.2010
Сообщений: 21,154
Блог
05.04.2025, 11:18

многопоточная программа
есть вот такая программа-при нажатии символа, добавляет его справа(1-ая операция); при нажатии клавиши Backspace-стирает последний...

Многопоточная запись в TRichEdit
Доброго времени суток! Появилась проблема. Есть 2 потока и функция для записи текста в компонент TRichEdit. Все это работает в DLL. При...

Многопоточная программа работает через раз.
Доброго времени суток, друзья. Только начал изучать написание многопоточных программ. Использую Qt 4.8.0 Вот что у меня получилось:...

SQLITE и многопоточная работа
Как организовать работу с SQLITE в многопоточном приложении, т.е. каждый дочерний поток может читать/писать данные в БД. Если каждый...

Реализация класса "Многопоточная очередь"
Добрый вечер! Может вопрос немного глупый, но.. Задание: Нужно реализовать класс многопоточной очереди есть вопрос: Например,...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
34
Ответ Создать тему
Новые блоги и статьи
Сезонность и суточность закисления почв
anaschu 04.07.2026
200 часов это все равно моловато. Есть ситуации, но нестандартные, когда смена происходит за 5 лет. Но обычно это 50 лет и более. Наверное, закисление почвы происходит сезонно в средней. . .
В чем ценность человеческого опыта в глобальном смысле?
kumehtar 03.07.2026
Возможно, ценность человека не в том, что он однажды достигает мудрости, а в том, что он становится носителем карты пути. Он знает не только истину, но и последовательность внутренних изменений,. . .
интеграция AnyLogic с самописным REST API и переход на Odoo
anaschu 03.07.2026
Успешная интеграция AnyLogic с самописным REST API и переход на промышленную Odoo WMS Сегодня проделал огромный путь от простой симуляции физических процессов до построения полноценной. . .
Поиск всех путей на ориентированном графе. Linux
dcc0 02.07.2026
Переработка старого кода из моей статьи. Через несколько переработок от PHP кода к C89 (надеюсь, 89). Но довольно запутанно получилось. Код для Linux. Но если убрать time и то, что с ним. . .
Сам себя обучал rest api
anaschu 02.07.2026
Педагогический лайфхак: Почему чистый REST API для ученика намного круче, чем готовые библиотеки Когда мы отказались от капризного JAR-файла AnyLogic и переписали код на стандартный HttpClient,. . .
rest api anylogic - выполнение модели на своём русском сайте
anaschu 02.07.2026
Как подружиться с AnyLogic Cloud API, победить провайдеров и развернуться Java-бэкенд в Docker на бесплатном хостинге: Двухдневный лог борьбы Всем привет! Хочу поделиться свежим (и довольно. . .
Где деньги лежат
kumehtar 02.07.2026
Это - японская подводная лодка I-52 (тип C2, кодовое имя Momi) вышла из Японии в марте 1944 года с миссией в оккупированную немцами Францию (Лорьян). Это была одна из «Янаги»-миссий по обмену. . .
Krabik для WoW 3.3.5a, многоязычный
AmbA 02.07.2026
Допилил бота, думаю что окончательно. Изменения: - добавлена многоязычность - добавлено снятие скриншотов - добавлено поддержание бафов хождения по воде (для жреца, дк и шамана) - и так, по. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2026, CyberForum.ru