CQRS — Command Query Responsibility Segregation, или разделение ответственности команд и запросов. Суть этого архитектурного паттерна проста: операции чтения данных (запросы) отделяются от операций изменения данных (команды). Многим эта идея покажется странной – зачем вообще что-то усложнять? В традиционных системах обычно используется одна модель данных для всех операций. Такой подход работает нормально для небольших приложений, но по мере роста системы эта единая модель превращается в монстра: громоздкая, сложная, неповоротливая. Она пытается угодить сразу и чтению, и записи, но в итоге не справляется ни с тем, ни с другим.
CQRS в Java: современный подход к архитектуре приложений
Исторически CQRS выросло из Domain-Driven Design (DDD) и является своеобразным ответом на ограничения CRUD-модели. Появившись примерно в 2010-х, идеи CQRS нашли благодатную почву в работах Грега Янга и Удо Домбровски. Эти ребята не просто предложили разделение моделей, но связали его с событийно-ориентированным подходом, что дало новый импульс развитию архитектурных стилей. Если взглянуть на CQRS с высоты птичьего полета, можно выделить три ключевых компонента:
1. Модель записи (Write Model) – оптимизирована для изменения состояния системы, содержит все бизнес-правила и ограничения, защищает консистентность данных.
2. Модель чтения (Read Model) – оптимизована для быстрого и удобного получения данных, часто денормализована и заточена под конкретные представления.
3. Механизм синхронизации – обеспечивает обновление модели чтения при изменениях в модели записи, обычно через события.
Когда же стоит применять CQRS? Точно не для простого CRUD-приложения с несколькими табличками и десятком пользователей. CQRS раскрывает свой потенциал, когда:
- чтение и запись данных имеют существенно разные характеристики и требования;
- система должна обрабатывать сложную бизнес-логику;
- требуется высокая масштабируемость операций чтения;
- нужна возможность независимого развития компонентов чтения и записи.
CQRS решает ряд практических проблем современных систем. В век больших данных и высоких нагрузок зачастую операции чтения требуют совсем другой оптимизации, чем операции записи. Например, электронная коммерция: пользователи намного чаще просматривают товары, чем делают покупки. Отделение модели чтения позволяет оптимизировать эти сценарии независимо.
Другое преимущество – более четкое разделение ответсвенности между командами разработчиков. Когда модели чтения и записи разделены, разные команды могут работать над ними параллельно, не мешая друг другу. Это ускоряет разработку и снижает риск конфликтов. Однако, Разделение моделей вводит дополнительную сложность, требует механизмов синхронизации и может создать проблемы с консистентностью данных. Поэтому решение о внедрении CQRS должно опираться на трезвую оценку требований проекта и готовность команды работать с более сложной архитектурой.
Error answering callback query: [400] Bad Request: query is too old and response timeout expired or query ID is invalid Я понимаю, что это бывает тогда, когда бот не запущен, а кто то нажимает на инлайн кнопку.... Что такое CQRS? Говорят для мобильных приложений используют шаблон CQRS. Вот понять бы просто что это вообще? Реализация паттерна проектирования цепочка обязаностей (chain of responsibility) Привет.
Есть небольшой набросок паттерна проектирования цепочки обязанностей, написан на... The Single Responsibility Principle Есть класс Weather, в котором, на данный момент, есть методы getMaxTemperature(),...
Теоретические основы CQRS
Вообще, чтобы по-настоящему понять CQRS, нужно вначале разобраться в его внутренних механизмах. Базовый принцип работы этого паттерна заключается в том, что каждое действие в системе относится либо к командам (изменяющим состояние), либо к запросам (возвращающим данные). В отличии от традиционного подхода, где одна модель обслуживает оба типа операций, CQRS четко разграничивает эти зоны ответственности.
Команды в CQRS — это объекты, содержащие всю необходимую информацию для выполнения бизнес-операции. Они именуются в повелительном наклонении: CreateUserCommand , UpdateOrderCommand и т.д. После создания команда передается обработчику, который извлекает из неё данные, применяет бизнес-логику и изменяет состояние системы. Самое важное тут — команды описывают намерение, а не механизм его реализации.
Запросы предназначены исключительно для получения данных. Они не должны изменять состояние системы и могут быть оптимизированы под конкретные сценарии использования. Например, страница товара в интернет-магазине может требовать данных из нескольких таблиц: информацию о товаре, отзывы, похожие товары. В традиционной архитектуре это потребовало бы множественных запросов или сложных JOIN'ов. В CQRS можно создать специализированную денормализованную модель чтения именно для этого представления.
Сравнивая CQRS с традиционной CRUD-архитектурой, можно заметить фундаментальное различие в подходе к моделированию данных. CRUD использует единую модель, где каждая сущность соответствует таблице в базе данных, а операции прямо работают с этими сущностями. Изменения происходят путем загрузки объекта, модификации его свойств и сохранения обратно. Это просто и интуитивно понятно для простых задач. CQRS, в свою очередь, разделяет ответственность. Модель команд отражает доменную логику и правила, она заточена под безопасное и консистентное изменение состояния. Модель запросов ориетирована на удобство получения данных и может содержать избыточную информацию для ускорения выборки. Команды и запросы работают с разными репозиториями, а иногда даже с разными базами данных. Я работал с проектом, где модели записи хранились в PostgreSQL с нормализованной схемой, а модели чтения — в MongoDB, оптимизированные под конкретные представления UI. Такое разделение позволяло нам независимо масштабировать системы чтения и записи, что критично для высоконагруженных приложений.
Ключевым элементом архитектуры CQRS является событийная модель взаимодействия. События — это факты, произошедшие в системе: "Пользователь зарегистрирован", "Заказ оплачен", "Товар добавлен в корзину". После обработки команды генерируется событие, которое затем используется для обновления модели чтения. Этот механизм естественно сочетается с Event Sourcing, но об этом подробнее поговорим позже.
События служат "мостом" между разделенными моделями и обеспечивают их эвентуальную согласованность. Эвентуальная согласованность означает, что в какой-то момент после изменения данных в модели записи соответствующие изменения отразятся и в модели чтения. Временной зазор между этими изменениями может быть от миллисекунд до минут, в зависимости от реализации.
При проектировании доменной модели для CQRS критично выделять агрегаты — группы взаимосвязаных объектов, которые обеспечивают консистентность данных. Агрегат должен быть максимально самодостаточен и инкапсулировать все бизнес-правила, относящиеся к входящим в него объектам. Он имеет корень (aggregate root) — главную сущность, через которую происходит взаимодействие с агрегатом. Например, в системе управления заказами агрегатом может быть "Заказ" со всеми связанными позициями. Корень агрегата — сущность Заказ, а позиции заказа недоступны напрямую, только через Заказ. Это гарантирует, что все изменения в заказе будут консистентными.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| public class Order {
private OrderId id;
private CustomerId customerId;
private List<OrderLine> orderLines;
private OrderStatus status;
// Конструкторы, геттеры
public void addProduct(Product product, int quantity) {
// Бизнес-логика добавления товара
if (status != OrderStatus.DRAFT) {
throw new IllegalStateException("Can only modify draft orders");
}
OrderLine existingLine = findOrderLineByProduct(product);
if (existingLine != null) {
existingLine.increaseQuantity(quantity);
} else {
orderLines.add(new OrderLine(product, quantity));
}
}
public void submit() {
// Валидация и изменение статуса
if (orderLines.isEmpty()) {
throw new IllegalStateException("Cannot submit empty order");
}
status = OrderStatus.SUBMITTED;
}
// Другие методы
} |
|
В этом примере Order — корень агрегата, инкапсулирующий бизнес-логику работы с заказом. Команда SubmitOrderCommand будет обрабатываться методом submit() , который проверит все необходимые условия.
Для синхронизации данных между моделями команд и запросов существует несколько подходов. Самый простой — прямое обновление моделей чтения в транзакции обработки команды. Но это нарушает принцип разделения ответственности и создает сильную связанность между компонентами. Более эффективный подход к синхронизации — использование событий. После успешной обработки команды и изменения модели записи генерируется событие (или несколько), которое асинхронно обрабатывается для обновления модели чтения. Это естественно вписывается в событийно-ориентированную архитектуру и обеспечивает слабую связанность компонентов.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| // Обработчик команды
public void handle(SubmitOrderCommand command) {
Order order = orderRepository.findById(command.getOrderId());
order.submit();
orderRepository.save(order);
// Публикация события
eventBus.publish(new OrderSubmittedEvent(order.getId(), order.getCustomerId()));
}
// Обработчик события для обновления модели чтения
public void on(OrderSubmittedEvent event) {
OrderView orderView = orderViewRepository.findById(event.getOrderId());
orderView.setStatus("SUBMITTED");
orderView.setSubmissionDate(LocalDateTime.now());
orderViewRepository.save(orderView);
} |
|
В примере выше команда обрабатывается, изменяя состояние агрегата Order, затем публикуется событие, которое асинхронно обновляет модель чтения. Это решение позволяет масштабировать подсистемы чтения и записи независимо друг от друга.
Конечно, у асинхронной синхронизации есть и обратная сторона — временное расхождение данных между моделями. Пользователь может отправить команду и сразу же попробовать прочитать обновленные данные, но они еще не отразились в модели чтения. Существует несколько стратегий решения этой проблемы:
1. Оптимистичное обновление UI на стороне клиента.
2. Отложенная консистентность с индикацией для пользователя.
3. Запрос непосредственно к модели записи для критичных операций.
4. Использование механизмов отслеживания завершения обработки событий.
Особое внимание при проектировании CQRS-систем нужно уделять моделям чтения. Они должны быть максимально оптимизированы под конкретные сценарии использования. Часто это приводит к созданию множества специализированых представлений данных. Для одной доменной сущности может существовать несколько моделей чтения, каждая из которых заточена под конкретный UseCase.
Например, для сущности "Товар" в интернет-магазине могут понадобиться разные модели чтения:- КраткаяКарточкаТовара — для списка товаров в категории.
- ДетальнаяКарточкаТовара — для страницы товара.
- ТоварВКорзине — для отображения в корзине.
- ТоварВПанелиАдминистратора — для управления каталогом.
Каждая из этих моделей содержит только те данные, которые необходимы для конкретного представления, что ускоряет выборку и обработку информации. Из своего опыта могу сказать, что проектирование моделей чтения нужно начинать с анализа UI и пользовательских сценариев. Я обычно собираю все экраны приложения и для каждого определяю, какие данные нужны и в каком формате. Затем группирую похожие запросы и проектирую соответствующие модели чтения и интерфейсы репозитория.
Для работы с CQRS существует ряд паттернов и практик. Одна из них — использование шины команд и запросов для маршрутизации к соответствующим обработчикам. Это добавляет гибкость и облегчает тестирование.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // Шина команд
public class CommandBus {
private Map<Class<? extends Command>, CommandHandler> handlers = new HashMap<>();
public <T extends Command> void register(Class<T> commandClass, CommandHandler<T> handler) {
handlers.put(commandClass, handler);
}
public void dispatch(Command command) {
CommandHandler handler = handlers.get(command.getClass());
if (handler == null) {
throw new IllegalStateException("No handler for " + command.getClass());
}
handler.handle(command);
}
} |
|
Аналогичная шина может быть реализована и для запросов. В более сложной реализации можно добавить валидацию, логирование, обработку ошибок и другую инфраструктурную функциональность. Помимо шин, часто используются паттерны Command Handler, Query Handler, Event Handler для обработки соответствующих сообщений. Они инкапсулируют логику обработки и делают код более модульным и тестируемым.
Вот что важно понимать: CQRS — это не "все или ничего". Можно начать с частичного внедрения для тех частей системы, где разделение моделей чтения и записи даст наибольший эффект. В одном из проектов мы применили CQRS только для модуля отчетности, где требовалось агрегировать данные из разных источников. Это позволило значительно улучшить производительность без полной переработки системы. В итоге CQRS — мощный инструмент в арсенале архитектора, который при правильном применении может значительно улучшить масштабируемость, производительность и поддерживаемость системы. Однако нужно трезво оценивать его необходимость и готовность команды работать с более сложной архитектурой.
Практическая реализация на Java
Структура проекта с CQRS обычно выглядит немного иначе, чем у традиционных приложений. Давайте создадим базовую структуру для нашего примера:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| com.example.cqrs
├── command
│ ├── model
│ ├── handler
│ └── repository
├── query
│ ├── model
│ ├── handler
│ └── repository
├── event
│ ├── model
│ └── handler
├── domain
│ └── model
└── infrastructure
├── bus
├── config
└── persistence |
|
Такая организация явно разделяет код для команд и запросов, что сразу дает понять новым разработчикам, что проект использует CQRS. В реальном мире структура может быть сложнее и адаптирована под конкретную команду, но этот шаблон хорошо работает как отправная точка.
Начнем с определения команды. Команда — это простой объект (DTO), содержащий данные, необходимые для выполнения операции.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| package com.example.cqrs.command.model;
public class CreateProductCommand {
private final String productId;
private final String name;
private final BigDecimal price;
private final String category;
// Конструктор, геттеры
public CreateProductCommand(String productId, String name,
BigDecimal price, String category) {
this.productId = productId;
this.name = name;
this.price = price;
this.category = category;
}
// Геттеры опущены для краткости
} |
|
Заметьте — команда неизменяемая (immutable), содержит только необходимые данные и не имеет никакой логики. Второй важный момент — команда именуется в повелительном наклонении, чтобы отразить намерение.
Теперь определим обработчик команды, который будет содержать бизнес-логику:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| package com.example.cqrs.command.handler;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class CreateProductCommandHandler {
private final ProductRepository productRepository;
private final EventBus eventBus;
public CreateProductCommandHandler(ProductRepository productRepository,
EventBus eventBus) {
this.productRepository = productRepository;
this.eventBus = eventBus;
}
@Transactional
public void handle(CreateProductCommand command) {
// Проверка бизнес-правил
if (productRepository.existsByName(command.getName())) {
throw new DuplicateProductException("Product with name already exists");
}
// Создание доменной модели
Product product = new Product(
new ProductId(command.getProductId()),
command.getName(),
command.getPrice(),
command.getCategory()
);
// Сохранение
productRepository.save(product);
// Публикация события
eventBus.publish(new ProductCreatedEvent(
product.getId().getValue(),
product.getName(),
product.getPrice(),
product.getCategory()
));
}
} |
|
Обрабочик получает команду, выполняет необходимые проверки, создает доменную модель, сохраняет её и публикует событие. Опытная команда могла заметить, что я использовал Spring для внедрения зависимостей и управления транзакциями — это стандартная практика в Java-мире.
Интересная деталь: вместо передачи обработчику сырых строк и чисел, мы создаем доменные объекты, такие как ProductId . Это сразу предотвращает множество ошибок, связанных с неправильными типами данных и валидацией.
Теперь переходим к модели запроса:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
| package com.example.cqrs.query.model;
public class GetProductQuery {
private final String productId;
public GetProductQuery(String productId) {
this.productId = productId;
}
public String getProductId() {
return productId;
}
} |
|
И соответствующий обработчик запроса:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| package com.example.cqrs.query.handler;
import org.springframework.stereotype.Service;
@Service
public class GetProductQueryHandler {
private final ProductViewRepository repository;
public GetProductQueryHandler(ProductViewRepository repository) {
this.repository = repository;
}
public ProductView handle(GetProductQuery query) {
return repository.findById(query.getProductId())
.orElseThrow(() -> new ProductNotFoundException(query.getProductId()));
}
} |
|
Обратите внимание на различие: обработчик команды ничего не возвращает, а вот обработчик запроса возврощает данные. Кроме того, они работают с разными репозиториями и моделями! `ProductRepository` хранит доменную модель, а ProductViewRepository — модель представления.
Для связывания всех компонентов используется шина команд или запросов. В Spring это можно реализовать через ApplicationEventPublisher или использовать библиотеки вроде Axon Framework. Вот простая реализация шины команд:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| package com.example.cqrs.infrastructure.bus;
import org.springframework.stereotype.Component;
@Component
public class CommandBus {
private final Map<Class<? extends Command>, CommandHandler<? extends Command>> handlers =
new HashMap<>();
@SuppressWarnings("unchecked")
public <T extends Command> void register(Class<T> commandType, CommandHandler<T> handler) {
handlers.put(commandType, handler);
}
@SuppressWarnings("unchecked")
public <T extends Command> void dispatch(T command) {
CommandHandler<T> handler = (CommandHandler<T>) handlers.get(command.getClass());
if (handler == null) {
throw new NoHandlerForCommandException(command.getClass());
}
handler.handle(command);
}
} |
|
Для синхронизации моделей чтения и записи мы используем события. Когда обработчик команды публикует событие, оно перехватывается обработчиком событий, который обновляет модель чтения:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| package com.example.cqrs.event.handler;
import org.springframework.stereotype.Service;
@Service
public class ProductEventHandler {
private final ProductViewRepository viewRepository;
public ProductEventHandler(ProductViewRepository viewRepository) {
this.viewRepository = viewRepository;
}
@EventListener
public void on(ProductCreatedEvent event) {
ProductView view = new ProductView(
event.getProductId(),
event.getName(),
event.getPrice(),
event.getCategory()
);
viewRepository.save(view);
}
@EventListener
public void on(ProductUpdatedEvent event) {
ProductView view = viewRepository.findById(event.getProductId())
.orElseThrow();
view.setName(event.getName());
view.setPrice(event.getPrice());
view.setCategory(event.getCategory());
viewRepository.save(view);
}
} |
|
Для большей гибкости в Spring Boot можно использовать интерфейс ApplicationEventPublisher и аннотацию @EventListener , что дает встроенную шину событий без дополнительной инфраструктуры. В реальных проетках часто используются более продвинутые очереди сообщений, такие как RabbitMQ или Kafka, особенно если нужна надежная доставка и горизонтальное масштабирование.
Важный аспект реализации CQRS — это отладка и тестирование. Изолированные модели чтения и записи проще тестировать, но интеграционное тестирование становится сложнее из-за асинхронного характера обновлений. Для модульных тестов каждого обработчика можно использовать моки и Mockito:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| @Test
public void testCreateProductCommandHandler() {
// Arrange
ProductRepository repository = mock(ProductRepository.class);
EventBus eventBus = mock(EventBus.class);
when(repository.existsByName("Test Product")).thenReturn(false);
CreateProductCommandHandler handler =
new CreateProductCommandHandler(repository, eventBus);
CreateProductCommand command = new CreateProductCommand(
"123", "Test Product", BigDecimal.TEN, "Electronics"
);
// Act
handler.handle(command);
// Assert
verify(repository).save(any(Product.class));
verify(eventBus).publish(any(ProductCreatedEvent.class));
} |
|
Для интеграционного тестирования CQRS систем можно использовать такой подход: запускать полную последовательность команда → событие → обновление модели чтения, а затем проверять, что модель чтения обновилась корректно. Поскольку обновление может быть асинхронным, иногда приходится добавлять ожидание или использовать библиотеки вроде Awaitility:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| @SpringBootTest
public class ProductIntegrationTest {
@Autowired private CommandBus commandBus;
@Autowired private QueryBus queryBus;
@Test
public void createProductShouldUpdateReadModel() {
// Arrange
String productId = UUID.randomUUID().toString();
CreateProductCommand command = new CreateProductCommand(
productId, "Test Product", BigDecimal.valueOf(99.99), "Test Category"
);
// Act
commandBus.dispatch(command);
// Assert with waiting
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
ProductView product = queryBus.dispatch(new GetProductQuery(productId));
assertNotNull(product);
assertEquals("Test Product", product.getName());
assertEquals(BigDecimal.valueOf(99.99), product.getPrice());
});
}
} |
|
Другой интересный аспект, которой часто упускают при реализации CQRS — очереди сообщений для команд. В высоконагруженных системах обработка команд может занимать время, и синхронное выполнение будет блокировать потоки. Решение — асинхронная обработка через очереди:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| @Service
public class AsyncCommandBus implements CommandBus {
private final JmsTemplate jmsTemplate;
public AsyncCommandBus(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@Override
public void dispatch(Command command) {
jmsTemplate.convertAndSend("command-queue", command);
}
}
@Component
@JmsListener(destination = "command-queue")
public class CommandMessageListener {
private final Map<Class<? extends Command>, CommandHandler<? extends Command>> handlers;
// ... инициализация обработчиков ...
@SuppressWarnings("unchecked")
public void onMessage(Command command) {
CommandHandler<Command> handler =
(CommandHandler<Command>) handlers.get(command.getClass());
if (handler != null) {
handler.handle(command);
} else {
throw new NoHandlerForCommandException(command.getClass());
}
}
} |
|
Мой опыт показывает, что реализация CQRS не обязательно требует сложной инфраструктуры, особенно на начальных этапах. Spring Boot предоставляет достаточно возможностей для построения работающего прототипа.
При реализации модели чтения важно помнить о проекциях — представлениях данных, оптимизированных под конкретные запросы. Вот пример реализации проекции для категории с продуктами:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| @Entity
@Table(name = "category_with_products_view")
public class CategoryWithProductsView {
@Id
private String categoryId;
private String categoryName;
@ElementCollection
@CollectionTable(name = "category_product_view")
private List<ProductSummary> products = new ArrayList<>();
// Конструкторы, геттеры, сеттеры
}
@Embeddable
public class ProductSummary {
private String productId;
private String name;
private BigDecimal price;
private String imageUrl;
// Конструкторы, геттеры, сеттеры
} |
|
Такая денормализованная структура позволяет загрузить категорию со всеми продуктами одним запросом, что критично для производительности UI.
Обработка конкуретных обновлений — ещё одна важная тема в CQRS. Когда несколько команд одновременно модефицируют один агрегат, может возникнуть состояние гонки. Классическое решение — использование оптимистичной блокировки:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
| @Entity
public class Product {
@Id
private String id;
private String name;
private BigDecimal price;
private String category;
@Version
private Long version;
// Методы
} |
|
Аннотация @Version указывает JPA использовать оптимистичные блокировки. При попытке сохранить сущность с устаревшей версией будет выброшено исключение OptimisticLockException , которое можно обработать и предпринять нужные действия.
В больших системах модели чтения и записи часто хранятся в разных базах данных. Например, доменная модель может использовать реляционную БД для обеспечения ACID-транзакций, а модели чтения — NoSQL для лучшей производительности и масштабирования. Конфигурация PostgreSQL для модели записи и MongoDB для модели чтения может выглядеть так:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| @Configuration
@EnableJpaRepositories(
basePackages = "com.example.cqrs.command.repository",
entityManagerFactoryRef = "commandEntityManagerFactory"
)
public class CommandDbConfig {
// Конфигурация источника данных для команд
}
@Configuration
@EnableMongoRepositories(
basePackages = "com.example.cqrs.query.repository",
mongoTemplateRef = "queryMongoTemplate"
)
public class QueryDbConfig {
// Конфигурация MongoDB для запросов
} |
|
На практике я встречал много случаев, когда команды пытались внедрить CQRS "потому что это круто", не понимая реальных потребностей проекта. Это приводило к излишней сложности и проблемам поддержки. Мой совет: начинайте с простой архитектуры и внедряйте CQRS постепенно, в тех частях системы, где разделение моделей действительно даст ощутимую пользу. Например, в одном из моих проектов мы вначале рализовали CQRS только для процесса оформления заказа, где была сложная бизнес-логика и высокие требования к консистентности. Модели чтения для каталога товаров и аналетической отчётности добавили позже, когда стало понятно, что они действительно нужны.
Для эффективного использованея CQRS в Java крайне полезно использовать фреймворки, специально предназначенные для этого. Axon Framework — один из наиболее зрелых вариантов, предоставляющий готовую инфраструктуру для команд, запросов, событий и даже Event Sourcing. Spring Cloud Stream также хорошо подходит для построения систем, основанных на событиях.
Хочу поделиться ещё одной техникой, которую я использую в реальных проектах — разделение API для команд и запросов на уровне контроллеров REST:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| @RestController
@RequestMapping("/products/commands")
public class ProductCommandController {
private final CommandBus commandBus;
@PostMapping
public ResponseEntity<Void> createProduct(@RequestBody CreateProductDTO dto) {
String productId = UUID.randomUUID().toString();
commandBus.dispatch(new CreateProductCommand(
productId, dto.getName(), dto.getPrice(), dto.getCategory()
));
return ResponseEntity.accepted().build();
}
}
@RestController
@RequestMapping("/products")
public class ProductQueryController {
private final QueryBus queryBus;
@GetMapping("/{id}")
public ResponseEntity<ProductView> getProduct(@PathVariable String id) {
try {
ProductView product = queryBus.dispatch(new GetProductQuery(id));
return ResponseEntity.ok(product);
} catch (ProductNotFoundException e) {
return ResponseEntity.notFound().build();
}
}
} |
|
Интеграция с Event Sourcing
Event Sourcing — это подход к хранению данных, который идеально дополняет CQRS. Если обычные системы хранят текущее состояние объектов, то Event Sourcing сохраняет историю всех событий, которые привели к этому состоянию. Представьте банковский счёт: традиционный подход сохраняет только текущий баланс, а Event Sourcing — всю историю транзакций, по которой в любой момент можно восстановить баланс. Когда я впервые столкнулся с этим паттерном, он показался мне крайне странным. Зачем усложнять? Но потом пришло озарение — Event Sourcing решает целый класс проблем, которые трудно преодолеть в традиционной архитектуре. Сочетание CQRS и Event Sourcing даёт синергию: CQRS разделяет модели чтения и записи, а Event Sourcing обеспечивает надежный источник правды для обеих моделей. События выступают естественным "клеем", связывающим эти компоненты.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| // Агрегат с Event Sourcing
public class ProductAggregate {
private String id;
private String name;
private BigDecimal price;
private List<DomainEvent> changes = new ArrayList<>();
// Конструктор загружает агрегат из истории событий
public ProductAggregate(List<DomainEvent> events) {
// Применяем события к агрегату для воссоздания текущего состояния
events.forEach(this::apply);
}
// Обработка команды
public void createProduct(CreateProductCommand command) {
// Валидация
if (name != null) {
throw new IllegalStateException("Product already exists");
}
// Создаем и применяем событие
ProductCreatedEvent event = new ProductCreatedEvent(
command.getProductId(),
command.getName(),
command.getPrice(),
command.getCategory()
);
apply(event);
changes.add(event);
}
// Применение события к состоянию агрегата
private void apply(DomainEvent event) {
if (event instanceof ProductCreatedEvent) {
this.id = ((ProductCreatedEvent) event).getProductId();
this.name = ((ProductCreatedEvent) event).getName();
this.price = ((ProductCreatedEvent) event).getPrice();
} else if (event instanceof ProductPriceChangedEvent) {
this.price = ((ProductPriceChangedEvent) event).getNewPrice();
}
// ... другие типы событий
}
// Метод для получения непримененных изменений
public List<DomainEvent> getUncommittedChanges() {
return new ArrayList<>(changes);
}
// Метод для фиксации изменений
public void markChangesAsCommitted() {
changes.clear();
}
} |
|
В этом примере агрегат восстанавливается из истории событий и генерирует новые события при обработке команд. Такой подход обеспечивает полную аудируемость — мы точно знаем, когда и почему изменилось состояние системы.
Для реализации Event Sourcing в Java существует несколько фреймворков, но Axon Framework, пожалуй, самый популярный. Он предоставляет готовую инфраструктуру для работы с CQRS и Event Sourcing, включая хранение событий, управление агрегатами и обработку событий. Вот пример агрегата в Axon:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| @Aggregate
public class Product {
@AggregateIdentifier
private String id;
private String name;
private BigDecimal price;
@CommandHandler
public Product(CreateProductCommand command) {
apply(new ProductCreatedEvent(
command.getProductId(),
command.getName(),
command.getPrice(),
command.getCategory()
));
}
@CommandHandler
public void changePrice(ChangePriceCommand command) {
apply(new ProductPriceChangedEvent(id, command.getNewPrice()));
}
@EventSourcingHandler
protected void on(ProductCreatedEvent event) {
this.id = event.getProductId();
this.name = event.getName();
this.price = event.getPrice();
}
@EventSourcingHandler
protected void on(ProductPriceChangedEvent event) {
this.price = event.getNewPrice();
}
} |
|
Axon значительно упрощает работу с CQRS и Event Sourcing благодаря аннотациям и конвенциям. Фреймворк автоматически обрабатывает восстановление агрегатов из событий и сохранение новых событий.
Обработка ошибок — один из самых сложных аспектов Event Sourcing. Что если произошла ошибка после того, как событие сохранено, но до того, как обновлена модель чтения? Или если событие содержит некорректные данные? В этих случаях возникает расхождение между моделями. В моей практике хорошо работает стратегия компенсирующих событий. Вместо изменения уже сохраненных событий (это нарушает принцип Event Sourcing) мы создаем новое событие, которое корректирует ситуацию:
Java | 1
2
3
4
5
6
7
8
9
| // Компенсирующее событие
public class ProductCorrectionEvent extends DomainEvent {
private final String productId;
private final String correctName;
private final BigDecimal correctPrice;
private final String reason;
// ... конструктор, геттеры
} |
|
Для обработки ошибок синхронизации между командной моделью и моделью запросов часто применяют механизм отслеживания и синхронизации. Например, периодическая задача проверяет целостность данных и восстанавливает проекции при необходимости:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| @Component
@Scheduled(fixedRate = 60000) // Раз в минуту
public class ReadModelIntegrityChecker {
private final EventStore eventStore;
private final ProductViewRepository viewRepo;
public void checkAndRepair() {
// Получаем все ID продуктов
List<String> productIds = eventStore.getAllProductIds();
// Проверяем каждый продукт
for (String id : productIds) {
// Проверяем наличие в модели чтения
if (!viewRepo.existsById(id)) {
// Восстанавливаем из событий
List<DomainEvent> events = eventStore.getEventsForProduct(id);
createOrUpdateProductView(events);
}
}
}
} |
|
С ростом числа событий возникает проблема производительности — восстановление состояния из тысяч событий может занять много времени. Решение — использовать снимки состояния (snapshots), которые сохраняют текущее состояние агрегата после определённого количества событий:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| @Component
public class SnapshotStrategy {
private final EventStore eventStore;
private final SnapshotRepository snapshotRepo;
private final int threshhold = 100; // Создавать снимок каждые 100 событий
public void handleNewEvents(String aggregateId, long eventCount) {
if (eventCount % threshhold == 0) {
// Запрашиваем все события
List<DomainEvent> events = eventStore.getEventsForAggregate(aggregateId);
// Восстанавливаем состояние
ProductAggregate aggregate = new ProductAggregate(events);
// Сохраняем снимок
snapshotRepo.save(new ProductSnapshot(aggregateId, aggregate.getState()));
}
}
} |
|
При загрузке агрегата сначала загружается последний снимок, затем применяются только те события, которые произошли после создания снимка.
Event Sourcing открывает мощные возможности для отладки и мониторинга. Имея полную историю событий, можно воспроизвести любую проблему в изолированной среде, определить точный момент, когда что-то пошло не так, и даже "переиграть" историю с исправленной логикой. Для мониторинга и отладки событий мы использовали такой подход:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| @Aspect
@Component
public class EventMonitor {
private final Logger logger = LoggerFactory.getLogger(EventMonitor.class);
private final MetricRegistry metrics;
@Pointcut("execution(* com.example.cqrs.event.EventStore.save(..))")
public void eventSaved() {}
@Before("eventSaved() && args(event)")
public void beforeEventSaved(DomainEvent event) {
// Инкрементируем счетчик по типу события
metrics.counter("events." + event.getClass().getSimpleName()).inc();
// Логируем событие
logger.info("Saving event: {}", event);
}
} |
|
Event Sourcing — не просто механизм хранения, это принципиально иной взгляд на моделирование данных. Вместо изменяемого состояния мы фокусируемся на бизнес-событиях и их последствиях. Это сближает код с бизнес-процессами и делает систему более понятной для предметных экспертов.
Эффективный дизайн проекций в Event Sourcing
Одной из самых сложных задач при работе с Event Sourcing является правильное проектирование проекций. Проекции – это оптимизированные для чтения представления данных, которые создаются на основе потока событий. В отличие от традиционных моделей, проекции в Event Sourcing часто денормализованы и заточены под конкретные сценарии использования. Есть несколько стратегий создания проекций:
1. Синхронные проекции – обновляются в той же транзакции, что и события. Это обеспечивает немедленную согласованность, но может замедлять обработку команд.
2. Асинхронные проекции – обновляются обработчиками событий вне транзакции записи. Обеспечивают лучшую производительность, но вводят временные окна несогласованности.
3. Вычисляемые проекции – создаются "на лету" при запросе путем применения всех релевантных событий. Это даёт максимальную гибкость, но может быть медленным.
4. Кэшированные проекции – гибрид, где состояние сохраняется, но при необходимости может быть пересчитано из событий.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| @Component
public class ProductCatalogProjection {
private final ProductViewRepository viewRepository;
@EventHandler
public void on(ProductCreatedEvent event) {
ProductView view = new ProductView();
view.setId(event.getAggregateId());
view.setName(event.getName());
view.setPrice(event.getPrice());
view.setCategory(event.getCategory());
view.setCreatedAt(event.getTimestamp());
view.setUpdatedAt(event.getTimestamp());
viewRepository.save(view);
}
@EventHandler
public void on(ProductPriceChangedEvent event) {
ProductView view = viewRepository.findById(event.getAggregateId())
.orElseThrow();
view.setPrice(event.getNewPrice());
view.setUpdatedAt(event.getTimestamp());
// Если это специальное предложение, добавляем метку
if (event.isSpecialOffer()) {
view.setSpecialOffer(true);
view.setSpecialOfferEndsAt(event.getSpecialOfferEndsAt());
}
viewRepository.save(view);
}
// Другие обработчики событий
} |
|
Нестандартный пример – построение временных рядов на основе событий. Например, для аналитики продаж можно создать проекцию, которая агрегирует данные о продажах по часам, дням, месяцам:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| @Component
public class SalesStatisticsProjection {
private final StatisticsRepository repository;
private final Clock clock;
@EventHandler
public void on(OrderPlacedEvent event) {
// Обновляем статистику за час
LocalDateTime hour = event.getTimestamp().truncatedTo(ChronoUnit.HOURS);
SalesStatistics hourlyStats = repository
.findByPeriodAndGranularity(hour, Granularity.HOURLY)
.orElse(new SalesStatistics(hour, Granularity.HOURLY));
hourlyStats.incrementOrdersCount();
hourlyStats.addToTotalSales(event.getTotalAmount());
// Обновляем статистику товаров
for (OrderItem item : event.getItems()) {
hourlyStats.addProductSold(item.getProductId(), item.getQuantity());
}
repository.save(hourlyStats);
// Аналогичным образом обновляем дневную и месячную статистику
updateDailyStatistics(event);
updateMonthlyStatistics(event);
}
// ... другие методы
} |
|
Такая проекция позволяет мгновенно получать аналитические данные без необходимости агрегировать их при каждом запросе.
Особый кейс – поисковые проекции. Для полнотекстового поиска удобно создавать специальные проекции в ElasticSearch или Solr:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| @Component
public class ProductSearchProjection {
private final ElasticsearchOperations elasticsearchOperations;
@EventHandler
public void on(ProductCreatedEvent event) {
ProductDocument document = new ProductDocument();
document.setId(event.getAggregateId());
document.setName(event.getName());
document.setCategory(event.getCategory());
document.setPrice(event.getPrice().doubleValue());
document.setTags(extractTags(event.getName(), event.getCategory()));
elasticsearchOperations.save(document);
}
private List<String> extractTags(String name, String category) {
// Извлекаем ключевые слова из названия и категории
// для улучшения поиска
// ...
}
// ... другие обработчики
} |
|
С такой проекцией легко реализовать продвинутый поиск товаров по ключевым словам, фильтрам и фасетам, при этом сохраняя основную логику в CQRS/Event Sourcing системе.
Я должен признать, что в работе с большими объемами событий есть подводные камни. В одном из моих проектов мы столкнулись с проблемой "горячих" агрегатов – объектов, которые обновляются очень часто и генерируют тысячи событий. Например, популярный товар или активный пользователь. Для таких случаев пришлось разработать стратегию "уплотнения историй" – периодического создания сводных событий, которые заменяли бы собой длинные последовательности простых событий.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| @Service
public class EventCompactionService {
@Scheduled(cron = "0 0 2 * * *") // Запуск каждый день в 2 часа ночи
public void compactEventStreams() {
// Находим агрегаты с чрезмерным количеством событий
List<String> hotAggregates = eventStore.findHotAggregates(1000);
for (String aggregateId : hotAggregates) {
compactAggregateEvents(aggregateId);
}
}
private void compactAggregateEvents(String aggregateId) {
// Получаем все события агрегата
List<DomainEvent> events = eventStore.getEventsForAggregate(aggregateId);
// Создаем снапшот состояния
Object aggregate = aggregateService.loadAggregate(aggregateId);
long latestVersion = events.get(events.size() - 1).getVersion();
// Сохраняем сводное событие
CompactionEvent compactionEvent = new CompactionEvent(
aggregateId,
latestVersion,
serialize(aggregate)
);
// Архивируем старые события и сохраняем компактное
eventStore.archiveEvents(aggregateId, 0, latestVersion - 1);
eventStore.save(compactionEvent);
}
} |
|
Такой подход помогает поддерживать производительность систем с Event Sourcing даже при высоких нагрузках и длинных историях событий.
При работе с Event Sourcing важно также учитывать пропускную способность инфраструктуры для обработки событий. В высоконагруженных системах количество генерируемых событий может быть огромным. Для решения этой проблемы полезно использовать шаблон "EventProcessor", который контролирует скорость обработки и группировку событий:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| @Component
public class BatchingEventProcessor {
private final List<DomainEvent> eventBatch = new ArrayList<>();
private final int BATCH_SIZE = 100;
private final EventHandler eventHandler;
@Scheduled(fixedRate = 100) // Проверяем каждые 100 мс
public void processBatch() {
List<DomainEvent> currentBatch;
synchronized (eventBatch) {
if (eventBatch.isEmpty()) {
return;
}
currentBatch = new ArrayList<>(eventBatch);
eventBatch.clear();
}
// Группируем события по типам для оптимизации обработки
Map<Class<?>, List<DomainEvent>> eventsByType = currentBatch.stream()
.collect(Collectors.groupingBy(DomainEvent::getClass));
// Обрабатываем группами
for (Map.Entry<Class<?>, List<DomainEvent>> entry : eventsByType.entrySet()) {
eventHandler.handleBatch(entry.getValue());
}
}
public void addEvent(DomainEvent event) {
synchronized (eventBatch) {
eventBatch.add(event);
// Если достигли размера пакета, запускаем обработку
if (eventBatch.size() >= BATCH_SIZE) {
processBatch();
}
}
}
} |
|
Фреймворки типа Axon также предоставляют продвинутые механизмы обработки событий, включая трекинг-процессоры, которые запоминают позицию в потоке событий и могут продолжать обработку с того места, где остановились, даже после перезапуска приложения. Кстати, о моём опыте: мы однажды столкнулись с интересной проблемой при внедрении Event Sourcing – как тестировать такие системы? Традиционные подходы с моками и заглушками не всегда работают хорошо, когда вы имеете дело с потоками событий. Мы разработали специальный фреймворк для тестирования, который позволял создавать сценарии в виде "дано - когда - тогда", где "дано" – это начальная последовательность событий, "когда" – команда, а "тогда" – ожидаемые новые события.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| @Test
public void shouldCreateProductWhenValidCommand() {
fixture.given() // Пустая история для нового продукта
.when(new CreateProductCommand("prod-1", "Test Product",
BigDecimal.TEN, "Test"))
.expectEvents(new ProductCreatedEvent("prod-1", "Test Product",
BigDecimal.TEN, "Test"));
}
@Test
public void shouldUpdatePriceWhenPriceChangeCommand() {
fixture.given(new ProductCreatedEvent("prod-1", "Test Product",
BigDecimal.TEN, "Test"))
.when(new ChangePriceCommand("prod-1", BigDecimal.valueOf(15.0)))
.expectEvents(new ProductPriceChangedEvent("prod-1",
BigDecimal.valueOf(15.0)));
} |
|
Такой подход делает тесты более четкими и сфокусированными на бизнес-логике, а не на технических деталях.
Оптимизация производительности и масштабирование
Теперь, когда мы разобрались с Event Sourcing и проекциями, пора поговорить о производительности и масштабировании CQRS-систем. В высоконагруженных системах чтение данных обычно происходит гораздо чаще, чем запись. Представьте себе популярный интернет-магазин: на одну покупку приходятся сотни просмотров карточек товаров. Благодаря CQRS мы можем оптимизировать эти операции отдельно друг от друга.
Начну с кэширования. В традиционных системах кэширование часто приводит к головной боли из-за необходимости инвалидировать кэш при изменении данных. В CQRS всё элегантнее: модели чтения можно рассматривать как большой специализированый кэш! Они уже оптимизированы под конкретные запросы и обновляются через обработку событий.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| @Component
public class ProductCacheManager {
private final Cache<String, ProductView> productCache;
public ProductCacheManager() {
// Настраиваем Caffeine с TTL и максимальным размером
this.productCache = Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.HOURS)
.maximumSize(10_000)
.build();
}
@EventHandler
public void on(ProductUpdatedEvent event) {
// Инвалидируем кэш при обновлении товара
productCache.invalidate(event.getProductId());
}
public ProductView getProduct(String productId, Supplier<ProductView> loader) {
return productCache.get(productId, key -> loader.get());
}
} |
|
В этом примере мы используем библиотеку Caffeine для кэширования товаров. Обратите внимание на интересный момент: мы инвалидируем кэш на основе событий! Это гораздо эффективнее традиционного подхода с отслеживанием изменений в БД. Для действительно высоких нагрузок полезно применять многоуровневое кэширование:
1. Локальный кэш в памяти (Caffeine, Guava).
2. Распределенный кэш (Redis, Hazelcast).
3. CDN для статических данных и изображений.
Я однажды работал над системой с 50 миллионов посещений в месяц, и многоуровневое кэширование снизило нагрузку на базу данных почти на 95%! Но важно помнить: кэш — это не серебряная пуля, и он вводит новые проблемы с согласованностью данных.
Поговорим об асинхронной обработке. В CQRS она становится естественной частью архитектуры, особенно при использовании событий для обновления моделей чтения. Ключевой компонент здесь — система обмена сообщениями типа Kafka или RabbitMQ.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| @Configuration
public class EventProcessingConfig {
@Bean
public Consumer<ProductEvent> processProductEvents() {
return event -> {
if (event instanceof ProductCreatedEvent) {
handleProductCreated((ProductCreatedEvent) event);
} else if (event instanceof ProductUpdatedEvent) {
handleProductUpdated((ProductUpdatedEvent) event);
}
};
}
private void handleProductCreated(ProductCreatedEvent event) {
// Асинхронное обновление модели чтения
// ...
}
private void handleProductUpdated(ProductUpdatedEvent event) {
// Асинхронное обновление модели чтения
// ...
}
} |
|
Подобная конфигурация с Spring Cloud Stream позволяет асинхронно обрабатывать события, что разгружает основной поток обработки команд. Мы можем масштабировать число обработчиков событий горизонтально, запуская несколько экземпляров. Но асинхронность привносит свои сложности. Помню случай, когда система клиент->сервер с асинхронным обновлением моделей чтения вызвала путаницу среди пользователей: они выполняли действие, но не видели результат сразу. Пришлось придумать хитрую систему "оптимистичного UI", где фронтенд локально применял изменения и затем сверял их с сервером.
Что касается согласованости данных... В распределенных системах достичь строгой консистентности сложно и дорого. Вместо этого часто используют эвентуальную согласованность (eventual consistency) — гарантию, что в какой-то момент в будущем все реплики данных придут к единому состоянию. CQRS изначально хорошо сочетается с этой концепцией. Модель записи может обеспечивать строгую консистентность внутри агрегатов, а модели чтения могут обновляться асинхронно. При этом важно обеспечить гарантии доставки событий. Если событие потеряется, модель чтения останется в несогласованном состоянии. Решения:
1. Отслеживание позиции в потоке событий (как в Kafka).
2. Идемпотентность обработчиков событий.
3. Периодическая сверка моделей и исправление расхождений.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| @Component
@Scheduled(fixedRate = 3600000) // Каждый час
public class DataConsistencyChecker {
private final EventStore eventStore;
private final ProductViewRepository viewRepository;
private final ProductEventHandler eventHandler;
public void checkConsistency() {
log.info("Starting consistency check...");
// Получаем все ID продуктов из хранилища событий
Set<String> eventProductIds = eventStore.getAllProductIds();
// Получаем все ID из модели чтения
Set<String> viewProductIds = viewRepository.findAllIds();
// Находим отсутствующие в модели чтения
Set<String> missing = new HashSet<>(eventProductIds);
missing.removeAll(viewProductIds);
// Перестраиваем отсутствующие проекции
for (String productId : missing) {
List<DomainEvent> events = eventStore.getEventsForAggregate(productId);
for (DomainEvent event : events) {
eventHandler.handle(event);
}
}
// Находим лишние записи в модели чтения
Set<String> extra = new HashSet<>(viewProductIds);
extra.removeAll(eventProductIds);
// Удаляем лишние проекции
for (String productId : extra) {
viewRepository.deleteById(productId);
}
log.info("Consistency check completed. Fixed {} missing and removed {} extra records",
missing.size(), extra.size());
}
} |
|
Когда дело доходит до работы в микросервисной архитектуре, CQRS становится ещё более привлекательным. Микросервисный подход предполагает декомпозицию системы на небольшие независимые сервисы, каждый из которых отвечает за свою область функциональности. Проблема тут в том, что иногда нужны данные из разных сервисов. Например, страница товара может требовать информацию о самом товаре, отзывах, наличии на складе, рекомендациях — а всё это могут быть отдельные сервисы. В классической архитектуре пришлось бы делать кучу синхронных запросов между сервисами, что увеличивает латентность и снижает надежность. CQRS предлагает элегантное решение: каждый сервис публикует события об изменениях в своей доменной модели, а другие сервисы могут подписываться на эти события и создавать собственные проекции данных. Так сервис товаров может иметь локальную копию отзывов, обновляемую на основе событий от сервиса отзывов.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| // В сервисе отзывов
@Service
public class ReviewService {
private final ReviewRepository repository;
private final EventPublisher eventPublisher;
@Transactional
public void addReview(String productId, String userId, int rating, String comment) {
Review review = new Review(UUID.randomUUID().toString(), productId, userId, rating, comment);
repository.save(review);
// Публикуем событие для других сервисов
eventPublisher.publish(new ReviewAddedEvent(
review.getId(),
review.getProductId(),
review.getUserId(),
review.getRating(),
review.getComment()
));
}
}
// В сервисе товаров
@Service
@KafkaListener(topics = "reviews")
public class ReviewEventHandler {
private final ProductViewRepository repository;
public void handleReviewEvent(ReviewAddedEvent event) {
// Обновляем проекцию товара с информацией об отзыве
ProductView product = repository.findById(event.getProductId())
.orElseThrow();
product.getReviews().add(new ReviewSummary(
event.getReviewId(),
event.getUserId(),
event.getRating(),
event.getComment()
));
product.recalculateAverageRating();
repository.save(product);
}
} |
|
Такая архитектура даёт нам преимущества:
1. Устойчивость к сбоям — если сервис отзывов недоступен, пользователи всё равно могут видеть старые отзывы.
2. Улучшенная производительность — нет необходимости в межсервисных запросах при загрузке страницы.
3. Независимость команд разработки — каждая команда может развивать свой сервис автономно.
Однако, есть и подводные камни. Один из самых коварных — проблема "раздвоения идентичности" данных. Когда одни и те же данные дублируются в нескольких сервисах, легко получить расхождения. Я помню один проект, где цена товара отображалась по-разному в каталоге и корзине из-за задержек в распространении событий. Пользователи были... скажем так, не в восторге.
Для решения подобных проблем в распределенных CQRS-системах можно использовать:
1. Версионирование данных — каждое событие содержит версию измененного объекта.
2. Временные метки — события имеют точное время создания.
3. Последовательное применение — события обрабатываются в строгом порядке их возникновения.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| public abstract class VersionedEvent {
private final String aggregateId;
private final long version;
private final LocalDateTime timestamp;
// ... геттеры, конструкторы
public boolean isNewerThan(VersionedEvent other) {
if (!aggregateId.equals(other.aggregateId)) {
throw new IllegalArgumentException("Cannot compare events of different aggregates");
}
return this.version > other.version;
}
}
@Service
public class VersionAwareEventHandler {
private final Map<String, Long> processedVersions = new ConcurrentHashMap<>();
public void handle(VersionedEvent event) {
// Проверяем, не обрабатывали ли мы уже более новую версию
Long currentVersion = processedVersions.get(event.getAggregateId());
if (currentVersion != null && currentVersion >= event.getVersion()) {
log.info("Skipping obsolete event version {} for aggregate {}, current version is {}",
event.getVersion(), event.getAggregateId(), currentVersion);
return;
}
// Обработка события...
// Запоминаем версию
processedVersions.put(event.getAggregateId(), event.getVersion());
}
} |
|
В распределенных системах также нужно помнить о CAP-теореме: невозможно одновременно гарантировать консистентность (Consistency), доступность (Availability) и устойчивость к разделению (Partition Tolerance). В условиях сетевых разделений приходится выбирать между доступностью и консистентностью. CQRS позволяет сделать этот выбор осознанно: модель записи может жертвовать доступностью ради консистентности (CP-система), а модели чтения могут жертвовать консистентностью ради доступности (AP-система). Это даёт уникальную гибкость в проектировании распределенных систем.
Интересный приём, который я использовал в одном проекте — "согласованное кэширование". Суть в том, что при обработке команды мы рассчитываем, какие проекции будут затронуты, и помечаем их как "ожидающие обновления". UI может проверять этот статус и показывать индикатор загрузки или версию данных.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| @Service
public class ProductCommandService {
private final CommandBus commandBus;
private final ProjectionUpdateTracker updateTracker;
@Transactional
public void updateProduct(UpdateProductCommand command) {
// Отправляем команду
commandBus.dispatch(command);
// Помечаем проекции как ожидающие обновления
updateTracker.markPending("product-details-" + command.getProductId());
updateTracker.markPending("product-catalog");
// Возвращаем клиенту идентификатор обновления
return updateTracker.getCurrentUpdateId();
}
}
// На клиенте
async function updateProduct(product) {
const updateId = await api.updateProduct(product);
// Ожидаем обновления проекций
await api.waitForProjectionUpdate(updateId);
// Теперь можно загружать актуальные данные
const updatedProduct = await api.getProduct(product.id);
// Обновляем UI
renderProduct(updatedProduct);
} |
|
Если приложение должно быть устойчиво к сетевым задержкам, имеет смысл хранить журнал неподтвержденных команд на клиенте и применять локальные изменения оптимистично, а затем синхронизировать их с сервером, когда соединение станет доступным. Этот подход особенно полезен для мобильных приложений.
Ещё один важный аспект масштабирования CQRS-систем — шардирование данных. Модели чтения и записи можно шардировать по разным ключам, в зависимости от паттернов доступа. Например, модель записи может шардироваться по ID пользователя (владельца), а модель чтения — по категории или геолокации.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| @Configuration
public class ShardingConfig {
@Bean
@Profile("command-node")
public DataSource commandDataSource() {
ShardedDataSource dataSource = new ShardedDataSource();
// Настраиваем шардирование по customer_id
dataSource.setShardingStrategy(new CustomerShardingStrategy());
// Добавляем шарды
dataSource.addShard("shard1", createDataSource("jdbc:mysql://shard1:3306/orders"));
dataSource.addShard("shard2", createDataSource("jdbc:mysql://shard2:3306/orders"));
return dataSource;
}
@Bean
@Profile("query-node")
public DataSource queryDataSource() {
ShardedDataSource dataSource = new ShardedDataSource();
// Настраиваем шардирование по product_category
dataSource.setShardingStrategy(new CategoryShardingStrategy());
// Добавляем шарды
dataSource.addShard("electronics", createDataSource("jdbc:mysql://read1:3306/products"));
dataSource.addShard("clothing", createDataSource("jdbc:mysql://read2:3306/products"));
return dataSource;
}
// ... вспомогательные методы
} |
|
Тут, конечно, я немного упростил — в реальных проектах обычно используются готовые решения для шардирования вроде Vitess для MySQL или шарднигов в MongoDB и Cassandra.
Финальный аккорд в оптимизации CQRS-систем — мониторинг и наблюдаемость. Отдельные модели чтения и записи, асинхронные обновления, распределенные данные — всё это делает отладку сложнее. В одном из проектов я внедрил систему распределенной трассировки на базе Jaeger, которая позволяла отслеживать путь команды через всю систему: от API-запроса через обработку команды, генерацию события, до обновления всех моделей чтения.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| @Aspect
@Component
public class CommandTracingAspect {
private final Tracer tracer;
@Around("execution(* com.example.cqrs.command.handler.*.*(..))")
public Object traceCommand(ProceedingJoinPoint pjp) throws Throwable {
String commandName = pjp.getArgs()[0].getClass().getSimpleName();
// Создаем спан трассировки
Span span = tracer.buildSpan("CommandHandler: " + commandName)
.withTag("command.type", commandName)
.start();
try (Scope scope = tracer.activateSpan(span)) {
return pjp.proceed();
} catch (Exception e) {
span.setTag("error", true);
span.log(Map.of(
"event", "error",
"error.kind", e.getClass().getName(),
"message", e.getMessage()
));
throw e;
} finally {
span.finish();
}
}
} |
|
Калькулятор используя паттерн проектирования Цепочка обязанностей (Chain of Responsibility) Всем привет, подскажите пожалуйста может кто в курсе как можно реализовать Калькулятор при помощи... Query java почему не работает метод findAll (classCastException cannot be cust to util.list ) и как написать... QraphQL java query на куда писать url? Всем привет!
Есть сайт на strapi, мне надо данные получить через http://localhost:1337/graphql... Конвертеры на Java для: Java->PDF, DBF->Java Буду признателен за любые ссылки по сабжу.
Заранее благодарен. Ошибка reference to List is ambiguous; both interface java.util.List in package java.util and class java.awt.List in... Почему кгда я загружаю пакеты awt, utill вместе в одной проге при обьявлении елемента List я ловлю... Какую версию Java поддерживает .Net Java# И какую VS6.0 Java++ ? Какую версию Java поддерживает .Net Java# И какую VS6.0 Java++ ?
Ответье, плиз, новичку, по MSDN... java + jni. считывание значений из java кода и работа с ним в c++ с дальнейшим возвращением значения в java Работаю в eclipse с android sdk/ndk.
как импортировать в java файл c++ уже разобрался, не могу... Exception in thread "main" java.lang.IllegalArgumentException: illegal component position at java.desktop/java.awt.Cont import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import... Паттерн Command Добрый вечер. Наткнулся на такой паттерн, говорят его нужно использовать если в классе много... Установка jr30l.sh, ошибка: uncompress: command not found После запуска установочного скрипта jr30l.sh, получаем uncompress: command not found... Как с этим... Gradle command line есть консольное приложение
public class Main {
public static void main(String args) throws... [Command pattern & Web App] Как в MVC фреймворках реализованы понятные пользователю URL? Когда я писал на Spring MVC, я просто в методе соотвествующего контроллера мапил url к методу....
|