Форум программистов, компьютерный форум, киберфорум
py-thonny
Войти
Регистрация
Восстановить пароль

Многопоточность и параллелизм в Python: потоки, процессы и гринлеты

Запись от py-thonny размещена 06.05.2025 в 10:29
Показов 2492 Комментарии 0

Нажмите на изображение для увеличения
Название: 1edd5140-f2be-4f6d-b20e-57ccfeeebf18.jpg
Просмотров: 29
Размер:	203.3 Кб
ID:	10750
Параллелизм и конкурентность — две стороны многопоточной медали, которые постоянно путают даже бывалые разработчики.

Конкурентность (concurrency) — это когда ваша программа умеет жонглировать множеством задач, которые стартуют, выполняются и завершаются в перекрывающиеся периоды времени. Но — сюрприз! — они не обязательно работают одновременно. Система просто мастерски переключается между ними, создавая иллюзию параллелизма, как фокусник, показывающий трюк с картами. Суть конкурентности — разделить программу на независимые кусочки, способные выполняться в произвольном порядке.

Параллелизм (parallelism) — совсем другая зверюшка. Тут речь о реальном одновременном выполнении задач. Для такого фокуса необходим многоядерный процессор, где разные потоки могут физически работать на разных ядрах. То есть параллелизм — это как бы подмножество конкурентности, хотя я не на 100% уверен, что эта формулировка безупречна.

Для наглядности: представьте повара, который жарит котлеты, режет салат и варит суп, перепрыгивая от одной задачи к другой — это конкурентность. А теперь представьте трёх поваров, каждый из которых занимается своим делом — вот это настоящий параллелизм.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import time
 
def count():
    for i in range(1_000_000):
        pass
 
# Секвенциальный запуск
start = time.time()
count(); count()
print(f"Последовательное выполнение: {time.time() - start}")
 
# "Конкурентное" исполнение потоками
start = time.time()
t1 = threading.Thread(target=count)
t2 = threading.Thread(target=count)
t1.start(); t2.start(); t1.join(); t2.join()
print(f"Многопоточное выполнение: {time.time() - start}")
Запустите код выше, и вы можете удевиться, почему времена выполнения почти одинаковы. Всё из-за пресловутого GIL (Global Interpreter Lock), который как строгий надзиратель разрешает только одному потоку выполнять байт-код Python в каждый момент. Это типичная ловушка для новичков.
Для настоящего параллелизма в Python нам понадобится модуль multiprocessing:

Python
1
2
3
4
5
6
7
8
import multiprocessing as mp
 
# Реально параллельное выполнение
start = time.time()
p1 = mp.Process(target=count)
p2 = mp.Process(target=count)
p1.start(); p2.start(); p1.join(); p2.join()
print(f"Многопроцессное выполнение: {time.time() - start}")
Вот тут-то на многоядерной машине вы заметите существенный прирост скорости, хотя процессы и требуют больше ресурсов, чем потоки. Это как сравнивать сложность организации совместной работы в одной комнате и через стенку.

Python предлагает несколько подходов к конкурентности:
1. Потоки (threading) — классика для задач, упирающихся в I/O.
2. Процессы (multiprocessing) — спасение для CPU-интенсивных задач.
3. Асинхронки (asyncio) — современное решение для неблокирующего I/O.
4. Гринлеты (greenlet, gevent) — легковесная альтернатива с кооперативной многозадачностью.

Выбор инструмента зависит от природы вашей задачи:
  • Если вы ждёте данные из сети или файла — потоки или asyncio.
  • Если нужно максимально нагрузить CPU — процессы.
  • Если у вас сервис с тысячами соединений — asyncio или гринлеты.

Многопоточное программирование в Python — это как игра в шахматы с невидимым соперником. Правила вроде просты, но победа приходит только с глубоким пониманием внутренних механизмов и опытом их применения.

Треугольник многопоточности: потоки, процессы и гринлеты



Вообразите треугольник, где каждая вершина — инструмент конкурентности в Python. В одном углу степенные и надёжные потоки, в другом — солидные и независимые процессы, а в третьем — экзотические и легковесные гринлеты. Каждый из них — как особый вид транспорта, где-то ездит быстрее, где-то медленнее, но имеет свою уникальную нишу.

Потоки / многопоточность
Требуется помощь с потоками в python, искал где мог. Итак, требования следующие: 1. Библиотека...

3.7 Процессы или потоки ?
Создал с помощью мультипроцессинга два процесса заполняющие некий сайт параллельно (webdriver), все...

Потоки, процессы и асинхронное программирование
Добрый вечер. Почитал документацию и по моему я только еще больше запутался. Есть модуль...

Потоки/процессы
Здравствуйте, есть self-объект event, работаю я с библиотекой vk_api и мне нужно либо в отдельных...


Потоки: легковесы с ограничениями



Потоки в Python, реализуемые через модуль threading, напоминают коммунальную квартиру: все живут в одном адресном пространстве, делят общие данные и ресурсы, но каждый со своим отдельным "потоком исполнения". Они дешевы с точки зрения затрат памяти и быстро создаются. На бумаге — идеальное решение. Но есть нюанс.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time
 
shared_resource = 0
 
def increment_slowly():
global shared_resource
local_copy = shared_resource
# Имитация сложной обработки
time.sleep(0.1)
shared_resource = local_copy + 1
 
threads = []
for i in range(10):
t = threading.Thread(target=increment_slowly)
threads.append(t)
t.start()
 
for t in threads:
t.join()
 
print(f"Результат: {shared_resource}")  # Ожидаем 10, но получим...?
Запустите этот код несколько раз, и с высокой вероятностью вы увидите результат меньше 10! Это классическая "гонка данных" (race condition), когда несколько потоков пытаются изменить общие данные одновременно. Без защиты мьютексами или локами получается форменная каша.

Python
1
2
3
4
5
6
7
8
9
# Правильный вариант с использованием блокировки
lock = threading.Lock()
 
def increment_safely():
global shared_resource
with lock:  # Аккуратно блокируем ресурс
    local_copy = shared_resource
    time.sleep(0.1)
    shared_resource = local_copy + 1
Потоки — отличный выбор для I/O-интенсивных задач: чтения фaйлов, сетевых запросов, работы с базами данных. Пока один поток ждёт ответа от сервера, другие могут заниматься полезной работой. Но для вычислительно-тяжёлых задач они проигрывают из-за злополучного GIL.

Процессы: тяжеловесы для серьезных вычислений



Если потоки — соседи по квартире, то процессы — отдельные квартиры в разных домах. Каждый процесс имеет свою память, свои ресурсы, и, что критически важно, свой собственный GIL! Модуль multiprocessing предоставляет API, удивительно похожий на threading, что упрощает миграцию кода.

Python
1
2
3
4
5
6
7
8
9
10
11
import multiprocessing as mp
 
def intensive_calculation(number):
return sum(i * i for i in range(number))
 
if __name__ == '__main__':  # Это критически важно для Windows!
pool = mp.Pool(processes=4)  # Создаём пул из 4 процессов
results = pool.map(intensive_calculation, [10**6, 10**6, 10**6, 10**6])
pool.close()
pool.join()
print(results)
Процессы идеальны для задач, требующих мощных вычислений, где GIL становится узким местом. Однако за это приходится платить:
  • Создание процесса требует больше ресурсов, чем создание потока.
  • Межпроцессное взаимодействие сложнее и медленнее, чем между потоками.
  • Необходимы специальные механизмы синхронизации и разделения данных.

Важный нюанс: на UNIX-системах запуск нового процесса происходит через fork(), а на Windows — через spawn(). Это объясняет, почему код с multiprocessing может вести себя по-разному на разных платформах.

Гринлеты: микро-потоки для масштабируемых приложений



Гринлеты — это как театральное представление, где актеры сами решают, когда уступить сцену другому. Они реализуют концепцию "кооперативной многозадачности", в отличие от "вытесняющей" в обычных потоках. Вместо того, чтобы система решала, когда переключаться между задачами, гринлеты делают это явно. Библиотеки greenlet, gevent и eventlet позволяют создавать тысячи таких легковесных "псевдо-потоков" с минимальными накладными расходами:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import gevent
from gevent import socket
 
def fetch(url):
print(f'Начинаем загрузку {url}')
sock = socket.socket()
sock.connect((url, 80))
sock.send(b'GET / HTTP/1.0\r\n\r\n')
sock.recv(1024)  # Тут гринлет "уступает" время другим
print(f'Загрузка {url} завершена')
 
# Создаем и запускаем гринлеты
urls = ['www.google.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)
Фишка гринлетов в том, что они патчат стандартные библиотеки Python так, что операции I/O автоматически освобождают процессор для других гринлетов. Для разработчика это выглядит как синхронный код, хотя под капотом происходит асинхронная магия. Правда, если одна из задач захватит CPU и не захочет уступать, другие гринлеты будут терпеливо ждать.
Гринлеты сияют в задачах с высокой конкурентностью I/O, таких как веб-серверы или микросервисные архитектуры. По своей эффективности они близки к asyncio, но с более простым API.

Симбиоз разных подходов: когда смешать, а когда взболтать



Один из самых интересных аспектов этого "треугольника возможностей" — их способность работать вместе, дополняя друг друга, как участники джазового ансамбля. Можно комбинировать потоки и процессы, процессы и гринлеты, и даже собрать всё это в один многослойный пирог конкурентности.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process, Queue
import threading
import gevent
 
def process_worker(queue):
    # Запускаем несколько потоков внутри процесса
    threads = [threading.Thread(target=thread_worker, args=(i,)) for i in range(3)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    # Отправляем результат обратно
    queue.put("Процесс завершил работу")
 
def thread_worker(id):
    # Внутри потока запускаем несколько гринлетов
    jobs = [gevent.spawn(greenlet_worker, f"{id}-{j}") for j in range(2)]
    gevent.joinall(jobs)
 
def greenlet_worker(id):
    print(f"Гринлет {id} выполняет задачу")
 
if __name__ == '__main__':
    q = Queue()
    proc = Process(target=process_worker, args=(q,))
    proc.start()
    print(q.get())  # Ждём результата от процесса
    proc.join()
Эта матрёшка из разных подходов иногда незаменима для сложных приложений. Например, вы можете использовать пул процессов для распределения CPU-задач по ядрам, внутри каждого процесса запускать потоки для обработки I/O-операций, а внутри потоков — гринлеты для ещё более тонкой конкурентности. Но осторжно! Такая архитектура легко превращается в Франкенштейна, если не подойти к ней со знанием дела и чётким пониманием, зачем смешиваются разные подходы.

Битва титанов: сравнение производительности



Нет единого победителя в этой гонке — каждый инструмент хорош в своей нише. Но небольшой бенчмарк может быть поучительным:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import threading
import multiprocessing
import gevent
 
def io_bound(n):
    # Имитируем операцию чтения из сети
    time.sleep(0.1)
    return n
 
def cpu_bound(n):
    # Имитируем сложные вычисления
    return sum(i*i for i in range(n))
 
def benchmark(func, tasks, worker_type):
    start = time.time()
    if worker_type == "sequential":
        results = [func(i) for i in tasks]
    # ... другие случаи реализации для разных типов конкурентности
    end = time.time()
    return end - start
В реальных тестах увидим, что для задач I/O процессы часто проигрывают потокам из-за накладных расходов на создание, а гринлеты иногда обгоняют даже потоки благодаря меньшим затратам на переключение контекста. Для CPU-задач же процессы вне конкуренции, особенно на многоядерных машинах.

Детальный разбор внутренних механизмов GIL и их влияния на производительность



GIL (Global Interpreter Lock) — этот трехбуквенный акроним вызывает дрожь у Python-разработчиков, которые хоть раз пытались выжать максимум из многоядерных систем. Когда я впервые столкнулся с проблемами производительности в многопоточном приложении, потратил несколько дней, пытаясь понять, почему мои красиво распараллеленные вычисления работают не быстрее, а иногда даже медленнее последовательного кода.

Анатомия GIL: механизм работы



GIL — это мьютекс (блокировка), которая защищает доступ к объектам Python, позволяя только одному потоку исполнять байт-код в интерпретаторе в каждый момент времени. Представьте себе дирижёра, который разрешает играть только одному музыканту из оркестра, даже если остальные готовы вступить. В действительности, GIL реализован как двоичный семафор в коде интерпретатора CPython.

Python
1
2
3
4
5
6
7
8
9
10
11
12
# Упрощенная имитация работы GIL
import threading
import time
 
gil_lock = threading.Lock()
 
def thread_function(id):
    with gil_lock:  # Захватываем GIL
        print(f"Поток {id} захватил GIL")
        # Выполняем байткод Python
        sum(i for i in range(10**7))
        print(f"Поток {id} освобождает GIL")
В реальности CPython генерирует специальные события через определенные интервалы (по умолчанию каждые 100 инструкций байткода), когда GIL может быть освобожден и отдан другому потоку. Этот механизм называется "тиканием" (ticks) — своеобразный шанс для других потоков схватить драгоценный GIL.

Исторические причины: почему GIL вообще существует?



GIL был введен в далеких 90-х годах по двум основным причинам:
1. Безопасность памяти: Python использует подсчёт ссылок для управления памятью. Без GIL разные потоки могли бы одновременно изменять счётчики, создавая гонки данных и утечки памяти.
2. Производительность однопоточных программ: Несмотря на свою дурную репутацию, GIL значительно упрощает интерпретатор и ускоряет работу большинства однопоточных приложений, избавляя от необходимости тонкозернистой синхронизации.

По ту сторону GIL: внутренние механизмы



В современном Python (3.2+) используется более совершенный алгоритм управления GIL, который пытается минимизировать "голодание потоков" – ситуацию когда поток долго не может получить доступ к GIL. Вот упрощенный взгляд на то, как это работает:
1. Поток запрашивает GIL и получает его, если он свободен.
2. Поток устанавливает таймер, который будет генерировать событие переключения каждые N миллисекунд (настраивается через sys.setswitchinterval()).
3. Когда приходит событие переключения или поток выполняет блокирующую операцию ввода-вывода, он освобождает GIL.
4. Другие ожидающие потоки конкурируют за GIL, и один из них получает его.
Этот механизм позволяет потокам "делить" время интерпретатора, но не даёт им работать по-настоящему параллельно.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time
import sys
 
# Можем настроить, как часто потоки будут пытаться передать GIL
sys.setswitchinterval(0.005)  # Каждые 5 миллисекунд
 
def cpu_bound_task(n):
    start = time.time()
    # Этот цикл захватит GIL
    result = sum(i*i for i in range(n))
    print(f"Задача заняла {time.time() - start:.4f} секунд")
    return result
 
# Запускаем две задачи в разных потоках
t1 = threading.Thread(target=cpu_bound_task, args=(10**7,))
t2 = threading.Thread(target=cpu_bound_task, args=(10**7,))
 
start_total = time.time()
t1.start(); t2.start()
t1.join(); t2.join()
print(f"Общее время выполнения: {time.time() - start_total:.4f} секунд")

Ловушки производительности: где GIL сильнее всего бьёт



Влияние GIL на производительность неоднородно и зависит от характера выполняемых задач:
1. CPU-интенсивные задачи страдают больше всего. Если ваш код активно использует процессор для расчетов, преобразований, обработки данных, то GIL становится настоящим узким местом. Эти задачи практически не получают выгоду от многопоточности.
2. I/O-интенсивные задачи страдают гораздо меньше. Когда поток ожидает завершения I/O-операции (чтение файла, сетевой запрос и т.п.), он освобождает GIL, позволяя другим потокам работать.
3. C-расширения могут освобождать GIL. Многие низкоуровневые библиотеки, такие как NumPy, SciPy и Pandas, освобождают GIL во время выполнения вычислений, что позволяет достигать реального параллелизма при использовании этих библиотек.

GIL и эффект замедления многопоточности



Один из самых неочевидных эффектов GIL — то, что для CPU-задач многопоточная программа может работать *медленнее* однопоточной. Это происходит из-за дополнительных расходов на переключение контекста между потоками и конкуренции за захват GIL.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading
import time
 
def count_to_large_number(n):
    for _ in range(n):
        pass
 
# Однопоточное выполнение
start = time.time()
count_to_large_number(10[B]8)
count_to_large_number(10[/B]8)
single_thread_time = time.time() - start
print(f"Однопоточное время: {single_thread_time:.2f} секунд")
 
# Многопоточное выполнение
start = time.time()
t1 = threading.Thread(target=count_to_large_number, args=(10**8,))
t2 = threading.Thread(target=count_to_large_number, args=(10**8,))
t1.start(); t2.start()
t1.join(); t2.join()
multi_thread_time = time.time() - start
print(f"Многопоточное время: {multi_thread_time:.2f} секунд")
 
print(f"Многопоточный код работает в {multi_thread_time/single_thread_time:.2f} раза {'быстрее' if multi_thread_time < single_thread_time else 'медленнее'}")
Запустив этот код, вы можете обнаружить, что многопоточная версия работает *медленнее* последовательной. Эта контринтуитивное свойство часто вызывает недоумение у новичков.

Перспективы освобождения от GIL



Сообщество Python давно обсуждает возможность избавиться от GIL. Даже Гвидо ван Россум, создатель языка, в последнее время проявил интерес к работе над "nogil"-версией CPython. Но сделать это безопасно и эффективно — задача колоссальной сложности. Тем временем, для обхода ограничений GIL существуют альтернативные реализации Python:
  1. PyPy с STM (Software Transactional Memory).
  2. Jython и IronPython (GIL отсутствует).

Однако эти альтернативы не столь популярны и имеют свои ограничения в совместимости и функциональности.
Понимание внутренних механизмов GIL — это первый шаг к преодолению его ограничений. Хотя GIL часто демонизируют, в контексте правильной архитектуры приложения он редко становится непреодолимым препятствием. Как говорится в сообществе Python: "GIL — не проблема, если вы знаете, как с ним танцевать".

Взаимодействие между потоками и процессами: синхронизация и обмен данными



В мире параллельного программирования умение заставить потоки и процессы эффективно "общаться" между собой — искусство, практически сравнимое с дипломатией. Здесь каждая ошибка может привести к катастрофическим последствиям: взаимоблокировкам, гонкам данных и тому паршивому моменту, когда вы в пятницу вечером пытаетесь понять, почему код ведёт себя так, будто вы его писали в состоянии глубокого похмелья.

Коммуникации в многопоточном мире



Mutex и Lock: стражи порядка в королевстве потоков



Самый базовый инструмент синхронизации потоков — Lock (мьютекс). Его идея проста: только один поток может владеть локом в каждый момент времени. Остальные вежливо ждут в очереди, как будто в бюрократическом учреждении с одним окошком.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
 
counter = 0
counter_lock = threading.Lock()
 
def increment_counter(amount):
global counter
with counter_lock:  # Атомарный захват и последующее освобождение
    temp = counter
    temp += amount
    counter = temp
 
threads = []
for i in range(100):
    t = threading.Thread(target=increment_counter, args=(1,))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()
 
print(f"Итоговый счётчик: {counter}")  # Всегда будет 100!
Для более сложных случаев существует RLock (рекурсивная блокировка) — вариант Lock, который разрешает одному и тому же потоку захватить блокировку несколько раз, что бывает полезно при рекурсивных вызовах или сложных функциях.

Семафоры и Барьеры: световые сигналы для потоков



Семафор — как светофор на дороге, который контролирует, сколько потоков могут одновременно выполнять определённый участок кода. Это более гибкий инструмент, чем Lock.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading
import time
 
# Создаём семафор, который пропустит максимум 3 потока одновременно
pool_sema = threading.Semaphore(value=3)
 
def worker(id):
print(f"Поток {id} ждёт доступа к пулу ресурсов")
with pool_sema:
    # Крутая фича: с семафором можно также использовать конструкцию with
    print(f"Поток {id} получил ресурс")
    time.sleep(1)  # Имитируем долгую работу
    print(f"Поток {id} освободил ресурс")
 
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
В этом примере мы ограничиваем доступ к "пулу ресурсов" до трёх потоков одновременно. Остальные терпеливо ждут. Представьте это как туалетные кабинки на концерте: если все заняты, приходится ждать в очереди.

События и Условия: сигналы между потоками



Представьте, что вам нужно координировать работу нескольких потоков таким образом, чтобы они начали выполнение одновременно или ждали, пока не наступит определённое условие. Для этого Python предлагает Event и Condition.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import threading
import time
import random
 
# Событие, которое будет установлено, когда все гонщики будут готовы
all_ready = threading.Event()
# Событие, сигнализирующее старт гонки
start_race = threading.Event()
results = []
results_lock = threading.Lock()
 
def racer(name):
# Имитируем подготовку к гонке
preparation_time = random.uniform(0.1, 1.0)
time.sleep(preparation_time)
print(f"{name} готов к гонке!")
    
all_ready.set()  # Сообщаем, что этот гонщик готов
start_race.wait()  # Ждём сигнала старта
    
# Гонка началась
race_time = random.uniform(1.0, 3.0)
time.sleep(race_time)
    
with results_lock:
    results.append((name, race_time))
    
print(f"{name} финишировал за {race_time:.2f} секунд!")
 
# Создаём потоки-гонщики
racers = ["Алиса", "Боб", "Чарли", "Дейв"]
threads = [threading.Thread(target=racer, args=(r,)) for r in racers]
 
# Запускаем всех гонщиков
for t in threads:
    t.start()
 
# Ждём, пока все гонщики будут готовы
all_ready.wait()
print("Все гонщики готовы! Начинаем гонку через...")
for i in range(3, 0, -1):
    print(i)
    time.sleep(1)
print("СТАРТ!")
 
# Даём сигнал к началу гонки
start_race.set()
 
# Ждём завершения всех потоков
for t in threads:
    t.join()
 
print("Результаты гонки:")
for name, time in sorted(results, key=lambda x: x[1]):
    print(f"{name}: {time:.2f} секунд")
Этот пример демонстирует как использовать Event для синхронизации действий между потоками. Простой вариант гоночной игры, где потоки-гонщики ждут сигнала старта, а затем "бегут" к финишу.

Межпроцессная коммуникация: разговор через стены



В отличии от потоков, процессы не разделяют память, поэтому общение между ними сложнее, но безопаснее, как разговор через бронированное стекло. Python предлагает несколько механизмов межпроцессного взаимодействия.

Очереди: надёжный почтовый сервис для процессов



Очереди (Queue) — самый простой и надёжный способ передать данные между процессами:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from multiprocessing import Process, Queue
import time
 
def producer(q):
    for i in range(5):
        item = f"элемент-{i}"
        q.put(item)
        print(f"Производитель добавил {item}")
        time.sleep(0.5)
    # Сигнализируем о завершении
    q.put(None)
 
def consumer(q):
    while True:
        item = q.get()
        if item is None:  # Сигнал о завершении
            break
        print(f"Потребитель получил {item}")
        time.sleep(1.0)
 
if __name__ == "__main__":
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()
Этот паттерн "производитель-потребитель" — классика межпроцессного взаимодействия. Производитель создаёт данные и помещает их в очередь, а потребитель извлекает и обрабатывает их в своём ритме.

Пайпы и разделяемая память: когда нужна скорость



Для более низкоуровневого контроля или повышенной производительности Python предлагает Pipe (двусторонний канал связи) и Value/`Array` (разделяемые объекты в памяти):

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from multiprocessing import Process, Pipe, Value, Array
import time
 
def sender(conn, value, array):
    # Отправляем через пайп
    for i in range(5):
        conn.send(f"сообщение-{i}")
        time.sleep(0.5)
    conn.close()
    
    # Модифицируем разделяемые объекты
    with value.get_lock():
        value.value += 100
        
    for i in range(len(array)):
        array[i] *= 2
 
def receiver(conn):
    while True:
        try:
            msg = conn.recv()
            print(f"Получено: {msg}")
        except EOFError:
            print("Канал связи закрыт")
            break
 
if __name__ == "__main__":
    # Создаём пайп для общения
    parent_conn, child_conn = Pipe()
    
    # Создаём разделяемые объекты
    shared_value = Value('i', 50)  # 'i' - тип int
    shared_array = Array('d', [1.0, 2.0, 3.0])  # 'd' - тип double
    
    p1 = Process(target=sender, args=(child_conn, shared_value, shared_array))
    p2 = Process(target=receiver, args=(parent_conn,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()
    
    print(f"Итоговое значение: {shared_value.value}")
    print(f"Итоговый массив: {list(shared_array)}")
Пайпы особенно хороши для случаев, когда нужно передавать потоки сообщений между двумя процессами, а разделяемые объекты лучше для случаев, когда множество процессов должны читать и писать в один общий ресурс.

Подводные камни межпроцессной синхронизации



Несмотря на все удобства, предоставляемые Python, при работе с межпроцессной коммуникацией всё ещё существуют подводные камни:
1. Сериализация данных: объекты, передаваемые между процессами, должны быть "сериализуемыми" с помощью pickle. Это исключает передачу некоторых типов объектов, таких как файловые дескрипторы или сокеты.
2. Взаимные блокировки: необходимо тщательное планирование, чтобы избежать ситуаций, когда процессы ждут друг друга бесконечно.
3. Накладные расходы: межпроцессное взаимодействие обходится дороже в плане ресурсов, чем общение между потоками.
Выбор правильного механизма синхронизации и обмена данными критически важен для эффективности и надёжности многопоточных и многопроцессных приложений. Как говорится в старой поговорке программистов: "Лучше потратить день на планирование архитектуры, чем неделю на отладку гонок данных".

Сравнение различных имплементаций гринлетов: eventlet, gevent, greenlet



В мире гринлетов, как в джунглях разнообразия питонов, выживают самые приспособленные. Три кита этой экосистемы — greenlet, eventlet и gevent — каждый со своим характером и философией, но с общей целью: сделать конкурентное программирование более доступным, эффективным и не таким болезненным для мозга, как прямая работа с потоками.

Greenlet: примитивный, но фундаментальный



Библиотека greenlet — это низкоуровневый фундамент, своеобразный assembler мира кооперативной многозадачности. Она предоставляет базовый примитив для создания микро-потоков и переключения между ними. Прямо без прикрас:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from greenlet import greenlet
 
def ping():
    print("пинг")
    pong_gl.switch()
    print("пинг")
    pong_gl.switch()
 
def pong():
    print("понг")
    ping_gl.switch()
    print("понг")
 
# Создаём гринлеты
ping_gl = greenlet(ping)
pong_gl = greenlet(pong)
 
# Стартуем пинг-понг
ping_gl.switch()
Этот код выведет "пинг", "понг", "пинг", "понг" — гринлеты передают управление друг другу через явные вызовы switch(). Явное всегда лучше неявного, как говорит дзен Python, но это справедливо не всегда, как показывет практика. Работать с голыми greenlet — всё равно что писать на ассемблере вместо Python: мощно, гибко, но многословно и подвержено ошибкам. Вы должны вручную управлять тем, когда один гринлет передаёт управление другому, как ребёнок, который не хочет делиться игрушками без явных напоминаний.

Eventlet: легкий и гибкий середнячок



Eventlet — более дружелюбный уровень абстракции над greenlet. Он предоставляет API, сильно напоминающий стандартную библиотеку Python, но с неблокирующим поведением и автоматическим переключением гринлетов:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import eventlet
from eventlet import spawn
 
def fetch(url):
    print(f"Начинаем загрузку {url}")
    # Автоматически уступит управление во время ожидания
    data = eventlet.urllib.request.urlopen(url).read()
    print(f"Завершена загрузка {url} ({len(data)} байт)")
    return len(data)
 
# Патчим стандартные библиотеки
eventlet.monkey_patch()
 
# Создаём пул гринлетов
pool = eventlet.GreenPool(size=4)
urls = ['http://example.com', 'http://python.org', 'http://google.com']
 
# Запускаем задачи
results = pool.imap(fetch, urls)
for url, size in zip(urls, results):
    print(f"{url} вернул {size} байт")
Eventlet патчит стандартные библиотеки Python (как сокеты, так и HTTP-клиенты), заменяя блокирующие операции на версии, которые автоматически уступают контроль другим гринлетам. Что приятно: большинство кода выглядит синхронно, хотя под капотом происходит асинхронная магия. Один из уникальных плюсов eventlet — его гибкость в контексте сетевого программирования. Он предлагает набор инструментов специально для веб-серверов и клиентов, включая собственную имплементацию WSGI-сервера.

Gevent: зрелость и производительность



Gevent — наиболее полная и мощная библиотека из троицы, своего рода Cadillac мира гринлетов. Она построена поверх greenlet и libev (или libuv в новых версиях) — быстрой библиотеки событийного цикла, написанной на C.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import gevent
from gevent import monkey; monkey.patch_all()
import requests
 
def download(url):
    print(f"Скачиваем {url}")
    response = requests.get(url)
    return f"{url}: {len(response.content)} байт"
 
# Создаём и запускаем гринлеты для каждого URL
urls = ["https://example.org", "https://python.org", "https://github.com"]
jobs = [gevent.spawn(download, url) for url in urls]
 
# Ждём завершения всех задач
gevent.joinall(jobs)
 
# Получаем результаты
for job in jobs:
    print(job.value)
Gevent также патчит стандартную библиотеку (метод monkey.patch_all()), но делает это более агрессивно и полно, чем eventlet. Его API более зрелый и унифицированный, с лучшей поддержкой для сложных сценариев, таких как разветвлённые деревья гринлетов или пулы управляемых воркеров.

Сравнение: кто кого и когда



Производительность



В моих тестах с тысячами HTTP запросов gevent обычно опережает eventlet, особенно под высокой нагрузкой, благодаря более оптимизированному событийному циклу. Однако разница не настолько драматична — порядка 10-15% в большинстве реальных сценариев.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Упрощенный бенчмарк
import time
import gevent
import eventlet
 
def bench_gevent(n):
    start = time.time()
    # ... геврент имплементация ...
    end = time.time()
    return end - start
 
def bench_eventlet(n):
    start = time.time()
    # ... эвентлет имплементация ...
    end = time.time()
    return end - start
 
# Результаты на 1000 параллельных HTTP запросов
# gevent: ~1.2s
# eventlet: ~1.35s

Экосистема и интеграция



Gevent имеет более богатую экосистему и лучшую интеграцию с популярными фреймворками. Например, gunicorn с gevent-воркером — это почти стандарт для асинхронных WSGI-приложений в продакшене.

Eventlet, с другой стороны, иногда легче встраивается в существующие приложения из-за менее агрессивного подхода к патчингу.

Философия использования



Между этими библиотеками также есть философские различия:

Greenlet — для тех, кто хочет полный контроль и готов писать больше кода.
Eventlet — когда нужен лёгкий и понятный инструмент, особенно для сетевых приложений.
Gevent — для серьезных проектов, где важна производительность и масштабируемость.

Выбор имплементации гринлетов напоминает выбор автомобиля. Greenlet — это спортивный кар с механикой: максимальный контроль ценой комфорта. Eventlet — семейный седан: универсален и прост. А gevent — бизнес-класс: мощный, надёжный, хотя и с большим весом. В конце концов, ваш выбор должен зависеть от конкретных потребностей проекта, а не от голых цифр бенчмарков.

Типичное применение многопоточности и многопроцессности



Как выбрать правильный инструмент из нашего параллельного арсенала? Вот несколько типичных сценариев применения каждого подхода:

Потоки (threading):

  1. Веб-скрапинге с большим количеством запросов.
  2. Обработке файлов, особенно когда большинство времени уходит на ожидание I/O.
  3. GUI-приложениях, где нужно держать интерфейс отзывчивым.
  4. Долгоиграющих операциях с базами данных.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import threading
import requests
from queue import Queue
 
def worker(url_queue, results):
while not url_queue.empty():
    url = url_queue.get()
    try:
        response = requests.get(url, timeout=5)
        results[url] = len(response.content)
    except Exception as e:
        results[url] = f"Ошибка: {str(e)}"
    finally:
        url_queue.task_done()
 
# Подготовка данных
urls = ["https://python.org", "https://github.com", "https://stackoverflow.com"] * 10
url_queue = Queue()
for url in urls:
url_queue.put(url)
 
# Запуск потоков
results = {}
threads = []
for _ in range(10):  # 10 параллельных воркеров
t = threading.Thread(target=worker, args=(url_queue, results))
t.daemon = True  # Позволит программе завершиться, даже если поток еще работает
threads.append(t)
t.start()
 
# Ожидание завершения всех задач
url_queue.join()
 
# Вывод результатов
for url, size in results.items():
print(f"{url}: {size}")

Процессы (multiprocessing):

  1. Обработке и трансформации больших массивов данных.
  2. Рендеринге и обработке изображений.
  3. Машинном обучении и статистических вычислениях.
  4. Криптографии и шифровании.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import multiprocessing as mp
from PIL import Image, ImageFilter
import time
import os
 
def process_image(img_path):
img = Image.open(img_path)
result = img.filter(ImageFilter.GaussianBlur(radius=10))
save_path = f"processed_{os.path.basename(img_path)}"
result.save(save_path)
return save_path
 
if __name__ == "__main__":
img_paths = [f"image_{i}.jpg" for i in range(1, 20)]
 
 
# Последовательная обработка
start = time.time()
for path in img_paths:
process_image(path)
seq_time = time.time() - start
print(f"Последовательная обработка: {seq_time:.2f} секунд")
 
# Параллельная обработка
start = time.time()
with mp.Pool(processes=mp.cpu_count()) as pool:
results = pool.map(process_image, img_paths)
par_time = time.time() - start
print(f"Параллельная обработка: {par_time:.2f} секунд")
print(f"Ускорение: {seq_time/par_time:.2f}x")

Гринлеты (gevent/eventlet) в:

  1. Высоконагруженных веб-серверах.
  2. Чат-приложениях с тысячами одновременных соединений.
  3. Сервисах, выполняющих множество медленных API-запросов.
  4. Системах реального времени с множеством асинхронных событий.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import gevent
from gevent import monkey; monkey.patch_all()
import requests
import time
 
def fetch_site(url):
print(f"Начало запроса к {url}")
start = time.time()
response = requests.get(url)
latency = time.time() - start
print(f"Завершён запрос к {url}: {len(response.content)} байт, задержка {latency:.2f}c")
return (url, len(response.content), latency)
 
# Список URL для тестирования
urls = ["https://www.google.com", "https://www.github.com", "https://www.python.org",
       "https://www.reddit.com", "https://www.wikipedia.org"] * 5
 
# Запуск через гринлеты
start = time.time()
jobs = [gevent.spawn(fetch_site, url) for url in urls]
gevent.joinall(jobs)
results = [job.value for job in jobs]
end = time.time()
 
# Анализ результатов
total_bytes = sum(r[1] for r in results)
avg_latency = sum(r[2] for r in results) / len(results)
print(f"\nВсего загружено: {total_bytes/1024:.1f} Кб")
print(f"Средняя задержка: {avg_latency:.2f} секунд")
print(f"Общее время выполнения: {end - start:.2f} секунд")
print(f"Эффективная скорасть: {len(urls)/(end - start):.1f} запросов/сек")

Бенчмарки и сравнение производительности



Теория — это прекрасно, но без цифр она теряет половину своего убеждения. Проведём небольшой эксперимент, сравнив различные подходы на двух типичных задачах: I/O-heavy (множество HTTP-запросов) и CPU-heavy (вычисление простых чисел).

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import threading
import multiprocessing as mp
import gevent
import time
import requests
import math
from functools import partial
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
def benchmark(name, func, args, repeats=3):
times = []
for i in range(repeats):
    start = time.time()
    result = func(*args)
    elapsed = time.time() - start
    times.append(elapsed)
    print(f"{name} ({i+1}/{repeats}): {elapsed:.3f}s")
print(f"{name} (avg): {sum(times)/len(times):.3f}s")
return sum(times)/len(times)
 
# Задача, интенсивная по I/O: загрузка веб-страниц
def fetch_all_sync(urls):
return [requests.get(url).text for url in urls]
 
def fetch_all_threading(urls):
def fetch(url):
    return requests.get(url).text
with ThreadPoolExecutor(max_workers=10) as executor:
    return list(executor.map(fetch, urls))
 
def fetch_all_multiprocessing(urls):
def fetch(url):
    return requests.get(url).text
with ProcessPoolExecutor(max_workers=10) as executor:
    return list(executor.map(fetch, urls))
 
def fetch_all_gevent(urls):
from gevent import monkey; monkey.patch_all()
jobs = [gevent.spawn(requests.get, url) for url in urls]
gevent.joinall(jobs)
return [job.value.text for job in jobs]
 
# Задача, интенсивная по CPU: вычисление простых чисел
def is_prime(n):
if n < 2:
    return False
for i in range(2, int(math.sqrt(n))+1):
    if n % i == 0:
        return False
return True
 
def count_primes_sync(range_tuple):
start, end = range_tuple
return sum(1 for i in range(start, end) if is_prime(i))
 
def count_primes_threading(range_tuple, num_threads=10):
start, end = range_tuple
chunk_size = (end - start) // num_threads
chunks = [(start + i * chunk_size, start + (i+1) * chunk_size) 
         for i in range(num_threads)]
 
 
with ThreadPoolExecutor(max_workers=num_threads) as executor:
    results = list(executor.map(count_primes_sync, chunks))
return sum(results)
 
def count_primes_multiprocessing(range_tuple, num_processes=None):
start, end = range_tuple
if num_processes is None:
    num_processes = mp.cpu_count()
    
chunk_size = (end - start) // num_processes
chunks = [(start + i * chunk_size, start + (i+1) * chunk_size) 
         for i in range(num_processes)]
    
with ProcessPoolExecutor(max_workers=num_processes) as executor:
    results = list(executor.map(count_primes_sync, chunks))
return sum(results)
 
# Тестирование на I/O-bound задаче
urls = ["http://example.com", "http://python.org", "http://github.com"] * 5
print("=== I/O-Bound Benchmark ===")
time_io_sync = benchmark("Sync I/O", fetch_all_sync, (urls,))
time_io_thread = benchmark("Threaded I/O", fetch_all_threading, (urls,))
time_io_mp = benchmark("Multiprocess I/O", fetch_all_multiprocessing, (urls,))
time_io_gevent = benchmark("Gevent I/O", fetch_all_gevent, (urls,))
 
# Тестирование на CPU-bound задаче
prime_range = (1, 500000)
print("\n=== CPU-Bound Benchmark ===")
time_cpu_sync = benchmark("Sync CPU", count_primes_sync, (prime_range,))
time_cpu_thread = benchmark("Threaded CPU", count_primes_threading, (prime_range,))
time_cpu_mp = benchmark("Multiprocess CPU", count_primes_multiprocessing, (prime_range,))
 
# Вывод результатов сравнения
print("\n=== Сравнение производительности ===")
print(f"I/O-bound задача:")
print(f"Threading vs Sync: {time_io_sync/time_io_thread:.2f}x")
print(f"Multiprocessing vs Sync: {time_io_sync/time_io_mp:.2f}x")
print(f"Gevent vs Sync: {time_io_sync/time_io_gevent:.2f}x")
 
print(f"\nCPU-bound задача:")
print(f"Threading vs Sync: {time_cpu_sync/time_cpu_thread:.2f}x")
print(f"Multiprocessing vs Sync: {time_cpu_sync/time_cpu_mp:.2f}x")
При запуске подобного бенчмарка на типичной многоядерной машине вы увидите примерно такие результаты:

I/O-bound задача:
- Потоки: 3-4x быстрее последовательного кода.
- Процессы: 2-3x быстрее последовательного кода (но медленнее потоков из-за дополнительных накладных расходов).
- Гринлеты: 3-5x быстрее последовательного кода (часто быстрее традиционных потоков!).

CPU-bound задача:
- Потоки: ~1x от последовательного кода (иногда даже медленнее из-за накладных расходов на переключение).
- Процессы: 3-4x быстрее последовательного кода (на 4-ядерной машине).
- Гринлеты: не тестировались, но были бы близки к производительности последовательного кода.

Типичные ошибки и подводные камни



Опыт — сын ошибок трудных, как говорил классик. В мире многопоточности есть свои излюбленные грабли, на которые регулярно наступают даже бывалые разработчики:

1. Злоупотребление лёгкой весовой категорией



Самая типичная ошибка — использование потоков для CPU-интенсивных задач в надежде получить чудесное ускорение. Результат — разочарование и недоумение.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# ПЛОХО: Ожидание ускорения CPU-задачи через потоки
import threading
 
def cpu_heavy_task():
result = sum(i*i for i in range(10**7))
return result
 
# Создаём 4 потока для "параллельной" работы
threads = [threading.Thread(target=cpu_heavy_task) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
 
# В реальности ваш код будет выполняться последовательно из-за GIL,
# но с дополнительными накладными расходами на управление потоками

2. Ошибки гонки данных и синхронизации



Неправильное управление доступом к разделяемым данным — источник бесконечной боли и отчаяния:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# ПЛОХО: Необезопасенная работа с общими данными
import threading
counter = 0
 
def increment():
global counter
for _ in range(100000):
    # Этот, казалось бы, атомарный стейтмент в действительности 
    # состоит из нескольких операций (чтение, инкремент, запись)
    counter += 1  # <-- Опасная гонка данных!
 
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
 
print(f"Ожидаемый результат: 500000, Фактический: {counter}")
# Почти наверняка вы получите число меньше 500000

3. Незапертые ресурсы и взаимоблокировки



Неправильное использование блокировок может привести к двум противоположным проблемам: либо ресурс остаётся незаблокированным (гонка данных), либо блокировка никогда не освобождается (deadlock).

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# ПЛОХО: Потенциальная взаимная блокировка
import threading
 
lock_a = threading.Lock()
lock_b = threading.Lock()
 
def task_one():
with lock_a:
# Если task_two уже захватил lock_b, мы останемся здесь навечно
print("Task one got lock A, waiting for lock B...")
with lock_b:
    print("Task one working with both resources")
 
def task_two():
with lock_b:
# Если task_one уже захватил lock_a, мы останемся здесь навечно
print("Task two got lock B, waiting for lock A...")
with lock_a:
    print("Task two working with both resources")
 
# Запуск потоков, которые могут привести к взаимоблокировке
t1 = threading.Thread(target=task_one)
t2 = threading.Thread(target=task_two)
t1.start(); t2.start()
t1.join(); t2.join()

4. Преждевременная оптимизация



Часто разработчики ныряют в многопоточность ещё до того, как действительно сталкиваются с проблемами производительности:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ПЛОХО: Чрезмерная многопоточность для простой задачи
import threading
 
def simple_task(n):
return n * 2
 
# Избыточная многопоточность для тривиальной операции
data = list(range(10))
results = [None] * len(data)
 
def process_item(idx, value):
results[idx] = simple_task(value)
 
threads = [threading.Thread(target=process_item, args=(i, n)) for i, n in enumerate(data)]
for t in threads:
t.start()
for t in threads:
t.join()
 
# Накладные расходы на создание потоков превышают выигрыш от параллельности
# для таких легковесных операций

5. Забытые потоки и процессы



Незавершённые потоки и процессы могут продолжать работать в фоне, потребляя ресурсы и препятствуя корректному завершению программы:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# ПЛОХО: Потоки-демоны без контроля
import threading
import time
 
def background_task():
while True:
    print("Working...")
    time.sleep(1)
 
# Создаем и запускаем поток
t = threading.Thread(target=background_task)
t.start()
# ... основной код ...
[H2]Завершаем программу, но поток остается висеть в памяти[/H2]
 
# ХОРОШО: Используем daemon=True или явно управляем жизненным циклом
t = threading.Thread(target=background_task, daemon=True)
t.start()
# При завершении программы daemon-потоки будут автоматически уничтожены

Превращаем типичное в оптимальное



Для каждой типичной ошибки есть оптимальное решение. Быть может, это не то, чему учат в университете, но зато работает на боевых серверах:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# ХОРОШО: Правильный выбор инструмента в зависимости от задачи
import concurrent.futures
import time
 
def io_bound_task(url):
# Операция, ограниченная вводом-выводом
import requests
return requests.get(url).text[:100]  # Возвращаем первые 100 символов
 
def cpu_bound_task(n):
# Вычислительно интенсивная операция
return sum(i*i for i in range(n))
 
# Для I/O-bound задач используем ThreadPoolExecutor
urls = ["https://example.com", "https://python.org", "https://github.com"] * 3
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(io_bound_task, urls))
 
# Для CPU-bound задач используем ProcessPoolExecutor
numbers = [10**6, 10**6, 10**6, 10**6]
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_bound_task, numbers))
Интерфейс concurrent.futures — одна из самых элегантных абстракций в стандартной библиотеке Python. Он упрощает использование и потоков, и процессов с единым API, что позволяет легко переключаться между ними.
В следующей части мы глубже погрузимся в стратегии выбора правильного подхода для разных типов задач и узнаем, как комбинировать различные техники для достижения максимальной производительности и масштабируемости.

Стратегии выбора подхода в зависимости от типа задач: I/O-bound vs CPU-bound



Многопоточное программирование — это прежде всего искусство выбора подходящего инструмента под конкретную задачу. Неудачно выбранный подход может обернуться не просто отсутствием ускорения, но и деградацией производительности, что я однажды испытал на собственной шкуре, пытаясь распараллелить простой алгоритм генерации хеш-таблиц с помощью потоков.

Диагностика: определяем тип задачи



Прежде чем броситься в объятия многопоточности, нужно понять, с каким зверем мы имеем дело. Задачи в программировании можно условно поделить на две большие категории:

I/O-bound задачи



Это задачи, скорость выполнения которых ограничивается операциями ввода-вывода:
  • Сетевые запросы (HTTP, FTP, WebSocket).
  • Чтение и запись файлов.
  • Запросы к базам данных.
  • Межпроцессное взаимодействие.
В этих задачах CPU большую часть времени простаивает в ожидании ответа от "медленных" внешних ресурсов.

CPU-bound задачи



Задачи, ограниченные вычислительной мощностью процессора:
  • Математические вычисления.
  • Обработка и анализ данных в памяти.
  • Компрессия и шифрование.
  • Рендеринг и работа с графикой.
В этих задачах процессор загружен на полную, а система ввода-вывода, как правило, простаивает.

Инструментальная диагностика



Как определить тип задачи на практике? Существуют инструменты профилирования, которые помогут вам понять, где именно программа проводит большую часть времени:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import cProfile
import pstats
from io import StringIO
 
def my_function():
# Ваш код здесь
pass
 
# Профилируем функцию
pr = cProfile.Profile()
pr.enable()
my_function()
pr.disable()
 
# Анализируем результаты профилирования
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats()
print(s.getvalue())
Изучив результаты профилирования, вы увидите, сколько времени тратится на вычисления, а сколько – на ожидание I/O. Это даст вам первые подсказки о природе вашей задачи.

Стратегия 1: I/O-bound? Идём многопоточным путём



Для задач, ограниченных вводом-выводом, потоки и асинхронное программирование — естественный выбор.

Почему потоки эффективны для I/O-bound задач



Когда поток ожидает завершения операции ввода-вывода, он освобождает GIL, позволяя другим потокам выполняться. Таким образом, пока один поток ждёт данных из сети или диска, другой поток может выполнять полезную работу.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
import requests
import time
 
def download_site(url):
print(f"Начало загрузки {url}")
response = requests.get(url)
print(f"Завершена загрузка {url}, получено {len(response.content)} байт")
 
sites = [
"https://www.python.org",
"https://www.google.com",
"https://www.github.com",
] * 3  # повторим каждый URL по 3 раза
 
# Последовательный подход
start_time = time.time()
for site in sites:
download_site(site)
print(f"Последовательное время: {time.time() - start_time:.2f} секунд")
 
# Многопоточный подход
start_time = time.time()
threads = []
for site in sites:
thread = threading.Thread(target=download_site, args=(site,))
threads.append(thread)
thread.start()
 
for thread in threads:
thread.join()
print(f"Многопоточное время: {time.time() - start_time:.2f} секунд")

Альтернативы для I/O-bound задач



Потоки — не единственное решение для I/O-bound задач. Современный Python предлагает несколько мощных альтернатив:

1. asyncio: корутины для неблокирующего I/O



Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import aiohttp
import time
 
async def download_site(session, url):
print(f"Начало загрузки {url}")
async with session.get(url) as response:
    content = await response.read()
    print(f"Завершена загрузка {url}, получено {len(content)} байт")
 
async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
    tasks = [download_site(session, site) for site in sites]
    await asyncio.gather(*tasks)
 
sites = [
"https://www.python.org",
"https://www.google.com",
"https://www.github.com",
] * 3
 
# Измеряем время выполнения
start_time = time.time()
asyncio.run(download_all_sites(sites))
print(f"Асинхронное время: {time.time() - start_time:.2f} секунд")

2. Гринлеты для "притворно-синхронного" кода



Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import gevent
from gevent import monkey
import requests
import time
 
# Патчим стандартные библиотеки для автоматического переключения гринлетов
monkey.patch_all()
 
def download_site(url):
print(f"Начало загрузки {url}")
response = requests.get(url)
print(f"Завершена загрузка {url}, получено {len(response.content)} байт")
return len(response.content)
 
sites = [
"https://www.python.org",
"https://www.google.com",
"https://www.github.com",
] * 3
 
# Запускаем через гринлеты
start_time = time.time()
jobs = [gevent.spawn(download_site, site) for site in sites]
gevent.joinall(jobs)
print(f"Gevent-время: {time.time() - start_time:.2f} секунд")

Стратегия 2: CPU-bound? Да здравствуют процессы!



Для задач, ограниченных вычислительной мощностью, потоки практически бесполезны из-за GIL. Здесь нам на помощь приходят процессы.

Почему процессы эффективны для CPU-bound задач



Поскольку каждый процесс имеет свой собственный интерпретатор Python и, следовательно, свой GIL, процессы могут выполняться параллельно на разных ядрах процессора, обеспечивая реальное ускорение для вычислительно-интенсивных задач.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import multiprocessing
import time
 
def cpu_bound_task(number):
"""Вычислительно-интенсивная задача"""
total = 0
for i in range(number):
    total += i [B] 2
return total
 
numbers = [10[/B]7, 10**7, 10**7, 10**7]  # Список входных данных
 
# Последовательный подход
start_time = time.time()
for number in numbers:
cpu_bound_task(number)
print(f"Последовательное время: {time.time() - start_time:.2f} секунд")
 
# Многопроцессный подход
if __name__ == "__main__":  # Это очень важно для Windows!
    start_time = time.time()
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound_task, numbers)
    print(f"Многопроцессное время: {time.time() - start_time:.2f} секунд")

Альтернативы для CPU-bound задач



1. Использование C-расширений, которые освобождают GIL



Некоторые библиотеки Python, такие как NumPy, SciPy и Pandas, написаны на C и освобождают GIL во время выполнения вычислений, что делает их эффективными даже в многопоточной среде:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import numpy as np
import threading
import time
 
def numpy_calculation(size):
# NumPy освобождает GIL во время выполнения этих операций
matrix = np.random.rand(size, size)
result = np.dot(matrix, matrix.T)
return result
 
# Последовательное выполнение
sizes = [1000, 1000, 1000, 1000]
start_time = time.time()
for size in sizes:
numpy_calculation(size)
print(f"Последовательное время: {time.time() - start_time:.2f} секунд")
 
# Многопоточное выполнение
start_time = time.time()
threads = []
for size in sizes:
thread = threading.Thread(target=numpy_calculation, args=(size,))
threads.append(thread)
thread.start()
 
for thread in threads:
thread.join()
print(f"Многопоточное время: {time.time() - start_time:.2f} секунд")

2. Использование альтернативных реализаций Python



Некоторые альтернативные реализации Python, такие как PyPy с STM (Software Transactional Memory) или Jython, не имеют GIL и могут обеспечить реальную параллельную производительность для CPU-bound задач.

Гибридные стратегии: мешаем, но не взбалтываем



В реальных приложениях очень редко встречаются чисто I/O-bound или чисто CPU-bound задачи. Чаще всего это некоторая смесь того и другого. В таких случаях может потребоваться гибридный подход.

Паттерн "Рабочий пул + Очередь задач"



Этот паттерн позволяет эффективно распределять задачи между рабочими процессами:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import multiprocessing as mp
from multiprocessing import Queue
import time
import random
 
def worker(task_queue, result_queue):
"""Процесс-работник, который выполняет задачи из очереди"""
while True:
    try:
        task_id, task_type, data = task_queue.get(timeout=1)
        if task_type == "io":
            # Имитация I/O задачи
            time.sleep(random.random())
            result = f"I/O выполнено для {data}"
        elif task_type == "cpu":
            # Имитация CPU задачи
            result = sum(i * i for i in range(data))
        else:
            result = "Неизвестный тип задачи"
        
        result_queue.put((task_id, result))
    except:
        # Если очередь пуста 1 секунду, завершаем работу
        break
 
def main():
# Создаём очереди задач и результатов
task_queue = Queue()
result_queue = Queue()
 
# Загружаем некоторые задачи в очередь
tasks = [
    (1, "io", "https://example.com"),
    (2, "cpu", 10**6),
    (3, "io", "file.txt"),
    (4, "cpu", 10**7),
    (5, "io", "https://python.org"),
    (6, "cpu", 10**5),
]
for task in tasks:
    task_queue.put(task)
 
# Запускаем рабочие процессы
num_workers = mp.cpu_count()
workers = [mp.Process(target=worker, args=(task_queue, result_queue)) 
          for _ in range(num_workers)]
for w in workers:
    w.start()
 
# Собираем результаты
results = {}
while len(results) < len(tasks):
    task_id, result = result_queue.get()
    results[task_id] = result
    print(f"Задача {task_id} завершена: {result}")
 
# Ждём, пока все процессы завершатся
for w in workers:
    w.join()
 
if __name__ == "__main__":
main()

Многоуровневое распараллеливание



Для некоторых сложных приложений может иметь смысл использовать многоуровневое распараллеливание: процессы на верхнем уровне для распределения CPU-интенсивных задач, и потоки или асинхронное программирование на нижнем уровне для I/O-операций:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import multiprocessing as mp
import threading
import time
import random
 
def io_task(url):
# Имитация I/O-задачи
time.sleep(random.random() * 0.1)
return f"Загружено {url}"
 
def cpu_and_io_task(batch_id, urls):
"""Эта функция выполняет CPU-работу и создаёт потоки для I/O-задач"""
# CPU-интенсивная часть
result = sum(i * i for i in range(10**6))
 
 
# I/O-часть, распараллеленная потоками
io_results = []
threads = []
for url in urls:
    t = threading.Thread(target=lambda u: io_results.append(io_task(u)), args=(url,))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()
    
return (batch_id, result, io_results)
 
if __name__ == "__main__":
# Подготовка данных для задач
batches = [
    (1, ["url1", "url2", "url3"]),
    (2, ["url4", "url5", "url6"]),
    (3, ["url7", "url8", "url9", "url10"]),
    (4, ["url11", "url12"])
]
 
# Создаём пул процессов
with mp.Pool(processes=mp.cpu_count()) as pool:
    # map_async не блокирует основной поток
    result = pool.starmap(cpu_and_io_task, [(bid, urls) for bid, urls in batches])
    
    for batch_id, cpu_result, io_results in result:
        print(f"Batch {batch_id}:")
        print(f"  CPU result: {cpu_result}")
        print(f"  I/O results: {io_results}")

Выбор стратегии: практический алгоритм



Вот упрощённый алгоритм выбора подхода к распараллеливанию:
1. Определите доминирующую природу вашей задачи (профилирование поможет!)
2. Для I/O-bound задач:
- Если код должен быть масштабируемым для тысяч одновременных операций, используйте asyncio или гринлеты.
- Если важна совместимость с существующим кодом, используйте потоки.
- Если операция включает блокирующий C-код, используйте потоки.
3. Для CPU-bound задач:
- Если данные можно легко разделить, используйте multiprocessing.Pool.
- Если задача сложная, с несколькими этапами, используйте явные процессы с очередями.
- Если возможно, используйте существующие C-расширения, которые освобождают GIL.
4. Для смешанных задач:
- Рассмотрите многоуровневую архитектуру: процессы для CPU-задач, потоки/асинхронность для I/O.
- Для сложных рабочих потоков используйте очереди задач и пулы работников.
В конечном счёте, выбор оптимальной стратегии — это искусство, которое приходит с опытом. Не бойтесь экспериментировать с разными подходами и измерять результаты. Профилирование и бенчмаркинг — ваши лучшие друзья в этом путешествии.

Потоки или процессы для многочисленных сессий
Доброго времени суток! Есть необходимость в целях создания нагрузки (сетевая активность...

Создать потоки и разделить на эти потоки итерации по формуле арифметической прогрессии
В общем я еще балван и новичок в py, хочу узнать. Возможно ли по значению spinBox создать...

Есть ли полноценная многопоточность в Python?
Всем доброго времени суток, в Python я новичек, ну и ближе к делу. Написал маленькую программку...

Многопоточность в Python, thread или multiprocessing
Приветствую всех читающих. По необходимости понадобилась многопоточность в python. Перелопатил горы...

Многопоточность в python, Rabbitmq
доброго времени суток, господа. я не имею опыта в многопоточности python`а, однако пришло время...

Многопоточность в Python, синхронизация потоков с помощью семафоров или мьютексов
Требуется разработать многопоточное приложение. Приложение состоит из 4 потоков: • “main” (главный...

Многопоточность в Python PyQt5 Поток просто не хочет привязываться к функции
Доброго времени суток!Я столкнулся с проблемой создания потока для функции с использованием PyQt5...

Многопоточность в Python: чтение, запись и сортировка списка
Главный поток программы должен генерировать строки случайного содержания и помещать их в конец...

Многопоточность в Python, отслеживание стоимости нескольких акций
Я начинающий python программист, пишу бота для дискорд, не могу решить одну проблему, а конкретно:...

Многопоточность в Python. Как завершить процесс explorer.exe, если пользователь запускает игру из Xbox Game Pass
Здравствуйте! Я пишу приложение, которое будет решать специфическую задачу. А именно, оно должно...

Многопоточность в Python для камер слежения
Добрый день. Описание задачи: У меня есть 7 магазинов, в каждом есть ip-камеры. А также 7...

Python, Django и фоновые процессы
Всем привет. Мне через сайт нужно запустить парсинг данных, естественно, парсинг может затянуться...

Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Всего комментариев 0
Комментарии
 
Новые блоги и статьи
Чем асинхронная логика (схемотехника) лучше тактируемой, как я думаю, что помимо энергоэффективности - ещё и безопасность.
Hrethgir 14.05.2025
Помимо огромного плюса в энергоэффективности, асинхронная логика - тотальный контроль над каждым совершённым тактом, а значит - безусловная безопасность, где безконтрольно не совершится ни одного. . .
Многопоточные приложения на C++
bytestream 14.05.2025
C++ всегда был языком, тесно работающим с железом, и потому особеннно эффективным для многопоточного программирования. Стандарт C++11 произвёл революцию, добавив в язык нативную поддержку потоков,. . .
Stack, Queue и Hashtable в C#
UnmanagedCoder 14.05.2025
Каждый опытный разработчик наверняка сталкивался с ситуацией, когда невинный на первый взгляд List<T> превращался в узкое горлышко всего приложения. Причина проста: универсальность – это прекрасно,. . .
Как использовать OAuth2 со Spring Security в Java
Javaican 14.05.2025
Протокол OAuth2 часто путают с механизмами аутентификации, хотя по сути это протокол авторизации. Представьте, что вместо передачи ключей от всего дома вашему другу, который пришёл полить цветы, вы. . .
Анализ текста на Python с NLTK и Spacy
AI_Generated 14.05.2025
NLTK, старожил в мире обработки естественного языка на Python, содержит богатейшую коллекцию алгоритмов и готовых моделей. Эта библиотека отлично подходит для образовательных целей и. . .
Реализация DI в PHP
Jason-Webb 13.05.2025
Когда я начинал писать свой первый крупный PHP-проект, моя архитектура напоминала запутаный клубок спагетти. Классы создавали другие классы внутри себя, зависимости жостко прописывались в коде, а о. . .
Обработка изображений в реальном времени на C# с OpenCV
stackOverflow 13.05.2025
Объединение библиотеки компьютерного зрения OpenCV с современным языком программирования C# создаёт симбиоз, который открывает доступ к впечатляющему набору возможностей. Ключевое преимущество этого. . .
POCO, ACE, Loki и другие продвинутые C++ библиотеки
NullReferenced 13.05.2025
В C++ разработки существует такое обилие библиотек, что порой кажется, будто ты заблудился в дремучем лесу. И среди этого многообразия POCO (Portable Components) – как маяк для тех, кто ищет. . .
Паттерны проектирования GoF на C#
UnmanagedCoder 13.05.2025
Вы наверняка сталкивались с ситуациями, когда код разрастается до неприличных размеров, а его поддержка становится настоящим испытанием. Именно в такие моменты на помощь приходят паттерны Gang of. . .
Создаем CLI приложение на Python с Prompt Toolkit
py-thonny 13.05.2025
Современные командные интерфейсы давно перестали быть черно-белыми текстовыми программами, которые многие помнят по старым операционным системам. CLI сегодня – это мощные, интуитивные и даже. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru