Форум программистов, компьютерный форум, киберфорум
Python
Войти
Регистрация
Восстановить пароль
Карта форума Темы раздела Блоги Сообщество Поиск Заказать работу  
 
Рейтинг 4.73/11: Рейтинг темы: голосов - 11, средняя оценка - 4.73
4 / 4 / 2
Регистрация: 04.04.2015
Сообщений: 186
1

Управление потоками

03.04.2018, 19:09. Показов 2090. Ответов 13
Метки нет (Все метки)

Author24 — интернет-сервис помощи студентам
Задача: запустить в поток в котором что-то делается в цикле и в конце цикла в очередь добавляется параметр.
Как только в очередь появляется элемент, запустить группу потоков не превышающего n - ого количества потоков, которые будут брать параметры из очереди и что-то делать пока не очистится очередь. Причем поток, который добавляет в очередь параметры (1 строчка) заканчивается быстрей, чем группа. Как это сделать? C потоками только сегодня познакомился, так что не закидывайте тапками
main_threading - метода для запуска первого потока
starting_threadin - метод для запуска группы потоков не превышающего n-ого числа потоков
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading
import queue
q=queue.Queue()
threads_count=5
 
def get_city_parameters(driver):
     for city in cities_all:
         ....
         q.put(subject)
 
def main_threading():
    driver = init_driver()
    get_city_parameters(driver)
    driver.quit()
 
def starting_threading():
    while  q.empty()==False:
        if(threading.active_count()<=threads_count+1):
            t = threading.Thread(target=parsing, args=(q,))
            t.start()
 
def parsing(q):
    #print('Запуск потока', q.get()[0], q.get()[1], q.get()[2])
    print('Запуск потока', q.get())
    print("Количество активных потоков",threading.activeCount())
    driver=init_driver()
    driver.get(url_main)
    driver.quit()
 
if __name__ == '__main__':
    start_time=time.time()
    main_thread = threading.Thread(target=parsing, args=(q,))
    main_thread.start()  
    print(time.time()-start_time)
Добавлено через 3 минуты
Думаю, нужно как-то .join применить, но как пока не вдупляю

Добавлено через 54 минуты
Чуток код перепутал
Python
1
2
3
4
5
if __name__ == '__main__':
    start_time=time.time()
    main_thread = threading.Thread(target=main_threading, args=' ')
    main_thread.start()  
    print(time.time()-start_time)
0
Лучшие ответы (1)
Programming
Эксперт
94731 / 64177 / 26122
Регистрация: 12.04.2006
Сообщений: 116,782
03.04.2018, 19:09
Ответы с готовыми решениями:

Управление двумя потоками и одновременное их "окончание"
Возникла проблема в реализации следующего - какую-то часть времени выполняем произвольный первый...

Управление потоками в Python
С использованием многопоточности для заданного значения найти сумму ряда с точностью члена...

Работа с потоками
Всем привет. Есть мультипотоки которые собирают из файла логин , пароль и запускают . Я не как...

Управление потоками
Доброго времени суток. с потоками я вроде как разобрался, теперь проблема в их управлении......

Управление потоками
Необходимо реализовать синхронизацию потоков таким образом, чтобы если первый поток был занят...

13
151 / 102 / 33
Регистрация: 11.08.2016
Сообщений: 574
03.04.2018, 19:53 2
в код страшновато глядеть, объясните на пальцах – поток, управляющий очередью, должен быть асинхронным по отношению к основному потоку? или же просто надо сформировать очередь из готового к-ва заданий и распараллелить это на n max?
0
4 / 4 / 2
Регистрация: 04.04.2015
Сообщений: 186
03.04.2018, 20:31  [ТС] 3
Цитата Сообщение от blz Посмотреть сообщение
в код страшновато глядеть, объясните на пальцах – поток, управляющий очередью, должен быть асинхронным по отношению к основному потоку? или же просто надо сформировать очередь из готового к-ва заданий и распараллелить это на n max?
Что понимается под основным потоком? Это поток, который запускает программу?
0
151 / 102 / 33
Регистрация: 11.08.2016
Сообщений: 574
03.04.2018, 21:04 4
да. или же Ваше приложение должно быть более сложным, с «динамической» очередью, которая не может быть сформирована единомоментно и может управляться из другого потока. в общем, каков алгоритм?

Добавлено через 3 минуты
в общем, мне лично видится 2 варианта:
1. простой, реализуемый в несколько строк – мы получаем список задач и распараллеливаем его на n max потоков.
2. более сложный – у нас есть поток, управляющий распределением задач и есть потом (возможно главный) который управляет очередью задач.
0
4 / 4 / 2
Регистрация: 04.04.2015
Сообщений: 186
03.04.2018, 21:13  [ТС] 5
Цитата Сообщение от blz Посмотреть сообщение
да. или же Ваше приложение должно быть более сложным, с «динамической» очередью, которая не может быть сформирована единомоментно и может управляться из другого потока. в общем, каков алгоритм?
Добавлено через 3 минуты
в общем, мне лично видится 2 варианта:
1. простой, реализуемый в несколько строк – мы получаем список задач и распараллеливаем его на n max потоков.
2. более сложный – у нас есть поток, управляющий распределением задач и есть потом (возможно главный) который управляет очередью задач.
Есть главный поток, который создается при запуске программы
Главный поток создает поток 1, который парсит данные и добавляет в очередь
И вот в этот момент, пока очередь не пуста, должна запуститься группа потоков до n, которые также создаются главным потоком
Поток 1 отработал, заполнил очередь, умер. Группа потоков продолжает работать, пока не закончится очередь
0
151 / 102 / 33
Регистрация: 11.08.2016
Сообщений: 574
03.04.2018, 21:14 6
Цитата Сообщение от danilshik Посмотреть сообщение
Есть главный поток, который создается при запуске программы
К.О., привет
0
4 / 4 / 2
Регистрация: 04.04.2015
Сообщений: 186
03.04.2018, 21:16  [ТС] 7
Цитата Сообщение от blz Посмотреть сообщение
Сообщение от danilshik
Есть главный поток, который создается при запуске программы
К.О., привет
Случайно нажал, сообщение выше вашего
0
151 / 102 / 33
Регистрация: 11.08.2016
Сообщений: 574
03.04.2018, 21:29 8
предлагаю называть негров – неграми Вы в потоках разбираетесь пока от слова никак, так что о5 же предлагаю упростить сначала задачу до п1. По мере осиления переходя к п 2. Если согласны – кивните.
0
4 / 4 / 2
Регистрация: 04.04.2015
Сообщений: 186
03.04.2018, 21:42  [ТС] 9
Цитата Сообщение от blz Посмотреть сообщение
предлагаю называть негров – неграми Вы в потоках разбираетесь пока от слова никак, так что о5 же предлагаю упростить сначала задачу до п1. По мере осиления переходя к п 2. Если согласны – кивните.
Хорошо

Добавлено через 1 минуту
Как бы я уже реализовал поток 1 и группу потоков, но у меня группа потоков начинается только после окончания потока 1, что затрачивает лишнее время
0
151 / 102 / 33
Регистрация: 11.08.2016
Сообщений: 574
03.04.2018, 22:29 10
если хотите что-то освоить – тогда пока отложите в сторону то, что Вы делаете. вот пример скелетной реализации распараллеливания n задач на k потоков.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time
 
MAX_WORKERS = 10
TASK_COUNT = 25
 
def run_task(name, value):
    sleep(1)
    return name, value // 5
 
if __name__ == '__main__':
    tasks = { f'task{i}': i for i in range(TASK_COUNT) }
 
start = time()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    workers = [executor.submit(run_task, n, v) for n, v in tasks.items()]
    for worker in as_completed(workers):
        task, res = worker.result()
        elapsed = time() - start
        print(f'+{elapsed:.2f}: {task} {res}')
здесь смысл в том, что прямое управление потоками и использование объектов синхронизации не всегда необходимо.

Добавлено через 2 минуты
тут все понятно пока?
0
4 / 4 / 2
Регистрация: 04.04.2015
Сообщений: 186
03.04.2018, 22:42  [ТС] 11
Цитата Сообщение от blz Посмотреть сообщение
если хотите что-то освоить – тогда пока отложите в сторону то, что Вы делаете. вот пример скелетной реализации распараллеливания n задач на k потоков.
PythonВыделить код
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time
MAX_WORKERS = 10
TASK_COUNT = 25
def run_task(name, value):
* * sleep(1)
* * return name, value // 5
if __name__ == '__main__':
* * tasks = { f'task{i}': i for i in range(TASK_COUNT) }
start = time()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
* * workers = [executor.submit(run_task, n, v) for n, v in tasks.items()]
* * for worker in as_completed(workers):
* * * * task, res = worker.result()
* * * * elapsed = time() - start
* * * * print(f'+{elapsed:.2f}: {task} {res}')
здесь смысл в том, что прямое управление потоками и использование объектов синхронизации не всегда необходимо.
Добавлено через 2 минуты
тут все понятно пока?
Да, только желательно комментарии, а то приходится смотреть, что методы делают
0
151 / 102 / 33
Регистрация: 11.08.2016
Сообщений: 574
03.04.2018, 23:52 12
Лучший ответ Сообщение было отмечено Black Fregat как решение

Решение

Цитата Сообщение от danilshik Посмотреть сообщение
а то приходится смотреть, что методы делают
не барское это дело – документацию читать?

Добавлено через 46 минут
А теперь вариант, который не дожидается окончания формирования очереди задач, а начинает их выполнять сразу по мере поступления (соблюдая ограничение на max к-во потоков). И снова никакие «ручные» потоки нам не нужны.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from threading import Lock
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from time import sleep, time
 
MAX_WORKERS = 5
TASK_COUNT = 35
 
def msg(text):
    if not hasattr(msg, 'lock'):
        msg.lock = Lock()
    with msg.lock:
        print(text)
 
def run_task(name, value):
    sleep(2)
    return name, value // 5
 
def task_complete(task):
    name, res = task.result()
    elapsed = time() - start
    msg(f'+{elapsed:.2f}: {name} {res}')
 
 
if __name__ == '__main__':
    start = time()
    executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
    workers = []
 
    msg('creating tasks...')
    for i in range(TASK_COUNT):
        t = executor.submit(run_task, f'task{i}', i)
        t.add_done_callback(task_complete)
        workers.append(t)
        sleep(0.2)
    msg('done creating tasks')
    wait(workers)
    msg('done executing tasks')
Код
creating tasks...
+2.00: task0 0
+2.20: task1 0
+2.40: task2 0
+2.60: task3 0
+2.80: task4 0
+4.00: task5 1
+4.20: task6 1
+4.40: task7 1
+4.60: task8 1
+4.81: task9 1
+6.00: task10 2
+6.21: task11 2
+6.41: task12 2
+6.61: task13 2
+6.81: task14 2
done creating tasks
+8.01: task15 3
+8.21: task16 3
+8.41: task17 3
+8.61: task18 3
+8.81: task19 3
+10.01: task20 4
+10.21: task21 4
+10.41: task22 4
+10.61: task23 4
+10.81: task24 4
+12.01: task25 5
+12.21: task26 5
+12.41: task27 5
+12.61: task28 5
+12.81: task29 5
+14.01: task30 6
+14.22: task31 6
+14.41: task32 6
+14.62: task33 6
+14.81: task34 6
done executing tasks
Добавлено через 6 минут
пара моментов, которые надо пояснить, думаю.
1. msg(). она вызывается асинхронно и мы можем получить некоторое месиво в stdout. поэтому там Lock()
2. в данной реализации мы не опрашиваем список futures, попутно получая результаты. вместо этого мы просто ожидаем, когда они все выполнятся. получение же результатов производится через callback (который регистрируется при создании и вызывается при завершении каждого future).
1
║XLR8║
1212 / 909 / 270
Регистрация: 25.07.2009
Сообщений: 4,361
Записей в блоге: 5
06.04.2018, 13:38 13
danilshik, blz, GIL никто не отменял.

In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once.

Non-CPython implementations
  • Jython and IronPython have no GIL and can fully exploit multiprocessor systems
  • PyPy currently has a GIL like CPython
  • in Cython the GIL exists, but can be released temporarily using a "with" statement
Если у вас CPython - забудьте о потоках, процессы - максимум что вам доступно.
0
151 / 102 / 33
Регистрация: 11.08.2016
Сообщений: 574
06.04.2018, 14:25 14
Цитата Сообщение от outoftime Посмотреть сообщение
Если у вас CPython - забудьте о потоках, процессы - максимум что вам доступно.
в такой формулировке подразумевается, что:
1. мы почему то ничего не знаем о потоках, процессах и GIL, ничего не читали по теме и не сталкивались на практике
2. распараллеливание потоками сферически бессмысленно, что не соответствуют действительности.

Раз уж Вы выразили свое мнение – продолжите и расскажите на что именно на практике GIL накладывает существенные ограничения. и на что он не накладывает.
0
06.04.2018, 14:25
IT_Exp
Эксперт
87844 / 49110 / 22898
Регистрация: 17.06.2006
Сообщений: 92,604
06.04.2018, 14:25
Помогаю со студенческими работами здесь

Управление потоками
Здравствуйте. Столкнулся с такой проблемой, как контроль потоков. Добавляю в коллекцию, все токены...

Управление процессами и потоками
Помогите пожалуйста. Нужно вывести ID текущего процесса, затем вывести ID созданного процесса и их...

Удаленное управление потоками
Здравствуйте. Задача следующая: Необходимо удаленно подключиться к компьютеру в домене, найти все...

Управление процессами и потоками
Найти сумму (n и 1), (n-1 и 2) и т.д. элементов массива. Обработка каждой суммы в порожденном...

Управление процессами и потоками
Необходимо разработать программу, демонстрирующую возможности управления процессами и потоками:...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
14
Ответ Создать тему
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2024, CyberForum.ru