Форум программистов, компьютерный форум, киберфорум
Python: Базы данных
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  
 
30 / 27 / 11
Регистрация: 03.06.2023
Сообщений: 73

Курсор. Обработка данных в параллельных процессах

08.10.2023, 18:56. Показов 852. Ответов 5

Студворк — интернет-сервис помощи студентам
Столкнулся с необходимостью обработки данных в параллельных процессах.
Данные получаю из курсора, наполняю бранчи по 1000 записей и передаю их в отдельную функцию. Функция производит обработку данных и вставку или апдейт этих записей, причем в каждом из запросов предусмотрен вариант "он конфликт" для исключения проблем с обработкой записей с ограничением уникальности полей. Для запуска функции в параллельных процессах использовал multiprocessing и concurrent.futures. Создаю пул из 10 процессов, читаю данные из курсора и наполняю бранчи по 1000 записей, как толькл набрал 10 бранчей для процессов, запускаю и жду их выполнения.
Процессы запускаются и даже обрабатывают данные, но до конца не дорабатывают, т.к. следующая партия процессов не запускается и скрипт заканчивается.
Отдельно запускал эту функцию, отработав 10000 записей не получил ни одной ошибки.
Как можно понять что мешает выполнению процессов и запуску следующих процессов?
Вот пример кода с запуском процессов:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
with concurrent.futures.ThreadPoolExecutor(max_processes) as executor:
        while True:
            data_batch = []
            tasks = []
            
            cursor.execute(query)
            rows = cursor.fetchone()
 
            if not rows:
                break
            
            for row in cursor:
                data_batch.append(row)
 
                if len(data_batch) == batch_size:
                    task = executor.submit(process_batch, data_batch.copy())
                    tasks.append(task)
                    data_batch = []
            
            # остаток записей меньше batch_size
            if data_batch:                
                task = executor.submit(process_batch, data_batch.copy())
                tasks.append(task)
                data_batch = [] 
           
            for task in concurrent.futures.as_completed(tasks):               
                result = task.result()
Добавлено через 4 минуты
Я не исключаю что в функции обработки происходит какой то конфликт и он все останавливает. Как его отловить?
0
Programming
Эксперт
39485 / 9562 / 3019
Регистрация: 12.04.2006
Сообщений: 41,671
Блог
08.10.2023, 18:56
Ответы с готовыми решениями:

Организовать рандом в параллельных процессах(потоках)
как организовать рандом в параллельных процессах(потоках)? про rand() и srand(time(0)) в main знаю но т.к. в одно время и в разных...

Обработка множества файлов в параллельных потоках
День добрый, уважаемые форумчане. Прошу помочь с моим камнем преткновения "Потоками". Имеется задача: На диске существует папка...

Обработка слова, над которым курсор
Здравствуйте. Есть задача: сделать поле ввода, в которое вводится предложение. Дальше необходимо выполнять действия над словом, на которое...

5
963 / 718 / 276
Регистрация: 10.12.2016
Сообщений: 1,761
09.10.2023, 15:40
Цитата Сообщение от dobr667 Посмотреть сообщение
Как его отловить?
try/except
Python
1
2
3
4
try:
        #make your code
except Exception as e:
        print(repr(e))
1
30 / 27 / 11
Регистрация: 03.06.2023
Сообщений: 73
09.10.2023, 23:17  [ТС]
Я реализовал запись логирования обработки в файл. Судя по нему процессы обрабатывают 1...10 записей из чанка и отпадают по неизвестной причине. До этого оператора except Exception as e даже дело не доходит.
Возможно дело в организации цикла или в специфическом взаимодействии процессов внутри пула.

Запуск процессов уже переделал так:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
   cursor.execute(query)
    records = cursor.fetchall()
    
    record_chunks = [records[i:i + batch_size] for i in range(0, len(records), batch_size)]
    
    pool = multiprocessing.Pool(processes=max_processes)
 
    pool.map(process_batch, record_chunks)
 
    pool.close()
    pool.join()
 
    connection.commit()
    cursor.close()    
    connection.close()
В моем случае 200000 записей, я создаю например 20 процессов, по 1000 записей, получаю от 1 до 6 записей в логе для одного процесса, с описанием какие данные обрабатываются и результат обработка, и больше ничего.

Добавлено через 5 часов 37 минут
Обошелся созданием пула событий библиотеки threading.
0
963 / 718 / 276
Регистрация: 10.12.2016
Сообщений: 1,761
10.10.2023, 18:01
Цитата Сообщение от dobr667 Посмотреть сообщение
20 процессов, по 1000 записей
не вник сначала
может вас банят за DDOS атаку?
0
30 / 27 / 11
Регистрация: 03.06.2023
Сообщений: 73
10.10.2023, 18:13  [ТС]
Это я еще локально тестирую. Пока обошелся созданием пула эвентов, где жду окончания запущенных процессов. Так работает. Наверное какой то баг все же есть в коде, но с библиотекой threading все же работает
0
963 / 718 / 276
Регистрация: 10.12.2016
Сообщений: 1,761
10.10.2023, 21:52
в закачках из сети я применял пул соединений, в вашем случае я бы создал несколько соединений, а не один курсор
connect->cursor->query-->обработка ->connect closed
но тут надо следить, чтобы соединения не переписывали друг друга если БД апдейтитcя
https://pythonru.com/bibliotek... postgresql
0
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
inter-admin
Эксперт
29715 / 6470 / 2152
Регистрация: 06.03.2009
Сообщений: 28,500
Блог
10.10.2023, 21:52
Помогаю со студенческими работами здесь

Обработка события, если курсор мыши находится в радиусе 5-10 пикселей от точки
Есть график с точками вершин, смысл в том что бы при преближеннии курсора к точке он примагничивался примерно в радиусе 5-10пикслей(что бы...

Организация 8 параллельных SPI каналов приема данных
Добрый день. У меня есть 8 гироскопов, подключаю их сейчас к Ардуино. Канал SPI там один. Из-за этого скорость опроса всех датчиков...

Курсор ввода данных
Уважаемые гуру помогите как узнать когда на форме присутствует "Курсор вводы данных!" (Мигающий такой) для ввода с клавиатура! ...

Вставка данных в таблицу через курсор
Существует таблица просрочки(верх), нужно сформировать таблицу (низ), где data_report-отчетная дата, rest_day- кол-во просрочки в днях. ...

Курсор query, название базы данных
Я не знаю, там ли я создал тему, у меня проблема. Использую Android SDK. У меня есть spinner, в котором 4 пункта, допустим one, two,...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
6
Ответ Создать тему
Новые блоги и статьи
PhpStorm 2025.3: WSL Terminal всегда стартует в ~
and_y87 14.12.2025
PhpStorm 2025. 3: WSL Terminal всегда стартует в ~ (home), игнорируя директорию проекта Симптом: После обновления до PhpStorm 2025. 3 встроенный терминал WSL открывается в домашней директории. . .
Access
VikBal 11.12.2025
Помогите пожалуйста !! Как объединить 2 одинаковые БД Access с разными данными.
Новый ноутбук
volvo 07.12.2025
Всем привет. По скидке в "черную пятницу" взял себе новый ноутбук Lenovo ThinkBook 16 G7 на Амазоне: Ryzen 5 7533HS 64 Gb DDR5 1Tb NVMe 16" Full HD Display Win11 Pro
Музыка, написанная Искусственным Интеллектом
volvo 04.12.2025
Всем привет. Некоторое время назад меня заинтересовало, что уже умеет ИИ в плане написания музыки для песен, и, собственно, исполнения этих самых песен. Стихов у нас много, уже вышли 4 книги, еще 3. . .
От async/await к виртуальным потокам в Python
IndentationError 23.11.2025
Армин Ронахер поставил под сомнение async/ await. Создатель Flask заявляет: цветные функции - провал, виртуальные потоки - решение. Не threading-динозавры, а новое поколение лёгких потоков. Откат?. . .
Поиск "дружественных имён" СОМ портов
Argus19 22.11.2025
Поиск "дружественных имён" СОМ портов На странице: https:/ / norseev. ru/ 2018/ 01/ 04/ comportlist_windows/ нашёл схожую тему. Там приведён код на С++, который показывает только имена СОМ портов, типа,. . .
Сколько Государство потратило денег на меня, обеспечивая инсулином.
Programma_Boinc 20.11.2025
Сколько Государство потратило денег на меня, обеспечивая инсулином. Вот решила сделать интересный приблизительный подсчет, сколько государство потратило на меня денег на покупку инсулинов. . . .
Ломающие изменения в C#.NStar Alpha
Etyuhibosecyu 20.11.2025
Уже можно не только тестировать, но и пользоваться C#. NStar - писать оконные приложения, содержащие надписи, кнопки, текстовые поля и даже изображения, например, моя игра "Три в ряд" написана на этом. . .
Мысли в слух
kumehtar 18.11.2025
Кстати, совсем недавно имел разговор на тему медитаций с людьми. И обнаружил, что они вообще не понимают что такое медитация и зачем она нужна. Самые базовые вещи. Для них это - когда просто люди. . .
Создание Single Page Application на фреймах
krapotkin 16.11.2025
Статья исключительно для начинающих. Подходы оригинальностью не блещут. В век Веб все очень привыкли к дизайну Single-Page-Application . Быстренько разберем подход "на фреймах". Мы делаем одну. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru