Python Quart(Flask) server + Rpi GPIO
23.06.2019, 19:58. Показов 1365. Ответов 0
Всем привет, разрабатываю веб-приложение на базе Quart на python3.7, встала проблема с асинхронными процессами. Суть задачи в том, чтобы при введённой пользователем температуры в форму HTML, запускался параллельный процесс, который бы подавал на ножку Rpi GPIO 1 или 0 в зависимости от разности текущей и введённой температур. В коде эта функция уже частично написана (On() - функция). Как создать параллельный процесс?
| Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
| from quart import Quart,request, render_template, redirect, url_for
import asyncio
import RPi.GPIO as GPIO
import time
import sqlite3 as lite
import sys
#******************** установка метрики ножек RPi
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
#******************** класс датчика температуры
class DS1620:
# ***************************************************************************************************
# Настройка пинов, к которым подключён датчик DS1620. *
# ***************************************************************************************************
def __init__(self, rst, dq, clk):
""" DS1620 sensor constructor.
Keyword arguments:
rst -- Integer corresponding to the RST GPIO Pin.
dq -- Integer DAT/DQ GPIO Pin.
clk -- Integer CLK GPIO Pin.
"""
GPIO.setmode(GPIO.BCM) # Set the mode to get pins by GPIO #
self._rst = rst
self._dq = dq
self._clk = clk
# ***************************************************************************************************
# Отправка команды датчику. *
# ***************************************************************************************************
def __send_command(self, command):
""" Sends an 8-bit command to the DS1620 """
for n in range(0, 8):
bit = ((command >> n) & (0x01))
GPIO.output(self._dq, GPIO.HIGH if (bit == 1) else GPIO.LOW)
GPIO.output(self._clk, GPIO.LOW)
GPIO.output(self._clk, GPIO.HIGH)
# ***************************************************************************************************
# Чтение ответа с датчика. *
# ***************************************************************************************************
def __read_data(self):
""" Read 8 bit data from the DS1620 """
raw_data = 0 # Go into input mode.
for n in range(0, 9):
GPIO.output(self._clk, GPIO.LOW)
GPIO.setup(self._dq, GPIO.IN)
if GPIO.input(self._dq) == GPIO.HIGH:
bit = 1
else:
bit = 0
GPIO.setup(self._dq, GPIO.OUT)
GPIO.output(self._clk, GPIO.HIGH)
raw_data = raw_data | (bit << n)
return raw_data
# ***************************************************************************************************
# Чтение текущей температуры с датчика. *
# ***************************************************************************************************
def get_temperature(self):
""" Send the commands to retrieve the temperature in Celsuis """
# Prepare the pins for output.
GPIO.setup(self._rst, GPIO.OUT)
GPIO.setup(self._dq, GPIO.OUT)
GPIO.setup(self._clk, GPIO.OUT)
GPIO.output(self._rst, GPIO.LOW)
GPIO.output(self._clk, GPIO.HIGH)
GPIO.output(self._rst, GPIO.HIGH)
self.__send_command(0x0c) # Write config command.
self.__send_command(0x02) # CPU Mode.
GPIO.output(self._rst, GPIO.LOW)
time.sleep(0.2) # Wait until the config register is written.
GPIO.output(self._clk, GPIO.HIGH)
GPIO.output(self._rst, GPIO.HIGH)
self.__send_command(0xEE) # Start conversion.
GPIO.output(self._rst, GPIO.LOW)
time.sleep(0.2)
GPIO.output(self._clk, GPIO.HIGH)
GPIO.output(self._rst, GPIO.HIGH)
self.__send_command(0xAA)
raw_data = self.__read_data()
GPIO.output(self._rst, GPIO.LOW)
return raw_data / 2.0
#******************** Инициализация пинов датчика
t_sensor = DS1620(23, 18, 24)
control = 17
#******************** вкл чайника
async def On():
t=t_sensor.get_temperature()
a = read_db1()
while t <= a:
GPIO.setup(control, GPIO.OUT, initial=GPIO.HIGH)
GPIO.setup(control, GPIO.OUT, initial=GPIO.LOW)\
#******************** Database
def init_db(b):
db = lite.connect('test.db')
cursor = db.cursor()
cursor.execute("DROP TABLE IF EXISTS TEMP")
cursor.execute("CREATE table TEMP (Temp_id text,Temp_val int)")
cursor.execute("INSERT INTO TEMP VALUES (?,?)",(1,t_sensor.get_temperature()))
db.commit()
cursor.execute("INSERT INTO TEMP VALUES (?,?)",(2,b))
db.commit()
db.close()
def read_db1():
db = lite.connect("test.db")
cursor = db.cursor()
cursor.execute("SELECT Temp_val FROM TEMP WHERE Temp_id = '2'")
temp1=cursor.fetchone()
db.close()
return temp1[0]
def read_db():
db = lite.connect("test.db")
cursor = db.cursor()
cursor.execute("SELECT Temp_val FROM TEMP WHERE Temp_id = '1'")
temp=cursor.fetchone()
db.close()
return temp[0]
#******************** Инициализация приложения
app = Quart(__name__)
init_db(100)
#******************** Метод-ссылка запроса
@app.route('/', methods = ['POST'])
async def create():
form = (await request.form)
init_db(form['temp1'])
return redirect(url_for('index'))
#******************** Метод-ссылка ответа
@app.route('/', methods = ['GET'])
async def index():
temp1 = read_db1()
init_db(temp1)
temp=read_db()
return await render_template('main.html', temp=temp, temp1=temp1)
#******************** Запуск
app.run() |
|
Это шаблон самой страницы.
| HTML5 | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| <!DOCTYPE html>
<html lang="rus">
<head><title>Temperature of teapot</title></head>
<body>
<center>
<h1>
Temp now: {{temp}}
</h1>
<h1>
Temp last: {{temp1}}
</h1>
<form action="{{url_for('create')}}" method="post">
<label for="temp1">Temp at last moment:</label>
<input name="temp1" id="temp1" placeholder="Temp of teapot">
<input type="submit" values="Send">
</form>
</center>
</body>
</html> |
|
Добавлено через 5 часов 13 минут
Решил проблему: | Python | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
| from quart import Quart,request, render_template, redirect, url_for, jsonify
import asyncio
import RPi.GPIO as GPIO
import time
import sqlite3 as lite
import sys
#******************** установка метрики ножек RPi
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
#******************** класс датчика температуры
class DS1620:
# ***************************************************************************************************
# Настройка пинов, к которым подключён датчик DS1620. *
# ***************************************************************************************************
def __init__(self, rst, dq, clk):
""" DS1620 sensor constructor.
Keyword arguments:
rst -- Integer corresponding to the RST GPIO Pin.
dq -- Integer DAT/DQ GPIO Pin.
clk -- Integer CLK GPIO Pin.
"""
GPIO.setmode(GPIO.BCM) # Set the mode to get pins by GPIO #
self._rst = rst
self._dq = dq
self._clk = clk
# ***************************************************************************************************
# Отправка команды датчику. *
# ***************************************************************************************************
def __send_command(self, command):
""" Sends an 8-bit command to the DS1620 """
for n in range(0, 8):
bit = ((command >> n) & (0x01))
GPIO.output(self._dq, GPIO.HIGH if (bit == 1) else GPIO.LOW)
GPIO.output(self._clk, GPIO.LOW)
GPIO.output(self._clk, GPIO.HIGH)
# ***************************************************************************************************
# Чтение ответа с датчика. *
# ***************************************************************************************************
def __read_data(self):
""" Read 8 bit data from the DS1620 """
raw_data = 0 # Go into input mode.
for n in range(0, 9):
GPIO.output(self._clk, GPIO.LOW)
GPIO.setup(self._dq, GPIO.IN)
if GPIO.input(self._dq) == GPIO.HIGH:
bit = 1
else:
bit = 0
GPIO.setup(self._dq, GPIO.OUT)
GPIO.output(self._clk, GPIO.HIGH)
raw_data = raw_data | (bit << n)
return raw_data
# ***************************************************************************************************
# Чтение текущей температуры с датчика. *
# ***************************************************************************************************
def get_temperature(self):
""" Send the commands to retrieve the temperature in Celsuis """
# Prepare the pins for output.
GPIO.setup(self._rst, GPIO.OUT)
GPIO.setup(self._dq, GPIO.OUT)
GPIO.setup(self._clk, GPIO.OUT)
GPIO.output(self._rst, GPIO.LOW)
GPIO.output(self._clk, GPIO.HIGH)
GPIO.output(self._rst, GPIO.HIGH)
self.__send_command(0x0c) # Write config command.
self.__send_command(0x02) # CPU Mode.
GPIO.output(self._rst, GPIO.LOW)
time.sleep(0.2) # Wait until the config register is written.
GPIO.output(self._clk, GPIO.HIGH)
GPIO.output(self._rst, GPIO.HIGH)
self.__send_command(0xEE) # Start conversion.
GPIO.output(self._rst, GPIO.LOW)
time.sleep(0.2)
GPIO.output(self._clk, GPIO.HIGH)
GPIO.output(self._rst, GPIO.HIGH)
self.__send_command(0xAA)
raw_data = self.__read_data()
GPIO.output(self._rst, GPIO.LOW)
return raw_data / 2.0
#******************** инициализация пинов датчика
t_sensor = DS1620(23, 18, 24)
control = 17
#******************** вкл чайника
async def On():
t=t_sensor.get_temperature()
a = read_db1()
if t<a:
GPIO.setup(control, GPIO.OUT, initial=GPIO.HIGH)
return 1
else:
GPIO.setup(control, GPIO.OUT, initial=GPIO.LOW)
return 0
#******************** Database
def init_db(b):
db = lite.connect('test.db')
cursor = db.cursor()
cursor.execute("DROP TABLE IF EXISTS TEMP")
cursor.execute("CREATE table TEMP (Temp_id text,Temp_val int)")
cursor.execute("INSERT INTO TEMP VALUES (?,?)",(1,t_sensor.get_temperature()))
db.commit()
cursor.execute("INSERT INTO TEMP VALUES (?,?)",(2,b))
db.commit()
db.close()
def read_db1():
db = lite.connect("test.db")
cursor = db.cursor()
cursor.execute("SELECT Temp_val FROM TEMP WHERE Temp_id = '2'")
temp1=cursor.fetchone()
db.close()
return temp1[0]
def read_db():
db = lite.connect("test.db")
cursor = db.cursor()
cursor.execute("SELECT Temp_val FROM TEMP WHERE Temp_id = '1'")
temp=cursor.fetchone()
db.close()
return temp[0]
#******************** инициализация приложения
app = Quart(__name__)
init_db(100)
#******************** Метод-ссылка
@app.route('/', methods = ['POST'])
async def create():
form = (await request.form)
if int(form['temp1'])>=20:
if int(form['temp1'])<=100:
init_db(form['temp1'])
else:
init_db(100)
else:
init_db(100)
return redirect(url_for('index'))
#******************** Метод-ссылка
@app.route('/')
async def index():
return """
<!DOCTYPE html>
<html lang="ru">
<head>
<meta charset="utf-8">
<title>Temperature of teapot</title>
<link rel="stylesheet" href="//code.jquery.com/ui/1.12.1/themes/base/jquery-ui.css">
<link rel="stylesheet" href="/resources/demos/style.css">
<script src="https://code.jquery.com/jquery-1.12.4.js"></script>
<script src="https://code.jquery.com/ui/1.12.1./jquery-ui.js"></script>
<script>
var state;
function checkStatus()
{
$.getJSON('""" + url_for('check_status') + """',function (data)
{
console.log(data);
state = parseFloat(data.state);
temp = parseFloat(data.temp);
temp1 = parseFloat(data.temp1);
update_text(state);
update_text(temp);
});
setTimeout(checkStatus,1000)
}
function update_text(val)
{
document.getElementById("temp").innerHTML = " <center><h2>"+temp+"℃</h2></center>";
document.getElementById("temp1").innerHTML = " <center><h2>"+temp1+"℃</h2></center>";
if (val != 1)
{
document.getElementById("state").innerHTML = " <center><h2>"+state+"</h2></center>";
}
else
{
document.getElementById("state").innerHTML = " <center><h2>"+state+"</h2></center>";
}
}
checkStatus();
</script>
</head?
<body>
<body>
<center>
<h1>
Temperature of teapot now:
</h1>
<div id="temp" name="temp"></div>
<h1>
Temperature of teapot at last moment:</h1>
<div id="temp1" name="temp1"></div>
<h1>
State:
</h1>
<div id="state" name="state"></div>
<form action=' """+url_for('create') +"""' method="post">
<label for="temp1">Temperature of teapot at you moment:</label>
<input name="temp1" id="temp1" placeholder="Temp. of teapot" >
<input type="submit" valuse="Send">
</form>
</center>
</body>
</html>
"""
#******************** Метод-ссылка
@app.route('/check_status/')
async def check_status():
status=dict()
try:
status['temp1']=read_db1()
status['temp']=t_sensor.get_temperature()
if await On()==0:
status['state']=0
else:
status['state']=1
except:
pass
print(status)
return jsonify(status)
#******************** Запуск
if __name__=="__main__":
app.run() |
|
0
|