Как корректно вызвать деструктор в демоне
05.02.2019, 14:51. Показов 1978. Ответов 0
Доброго времени суток, коллеги. Делаю демона, который "слушает" RabbitMQ и ждет сообщения, как только получает сообщение, делает довольно продолжительную работу. Подскажите как корректно вызвать завершение процесса "прослушки" connection.close() + дождаться пока все треды доработают
Процесс породивший демона волшебным os.fork(), принят вызов SIGTERM должен корректно все завершить, но он не имеет self._demon.connection.close()
Закомменченный код в SigFunctions -> SIGTERM завершается с ошибкой
| Python | 1
2
| self.__demon.channel.stop_consuming()
AttributeError: Demon instance has no attribute 'channel' |
|
собственно код :
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
| #!/usr/bin/env python
import sys
import time
import signal
import syslog
import demon
import pika
import threading
import functools
import atexit
from pika import exceptions
QUEUE_NAME = 'queue_name'
RABBITMQ_HOST = '10.128.152.30'
RABBITMQ_PORT = 5672
HEARTBEAT_INTERVAL = 600
BLOCKED_CONNECTION_TIMEOUT = 300
PID_FILE_PATH = "/var/run/security_agent_manager_demon.pid"
HELP = """Automation has be applied to distribution sistem
feeder for a long time, a specially as related to protection
and the restoration of some parts of the feeder."""
class SigFunctions:
def __init__(self, daemon):
self._demon = daemon
def SIGTERM(self):
sys.stderr.write("Goodbye!\n")
# self._demon.channel.stop_consuming()
# self._demon.connection.close()
# self._demon.delpid()
# # Wait for all threads to complete
# for thread in self._demon.threads:
# thread.join()
sys.exit(0)
class ReactFunction:
def __init__(self, daemon):
self.__demon = daemon
def start(self):
self.__demon.start()
def stop(self):
# self.__demon.channel.stop_consuming()
# self.__demon.connection.close()
# # Wait for all threads to complete
# for thread in self.__demon.threads:
# thread.join()
self.__demon.stop()
def restart(self):
# self.__demon.channel.stop_consuming()
# self.__demon.connection.close()
# # Wait for all threads to complete
# for thread in self.__demon.threads:
# thread.join()
self.__demon.restart()
def stmess(self, message):
print message
self.__demon.start()
class DaemonConfigurator:
def __init__(self, daemon):
self.demon = daemon
def get_signals_dor_demon(self):
localCon = SigFunctions(self.demon)
sig_dict = {}
for sig in dir(localCon):
if sig[0:1] != '_':
sig_dict[getattr(signal, sig)] = getattr(localCon, sig)
return sig_dict
def get_reacts_for_demon(self):
localCon = ReactFunction(self.demon)
react_dict = {}
for react in dir(localCon):
if react[0:1] != '_':
react_dict[react] = getattr(localCon, react)
return react_dict
class AgentDemon(demon.BaseDemon):
def close_connect(self):
self.channel.stop_consuming()
self.connection.close()
# Wait for all threads to complete
for thread in self.threads:
thread.join()
def run(self):
atexit.register(self.close_connect)
while (True):
try:
self.connect()
self.threads = []
on_message_callback = functools.partial(self.on_message)
self.channel.basic_consume(on_message_callback,
queue=self.queue_name)
except exceptions.ConnectionClosed:
# Wait for all threads to complete
for thread in self.threads:
thread.join()
# write log here
def connect(self):
"""
NOTE: prefetch is set to 1 here for test to keep the number of threads created
to a reasonable amount. We can to test with different prefetch values
to find which one provides the best performance and usability for your solution
:return: None (void)
"""
self.params = pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
credentials=pika.credentials.PlainCredentials(self.principal, self.token),
heartbeat_interval=HEARTBEAT_INTERVAL,
blocked_connection_timeout=BLOCKED_CONNECTION_TIMEOUT,
)
self.connection = pika.BlockingConnection(
parameters=self.params,
)
self.channel = self.connection.channel()
self.queue = self.channel.queue_declare(
queue=self.queue_name,
durable=True,
exclusive=False,
auto_delete=False,
)
self.channel.basic_qos(prefetch_count=1)
def ack_message(self):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if self.channel.is_open:
self.channel.basic_ack(self.delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def do_work(self, body):
# thread_id = threading.get_ident()
# fmt1 = 'Thread id: {} Delivery tag: {} Message body: {}'
# LOGGER.info(fmt1.format(thread_id, delivery_tag, body))
# Sleeping to simulate 10 seconds of work (we need to code work here)
time.sleep(10)
callback = functools.partial(self.ack_message)
self.connection.add_callback_threadsafe(callback)
def on_message(self, method_frame, header_frame, body):
self.delivery_tag = method_frame.delivery_tag
thr = threading.Thread(target=self.do_work, args=(body))
thr.start()
self.threads.append(thr)
@property
def queue_name(self):
return QUEUE_NAME
@property
def principal(self):
return 'guest'
@property
def token(self):
return 'guest'
if __name__ == "__main__":
daemon = AgentDemon(PID_FILE_PATH)
config = DaemonConfigurator(daemon)
sig_dict = config.get_signals_dor_demon()
daemon.metaInit(sig_dict)
react_dict = config.get_reacts_for_demon()
if len(sys.argv) > 1:
if sys.argv[1] in react_dict.keys():
try:
react_dict[sys.argv[1]](*sys.argv[2:len(sys.argv)])
sys.exit(0)
except TypeError, error:
print error
print HELP
sys.exit(2)
else:
print "usage: %s %s" % (sys.argv[0], react_dict)
sys.exit(2) |
|
и сам класс реализующий демонизацию demon.py:
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
| #!/usr/bin/env python
import sys
import os
import time
import atexit
import signal
class SignalHandler:
SIGNALS = ()
def register(self, signum, callback):
self.SIGNALS += (SigAction(signum, callback),)
def getActions(self):
return self.SIGNALS
def handler(self, signum, frame):
assert 0, "You must define a handler(signum, frame) method in %s" % (self)
def __repr__(self):
return "<Class:%s>" % (self.__class__.__name__)
class SigAction(SignalHandler):
def __init__(self, signum, callback):
self.signum = signum
self.callback = callback
signal.signal(self.signum, self.handler)
def handler(self, signum, frame):
self.callback()
def __repr__(self):
return "<Class:%s signal:%s>" % (self.__class__.__name__, self.signum)
class BaseDemon:
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.pidfile = pidfile
def metaInit(self, sig_dict):
self.sigDict = sig_dict
def daemonize(self):
"""
do the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177)
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
"""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, 'r')
so = file(self.stdout, 'a+')
se = file(self.stderr, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile, 'w+').write("%s\n" % pid)
def delpid(self):
os.remove(self.pidfile)
def signalAssign(self):
assignee = SignalHandler()
for i in iter(self.sigDict):
assignee.register(i, self.sigDict[i])
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
self.signalAssign()
self.run()
sigDict = {}
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = file(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print str(err)
sys.exit(1)
def restart(self):
"""
Restart the daemon
"""
self.stop()
self.start()
def run(self):
print "dummy" |
|
Буду рад любой помощи и рекомендациям по коду. Спасибо за потраченное время!
0
|