1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
| import json
from pydantic import BaseModel, model_validator, model_serializer, Field
from pydantic._internal._model_construction import ModelMetaclass
import typing
from typing import List
import types
import inspect
import numpy as np
"""
Определяем расширение для типа BaseModel из библиотеки pydantic в виде класса-наследника ExtendedBaseModel(BaseModel),
которое позволяет определять в модели атрибуты нестандартных типов, в частности, типа numpy.array
и выполнять сериализацию и десериализацию объектов с этими атрибутами.
Для атрибутов типа numpy.array можно также указать дополнительные ограничения:
dimension_limits - определяется разрешенная размерность массива (количество измерений), а также верхний и нижний предел
для каждого измерения.
Задаётся в виде списка пар [<мин.значение>, <макс.значения>] для каждого измерения:
Например:
dimension_limits = [[3,3],[3,3]] - поле может быть только матрицей строгой размерности 3 на 3
dimension_limits = [[2,None]] - поле может быть одномерным массивом минимум из двух элементов
elem_type - задаётся ограничение на тип элементов массива, например:
elem_type = float
Если эти ограничения заданы для определённых полей - то при инициализации значения этих полей (типа numpy.array)
проверяются на соответcтвие этип ограничениям, и в случае нарушения генерируется ошибка
Если необходимо задать ограничения dimension_limits и/или elem_type, поле в классе должно описываться с помощью
конструкции Field, например:
class SomeClass(ExtendedBaseModel):
...
array_field: np.ndarray = Field(dimension_limits = [[3,3],[3,3]], elem_type = float)
"""
"""
Класс для хранения дополнительной информации о составе полей модели BaseModel в виде статических полей
ExtendedBaseModel будет содержать эти статические поля, т.к. от будет также и наследником ExtendedBaseModelInfo
через множественное наследование
"""
class ExtendedBaseModelInfo(object):
need_custom_json_serializing = False # Нужна ли кастомная сериализация (присутствуют ли поля нестандартных типов)
list_of_basemodel_fields = [] # Список полей модели, которые в свою очередь являются списками объектов типа BaseModel
dimension_limits_error = "'dimension_limits' must be a list of pairs [<minimum size>, <maximum size>] defining limitations for array dimensions, where <minimum size> and <maximum size> are integer value or None"
"""
Чтобы поля, определённые в ExtendedBaseModelInfo, инициализировались сразу при определении классов-наследников ExtendedBaseModel
- используем магию метакласса (https://habr.com/ru/articles/145835/)
Т.К. для BaseBodel в pydantic уже определён метакласс - наш иетакласс должен быть его наследником
"""
class ExtendedBaseModelMetaClass(ModelMetaclass):
def __new__(mcs, *args, **kwargs) -> type:
# Класс - наследник ExtendedBaseModel создаётся с помощью метода родительского метакласса ModelMetaclass
ebm_class = super(ExtendedBaseModelMetaClass, mcs).__new__(mcs, *args, **kwargs)
cls_name = args[0]
if (cls_name != "ExtendedBaseModel") and (len(ebm_class.model_fields) > 0):
# Перехватываем его после создания, когда в нём уже определены описания полей в списке model_fields
# Проходим по списку полей
for fln in ebm_class.model_fields:
if ebm_class.model_fields[fln].exclude != True:
# Определяем, не является ли поле списком объектов, являющихся наследниками BaseModel
list_of_basemodel = False
if isinstance(ebm_class.model_fields[fln].annotation, types.GenericAlias) or isinstance(
ebm_class.model_fields[fln].annotation, typing._GenericAlias):
""" Как работает магтческий метод __reduce__ - не вполне понятно, но в случае, если тип поля (в annotation)
- это types.GenericAlias или typing._GenericAlias, вызов __reduce__ от этого типа
позволяет получить информацию об основном типе (напр. list или List), и о типе элемента
"""
rdc = ebm_class.model_fields[fln].annotation.__reduce__()
if len(rdc) == 2:
if len(rdc[1]) == 2:
if (rdc[1][0] is list) or (rdc[1][0] is List):
if inspect.isclass(rdc[1][1]) == True:
if issubclass(rdc[1][1], BaseModel):
list_of_basemodel = True
if issubclass(rdc[1][1], ExtendedBaseModel):
ebm_class.need_custom_json_serializing = True
elif isinstance(rdc[1][1], tuple):
for te in rdc[1][1]:
if inspect.isclass(te) == True:
if issubclass(te, BaseModel):
list_of_basemodel = True
if issubclass(te, ExtendedBaseModel):
ebm_class.need_custom_json_serializing = True
break
if list_of_basemodel == True:
ebm_class.list_of_basemodel_fields.append(fln)
elif (ebm_class.model_fields[fln].annotation is np.ndarray) or (issubclass(ebm_class.model_fields[fln].annotation, ExtendedBaseModel)):
ebm_class.need_custom_json_serializing = True
# Контролируем правильность значений в "dimension_limits" и/или "elem_type", если они заданы при определении поля
if ebm_class.model_fields[fln].json_schema_extra != None:
if "dimension_limits" in ebm_class.model_fields[fln].json_schema_extra:
dls = ebm_class.model_fields[fln].json_schema_extra["dimension_limits"]
if isinstance(dls, list) == False:
raise ValueError("Class '"+cls_name+"', field '"+fln+"': "+dimension_limits_error)
if len(dls) < 1:
raise ValueError("Class '"+cls_name+"', field '"+fln+"': "+"'dimension_limits' list must contain at least one dimension limitation descriptor [<minimum size>, <maximum size>]")
for dl in dls:
if isinstance(dl, list) == False:
raise ValueError("Class '"+cls_name+"', field '"+fln+"': "+dimension_limits_error)
if len(dl) != 2:
raise ValueError("Class '"+cls_name+"', field '"+fln+"': "+dimension_limits_error)
if ((dl[0] != None) and (isinstance(dl[0], int) == False)) or (
(dl[1] != None) and (isinstance(dl[1], int) == False)):
raise ValueError("Class '"+cls_name+"', field '"+fln+"': "+dimension_limits_error)
if (dl[0] != None) and (dl[1] != None):
if dl[0] > dl[1]:
raise ValueError("Class '"+cls_name+"', field '"+fln+"': "+"'dimension_limits' list must contain limitation descriptors [<minimum size>, <maximum size>] where minimum size <= maximum size")
if "elem_type" in ebm_class.model_fields[fln].json_schema_extra:
if isinstance(ebm_class.model_fields[fln].json_schema_extra["elem_type"], type) == False:
raise ValueError("Class '"+cls_name+"', field '"+fln+"': 'elem_type' must be a type")
return ebm_class
class ExtendedBaseModel(BaseModel, ExtendedBaseModelInfo, metaclass=ExtendedBaseModelMetaClass):
class Config: # Разрешение использования нестандартных типов (numpy.array и пр)
arbitrary_types_allowed = True
# Кастомный валидатор свойств
@model_validator(mode='before')
@classmethod
def validate_cust(cls, dt: any):
""" При создании объекта конструктором класса (например: p = Person(name="max", age=34) )
в dt передаётся словарь со значениями атрибутов, например:
{
"name": "max",
"age": 34
}
"""
data = dt
""" А при создании объекта парсингом из json-строки (например: p = <Имя класса>.parse_raw(<json-строка>) )
в dt передаётся содержимое этой json-строки (тип str)
и её надо загрузить в словарь с помощью стандартного метода json.loads
"""
if isinstance(data, str):
data = json.loads(data)
if isinstance(data, dict):
# Проходим по описаниям полей класса
for fldn in cls.model_fields:
# и если тип поля - нестандартный (например, numpy.array) - выполняем его кастомную инициализацию и валидацию
if cls.model_fields[fldn].annotation is np.ndarray:
if fldn in data:
dim = None
elt = None
if cls.model_fields[fldn].json_schema_extra != None:
if "dimension_limits" in cls.model_fields[fldn].json_schema_extra:
dim = cls.model_fields[fldn].json_schema_extra["dimension_limits"]
if "elem_type" in cls.model_fields[fldn].json_schema_extra:
elt = cls.model_fields[fldn].json_schema_extra["elem_type"]
vl = data[fldn]
if isinstance(vl, list):
if elt != None:
data[fldn] = np.array(vl, dtype=elt)
else:
data[fldn] = np.array(vl)
elif isinstance(vl, str):
if elt != None:
data[fldn] = np.array(json.loads(vl), dtype=elt)
else:
data[fldn] = np.array(json.loads(vl))
# Проверка на соответствие типа элемента массива, если он задан
elif (isinstance(vl, np.ndarray) == True) and (elt != None):
if vl.dtype != elt:
raise ValueError(fldn + " field must be a numpy array of type '"+str(elt)+"'")
elif (isinstance(vl, np.ndarray) == False) and (vl != None):
raise ValueError(fldn +" field may be numpy array or list or json string for list")
# Проверка на соответствие размерности, если она задана
if dim != None:
rdim = data[fldn].shape
if len(rdim) != len(dim):
raise ValueError("the number of dimensions in "+ fldn + " bust be "+str(len(dim))+" as defined in 'dimension_limits'" )
dn = 0
for td in rdim:
if ((dim[dn][0] != None) and (td < dim[dn][0])) or ((dim[dn][1] != None) and (td > dim[dn][1])):
raise ValueError("shape of " + fldn + " numpy array "+str(rdim)+" violates limitations defined in 'dimension_limits' "+str(dim))
dn = dn + 1
return data
# Кастомный сериализатор
@model_serializer(mode='wrap')
def serialize_cust(self, std, _info):
if _info.mode == "json":
"""
Если в классе есть нестандартные поля, требующие кастомной сериализации - генерируем json-строку "вручную", проходя по полям объекта
"""
if self.need_custom_json_serializing == True:
rv = "{"
first_fld = True
for fln in self.model_fields:
if self.model_fields[fln].exclude == True:
continue
if first_fld == False:
rv = rv + ","
first_fld = False
rv = rv + "\"" + fln + "\":"
flv = self.__getattribute__(fln)
if flv is None:
rv = rv + "null"
else:
# Если поле - это список объектов-наследников BaseModel
# - проходим по нему и вызываем метод .json() для каждого члена списка
if fln in self.list_of_basemodel_fields:
rv = rv + "["
first_list_elem = True
for le in flv:
if first_list_elem == False:
rv = rv + ","
rv = rv + le.json()
first_list_elem = False
rv = rv + "]"
# для поля типа numpy.array преобразуем его значение обратно в список и сериализуем с помощью json
elif self.model_fields[fln].annotation is np.ndarray:
rv = rv + json.dumps(flv.tolist())
# Для поля, являющегося наследником BaseModel - вызываем метод .json()
elif isinstance(flv, BaseModel):
rv = rv + flv.json()
# Прочие поля стандартных типов сериализуем с помощью стандартной библиотеки json
else:
rv = rv + json.dumps(flv)
rv = rv + "}"
return rv
else:
# Если в классе нет полей нестандартных типов - используем стандартную серивлизацию от BaseModel
return std(self)
else:
return std(self) |