Форум программистов, компьютерный форум, киберфорум
Python
Войти
Регистрация
Восстановить пароль
Карта форума Темы раздела Блоги Сообщество Поиск Заказать работу  
 
Рейтинг 4.75/4: Рейтинг темы: голосов - 4, средняя оценка - 4.75
Эксперт Python
295 / 108 / 57
Регистрация: 07.12.2016
Сообщений: 209
1

Threading и Queue непонятки в какакоде

09.07.2017, 12:59. Показов 633. Ответов 3
Метки нет (Все метки)

Author24 — интернет-сервис помощи студентам
Доброго времени суток, Уважаемые Форумчане. Решил поковыряться в потоках на питоне. Написал простейший скриптик для этого - банально чекает прокси. Проблема следующая - когда запускаю небольшое кол-во(и потоков, и проксей), то код отрабатывает и завершается. Вроде бы все нормально. Но когда кидаю побольше(и того, и другого) то вроде бы все работает, но не завершается. ждёт по queue.join(). Укажите, пожалуйста, на ошибки.
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import requests
import re
from queue import Queue, LifoQueue
import threading
 
 
class Proxy_check(object):
    def __init__(self):
        self.queue_wrt = Queue()
        self.queue_read = LifoQueue()
        self.queue_parse = Queue()
        self.check = re.compile(r'(banned)|(приостановлен)|(оступ заблокирован)|(одозрительная активность)')
 
    def read_file(self): # считываем проксики из файла и засовываем в очередь
        with open('Proxy.txt', 'r', encoding='utf-8') as read_file:
            for proxy_rd in read_file.read().split('\n'):
                if proxy_rd!='':
                    self.queue_read.put(proxy_rd.strip())
 
 
    def get_page(self,url): # кидаем на проверку и при удачном response добавляем в другую очередь(на дальнейшую проверку)
        while not self.queue_read.empty():
            proxy = self.queue_read.get()
            proxies = {'https':'https://'+str(proxy), 'http':'http://'+str(proxy)}
            try:
                page = requests.get(url, proxies=proxies, timeout=5)
            except:
                pass
            else:
                if page.status_code==200:
                    self.queue_parse.put((page.text, proxy))
                    self.parse_page()
            finally:
                self.queue_read.task_done()
 
 
    def parse_page(self):# проверяем полученную страницу на баны и тому подобное и кладем в очередь на запись
        while not self.queue_parse.empty():
            page = self.queue_parse.get()
            if not self.check.search(page[0]):
                self.queue_wrt.put(page[1])
                self.wrt_file()
            self.queue_parse.task_done()
 
 
    def wrt_file(self):# записываем в новый файл
        while not self.queue_wrt.empty():
            proxy = self.queue_wrt.get()
            with open('Proxy_GOOD.txt', 'a', encoding='utf-8') as wrt_file:
                wrt_file.write(proxy + '\n')
                print('Good Proxy: {} / {}'.format(proxy, str(self.queue_read.qsize())))
            self.queue_wrt.task_done()
 
 
 
if __name__ == '__main__':
    p = Proxy_check()
    url = 'https://www.yandex.ru/' # Например яндекс чекаем
    p.read_file()
    for i in range(500):
        i = threading.Thread(target=p.get_page, args={url})
        i.start()
    p.queue_parse.join()
    p.queue_wrt.join()
    p.queue_read.join()
    print('ГОТОВО')
Добавлено через 10 часов 44 минуты
Хмм...добавил в класс функцию main(), в которую и засунул создание потоков и ожидание завершения очередей. Скрипт стал завершаться. Но ошибка все равно не до конца понятна. Если кто-то сможет объяснить на пальцах, то буду очень благодарен
0
Programming
Эксперт
94731 / 64177 / 26122
Регистрация: 12.04.2006
Сообщений: 116,782
09.07.2017, 12:59
Ответы с готовыми решениями:

Работа Threading Queue
Изучаю потоки python по данному примеру. # -*- coding: utf-8 -*- import os import threading...

main.cpp:(.text+0x1f4): undefined reference to `Queue<int>::~Queue()
Выдаёт ошибку о проблеме деструктора. Пытался саму функцию destroy вставить в деструктор, но то же...

Разница между queue.synchronized и concurrent queue
По сути 2 коллекции потокобезопасные, что лучше использовать? Queue que =...

Разница между finalization queue и freachable queue
Всем привет, все как-то никак не могу понять, в чем разница

3
394 / 122 / 48
Регистрация: 26.10.2013
Сообщений: 734
10.07.2017, 18:44 2
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor as Pool
import requests
 
THREADS_COUNT = 100
INPUT_FILE = 'Proxy.txt'
PROXY_TYPE = 'http'
PROXY_TYPE_SEP = '://'
CONFIG = {'timeout': 10, 'headers': {'User-Agent': 'bot'}, 'url': 'https://www.yandex.ru/'}
 
def prepare_requests(config):
    def wrapper(proxy):
        try:
            proxy = PROXY_TYPE_SEP.join((PROXY_TYPE, proxy))
            proxy = {'http': proxy, 'https': proxy}
            r = requests.get(proxies=proxy, **config)
            r.raise_for_status()
        except Exception as e:
            print('Error with message:', e)
            return False
        else:
            return True
    return wrapper
 
 
if __name__ == '__main__':
    session = prepare_requests(CONFIG)
    with open(INPUT_FILE, 'r') as f:
        lines = (x.strip() for x in f.readlines() if x.strip())
    
    with Pool(THREADS_COUNT) as pool:
        outlineы = tuple((x for x in pool.map(session, lines) if x))
    
    with open('good.txt', 'r') as f:
        for line in outlines:
            f.write(line)
            f.write('\n')
        f.flush()
 
    print('ГОТОВО')
Ну вот как то так.... Разберешься?
1
Эксперт Python
295 / 108 / 57
Регистрация: 07.12.2016
Сообщений: 209
10.07.2017, 20:08  [ТС] 3
Wi0M, спасибо. Сейчас буду разбираться. Но если что, обращусь

Добавлено через 18 минут
Wi0M, А здесь можно сделать параллельную запись в файл? Я у себя так делал. Хотя может из за этого у меня блочится по join() очередь
0
394 / 122 / 48
Регистрация: 26.10.2013
Сообщений: 734
10.07.2017, 20:17 4
можно. если сделаешь метод для записи и через map будешь писать порциями.

Добавлено через 2 минуты
ой. я там ошибку допустил. ну ничего. найдешь исправишь. за одно и разберешься как ThreadPoolExecutor работает. есть кстати еще map_acync в который можно передать callback, который и будет писать в файл.
1
10.07.2017, 20:17
IT_Exp
Эксперт
87844 / 49110 / 22898
Регистрация: 17.06.2006
Сообщений: 92,604
10.07.2017, 20:17
Помогаю со студенческими работами здесь

Переполнение Queue, методы оптимизации Queue
Доброго времени суток. Я тут планирую пенгатон взять под контроль.:- StreamReader...

std threading library взамен boost threading library
изучаю распараллеливание на примере &quot;примера&quot; с http://www.linux.org.ru/forum/development/4152264 ,...

Queue vs Queue.Synchronized vs ConcurrentQueue
Ситуация такова, что один поток постоянно толкает в очередь объекты (Enqueue), а второй забирает...

Error C2664: Client::First: невозможно преобразовать параметр 1 из "std::queue<_Ty>" в "std::queue<_Ty> &"
barbershop.h: #ifndef __BARBERSHOP_H__ #define __BARBERSHOP_H__ #include &lt;vector&gt; #include...

Multiprocessing Queue vs multiprocessing.Manager Queue
Если кому интересно: обычная Queue из мультитпроцессинга имеет ограничение по размеру, если туда...

Threading и selenium
Приветствую форумчан, никак не могу понять, в чем проблема. Собственно, вот мой код: import...

PyQt + Threading
Добрый день, и в гуи и в трединге я нуб, с инета скопипастил и модифицировал тестовый пример,...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
4
Ответ Создать тему
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2024, CyberForum.ru