Форум программистов, компьютерный форум, киберфорум
ArchitectMsa
Войти
Регистрация
Восстановить пароль

Микросервис на Python с FastAPI и Docker

Запись от ArchitectMsa размещена 23.04.2025 в 16:13
Показов 2844 Комментарии 0

Нажмите на изображение для увеличения
Название: b07e282a-852b-4d14-952d-8776c49e9520.jpg
Просмотров: 66
Размер:	251.5 Кб
ID:	10634
В эпоху облачных вычислений и растущей сложности программных продуктов классическая монолитная архитектура всё чаще уступает место новым подходам. Микросервисная архитектура становится фаворитом среди разработчиков, стремящихся создавать масштабируемые и гибкие системы. Но что же такое микросервисы на самом деле?

Концепция микросервисной архитектуры



Микросервисная архитектура — это метод разработки программного обеспечения, при котором приложение строится как набор небольших, независимых служб, каждая из которых отвечает за отдельную функциональность и процесс. Каждый микросервис работает в собственном процессе и коммуницирует с другими через чётко определённые API, обычно HTTP-ресурсы или легковесные протоколы вроде gRPC. Кардинальное отличие от монолитной архитектуры состоит в том что монолит представляет собой единое приложение, где все компоненты тесно связаны и зависимы друг от друга. Изменение одной части может потребовать перекомпиляции и развертывания всего приложения. Микросервисы же разбивают эту монолитную структуру на независимые части. Каждый микросервис:
  1. Реализует конкретную бизнес-функцию.
  2. Имеет ограниченную зону ответственности.
  3. Может быть разработан, развернут и масштабирован независимо.
  4. Часто имеет собственную базу данных или хранилище.

Docker, (Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?)
До появления ошибки работал с Docker, запускал контейнеры, останавливал и удалял их. Но внезапно в...

Docker, IP Host -> Docker responce
есть некий сервис достучатся к которому возможно по IP (но только через VPN), задался вопросом, а...

Не могу создать образ Docker, подскажите как сделать. Вылазить ошибка. docker-file. Новичок в докере
Если можно обясните как строить докер файл. столько видео посмотрел ничего не понял Step 4/5 :...

Запуск linux контейнеров Docker в windows без Docker Desktop
Всем доброго времени суток! Пытаюсь разворачивать локальный веб-сервер на ПК С ОС windows с...


Преимущества и недостатки микросервисов



Микросервисная архитектура несёт ряд ощутимых преимуществ для современной разработки:

Технологическая независимость. Разные микросервисы могут использовать разные технологии, языки программирования и фреймворки, что позволяет выбирать оптимальный стек для каждой конкретной задачи. Например, для интенсивных вычислений можно выбрать Go или Rust, а для API взаимодействия — Python с FastAPI.
Масштабируемость. Микросервисы позволяют масштабировать отдельные компоненты системы независимо. Нет необходимости масштабировать всё приложение, если высокой нагруке подвергается лишь конкретный функционал.
Отказоустойчивость. Изолированная природа микросервисов предотвращает каскадные отказы. Если один сервис выходит из строя, остальная часть системы продолжает функционировать.
Простота разработки. Небольшие команды могут работать над отдельными микросервисами параллельно, не мешая друг другу. Это повышает производительность и уменьшает время вывода новых функций на рынок.

Однако архитектура микросервисов не лишена и серьезных недостатков:

Сложность управления. С ростом числа микросервисов возрастает и сложность их оркестрации, мониторинга и обнаружения проблем.
Распределенные транзакции. Обеспечение согласованности данных между разными микросервисами требует особых подходов и механизмов.
Увеличение задержек. Межсервисная коммуникация может приводить к дополнительным задержкам, особенно при последовательных запросах.
Дублирование кода. Некоторые общие функциональности могут дублироваться между сервисами, что усложняет поддержку и вносит риски несогласованности.

Когда стоит выбирать микросервисы



Выбор микросервисной архитектуры должен быть обоснованным решением, а не данью моде. Микросервисы оптимальны в следующих ситуациях:

Сложные и масштабные продукты. Если ваше приложение имеет множество разнородных функций, логично разделить их на независимые компоненты.
Команды, распределенные географически или функционально. Микросервисы позволяют разным командам работать над своими компонентами независимо.
Необходимость частых обновлений. При необходимости быстро и регулярно обновлять отдельные части системы микросервисы дают преимущество, позволяя модифицировать компоненты независимо.
Неравномерная нагрузка. Если разные части приложения испытывают разную нагрузку, микросервисы позволяют масштабировать только те компоненты, которые в этом нуждаются.

Однако, если приложение небольшое и простое, команда разработки мала, или у вас нет необходимой инфраструктуры и опыта для управления распределенной системой, монолитная архитектура может оказаться более практичным решением. Микросервисы вносят существенную операционную сложность, и ее нужно уметь эффективно контролировать.

В этой статье мы сосредоточимся на создании микросервисов с использованием Python, FastAPI и Docker — технологического стека, который стал одним из самых популярных для разработки современных API-cервисов благодаря своей скорости, простоте и мощи.

Технический стек



Python как язык для микросервисов



Python стал одним из популярнейших языков для разработки микросервисов благодаря своей универсальности и доступности. Язык предлагает чистый, читаемый синтаксис, что делает код понятным даже для новичков — качество, которое нельзя недооценивать при работе в командах с разным уровнем подготовки. Ключевые преимущества Python в микросервисной архитектуре:
  • Скорость разработки — Python позволяет создавать прототипы и готовые решения значительно быстрее многих конкурентов.
  • Богатая экосистема библиотек упрощает интеграцию с различными технологиями .
  • Гибкость языка позволяет адаптировать код под растущие потребности проекта.
  • Поддержка современных асинхронных подходов через модуль asyncio.

Несмотря на распространённое мнение о низкой производительности Python современные реализации интерпретатора и асинхронные фреймворки сводят этот недостаток к минимуму, особенно для I/O-bound операций, характерных для микросервисов.

FastAPI: высокопроизводительный фреймворк



FastAPI вырвался в лидеры среди Python-фреймворков для разработки API благодаря умному сочетанию современных технологий и понятного пользовательского интерфейса. Этот ASGI-фреймворк создан Себастьяном Рамирезом и впервые выпущен в 2018 году, но уже успел завоевать признание в сообществе. В основе FastAPI лежат два мощных компонента:
  1. Starlette — асинхронный веб-фреймворк.
  2. Pydantic — библиотека для валидации данных через аннотации типов Python.

Главные особенности FastAPI:

Производительность. FastAPI впечатляет скоростью работы, не уступая таким технологиям как Node.js и даже Go. Это возможно благодаря асинхронной природе и оптимизированному ASGI-серверу.

Типизация и валидация. Фреймворк использует подсказки типов Python для автоматической валидации данных, что снижает количество ошибок и упрощает отладку.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from fastapi import FastAPI
from pydantic import BaseModel
 
app = FastAPI()
 
class User(BaseModel):
    id: int
    name: str
    email: str
    active: bool = True
 
@app.post("/users/")
async def create_user(user: User):
    # Данные уже проверены и соответствуют модели
    return {"user_id": user.id}
Автоматическая документация. FastAPI генерирует интерактивную документацию API (через Swagger UI и ReDoc) автоматически на основе кода — функция, которая экономит массу времени и сохраняет документацию всегда актуальной.

Особенности асинхронного программирования в Python для микросервисов



Асинхронное программирование — настоящий прорыв для Python-бэкенда. Оно позволяет микросервисам обрабатывать тысячи запросов одновременно без создания дополнительных потоков или процессов, что существенно снижает накладные расходы.
Ключевые элементы асинхронности в Python:
  1. Ключевые слова async/await, введенные в Python 3.5.
  2. Цикл событий (event loop) из модуля asyncio.
  3. Неблокирующие I/O-операции.

Асинхронный подход особенно эффективен для микросервисов, которые часто выполняют операции ввода-вывода: запросы к базам данных, обращения к внешним API, чтение файлов. В этих случаях основное время тратится на ожидание ответа, а не на CPU-вычисления. Пример асинхронного обработчика в FastAPI:

Python
1
2
3
4
5
6
7
8
9
10
11
@app.get("/items/{item_id}")
async def get_item(item_id: int):
    # Предположим, что db_client — это асинхронный клиент БД
    item = await db_client.get_item(item_id)
    if not item:
        raise HTTPException(status_code=404, detail="Item not found")
    
    # Параллельный запрос к другому сервису
    related_data = await external_service.get_data(item.external_id)
    
    return {"item": item, "related_data": related_data}

Альтернативные фреймворки и их сравнение с FastAPI



Выбор фреймворка для микросервисов — не тривиальная задача. Ниже представлены основные альтернативы FastAPI и их сравнение:

Flask — минималистичный WSGI-фреймворк, популярный благодаря простоте и гибкости. Основное отличие от FastAPI — отсутствие встроенной асинхронности и автоматической валидации данных. Flask требует множества дополнительных расширений для достижения функциональности, которая в FastAPI доступна из коробки.

Django + DRF — мощный комплекс для построения веб-приложений. Включает ORM, админ-панель и множество других компонентов. Страдает от избыточности для микросервисов — слишком много функций, которые редко используются в API-сервисах. По производительности заметно уступает FastAPI.

aiohttp — асинхронный HTTP-клиент/сервер. Более низкоуровневый, чем FastAPI, и требует больше ручной работы для настройки API. Лишен встроенной документации и валидации данных.

Sanic — асинхронный фреймворк, ориентированный на скорость. По производительности сопоставим с FastAPI, но уступает в удобстве использования и документировании.

Фреймворки можно сравнить по нескольким ключевым параметрам:

1. Производительность: FastAPI ≈ Sanic > aiohttp > Flask > Django.
2. Удобство разработки: FastAPI > Django > Flask > Sanic > aiohttp.
3. Документирование: FastAPI > Django > Flask > Sanic > aiohttp.
4. Экосистема и расширения: Django > Flask > FastAPI > aiohttp > Sanic.

Особенности работы с зависимостями в FastAPI-проектах



Система зависимостей — одно из главных преимуществ FastAPI. Она позволяет создавать модульный, тестируемый код, что особенно важно в микросервисной архитектуре. Основные возможности системы зависимостей FastAPI:

Инъекция зависимостей — позволяет передавать общие компоненты (БД, кэш, конфигурация) в обработчики запросов:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
 
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user
Цепочки зависимостей — возможность строить иерархии зависимостей, где одна зависимость использует другую:

Python
1
2
3
4
5
6
7
8
9
def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)):
    user = authenticate_user(db, token)
    if not user:
        raise HTTPException(status_code=401)
    return user
 
@app.get("/items/")
async def read_items(current_user: User = Depends(get_current_user)):
    return current_user.items
Кэширование — FastAPI автоматически кэширует результаты зависимостей в рамках одного запроса, что повышает производительность.

В завершение, FastAPI предоставляет специальный инструментарий для работы с внешними зависимостями через файл requirements.txt или более современный подход с Poetry. Это позволяет чётко определить все необходимые пакеты и их версии, упрощая воспроизведение окружения при развёртывании микросервиса.

Практическая реализация



Теория микросервисов и преимущества FastAPI очевидны, но как это выглядит на практике? Давайте перейдём от слов к делу и создадим полноценный микросервис с нуля.

Создание базового микросервиса



Первый шаг в создании микросервиса — настройка окружения и структуры проекта. Рекомендуется использовать виртуальное окружение для изоляции зависимостей:

Bash
1
2
3
4
5
6
7
8
9
10
11
# Создание виртуального окружения
python -m venv venv
 
# Активация в Linux/macOS
source venv/bin/activate
 
# Активация в Windows
[H2]venv\Scripts\activate[/H2]
 
# Установка необходимых пакетов
pip install fastapi uvicorn
Uvicorn — это ASGI-сервер, который будет запускать наше FastAPI-приложение. Он обеспечивает высокую производительность благодаря асинхронной природе работы. Теперь создадим простейший микросервис. Для начала достаточно одного файла main.py:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from fastapi import FastAPI
 
app = FastAPI(
    title="Product API",
    description="Микросервис для управления продуктами",
    version="0.1.0"
)
 
@app.get("/")
async def root():
    return {"message": "Добро пожаловать в API продуктов!"}
 
@app.get("/health")
async def health_check():
    return {"status": "healthy"}
 
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
Запустить сервис можно двумя способами:

Bash
1
2
3
4
5
# Запуск через Python
python main.py
 
# Или напрямую через Uvicorn
uvicorn main:app --reload
Флаг --reload включает автоматическую перезагрузку сервера при изменении кода — удобно при разработке.

Структура проекта и зависимости



По мере роста сложности микросервиса, хранить весь код в одном файле становится неудобно. Организованная структура проекта критически важна для поддерживаемости кода. Вот рекомендуемая структура:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
my_microservice/
├── app/
│   ├── __init__.py
│   ├── main.py              # Точка входа приложения
│   ├── api/                 # Маршруты API
│   │   ├── __init__.py
│   │   ├── routes/          # Маршруты, сгруппированные по функциональности
│   │   │   ├── __init__.py
│   │   │   ├── products.py
│   │   │   └── users.py
│   ├── core/                # Основные компоненты
│   │   ├── __init__.py
│   │   ├── config.py        # Конфигурация приложения
│   │   └── security.py      # Механизмы безопасности
│   ├── db/                  # Код для работы с базой данных
│   │   ├── __init__.py
│   │   ├── models.py        # Модели данных ORM
│   │   └── repository.py    # Абстракции доступа к данным
│   ├── schemas/             # Pydantic-модели для валидации
│   │   ├── __init__.py
│   │   ├── product.py
│   │   └── user.py
│   └── services/            # Бизнес-логика
│       ├── __init__.py
│       └── product_service.py
├── tests/                   # Тесты
│   ├── __init__.py
│   ├── conftest.py
│   └── test_api/
│       ├── __init__.py
│       └── test_products.py
├── .env                     # Переменные окружения
├── .gitignore
├── Dockerfile               # Конфигурация для Docker
├── docker-compose.yml
├── pyproject.toml          # Метаданные проекта и зависимости
└── README.md
Для управления зависимостями современным подходом является использование Poetry вместо традиционного requirements.txt. Poetry позволяет точно фиксировать версии пакетов и разделять зависимости разработки и продакшена:

Bash
1
2
3
4
5
6
7
8
# Установка Poetry
pip install poetry
 
# Инициализация проекта
poetry init
 
# Добавление зависимостей
poetry add fastapi uvicorn sqlalchemy pydantic python-dotenv
Это создаст файл pyproject.toml с зависимостями и poetry.lock с их точными версиями.

Документирование API с помощью OpenAPI и Swagger UI



Одно из самых мощных преимуществ FastAPI — автоматическая документация API. Без дополнительных настроек вы получаете интерактивную документацию с возможностью тестирования эндпоинтов прямо в браузере.
При запуске приложения документация доступна по адресам:
/docs — Swagger UI, интерактивная документация,
/redoc — ReDoc, более удобная для чтения версия.

Чтобы сделать документацию максимально полезной, используйте docstrings в коде и правильно аннотируйте параметры и возвращаемые значения:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from typing import List, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
 
class Product(BaseModel):
    id: Optional[int] = None
    name: str
    description: Optional[str] = None
    price: float
    available: bool = True
 
router = APIRouter()
 
@router.get("/products/", response_model=List[Product])
async def get_products(
    skip: int = Query(0, ge=0, description="Сколько записей пропустить"),
    limit: int = Query(10, ge=1, le=100, description="Максимальное количество записей")
):
    """
    Получение списка продуктов с пагинацией.
    
    - [B]skip[/B]: Количество записей, которые нужно пропустить
    - [B]limit[/B]: Максимальное количество записей для возврата
    """
    return [
        {"id": 1, "name": "Смартфон", "price": 699.99, "available": True},
        {"id": 2, "name": "Ноутбук", "price": 1299.99, "available": False}
    ]
FastAPI автоматически включит эти описания в генерируемую документацию.

Реализация валидации данных и обработки ошибок



Валидация входящих данных — важнейший аспект любого API. FastAPI использует Pydantic для автоматической валидации данных на основе типов Python:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from fastapi import FastAPI, HTTPException, Path
from pydantic import BaseModel, Field, validator
 
app = FastAPI()
 
class ProductCreate(BaseModel):
    name: str = Field(..., min_length=3, max_length=50)
    description: str = Field(None, max_length=1000)
    price: float = Field(..., gt=0)
    category_id: int
    
    @validator('name')
    def name_must_be_valid(cls, v):
        if v.strip() == "":
            raise ValueError('Название не может быть пустым')
        return v.strip()
    
    class Config:
        schema_extra = {
            "example": {
                "name": "Игровая мышь",
                "description": "Высокоточная игровая мышь с RGB подсветкой",
                "price": 59.99,
                "category_id": 5
            }
        }
 
@app.post("/products/", response_model=ProductCreate)
async def create_product(product: ProductCreate):
    # В реальном приложении здесь был бы код для сохранения в БД
    return product
Pydantic выполняет следующие проверки:
  1. Типы данных (строка, число и т.д.).
  2. Ограничения на длину строк (min_length, max_length).
  3. Ограничения на числа (gt - больше чем, le - меньше или равно и т.д.).
  4. Пользовательские валидаторы (метод name_must_be_valid).

При нарушении любого из этих условий FastAPI автоматически вернёт клиенту ошибку 422 с детальным описанием проблемы.
Для обработки других типов ошибок используются исключения:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
@app.get("/products/{product_id}")
async def get_product(product_id: int = Path(..., gt=0)):
    # Имитация поиска в базе данных
    product = None  # предположим, что продукт не найден
    
    if not product:
        raise HTTPException(
            status_code=404,
            detail=f"Продукт с ID {product_id} не найден",
            headers={"X-Error": "PRODUCT_NOT_FOUND"},
        )
    
    return {"id": product_id, "name": "Тестовый продукт"}
FastAPI также позволяет создавать глобальные обработчики исключений для централизованной обработки ошибок:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
 
app = FastAPI()
 
class CustomException(Exception):
    def __init__(self, code: str, message: str):
        self.code = code
        self.message = message
 
@app.exception_handler(CustomException)
async def custom_exception_handler(request: Request, exc: CustomException):
    return JSONResponse(
        status_code=400,
        content={"code": exc.code, "message": exc.message}
    )
 
@app.get("/error-test")
async def error_test():
    raise CustomException(code="BUSINESS_RULE_VIOLATED", message="Невозможно выполнить операцию")
Такой подход позволяет стандартизировать формат ошибок во всём приложении и делает код более читаемым.

Разработка API эндпоинтов



При развитии микросервиса структурирование эндпоинтов становится критически важным. FastAPI предлагает удобный механизм группировки маршрутов через роутеры (APIRouter), что позволяет организовать код логично и легко масштабировать его:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from fastapi import APIRouter, Depends
from app.dependencies import get_token_header
 
router = APIRouter(
    prefix="/products",
    tags=["products"],
    dependencies=[Depends(get_token_header)],
    responses={404: {"description": "Не найдено"}},
)
 
@router.get("/")
async def read_products():
    return [{"name": "Часы", "price": 49.99}, {"name": "Наушники", "price": 99.99}]
 
@router.get("/{product_id}")
async def read_product(product_id: int):
    return {"name": "Часы", "product_id": product_id}
А затем подключаем роутер к основному приложению:

Python
1
2
3
4
5
6
from fastapi import FastAPI
from app.api.routes import products, users
 
app = FastAPI()
app.include_router(products.router)
app.include_router(users.router)
Такой подход позволяет:
  1. Группировать эндпоинты по функциональности.
  2. Применять общие зависимости и префиксы.
  3. Удобно тестировать изолированные компоненты.

Обработка лимитов и пагинации в API микросервиса



Правильная пагинация необходима для эффективной работы с большими наборами данных. Рассмотрим подход, который обеспечивает как удобство для клиентов, так и оптимальное использование ресурсов сервера:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from typing import List
from fastapi import APIRouter, Query
from pydantic import BaseModel
 
router = APIRouter()
 
class Product(BaseModel):
    id: int
    name: str
    price: float
 
@router.get("/products/", response_model=List[Product])
async def get_products(
    offset: int = Query(0, ge=0, description="Смещение от начала списка"),
    limit: int = Query(10, ge=1, le=100, description="Максимальное число элементов")
):
    # Здесь был бы код получения данных из БД с указанными параметрами
    products = [{"id": i, "name": f"Продукт {i}", "price": float(i * 10)} for i in range(offset, offset + limit)]
    return products
Для более продвинутой пагинации можно реализовать возврат курсора или токена для следующей страницы:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class PaginatedResponse(BaseModel):
    items: List[Product]
    total: int
    next_cursor: str = None
 
@router.get("/products/cursor", response_model=PaginatedResponse)
async def get_products_cursor(
    cursor: str = None,
    limit: int = Query(10, ge=1, le=100)
):
    # Декодирование курсора для получения последнего ID
    last_id = 0 if not cursor else int(cursor)
    
    # Имитация получения данных из БД
    items = [{"id": i, "name": f"Продукт {i}", "price": float(i * 10)} 
             for i in range(last_id + 1, last_id + limit + 1)]
    
    next_cursor = str(last_id + limit) if items else None
    
    return {
        "items": items, 
        "total": 100,  # общее число элементов
        "next_cursor": next_cursor
    }

Верификация входящих запросов



В дополнение к базовой валидации, FastAPI позволяет создавать сложные схемы верификации с вложенными моделями и зависимостями:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pydantic import BaseModel, EmailStr, Field
from typing import List, Optional
 
class Address(BaseModel):
    street: str
    city: str
    zip_code: str = Field(..., regex=r"^\d{5}(-\d{4})?$")
    
class User(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    full_name: Optional[str] = None
    addresses: List[Address] = []
    
@app.post("/users/")
async def create_user(user: User):
    return {"message": "Пользователь создан", "user": user}

Контейнеризация с Docker



После создания микросервиса на FastAPI естественным шагом становится его упаковка в контейнер. Docker — идеальная технология для этой цели, обеспечивающая изоляцию, переносимость и согласованность развертывания. Давайте разберем, как эффективно контейнеризировать наш Python-микросервис.

Написание Dockerfile



Dockerfile — это текстовый файл с инструкциями по созданию Docker-образа. Для микросервиса на FastAPI базовый Dockerfile может выглядеть так:

Bash
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Используем официальный образ Python
FROM python:3.9-slim
 
# Устанавливаем рабочую директорию
WORKDIR /app
 
# Копируем файлы зависимостей
COPY pyproject.toml poetry.lock* /app/
 
# Устанавливаем Poetry и зависимости
RUN pip install poetry && \
    poetry config virtualenvs.create false && \
    poetry install --no-dev
 
# Копируем код приложения
COPY ./app /app/app
 
# Экспонируем порт, на котором работает приложение
EXPOSE 8000
 
# Запускаем приложение
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Несколько важных моментов в этом Dockerfile:
  • Использование тега slim для уменьшения размера базового образа.
  • Разделение установки зависимостей и копирования кода для эффективного использования кэша Docker.
  • Отключение создания виртуального окружения Poetry внутри контейнера.
  • Исключение зависимостей для разработки в продакшен-образе.

Собрать Docker-образ можно командой:

Bash
1
docker build -t my-fastapi-service:latest .

Настройка Docker Compose



Для микросервисов, работающих с другими компонентами (базы данных, кеш, очереди сообщений), целесообразно использовать Docker Compose. Он позволяет определить и запустить многоконтейнерные приложения с одним конфигурационным файлом:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
version: '3.8'
 
services:
  api:
    build: 
      context: .
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@db:5432/app
      - REDIS_URL=redis://cache:6379/0
    depends_on:
      - db
      - cache
 
  db:
    image: postgres:13
    volumes:
      - postgres_data:/var/lib/postgresql/data
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=app
    ports:
      - "5432:5432"
 
  cache:
    image: redis:6
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
 
volumes:
  postgres_data:
  redis_data:
Этот файл определяет три сервиса:
api — наш FastAPI микросервис
db — база данных PostgreSQL
cache — Redis для кеширования

Запустить композицию контейнеров можно командой:

Bash
1
docker-compose up -d

Многоступенчатые сборки для уменьшения размера образов



Один из недостатков простых Dockerfile — избыточные размеры итоговых образов. Многоступенчатые сборки помогают создавать оптимизированные образы, включающие только необходимые компоненты:

Bash
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Этап сборки
FROM python:3.9-slim as builder
 
WORKDIR /build
 
# Устанавливаем инструменты сборки
RUN apt-get update && \
    apt-get install -y --no-install-recommends gcc 
 
COPY pyproject.toml poetry.lock* /build/
RUN pip install poetry && \
    poetry export -f requirements.txt > requirements.txt
 
# Создаем оптимизированные пакеты с Wheel
RUN pip wheel --no-cache-dir --wheel-dir /build/wheels -r requirements.txt
 
# Окончательный этап
FROM python:3.9-slim
 
WORKDIR /app
 
# Копируем только готовые wheel-пакеты и устанавливаем их
COPY --from=builder /build/wheels /wheels
RUN pip install --no-cache-dir --no-index --find-links=/wheels /wheels/* && \
    rm -rf /wheels
 
# Копируем код приложения
COPY ./app /app/app
 
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Такой подход значительно уменьшает размер итогового образа, так как в нём отсутствуют компоненты, необходимые только для сборки, и промежуточные файлы.

Оптимизация образов для производства



При подготовке Docker-образов для производственной среды учитывайте следующие моменты:

1. Безопасность:

Bash
1
2
3
4
5
6
7
8
9
# Создаем непривилегированного пользователя
RUN addgroup --system app && \
    adduser --system --group app
 
# Задаем правильные разрешения
COPY --chown=app:app ./app /app/app
 
# Переключаемся на непривилегированного пользователя
USER app
Запуск контейнера от имени непривилегированного пользователя снижает потенциальные риски безопасности.

2. Кэширование слоев:

Наиболее изменчивые файлы должны копироваться в последнюю очередь, чтобы максимально использовать кэширование слоев Docker:

Bash
1
2
3
4
5
# Редко меняющиеся файлы
COPY --chown=app:app pyproject.toml poetry.lock* /app/
 
# Часто меняющийся код копируется последним
COPY --chown=app:app ./app /app/app
3. Переменные окружения:

Используйте переменные окружения для конфигурации, избегая хардкода параметров:

Bash
1
2
3
4
5
ENV LOG_LEVEL=info \
    WORKERS=2 \
    MAX_REQUESTS=1000
 
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "${WORKERS}", "--log-level", "${LOG_LEVEL}"]
4. Файл .dockerignore:

Создание файла .dockerignore позволяет исключить ненужные файлы из контекста сборки:

Python
1
2
3
4
5
6
7
8
9
10
.git
.github
venv
__pycache__
*.pyc
.pytest_cache
.coverage
htmlcov
tests
docs

Стратегии непрерывной интеграции и развертывания



Эффективное CI/CD для контейнеризованных микросервисов включает несколько ключевых шагов:

1. Автоматизированная сборка образов

Для GitHub Actions типичный workflow выглядит так:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
name: Build and Deploy
 
on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]
 
jobs:
  build:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    
    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v1
    
    - name: Build and push
      uses: docker/build-push-action@v2
      with:
        context: .
        push: false
        tags: myapp:latest
        cache-from: type=local,src=/tmp/.buildx-cache
        cache-to: type=local,dest=/tmp/.buildx-cache
    
    - name: Run tests
      run: docker run myapp:latest pytest
2. Сканирование уязвимостей

Добавьте шаг сканирования образа на наличие уязвимостей:

YAML
1
2
3
4
5
6
7
name: Scan for vulnerabilities
  uses: aquasecurity/trivy-action@master
  with:
    image-ref: myapp:latest
    format: 'table'
    exit-code: '1'
    severity: 'CRITICAL'
3. Автоматическое развертывание

После успешного прохождения тестов можно настроить автоматическое развертывание на целевую инфраструктуру:

YAML
1
2
3
4
5
name: Deploy to Kubernetes
  uses: steebchen/kubectl@v2
  with:
    config: ${{ secrets.KUBE_CONFIG_DATA }}
    command: apply -f k8s/deployment.yaml
Эта стратегия обеспечивает:
  1. Автоматизацию рутинных операций.
  2. Единообразие развертывания.
  3. Раннее обнаружение проблем.
  4. Быстрое восстановление при сбоях.

При настройке CI/CD для микросервисов особое внимание стоит уделить использованию переменных среды для разных окружений и механизмам отката к предыдущим версиям при возникновении проблем.

Работа с Docker Registry и управление версиями образов



Для полноценной работы с контейнерами в производственной среде необходимо надежное хранилище образов. Docker Registry — это сервис, который позволяет хранить, распространять и управлять Docker-образами.
Для публикации образа в Docker Hub, самый популярный реестр, используйте команды:

Bash
1
2
3
4
5
6
7
8
# Аутентификация
docker login
 
# Тегирование образа с указанием версии
docker tag my-fastapi-service:latest username/my-fastapi-service:1.0.0
 
# Отправка в реестр
docker push username/my-fastapi-service:1.0.0
Семантическое версионирование (SemVer) — проверенная методика нумерации релизов, особенно полезная для микросервисов:

Bash
1
2
3
4
5
6
7
8
# Мажорная версия: несовместимые изменения API
docker tag my-service username/my-service:2.0.0
 
# Минорная версия: новый функционал с обратной совместимостью
docker tag my-service username/my-service:1.1.0
 
# Патч: исправление ошибок
docker tag my-service username/my-service:1.0.1
Для разработки можно настроить частный Docker Registry:

YAML
1
2
3
4
5
6
7
# В docker-compose.yml
registry:
  image: registry:2
  ports:
    - "5000:5000"
  volumes:
    - registry_data:/var/lib/registry
При отладке контейнеров полезно знать несколько приемов:

Bash
1
2
3
4
5
6
7
8
# Просмотр логов контейнера
docker logs container_id
 
# Интерактивная сессия внутри работающего контейнера
docker exec -it container_id bash
 
# Проверка использования ресурсов
docker stats
Оптимальная стратегия для микросервисов на Python — генерировать образы при каждом изменении ветки main, тегируя их как "latest" и номером версии для релизов. Такой подход обеспечивает четкую историю изменений и возможность быстрого отката к стабильным версиям при необходимости.

Расширение функциональности



Базовый микросервис на FastAPI и Docker даёт прочную основу, но для создания полноценного производственного решения требуется расширить его функциональность. Рассмотрим ключевые компоненты, которые превратят прототип в надёжное приложение.

Интеграция с базами данных



Большинство микросервисов требуют хранения данных. FastAPI отлично сочетается с различными базами данных, особенно с асинхронными клиентами.

SQLAlchemy для реляционных баз



SQLAlchemy — самое популярное ORM-решение для Python. С версии 1.4 она поддерживает асинхронные операции, что идеально подходит для FastAPI:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String, Float, Boolean
 
# Настройка подключения
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
 
# Определение модели данных
Base = declarative_base()
 
class Product(Base):
    __tablename__ = "products"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    description = Column(String, nullable=True)
    price = Column(Float)
    available = Column(Boolean, default=True)
 
# Использование в FastAPI
@app.get("/products/{product_id}")
async def get_product(product_id: int):
    async with async_session() as session:
        query = select(Product).where(Product.id == product_id)
        result = await session.execute(query)
        product = result.scalars().first()
        
        if not product:
            raise HTTPException(status_code=404, detail="Продукт не найден")
        
        return product

MongoDB для NoSQL-данных



Для неструктурированных данных или схем с частыми изменениями MongoDB — отличный выбор:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from motor.motor_asyncio import AsyncIOMotorClient
 
# Подключение
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.product_db
collection = db.products
 
# Использование в FastAPI
@app.post("/products/")
async def create_product(product: ProductCreate):
    product_dict = product.dict()
    result = await collection.insert_one(product_dict)
    
    created_product = await collection.find_one({"_id": result.inserted_id})
    return created_product

Абстрагирование доступа к данным



Лучшая практика — создание репозиториев, которые абстрагируют логику доступа к данным:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ProductRepository:
    def __init__(self, session_factory):
        self.session_factory = session_factory
    
    async def get_by_id(self, product_id: int):
        async with self.session_factory() as session:
            query = select(Product).where(Product.id == product_id)
            result = await session.execute(query)
            return result.scalars().first()
    
    async def create(self, product_data: dict):
        product = Product(**product_data)
        async with self.session_factory() as session:
            session.add(product)
            await session.commit()
            await session.refresh(product)
            return product
Такой подход упрощает тестирование и позволяет легко менять бэкенды хранения данных.

Добавление асинхронных операций



FastAPI изначально ориентирован на асинхронное выполнение, что делает его отличным выбором для операций, требующих обработки множества одновременных запросов.

Параллельное выполнение нескольких асинхронных задач



Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
from fastapi import APIRouter
 
router = APIRouter()
 
async def fetch_product_details(product_id: int):
    # Имитация задержки при запросе данных
    await asyncio.sleep(1)
    return {"id": product_id, "name": f"Продукт {product_id}"}
 
async def fetch_product_reviews(product_id: int):
    # Имитация задержки при запросе отзывов
    await asyncio.sleep(0.5)
    return [
        {"user": "user1", "rating": 5, "comment": "Отличный продукт!"},
        {"user": "user2", "rating": 4, "comment": "Хороший выбор"}
    ]
 
@router.get("/products/{product_id}/full")
async def get_product_with_reviews(product_id: int):
    # Параллельное выполнение двух асинхронных операций
    product_details, reviews = await asyncio.gather(
        fetch_product_details(product_id),
        fetch_product_reviews(product_id)
    )
    
    return {
        "product": product_details,
        "reviews": reviews
    }

Асинхронные фоновые задачи



Для тяжелых операций, которые не должны блокировать ответ API:

Python
1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import BackgroundTasks
 
@app.post("/orders/")
async def create_order(order: OrderCreate, background_tasks: BackgroundTasks):
    # Быстрая операция - сохранение заказа
    order_id = await save_order(order)
    
    # Медленные операции выполняются в фоне
    background_tasks.add_task(process_payment, order_id, order.payment_details)
    background_tasks.add_task(send_order_confirmation, order.user_email, order_id)
    
    return {"order_id": order_id, "status": "processing"}
Для более сложных сценариев стоит рассмотреть Celery или другие системы управления очередями.

Тестирование микросервиса



Надёжные тесты критически важны для микросервисов, поскольку они часто развертываются независимо и должны функционировать автономно.

Модульное тестирование с pytest



Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# test_product_service.py
import pytest
from app.services.product_service import ProductService
from app.repositories.product_repository import ProductRepository
 
class MockProductRepository:
    async def get_by_id(self, product_id: int):
        if product_id == 1:
            return {"id": 1, "name": "Test Product", "price": 9.99}
        return None
 
@pytest.fixture
def product_service():
    repository = MockProductRepository()
    return ProductService(repository)
 
@pytest.mark.asyncio
async def test_get_product_returns_product_when_exists(product_service):
    # Действие
    product = await product_service.get_product(1)
    
    # Проверка
    assert product["id"] == 1
    assert product["name"] == "Test Product"
 
@pytest.mark.asyncio
async def test_get_product_raises_exception_when_not_exists(product_service):
    # Проверка исключения
    with pytest.raises(ValueError, match="Продукт не найден"):
        await product_service.get_product(999)

Интеграционное тестирование API



Для тестирования полного API-сервиса FastAPI предоставляет TestClient:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# test_api.py
from fastapi.testclient import TestClient
from app.main import app
 
client = TestClient(app)
 
def test_read_product():
    response = client.get("/products/1")
    assert response.status_code == 200
    assert response.json()["name"] == "Test Product"
 
def test_create_product():
    product_data = {
        "name": "New Product",
        "price": 29.99,
        "description": "Test description"
    }
    response = client.post("/products/", json=product_data)
    assert response.status_code == 201
    assert response.json()["name"] == "New Product"

Тестирование с моками баз данных



Для полноценного тестирования без реальной БД можно использовать обёртки:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pytest
from unittest.mock import AsyncMock
 
@pytest.fixture
async def mock_db_session():
    session = AsyncMock()
    session.__aenter__.return_value = session
    
    # Мок для запроса
    mock_result = AsyncMock()
    mock_result.scalars.return_value.first.return_value = {
        "id": 1,
        "name": "Test Product"
    }
    session.execute.return_value = mock_result
    
    yield session
 
@pytest.mark.asyncio
async def test_get_product_from_db(mock_db_session):
    repository = ProductRepository(lambda: mock_db_session)
    product = await repository.get_by_id(1)
    
    # Проверяем, что вызов execute был с правильными параметрами
    mock_db_session.execute.assert_called_once()
    assert product["id"] == 1
Хорошее тестирование гарантирует, что микросервис будет работать надежно даже при активной разработке и частом выпуске новых версий. Комбинация модульных, интеграционных и функциональных тестов обеспечивает высокую степень уверенности в работоспособности системы.

Внедрение мониторинга и логирования в микросервис



В производственной среде недостаточно просто запустить микросервис — необходимо постоянно следить за его состоянием и производительностью. Правильно настроенные мониторинг и логирование помогают быстро обнаруживать и устранять проблемы.

Настройка логирования



Python предлагает встроенную библиотеку logging, которую можно легко интегрировать с FastAPI:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import logging
from fastapi import FastAPI, Request
import time
from typing import Callable
import uuid
 
# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("app.log"),
        logging.StreamHandler()
    ]
)
 
logger = logging.getLogger(__name__)
 
app = FastAPI()
 
# Middleware для логирования запросов
@app.middleware("http")
async def log_requests(request: Request, call_next: Callable):
    request_id = str(uuid.uuid4())
    logger.info(f"Request {request_id} started: {request.method} {request.url}")
    
    start_time = time.time()
    try:
        response = await call_next(request)
        process_time = time.time() - start_time
        logger.info(f"Request {request_id} completed: {response.status_code} in {process_time:.3f}s")
        
        # Добавляем ID запроса в заголовки ответа для отслеживания
        response.headers["X-Request-ID"] = request_id
        return response
    except Exception as e:
        logger.error(f"Request {request_id} failed: {str(e)}")
        raise
Для более продвинутого логирования стоит рассмотреть структурированные логи в формате JSON, которые легче анализировать с помощью инструментов, таких как ELK Stack (Elasticsearch, Logstash, Kibana) или Graylog:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import json
import logging
import datetime
 
class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_record = {
            "timestamp": datetime.datetime.now().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }
        
        if hasattr(record, 'request_id'):
            log_record["request_id"] = record.request_id
        
        if record.exc_info:
            log_record["exception"] = self.formatException(record.exc_info)
            
        return json.dumps(log_record)

Метрики производительности и мониторинг



Для отслеживания производительности микросервиса можно использовать Prometheus — популярную систему мониторинга, которая хорошо интегрируется с FastAPI:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from prometheus_client import Counter, Histogram
from prometheus_fastapi_instrumentator import Instrumentator
 
# Инициализация Instrumentator
instrumentator = Instrumentator().instrument(app)
 
# Настройка собственных метрик
http_requests_total = Counter(
    'http_requests_total', 
    'Total HTTP requests', 
    ['method', 'endpoint', 'status']
)
 
http_request_duration = Histogram(
    'http_request_duration_seconds', 
    'HTTP request duration in seconds',
    ['method', 'endpoint']
)
 
@app.middleware("http")
async def monitor_requests(request: Request, call_next: Callable):
    method = request.method
    path = request.url.path
    
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time
    
    status = response.status_code
    http_requests_total.labels(method=method, endpoint=path, status=status).inc()
    http_request_duration.labels(method=method, endpoint=path).observe(duration)
    
    return response
 
# Активация инструментации
@app.on_event("startup")
async def startup():
    instrumentator.expose(app)
Теперь метрики доступны по эндпоинту /metrics и могут быть собраны Prometheus.

Обеспечение безопасности микросервиса



Безопасность — критически важный аспект микросервисной архитектуры. FastAPI предоставляет различные инструменты для реализации аутентификации и авторизации.

OAuth2 с JWT-токенами



Для защиты API можно использовать OAuth2 с JSON Web Tokens (JWT):

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta
from pydantic import BaseModel
 
# Настройки безопасности
SECRET_KEY = "ваш_секретный_ключ"  # В production берите из переменных окружения!
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
 
# Модели
class Token(BaseModel):
    access_token: str
    token_type: str
 
class TokenData(BaseModel):
    username: str = None
 
class User(BaseModel):
    username: str
    email: str = None
    disabled: bool = False
 
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
# Функции для работы с JWT
def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt
 
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Неверные учетные данные",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    
    # Здесь должна быть проверка пользователя в БД
    user = {"username": token_data.username, "disabled": False}
    return user
 
# Эндпоинт для аутентификации
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # Проверка пользователя (заглушка)
    if form_data.username != "test" or form_data.password != "test":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Неверное имя пользователя или пароль",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": form_data.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}
 
# Защищенный эндпоинт
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user
Имея настроенные системы логирования, мониторинга и безопасности, ваш микросервис будет готов к использованию в реальной производственной среде. Эти компоненты являются фундаментальными для создания надежного и отказоустойчивого решения.

Имплементация кэширования для оптимизации производительности



Кэширование — мощный инструмент оптимизации, который может значительно ускорить работу микросервиса, особенно при обработке повторяющихся запросов. Для FastAPI-приложений существует несколько эффективных стратегий кэширования.
Redis — популярное решение для внедрения кэша в микросервисную архитектуру:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import redis
from fastapi import FastAPI, Depends
import json
import pickle
from functools import wraps
 
# Инициализация Redis-клиента
redis_client = redis.Redis(host='localhost', port=6379, db=0)
 
app = FastAPI()
 
# Декоратор для кэширования результатов функций
def cache(expire_seconds: int = 60):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Формирование ключа кэша из параметров функции
            key = f"cache:{func.__name__}:{str(args)}:{str(kwargs)}"
            
            # Проверка наличия данных в кэше
            cached_data = redis_client.get(key)
            if cached_data:
                return pickle.loads(cached_data)
            
            # Выполнение функции и сохранение результата в кэш
            result = await func(*args, **kwargs)
            redis_client.setex(key, expire_seconds, pickle.dumps(result))
            return result
        return wrapper
    return decorator
 
# Применение кэширования к эндпоинту
@app.get("/products/{product_id}")
@cache(expire_seconds=300)  # Кэш на 5 минут
async def get_product(product_id: int):
    # Имитация медленной операции получения данных
    # В реальном приложении здесь был бы запрос к БД
    await asyncio.sleep(1)
    return {"id": product_id, "name": f"Продукт {product_id}", "price": 100.0}
Помимо Redis, можно использовать in-memory кэширование для данных, которые не требуется синхронизировать между экземплярами микросервиса:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from cachetools import TTLCache
from datetime import timedelta
 
# Создание локального кэша с временем жизни элементов 5 минут
local_cache = TTLCache(maxsize=100, ttl=timedelta(minutes=5).total_seconds())
 
def get_cached_or_fetch(key, fetch_func):
    # Проверка наличия в кэше
    if key in local_cache:
        return local_cache[key]
    
    # Получение и сохранение в кэш
    result = fetch_func()
    local_cache[key] = result
    return result

Построение механизма взаимодействия между микросервисами



Суть микросервисной архитектуры в том, что различные компоненты системы работают как независимые единицы. Но независимость не означает изоляцию — микросервисы должны эффективно взаимодействовать друг с другом. Существует несколько проверенных паттернов коммуникации, каждый из которых имеет свои преимущества и недостатки.

Синхронная коммуникация через REST API



Наиболее распространённый и простой метод взаимодействия — прямые HTTP-запросы между сервисами:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import httpx
 
async def get_user_orders(user_id: int):
  async with httpx.AsyncClient() as client:
      # Запрос к сервису заказов
      response = await client.get(f"http://order-service/api/orders?user_id={user_id}")
      response.raise_for_status()
      return response.json()
 
@app.get("/users/{user_id}/dashboard")
async def get_user_dashboard(user_id: int):
  # Получение данных пользователя из локальной БД
  user = await user_repository.get_by_id(user_id)
  if not user:
      raise HTTPException(status_code=404, detail="Пользователь не найден")
  
  # Получение заказов пользователя из другого сервиса
  orders = await get_user_orders(user_id)
  
  return {
      "user": user,
      "orders": orders,
      "orders_count": len(orders)
  }
Для работы с внешними API удобно создать клиентский класс с настраиваемыми параметрами:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class ServiceClient:
  def __init__(self, base_url: str, timeout: float = 5.0, retries: int = 3):
      self.base_url = base_url
      self.timeout = timeout
      self.retries = retries
  
  async def request(self, method: str, path: str, **kwargs):
      url = f"{self.base_url}{path}"
      
      # Настройка таймаута и повторных попыток
      kwargs.setdefault("timeout", self.timeout)
      
      # Применение паттерна Circuit Breaker для защиты от каскадных сбоев
      for attempt in range(self.retries):
          try:
              async with httpx.AsyncClient() as client:
                  response = await client.request(method, url, **kwargs)
                  response.raise_for_status()
                  return response.json()
          except (httpx.RequestError, httpx.HTTPStatusError) as e:
              if attempt == self.retries - 1:
                  # Последняя попытка, пробрасываем исключение выше
                  raise ServiceCommunicationError(f"Ошибка при обращении к сервису: {str(e)}")
              # Экспоненциальная задержка перед повторной попыткой
              await asyncio.sleep(0.1 * (2 ** attempt))
Синхронное взаимодействие просто в реализации, но имеет существенные недостатки:
  • Зависимость от доступности других сервисов.
  • Потенциальное увеличение времени ответа.
  • Риск каскадных отказов при недоступности одного из сервисов.

Асинхронная коммуникация через очереди сообщений



Для сценариев, где не требуется немедленный ответ, предпочтительнее асинхронное взаимодействие через очереди сообщений (message queues). Популярные решения включают RabbitMQ, Kafka, AWS SQS и Google Pub/Sub.
Пример использования RabbitMQ с библиотекой aio-pika:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import aio_pika
import json
 
# Настройка подключения к RabbitMQ
async def get_rabbitmq_connection():
  return await aio_pika.connect_robust("amqp://guest:guest@rabbitmq:5672/")
 
# Отправка сообщения
async def send_event(event_type: str, payload: dict):
  connection = await get_rabbitmq_connection()
  async with connection:
      channel = await connection.channel()
      
      # Объявление обменника для событий
      exchange = await channel.declare_exchange(
          "events", aio_pika.ExchangeType.TOPIC
      )
      
      # Создание и отправка сообщения
      message = aio_pika.Message(
          body=json.dumps(payload).encode(),
          content_type="application/json",
          headers={"event_type": event_type}
      )
      
      await exchange.publish(message, routing_key=event_type)
 
# Обработчик эндпоинта, отправляющий событие
@app.post("/products/")
async def create_product(product: ProductCreate):
  # Создание продукта в БД
  db_product = await product_repository.create(product.dict())
  
  # Отправка события о создании продукта
  await send_event(
      "product.created",
      {"product_id": db_product.id, "name": db_product.name}
  )
  
  return db_product
Для получения сообщений в другом микросервисе:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async def process_messages():
  connection = await get_rabbitmq_connection()
  async with connection:
      channel = await connection.channel()
      
      # Объявление обменника
      exchange = await channel.declare_exchange(
          "events", aio_pika.ExchangeType.TOPIC
      )
      
      # Создание очереди для данного сервиса
      queue = await channel.declare_queue("inventory-service-queue", durable=True)
      
      # Подписка на события определенного типа
      await queue.bind(exchange, routing_key="product.created")
      await queue.bind(exchange, routing_key="product.updated")
      
      # Начать получение сообщений
      async with queue.iterator() as queue_iter:
          async for message in queue_iter:
              async with message.process():
                  try:
                      body = json.loads(message.body.decode())
                      event_type = message.headers["event_type"]
                      
                      if event_type == "product.created":
                          await handle_product_created(body)
                      elif event_type == "product.updated":
                          await handle_product_updated(body)
                  except Exception as e:
                      # Логирование ошибки и возможное перенаправление в очередь ошибок
                      logging.error(f"Ошибка обработки сообщения: {str(e)}")
 
# Запуск обработчика сообщений при старте приложения
@app.on_event("startup")
async def startup_event_handlers():
  # Запуск фоновой задачи для обработки сообщений
  asyncio.create_task(process_messages())
Преимущества асинхронной коммуникации:
  1. Слабая связность между сервисами.
  2. Устойчивость к временной недоступности сервисов.
  3. Естественная балансировка нагрузки.
  4. Возможность масштабирования обработчиков независимо.

Гибридный подход: Event Sourcing и CQRS



Для сложных сценариев взаимодействия часто используется комбинация паттернов Event Sourcing (сохранение всех изменений как последовательности событий) и CQRS (разделение операций чтения и записи). В этом подходе:
1. Каждое изменение состояния в системе сохраняется как событие.
2. Сервисы публикуют события в общую шину сообщений.
3. Заинтересованные сервисы подписываются на нужные события.
4. Каждый сервис поддерживает свою оптимизированную модель данных.

Пример реализации Event Sourcing:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Модель события
class Event(BaseModel):
  id: UUID = Field(default_factory=uuid4)
  timestamp: datetime = Field(default_factory=datetime.utcnow)
  type: str
  data: dict
  metadata: dict = {}
 
# Сервис для работы с событиями
class EventStore:
  def __init__(self, mongo_client):
      self.db = mongo_client.get_database("event_store")
      self.events = self.db.get_collection("events")
  
  async def save_event(self, event: Event):
      await self.events.insert_one(event.dict())
      # Публикация события в шину сообщений
      await publish_event(event)
  
  async def get_events(self, event_type: str = None, start_date: datetime = None):
      query = {}
      if event_type:
          query["type"] = event_type
      if start_date:
          query["timestamp"] = {"$gte": start_date}
      
      cursor = self.events.find(query).sort("timestamp", 1)
      return [Event(**event) async for event in cursor]
 
# Использование в обработчике API
@app.post("/orders/")
async def create_order(order: OrderCreate):
  # Создание события
  event = Event(
      type="OrderCreated",
      data=order.dict()
  )
  
  # Сохранение события
  await event_store.save_event(event)
  
  return {"order_id": event.data["id"], "status": "processing"}
Такой подход позволяет создавать гибкие, масштабируемые и устойчивые к сбоям системы, но требует более сложной инфраструктуры и проработки механизмов согласованности данных.

Стратегии масштабирования микросервисов в production-среде



Одно из главных преимуществ микросервисной архитектуры — возможность гибкого масштабирования отдельных компонентов системы. Однако это требует правильной стратегии и инструментов.

Горизонтальное масштабирование с Kubernetes



Kubernetes (K8s) стал де-факто стандартом для оркестрации контейнеров в production. Он предоставляет мощные средства для масштабирования микросервисов. Базовый манифест для развертывания FastAPI-микросервиса в Kubernetes:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
apiVersion: apps/v1
kind: Deployment
metadata:
  name: product-service
  labels:
    app: product-service
spec:
  replicas: 3  # Начальное количество экземпляров
  selector:
    matchLabels:
      app: product-service
  template:
    metadata:
      labels:
        app: product-service
    spec:
      containers:
      - name: product-service
        image: registry.example.com/product-service:1.0.0
        ports:
        - containerPort: 8000
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 15
          periodSeconds: 20
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: product-service-secrets
              key: database-url
Для автоматического масштабирования в зависимости от нагрузки используется HorizontalPodAutoscaler:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: product-service-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: product-service
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
В этом примере количество экземпляров сервиса будет автоматически увеличиваться при достижении 70% утилизации CPU или 80% использования памяти.

Оптимизация производительности FastAPI-приложений



Перед масштабированием следует убедиться, что каждый экземпляр микросервиса работает эффективно:

1. Настройка количества воркеров Uvicorn

В production FastAPI-приложения обычно запускаются через Gunicorn с несколькими воркерами Uvicorn:

Bash
1
2
# В Dockerfile или команде запуска
CMD ["gunicorn", "app.main:app", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
Оптимальное число воркеров зависит от доступных CPU и характера приложения. Часто используется формула:
(2 × количество CPU) + 1

2. Эффективное использование соединений с БД

Для оптимизации работы с базами данных стоит использовать пулы соединений:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from databases import Database
from sqlalchemy.ext.declarative import declarative_base
 
DATABASE_URL = "postgresql://user:password@localhost/dbname"
database = Database(DATABASE_URL)
Base = declarative_base()
 
# В FastAPI приложении
app = FastAPI()
 
@app.on_event("startup")
async def startup():
  await database.connect()
 
@app.on_event("shutdown")
async def shutdown():
  await database.disconnect()
3. Разгрузка основного процесса

Тяжелые операции следует выносить в отдельные процессы или сервисы:

Python
1
2
3
4
5
6
7
8
9
10
11
from fastapi import BackgroundTasks
 
@app.post("/reports/generate")
async def generate_report(report_params: ReportParams, background_tasks: BackgroundTasks):
  # Быстрый ответ клиенту
  report_id = str(uuid4())
  
  # Запуск генерации отчета в фоне
  background_tasks.add_task(generate_report_task, report_id, report_params)
  
  return {"report_id": report_id, "status": "processing"}

Масштабирование с учетом состояния



Не все микросервисы одинаково легко масштабируются. Особые стратегии требуются для сервисов с состоянием (stateful services), таких как сервисы, управляющие сессиями пользователей или локальными кэшами.

1. Распределенные сессии

Вместо хранения сессий в памяти отдельного экземпляра их можно хранить в распределенном хранилище:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi_sessions.backends.redis import RedisBackend
from fastapi_sessions.session_verifier import SessionVerifier
from fastapi_sessions.frontends.cookie import CookieParameters, SessionCookie
from redis.asyncio import Redis
 
redis = Redis(host="redis", port=6379)
backend = RedisBackend(redis, prefix="sessions:")
 
cookie_params = CookieParameters()
cookie = SessionCookie(
  cookie_name="session",
  identifier="general_verifier",
  auto_error=True,
  secret_key="SECURE_KEY_HERE",
  cookie_params=cookie_params
)
 
# Определение данных сессии
class SessionData(BaseModel):
  user_id: int
 
# Верификатор сессии
class BasicVerifier(SessionVerifier[SessionData]):
  def __init__(self, backend: RedisBackend, name: str):
      self.backend = backend
      self.name = name
      
  async def read_data(self, session_id: str) -> SessionData:
      data = await self.backend.read(session_id)
      if not data:
          raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
      return SessionData(**data)
  
  async def verify_session(self, session_id: str) -> bool:
      exists = await self.backend.exists(session_id)
      return exists
 
verifier = BasicVerifier(backend=backend, name="general_verifier")
 
# Использование в API
@app.get("/users/me")
async def get_current_user(session_data: SessionData = Depends(cookie)):
  return {"user_id": session_data.user_id}
2. Распределенный кэш

Для микросервисов, использующих локальный кэш, переход на распределенное кэширование обеспечивает согласованность между экземплярами:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import redis.asyncio as redis
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
 
@app.on_event("startup")
async def startup():
  redis_client = redis.Redis(host="redis", port=6379)
  FastAPICache.init(RedisBackend(redis_client), prefix="fastapi-cache:")
 
@app.get("/products/{product_id}")
@cache(expire=60)  # Кэширование на 60 секунд
async def get_product(product_id: int):
  # Медленная операция получения данных
  return await product_repository.get_by_id(product_id)
3. Очереди заданий для распределения нагрузки

Для обработки асинхронных задач можно использовать распределенные очереди заданий, такие как Celery, которые автоматически балансируют нагрузку между воркерами:

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from celery import Celery
from fastapi import FastAPI
 
# Инициализация Celery
celery_app = Celery(
  "tasks",
  broker="amqp://guest:guest@rabbitmq:5672//",
  backend="redis://redis:6379/0"
)
 
# Определение задачи
@celery_app.task
def process_order(order_id: str):
  # Длительная обработка заказа
  pass
 
app = FastAPI()
 
@app.post("/orders/")
async def create_order(order: OrderCreate):
  # Сохранение заказа в БД
  order_id = await save_order(order)
  
  # Запуск асинхронной обработки
  task = process_order.delay(order_id)
  
  return {"order_id": order_id, "task_id": task.id}
 
@app.get("/orders/{order_id}/status")
async def get_order_status(order_id: str, task_id: str):
  # Проверка статуса задачи
  task = process_order.AsyncResult(task_id)
  if task.state == 'PENDING':
      return {"status": "processing"}
  elif task.state == 'SUCCESS':
      return {"status": "completed"}
  else:
      return {"status": "failed", "error": str(task.result)}
Эффективное масштабирование микросервисов — комплексная задача, требующая внимания к деталям на всех уровнях: от архитектуры отдельных сервисов до инфраструктуры развертывания. Правильно спроектированные и реализованные с помощью FastAPI и Docker микросервисы могут обеспечить непревзойденную гибкость и масштабируемость для приложений любого размера.

Docker-compose push to Docker Hub
Всем привет! Я заготовил docker-compose.yml, но есть несколько зависимостей в папочках . ├──...

Разработка на fastapi с jinja + uvicorn + starlette -- это какая архитектра?)
Добрый вечер, сразу прошу прощение за возможно глупый и банальный вопрос. Если я разрабатываю...

Сколько оперативной памяти сервера нужно питону и среде для запуска fastapi?
Доброй ночи всем, кто с радостью набирает код на Питоне! Я пока не определился в пользу выбора...

Python FastAPI не отображается html страница
Добрый день! Подскажите, пожалуйста, почему у меня не отображается html файл, хотя всё работает без...

FastAPI и сериалайзеры
В моём проекте три таблицы. Нужно сформировать ответ который дергает инфу сразу из всех трёх...

FastAPI и pytest
Использовал инструкцию https://www.fastapitutorial.com/blog/unit-testing-in-fastapi/ но что-то не...

Django vs. Flask vs. FastAPI
Какой фреймворк выбрать начинающему? Какой проще, какой сложнее? Для какого больше дополнительных...

JSON поле в peewee + pydantic (FastAPI)
Доброго времени. Проблема следующая. Есть приложение на FastAPI, в котором описаны модели таблиц...

FastAPI и matplotlib
Напишите API сервис используя фреймворк FastAPI Он должен уметь принимать данные (На ваше...

Ошибка NameError при применении FastAPI
Всем добрый день :) Подскажите, пожалуйста: есть функция, в которую передаю аргумент через...

FastAPI выгрузить html код
возникла небольшая проблема я решил при помощи фраемворка выгрузить html код(так по себе он...

ImportError FastAPI
Всем, привет, прохожу сейчас курс по FastAPI. Остановился на аутентификации, из-за того, что при...

Размещено в Без категории
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Всего комментариев 0
Комментарии
 
Новые блоги и статьи
Генераторы Python для эффективной обработки данных
AI_Generated 21.05.2025
В Python существует инструмент настолько мощный и в то же время недооценённый, что я часто сравниваю его с тайным оружием в арсенале программиста. Речь идёт о генераторах — одной из самых элегантных. . .
Чем заменить Swagger в .NET WebAPI
stackOverflow 21.05.2025
Если вы создавали Web API на . NET в последние несколько лет, то наверняка сталкивались с зелёным интерфейсом Swagger UI. Этот инструмент стал практически стандартом для документирования и. . .
Использование Linq2Db в проектах C# .NET
UnmanagedCoder 21.05.2025
Среди множества претендентов на корону "идеального ORM" особое место занимает Linq2Db — микро-ORM, балансирующий между мощью полноценных инструментов и легковесностью ручного написания SQL. Что. . .
Реализация Domain-Driven Design с Java
Javaican 20.05.2025
DDD — это настоящий спасательный круг для проектов со сложной бизнес-логикой. Подход, предложенный Эриком Эвансом, позволяет создавать элегантные решения, которые точно отражают реальную предметную. . .
Возможности и нововведения C# 14
stackOverflow 20.05.2025
Выход версии C# 14, который ожидается вместе с . NET 10, приносит ряд интересных нововведений, действительно упрощающих жизнь разработчиков. Вы уже хотите опробовать эти новшества? Не проблема! Просто. . .
Собеседование по Node.js - вопросы и ответы
Reangularity 20.05.2025
Каждому разработчику рано или поздно приходится сталкиватся с техническими собеседованиями - этим стрессовым испытанием, где решается судьба карьерного роста и зарплатных ожиданий. В этой статье я. . .
Cython и C (СИ) расширения Python для максимальной производительности
py-thonny 20.05.2025
Python невероятно дружелюбен к начинающим и одновременно мощный для профи. Но стоит лишь заикнуться о высокопроизводительных вычислениях — и энтузиазм быстро улетучивается. Да, Питон медлительнее. . .
Безопасное программирование в Java и предотвращение уязвимостей (SQL-инъекции, XSS и др.)
Javaican 19.05.2025
Самые распространёные векторы атак на Java-приложения за последний год выглядят как классический "топ-3 хакерских фаворитов": SQL-инъекции (31%), межсайтовый скриптинг или XSS (28%) и CSRF-атаки. . .
Введение в Q# - язык квантовых вычислений от Microsoft
EggHead 19.05.2025
Microsoft вошла в гонку технологических гигантов с собственным языком программирования Q#, специально созданным для разработки квантовых алгоритмов. Но прежде чем погружаться в синтаксические дебри. . .
Безопасность Kubernetes с Falco и обнаружение вторжений
Mr. Docker 18.05.2025
Переход организаций к микросервисной архитектуре и контейнерным технологиям сопровождается лавинообразным ростом векторов атак — от тривиальных попыток взлома до многоступенчатых кибератак, способных. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru