Форум программистов, компьютерный форум, киберфорум
Python
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  
 
 Аватар для TimeTwo
102 / 95 / 104
Регистрация: 29.11.2009
Сообщений: 407

Как корректно вызвать деструктор в демоне

05.02.2019, 14:51. Показов 1978. Ответов 0

Студворк — интернет-сервис помощи студентам
Доброго времени суток, коллеги. Делаю демона, который "слушает" RabbitMQ и ждет сообщения, как только получает сообщение, делает довольно продолжительную работу. Подскажите как корректно вызвать завершение процесса "прослушки" connection.close() + дождаться пока все треды доработают

Процесс породивший демона волшебным os.fork(), принят вызов SIGTERM должен корректно все завершить, но он не имеет self._demon.connection.close()

Закомменченный код в SigFunctions -> SIGTERM завершается с ошибкой
Python
1
2
self.__demon.channel.stop_consuming()
AttributeError: Demon instance has no attribute 'channel'

собственно код :

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#!/usr/bin/env python
import sys
import time
import signal
import syslog
import demon
import pika
import threading
import functools
import atexit
from pika import exceptions
 
 
QUEUE_NAME = 'queue_name'
RABBITMQ_HOST = '10.128.152.30'
RABBITMQ_PORT = 5672
HEARTBEAT_INTERVAL = 600
BLOCKED_CONNECTION_TIMEOUT = 300
PID_FILE_PATH = "/var/run/security_agent_manager_demon.pid"
HELP = """Automation has be applied to distribution sistem 
feeder for a long time, a specially as related to protection 
and the restoration of some parts of the feeder."""
 
 
class SigFunctions:
 
    def __init__(self, daemon):
        self._demon = daemon
 
    def SIGTERM(self):
        sys.stderr.write("Goodbye!\n")
        # self._demon.channel.stop_consuming()
        # self._demon.connection.close()
        # self._demon.delpid()
        # # Wait for all threads to complete
        # for thread in self._demon.threads:
        #     thread.join()
        sys.exit(0)
 
 
class ReactFunction:
 
    def __init__(self, daemon):
        self.__demon = daemon
 
    def start(self):
        self.__demon.start()
 
    def stop(self):
        # self.__demon.channel.stop_consuming()
        # self.__demon.connection.close()
        # # Wait for all threads to complete
        # for thread in self.__demon.threads:
        #     thread.join()
        self.__demon.stop()
 
    def restart(self):
        # self.__demon.channel.stop_consuming()
        # self.__demon.connection.close()
        # # Wait for all threads to complete
        # for thread in self.__demon.threads:
        #     thread.join()
        self.__demon.restart()
 
    def stmess(self, message):
        print message
        self.__demon.start()
 
 
class DaemonConfigurator:
 
    def __init__(self, daemon):
        self.demon = daemon
 
    def get_signals_dor_demon(self):
        localCon = SigFunctions(self.demon)
        sig_dict = {}
        for sig in dir(localCon):
            if sig[0:1] != '_':
                sig_dict[getattr(signal, sig)] = getattr(localCon, sig)
        return sig_dict
 
    def get_reacts_for_demon(self):
        localCon = ReactFunction(self.demon)
        react_dict = {}
        for react in dir(localCon):
            if react[0:1] != '_':
                react_dict[react] = getattr(localCon, react)
        return react_dict
 
 
 
class AgentDemon(demon.BaseDemon):
 
    def close_connect(self):
        self.channel.stop_consuming()
        self.connection.close()
        # Wait for all threads to complete
        for thread in self.threads:
            thread.join()
 
    def run(self):
        atexit.register(self.close_connect)
        while (True):
            try:
                self.connect()
                self.threads = []
                on_message_callback = functools.partial(self.on_message)
                self.channel.basic_consume(on_message_callback,
                                           queue=self.queue_name)
            except exceptions.ConnectionClosed:
                # Wait for all threads to complete
                for thread in self.threads:
                    thread.join()
                # write log here
 
 
 
    def connect(self):
        """
        NOTE: prefetch is set to 1 here for test to keep the number of threads created
        to a reasonable amount. We can to test with different prefetch values
        to find which one provides the best performance and usability for your solution
 
        :return: None (void)
        """
        self.params = pika.ConnectionParameters(
            host=RABBITMQ_HOST,
            port=RABBITMQ_PORT,
            credentials=pika.credentials.PlainCredentials(self.principal, self.token),
            heartbeat_interval=HEARTBEAT_INTERVAL,
            blocked_connection_timeout=BLOCKED_CONNECTION_TIMEOUT,
        )
        self.connection = pika.BlockingConnection(
            parameters=self.params,
        )
        self.channel = self.connection.channel()
        self.queue = self.channel.queue_declare(
            queue=self.queue_name,
            durable=True,
            exclusive=False,
            auto_delete=False,
        )
        self.channel.basic_qos(prefetch_count=1)
 
    def ack_message(self):
        """Note that `channel` must be the same pika channel instance via which
        the message being ACKed was retrieved (AMQP protocol constraint).
        """
        if self.channel.is_open:
            self.channel.basic_ack(self.delivery_tag)
        else:
            # Channel is already closed, so we can't ACK this message;
            # log and/or do something that makes sense for your app in this case.
            pass
 
    def do_work(self, body):
        # thread_id = threading.get_ident()
        # fmt1 = 'Thread id: {} Delivery tag: {} Message body: {}'
        # LOGGER.info(fmt1.format(thread_id, delivery_tag, body))
        # Sleeping to simulate 10 seconds of work (we need to code work here)
        time.sleep(10)
        callback = functools.partial(self.ack_message)
        self.connection.add_callback_threadsafe(callback)
 
    def on_message(self, method_frame, header_frame, body):
        self.delivery_tag = method_frame.delivery_tag
        thr = threading.Thread(target=self.do_work, args=(body))
        thr.start()
        self.threads.append(thr)
 
    @property
    def queue_name(self):
        return QUEUE_NAME
 
    @property
    def principal(self):
        return 'guest'
 
    @property
    def token(self):
        return 'guest'
 
 
 
 
 
if __name__ == "__main__":
    daemon = AgentDemon(PID_FILE_PATH)
    config = DaemonConfigurator(daemon)
    sig_dict = config.get_signals_dor_demon()
    daemon.metaInit(sig_dict)
    react_dict = config.get_reacts_for_demon()
 
    if len(sys.argv) > 1:
        if sys.argv[1] in react_dict.keys():
            try:
                react_dict[sys.argv[1]](*sys.argv[2:len(sys.argv)])
                sys.exit(0)
            except TypeError, error:
                print error
                print HELP
                sys.exit(2)
        else:
            print "usage: %s %s" % (sys.argv[0], react_dict)
            sys.exit(2)
и сам класс реализующий демонизацию demon.py:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python
import sys
import os
import time
import atexit
import signal
 
 
class SignalHandler:
    SIGNALS = ()
 
    def register(self, signum, callback):
        self.SIGNALS += (SigAction(signum, callback),)
 
    def getActions(self):
        return self.SIGNALS
 
    def handler(self, signum, frame):
        assert 0, "You must define a handler(signum, frame) method in %s" % (self)
 
    def __repr__(self):
        return "<Class:%s>" % (self.__class__.__name__)
 
 
class SigAction(SignalHandler):
 
    def __init__(self, signum, callback):
        self.signum = signum
        self.callback = callback
        signal.signal(self.signum, self.handler)
 
    def handler(self, signum, frame):
        self.callback()
 
    def __repr__(self):
        return "<Class:%s signal:%s>" % (self.__class__.__name__, self.signum)
 
 
class BaseDemon:
 
    def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr
        self.pidfile = pidfile
 
    def metaInit(self, sig_dict):
        self.sigDict = sig_dict
 
    def daemonize(self):
        """
        do the UNIX double-fork magic, see Stevens' "Advanced
        Programming in the UNIX Environment" for details (ISBN 0201563177)
        http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
        """
        try:
            pid = os.fork()
            if pid > 0:
                # exit first parent
                sys.exit(0)
        except OSError, e:
            sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)
 
        # decouple from parent environment
        os.chdir("/")
        os.setsid()
        os.umask(0)
 
        # do second fork
        try:
            pid = os.fork()
            if pid > 0:
                # exit from second parent
                sys.exit(0)
        except OSError, e:
            sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)
 
            # redirect standard file descriptors
        sys.stdout.flush()
        sys.stderr.flush()
        si = file(self.stdin, 'r')
        so = file(self.stdout, 'a+')
        se = file(self.stderr, 'a+', 0)
        os.dup2(si.fileno(), sys.stdin.fileno())
        os.dup2(so.fileno(), sys.stdout.fileno())
        os.dup2(se.fileno(), sys.stderr.fileno())
 
        # write pidfile
        atexit.register(self.delpid)
        pid = str(os.getpid())
        file(self.pidfile, 'w+').write("%s\n" % pid)
 
    def delpid(self):
        os.remove(self.pidfile)
 
    def signalAssign(self):
        assignee = SignalHandler()
        for i in iter(self.sigDict):
            assignee.register(i, self.sigDict[i])
 
    def start(self):
        """
        Start the daemon
        """
        # Check for a pidfile to see if the daemon already runs
        try:
            pf = file(self.pidfile, 'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None
 
        if pid:
            message = "pidfile %s already exist. Daemon already running?\n"
            sys.stderr.write(message % self.pidfile)
            sys.exit(1)
 
        # Start the daemon
        self.daemonize()
        self.signalAssign()
        self.run()
 
    sigDict = {}
 
    def stop(self):
        """
        Stop the daemon
        """
        # Get the pid from the pidfile
        try:
            pf = file(self.pidfile, 'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None
 
        if not pid:
            message = "pidfile %s does not exist. Daemon not running?\n"
            sys.stderr.write(message % self.pidfile)
            return  # not an error in a restart
 
        # Try killing the daemon process
        try:
            while 1:
                os.kill(pid, signal.SIGTERM)
                time.sleep(0.1)
        except OSError, err:
            err = str(err)
            if err.find("No such process") > 0:
                if os.path.exists(self.pidfile):
                    os.remove(self.pidfile)
            else:
                print str(err)
                sys.exit(1)
 
    def restart(self):
        """
        Restart the daemon
        """
        self.stop()
        self.start()
 
    def run(self):
        print "dummy"

Буду рад любой помощи и рекомендациям по коду. Спасибо за потраченное время!
0
IT_Exp
Эксперт
34794 / 4073 / 2104
Регистрация: 17.06.2006
Сообщений: 32,602
Блог
05.02.2019, 14:51
Ответы с готовыми решениями:

Наследование: как вызвать деструктор родителя?
набросал такой код: class a(): def __del__(self): print u'del-a' # class b(a): def __del__(self): ...

Как вызвать деструктор для обобщенного типа
Здравствуйте. У меня проблема с вызовом деструктора для обобщенного типа. Такой код: class MyClass&lt;TData&gt; { ...

Stl map: Как вызвать деструктор мапа
1) как вызвать деструктор мапа?)) 2) если мап хранит класс он вызовит его деструктор при своём уничтожении?

0
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
BasicMan
Эксперт
29316 / 5623 / 2384
Регистрация: 17.02.2009
Сообщений: 30,364
Блог
05.02.2019, 14:51
Помогаю со студенческими работами здесь

Почему не вызывается деструктор, а точнее как его вызвать?
#include &lt;iostream&gt; using namespace std; class MyClass { private: int n; int* fibs; public: MyClass(int n) { ...

Как корректно вызвать процедуру
вечер добрый из одного массива формирую два массива (критерий четный и нечетный индекс) + суммирую сумму двух новых массивов вот...

Как корректно вызвать EnumWindows в классе?
собственно, сам вопрос уже описан в теме, но вот к примеру, есть у меня заголовочный файл в котором я описал класс ...

Как корректно вызвать ModelView propertychanged из команды?
ситуация: есть коллекция объектов, отображаемых на форме в DataGrid есть команда, которая эту коллекцию объектов каким-либо образом...

Как корректно вызвать свойство какого-либо объекта?
Вот код: public static Food apple = new Food(&quot;Яблоко&quot;, 25, &quot;Обыкновенное спелое яблоко&quot;); BackpackLObjects.Add(apple); ...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
1
Ответ Создать тему
Новые блоги и статьи
Уведомление о неверно выбранном значении справочника
Maks 06.04.2026
Алгоритм из решения ниже реализован на примере нетипового документа "НарядПутевка", разработанного в конфигурации КА2. Задача: уведомлять пользователя, если в документе выбран неверный склад. . .
Установка Qt Creator для C и C++: ставим среду, CMake и MinGW без фреймворка Qt
8Observer8 05.04.2026
Среду разработки Qt Creator можно установить без фреймворка Qt. Есть отдельный репозиторий для этой среды: https:/ / github. com/ qt-creator/ qt-creator, где можно скачать установщик, на вкладке Releases:. . .
AkelPad-скрипты, структуры, и немного лирики..
testuser2 05.04.2026
Такая программа, как AkelPad существует уже давно, и также давно существуют скрипты под нее. Тем не менее, прога живет, периодически что-то не спеша дополняется, улучшается. Что меня в первую очередь. . .
Отображение реквизитов в документе по условию и контроль их заполнения
Maks 04.04.2026
Алгоритм из решения ниже реализован на примере нетипового документа "ПланированиеСпецтехники", разработанного в конфигурации КА2. Данный документ берёт данные из другого нетипового документа. . .
Фото всей Земли с борта корабля Orion миссии Artemis II
kumehtar 04.04.2026
Это первое подобное фото сделанное человеком за 50 лет. Снимок называют новым вариантом легендарной фотографии «The Blue Marble» 1972 года, сделанной с борта корабля «Аполлон-17». Новое фото. . .
Вывод диалогового окна перед закрытием, если документ не проведён
Maks 04.04.2026
Алгоритм из решения ниже реализован на примере нетипового документа "СписаниеМатериалов", разработанного в конфигурации КА2. Задача: реализовать программный контроль на предмет проведения документа. . .
Программный контроль заполнения реквизитов табличной части документа
Maks 02.04.2026
Алгоритм из решения ниже реализован на примере нетипового документа "СписаниеМатериалов", разработанного в конфигурации КА2. Задача: 1. Реализовать контроль заполнения реквизита. . .
wmic не является внутренней или внешней командой
Maks 02.04.2026
Решение: DISM / Online / Add-Capability / CapabilityName:WMIC~~~~ Отсюда: https:/ / winitpro. ru/ index. php/ 2025/ 02/ 14/ komanda-wmic-ne-naydena/
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2026, CyberForum.ru