За последние несколько лет сложность корпоративных приложений выросла в геометрической прогрессии. Простые монолитные системы уступили место распределенным микросервисам, а нагрузка на корпоративные приложения достигла невиданных ранее масштабов. Трациционные подходы к проектированию, основаные на CRUD-операциях, часто не справляются с такими требованиями. В ответ на эти вызовы появились архитектурные подходы CQRS и Event Sourcing, позволяющие создавать системы нового поколения.
Основы CQRS
CQRS (Command Query Responsibility Segregation) - это архитектурный паттерн, который в корне меняет наш подход к взаимодействию с данными. Если упростить до предела, суть его в следующем: разделяй операции чтения и записи. Звучит банально, но последствия такого разделения могут быть поразительными.
Классическая модель, к которой мы привыкли, подразумевает единую модель данных для чтения и записи. Мы создаем репозиторий, который умеет и сохранять, и читать одни и те же сущности. CQRS же предлагает радикально иной подход - использовать разные модели для операций чтения (queries) и записи (commands). Команды в CQRS описывают намерение изменить состояние системы. Это не просто DTO для передачи данных, а объекты, которые несут в себе смысловую нагрузку: "ЗарегистрироватьПользователя", "ОформитьЗаказ", "ОтменитьПодписку". Команды обрабатываются специальными обработчиками (command handlers), которые содержат в себе бизнес-логику.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| public class CreateOrderCommand
{
public Guid CustomerId { get; set; }
public List<OrderItem> Items { get; set; }
public string ShippingAddress { get; set; }
}
public class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
{
private readonly IOrderRepository _repository;
public CreateOrderCommandHandler(IOrderRepository repository)
{
_repository = repository;
}
public async Task Handle(CreateOrderCommand command)
{
var order = new Order(command.CustomerId, command.Items, command.ShippingAddress);
await _repository.SaveAsync(order);
}
} |
|
Запросы (queries) не меняют состояние системы, Они лишь извлекают данные в форме, максимально удобной для конкретного сценария использования. И тут начинается самое интересное - модели для чтения могут быть полностью денормализованными, оптимизированными под конкретные запросы.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| public class GetOrderDetailsQuery
{
public Guid OrderId { get; set; }
}
public class GetOrderDetailsQueryHandler : IQueryHandler<GetOrderDetailsQuery, OrderDetailsDto>
{
private readonly IReadDbContext _readDb;
public GetOrderDetailsQueryHandler(IReadDbContext readDb)
{
_readDb = readDb;
}
public async Task<OrderDetailsDto> Handle(GetOrderDetailsQuery query)
{
return await _readDb.OrderDetails
.Where(o => o.Id == query.OrderId)
.FirstOrDefaultAsync();
}
} |
|
Такое разделение даёт невероятную гибкость. Модели чтения можно оптимизировать под конкретные запросы, делая их максимально плоскими и денормализованными. Вы можете забыть о сложных JOIN-ах и многоуровневых выборках. Это просто не нужно, когда ваша модель для чтения уже содержит все необходимые данные в нужной структуре. Кроме того, CQRS позволяет нам применять разные стратегии масштабирования для операций чтения и записи. Обычно чтений в системе на порядки больше, чем записей. Поэтому мы можем горизонтально масштабировать именно чтение, разворачивая дополнительные экземпляры баз данных, оптимизированных для быстрого поиска.
Но за эти преимущества приходится платить. Основной недостаток CQRS - повышенная сложность. Вместо одной модели данных вам придется поддерживать как минимум две, а часто и больше. К тому же, возникает проблема синхронизации между моделями записи и чтения. Когда вы изменяете данные через команду, эти изменения должны отразиться в модели чтения. И тут возникает интересный момент - эта синхронизация не обязательно должна быть мгновенной. Да, я не ошибся. В большинстве реальных систем с CQRS синхронизация между моделями записи и чтения происходит с некоторой задержкой, то есть асинхронно. Это называется "eventual consistency" (итоговая согласованность). После выполнения команды может пройти некоторое время, прежде чем изменения отразятся в модели для чтения. Это нормально и даже полезно с точки зрения производительности и масштабируемости.
Не все готовы к такому компромиссу. Помню случай, когда заказчик требовал, чтобы при создании заказа пользователь сразу видел его в списке заказов. Но при асинхронной синхронизации может возникнуть ситуация, когда пользователь создал заказ, но на странице списка заказов его еще нет. Пришлось изобретать обходные пути - например, оптимистично добавлять заказ на фронтенде, пока не придет подтверждение с сервера.
Реализация CQRS в .NET Core значительно упрощается благодаря встроенной поддержке внедрения зависимостей. Вот как выглядит регистрация обработчиков команд и запросов:
| C# | 1
2
3
4
5
6
7
8
9
10
| public void ConfigureServices(IServiceCollection services)
{
// Регистрация обработчиков команд
services.AddTransient<ICommandHandler<CreateOrderCommand>, CreateOrderCommandHandler>();
services.AddTransient<ICommandHandler<CancelOrderCommand>, CancelOrderCommandHandler>();
// Регистрация обработчиков запросов
services.AddTransient<IQueryHandler<GetOrderDetailsQuery, OrderDetailsDto>, GetOrderDetailsQueryHandler>();
services.AddTransient<IQueryHandler<GetCustomerOrdersQuery, List<OrderSummaryDto>>, GetCustomerOrdersQueryHandler>();
} |
|
Конечно, при большом количестве команд и запросов такой подход становится утомительным. Поэтому часто используют библиотеки вроде MediatR, которые автоматизируют регистрацию обработчиков:
| C# | 1
2
3
4
| public void ConfigureServices(IServiceCollection services)
{
services.AddMediatR(typeof(Startup).Assembly);
} |
|
При анализе производительности CQRS-систем я часто замечаю, что узким местом становится не сама обработка команд или запросов, а передача данных между разными моделями. Особенно это заметно, когда команды вызывают сложные бизнес-процессы, которые в свою очередь порождают множество событий, требующих обновления нескольких проекций для чтения. Существует несколько подходов к решению этой проблемы. Один из них - использование очередей сообщений для асинхронной обработки событий. Другой - оптимизация структуры моделей для чтения, чтобы минимизировать количество обновляемых записей.
Говоря о типичных ошибках при внедрении CQRS, часто встречаю попытки применить паттерн ко всей системе целиком, даже к тем частям, где он не нужен. CQRS - это мощный инструмент, но не серебряная пуля. Используйте его там, где действительно есть значительное расхождение между операциями чтения и записи, или где требуется особая гибкость в плане масштабирования. Не менее распространенная ошибка - чрезмерное усложнение команд. Команды должны быть атомарными и выражать одно конкретное намерение. Если вы создаете команды наподобие "УправлятьВсейСистемой", значит, что-то идет не так.
CQRS Добрый день. Подскажите, где можно почитать материал по этой теме, для написания курсовой. Нужна информация по Event Sourcing DDD, CQRS и Event Sourcing - это три термина, вокруг которых крутится моя дипломная работа. Ни об... WebBrowser не поддерживает Event MouseDown и Event MouseUp Здравствуйте, у меня имеется WebBrowser control в windowsFormApp, но он не поддерживает Event... Создание event на PageLoad на стороне клдиента на javaScript Я хочу создать функцию у UserControl на стороне клиента, чтобы на PageLoad она запускалась....
Event Sourcing: события как источник истины
Event Sourcing - это совершенно иной подход к хранению данных, который идеально дополняет CQRS. Если традиционные базы данных хранят текущее состояние системы, то Event Sourcing сохраняет историю всех изменений, которые привели к этому состоянию. Проще говоря, вместо того чтобы хранить факт "баланс пользователя равен 100 рублей", мы храним последовательность событий: "пользователь пополнил счет на 150 рублей", "пользователь потратил 50 рублей".
Когда я впервые столкнулся с этим паттерном, мне казалось, что это излишне сложно. Зачем хранить всю историю, если нам важно только текущее состояние? Но практика показала обратное - такой подход дает колоссальные преимущества.
Во-первых, мы получаем бесплатный аудит-лог. Все изменения системы автоматически документируются в виде событий. Это бесценно для отладки, расследования инцидентов и соответствия регуляторным требованиям.
Во-вторых, мы можем восстановить состояние системы на любой момент времени. Хотите узнать, каким был баланс пользователя неделю назад? Просто "проиграйте" все события до нужной точки. Это открывает массу возможностей для аналитики и отчетности.
Вот как выглядит простейшая реализация хранилища событий:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| public class EventStore : IEventStore
{
private readonly DbContext _dbContext;
public EventStore(DbContext dbContext)
{
_dbContext = dbContext;
}
public async Task SaveEventsAsync(Guid aggregateId, IEnumerable<IDomainEvent> events, int expectedVersion)
{
var eventEntities = events.Select(e => new EventEntity
{
AggregateId = aggregateId,
Data = JsonSerializer.Serialize(e),
Type = e.GetType().AssemblyQualifiedName,
Version = ++expectedVersion,
Timestamp = DateTime.UtcNow
}).ToList();
await _dbContext.Events.AddRangeAsync(eventEntities);
await _dbContext.SaveChangesAsync();
}
public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(Guid aggregateId)
{
var events = await _dbContext.Events
.Where(e => e.AggregateId == aggregateId)
.OrderBy(e => e.Version)
.ToListAsync();
return events.Select(e =>
{
var type = Type.GetType(e.Type);
return (IDomainEvent)JsonSerializer.Deserialize(e.Data, type);
});
}
} |
|
Интеграция Event Sourcing с Entity Framework или другими ORM требует нестандартного подхода. Традиционные ORM предполагают работу с текущим состоянием, а не с историей событий. Я обычно использую отдельную таблицу для хранения событий, как показано в примере выше.
Конечно, восстанавливать состояние объекта каждый раз, проигрывая все события с начала времен, неэффективно. Для решения этой проблемы используют снэпшоты - периодические "снимки" состояния. Например, мы можем сохранять снэпшот каждые 100 событий. Тогда для восстановления текущего состояния нам нужно загрузить последний снэпшот и применить только те события, которые произошли после него.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| public async Task<T> LoadAggregateAsync<T>(Guid aggregateId) where T : AggregateRoot, new()
{
var snapshot = await GetLatestSnapshotAsync(aggregateId);
var aggregate = new T();
if (snapshot != null)
{
aggregate.LoadFromSnapshot(snapshot);
var events = await _eventStore.GetEventsAsync(aggregateId, snapshot.Version + 1);
aggregate.ApplyEvents(events);
}
else
{
var events = await _eventStore.GetEventsAsync(aggregateId);
aggregate.ApplyEvents(events);
}
return aggregate;
} |
|
Еще одна проблема, с которой я столкнулся - версионирование событий. Со временем структура событий меняется, появляются новые поля, старые переименовываются или удаляются. Как быть, если в хранилище есть события старой версии? Один из подходов - использовать "upcasting", когда старые события преобразуются в новый формат при загрузке.
Важно понимать, что Event Sourcing не подходит для всех частей системы. Я использую его только там, где важна история изменений или где бизнес-логика настолько сложна, что выразить ее в терминах состояний становится неудобно.
Одна из самых интересных проблем Event Sourcing - консистентность данных. В традиционной модели мы применяем транзакции, чтобы гарантировать атомарность операций. Но в мире событий все не так просто. События должны сохраняться строго в порядке их возникновения, иначе мы можем получить некорректное состояние.
Вспоминаю проект для финтех-компании, где мы использовали Event Sourcing для учета финансовых операций. Мы столкнулись с ситуацией, когда из-за сбоя в сети события пришли в неправильном порядке. В результате система временно показывала отрицательный баланс там, где его быть не могло. Решили проблему введением строгой очередности обработки событий для каждого агрегата и добавлением версионирования.
| C# | 1
2
3
4
5
6
7
8
9
10
11
| public async Task SaveEventsAsync(Guid aggregateId, IEnumerable<IDomainEvent> events, int expectedVersion)
{
var dbVersion = await GetCurrentVersionAsync(aggregateId);
if (expectedVersion != -1 && dbVersion != expectedVersion)
{
throw new ConcurrencyException($"Агрегат с ID {aggregateId} был изменен. Ожидаемая версия: {expectedVersion}, текущая: {dbVersion}");
}
// Остальной код сохранения событий...
} |
|
Для эффективной работы с Event Sourcing критически важны агрегаты. Агрегат - это кластер связанных объектов, которые рассматриваются как единое целое с точки зрения изменения данных. Границы агрегата определяют транзакционную консистентность. Изменения внутри одного агрегата атомарны, а взаимодействие между агрегатами асинхронно.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| public abstract class AggregateRoot
{
private readonly List<IDomainEvent> _uncommittedEvents = new();
public Guid Id { get; protected set; }
public int Version { get; private set; } = -1;
protected void ApplyEvent(IDomainEvent @event)
{
// Метод применяет изменения к состоянию агрегата
((dynamic)this).Apply((dynamic)@event);
// И сохраняет событие для последующей фиксации
_uncommittedEvents.Add(@event);
}
public void ApplyEvents(IEnumerable<IDomainEvent> events)
{
foreach (var @event in events)
{
// Применяем событие к состоянию
((dynamic)this).Apply((dynamic)@event);
// Обновляем версию
Version++;
}
}
public IEnumerable<IDomainEvent> GetUncommittedEvents()
{
return _uncommittedEvents;
}
public void ClearUncommittedEvents()
{
_uncommittedEvents.Clear();
}
} |
|
Внедрение Event Sourcing требует серезной перестройки мышления. Я месяцами ломал голову над тем, как эффективно проектировать системы, основанные на событиях. Ключевой момент - определение правильного уровня гранулярности агрегатов. Слишком большие агрегаты становятся узким местом по производительности, слишком маленькие усложняют обеспечение консистентности между связанными данными.
Для высоконагруженных систем еще одно важное соображение - партиционирование хранилища событий. Можно разделить события по агрегатам, по типам событий или по временным интервалам. В одном проекте мы использовали шардирование по идентификатору агрегата, что позволило масштабировать систему горизонтально.
Вы наверное задаетесь вопросом: а что если событие нужно "отменить"? Ведь в Event Sourcing мы не удаляем данные. Ответ прост - мы создаем компенсирующее событие. Например, если произошла ошибочная оплата, мы не удаляем событие "Оплата произведена", а создаем новое событие "Оплата возвращена".
Совместная работа паттернов
CQRS и Event Sourcing — это два мощных паттерна, которые сами по себе уже способны значительно улучшить архитектуру приложения. Но настоящее волшебство происходит, когда они работают в тандеме. Я всегда говорю своим менти: "CQRS — это структура, Event Sourcing — это память системы". Вместе они образуют полноценный мозг приложения.
Когда мы интегрируем эти два паттерна, события из Event Sourcing становятся связующим звеном между командной и запросной сторонами CQRS. События, порождаемые при обработке команд, не только сохраняются в хранилище событий, но и используются для обновления моделей чтения. Получается элегантный поток данных, при котором каждое изменение состояния автоматически распространяется по всей системе.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| public async Task Handle(CreateOrderCommand command)
{
// Создаем агрегат и применяем бизнес-логику
var order = new Order(command.CustomerId);
foreach (var item in command.Items)
{
order.AddItem(item.ProductId, item.Quantity, item.Price);
}
order.SetShippingAddress(command.ShippingAddress);
// Сохраняем события в хранилище
var events = order.GetUncommittedEvents();
await _eventStore.SaveEventsAsync(order.Id, events, -1);
// Публикуем события для обновления проекций
foreach (var @event in events)
{
await _eventBus.PublishAsync(@event);
}
} |
|
Для эффективного взаимодействия между компонентами системы важен механизм передачи сообщений. Message Bus (шина сообщений) становится нервной системой приложения, позволяя различным компонентам общаться асинхронно, без прямых зависимостей.
В своих проектах я часто использую RabbitMQ или Azure Service Bus в качестве транспорта для событий. Это позволяет достичь высокой масштабируемости и надежности. Шина сообщений обеспечивает буферизацию, что особенно важно при пиковых нагрузках, когда количество команд может превышать пропускную способность системы.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| public class RabbitMqEventBus : IEventBus
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _exchangeName = "domain_events";
public RabbitMqEventBus(ConnectionFactory connectionFactory)
{
_connection = connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(_exchangeName, ExchangeType.Fanout);
}
public Task PublishAsync<T>(T @event) where T : IDomainEvent
{
var eventType = @event.GetType().Name;
var message = JsonSerializer.Serialize(@event);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(
exchange: _exchangeName,
routingKey: eventType,
basicProperties: null,
body: body);
return Task.CompletedTask;
}
} |
|
Один из самых сложных аспектов распределенных систем — обеспечение согласованности данных при выполнении операций, затрагивающих несколько агрегатов. Традиционные ACID-транзакции здесь не работают. На помощь приходит паттерн Saga.
Saga представляет собой длительную транзакцию, состоящую из нескольких шагов. Каждый шаг либо успешно завершается, либо вызывает компенсирующую операцию для отмены эффекта предыдущих шагов. Это обеспечивает согласованность данных в конечном счете (eventual consistency). Вот как выглядит простая реализация Saga для процесса оформления заказа:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
| public class OrderProcessSaga :
IHandleEvent<OrderCreated>,
IHandleEvent<PaymentProcessed>,
IHandleEvent<PaymentFailed>,
IHandleEvent<InventoryReserved>,
IHandleEvent<InventoryReservationFailed>
{
private readonly ICommandBus _commandBus;
public OrderProcessSaga(ICommandBus commandBus)
{
_commandBus = commandBus;
}
public async Task Handle(OrderCreated @event)
{
await _commandBus.SendAsync(new ProcessPaymentCommand
{
OrderId = @event.OrderId,
Amount = @event.TotalAmount
});
}
public async Task Handle(PaymentProcessed @event)
{
await _commandBus.SendAsync(new ReserveInventoryCommand
{
OrderId = @event.OrderId,
Items = @event.Items
});
}
public async Task Handle(PaymentFailed @event)
{
await _commandBus.SendAsync(new CancelOrderCommand
{
OrderId = @event.OrderId,
Reason = "Платеж не прошел"
});
}
public async Task Handle(InventoryReserved @event)
{
await _commandBus.SendAsync(new CompleteOrderCommand
{
OrderId = @event.OrderId
});
}
public async Task Handle(InventoryReservationFailed @event)
{
await _commandBus.SendAsync(new RefundPaymentCommand
{
OrderId = @event.OrderId,
Amount = @event.Amount
});
await _commandBus.SendAsync(new CancelOrderCommand
{
OrderId = @event.OrderId,
Reason = "Товар отсутствует на складе"
});
}
} |
|
Важно понимать, что Saga не обеспечивает изоляцию, как это делают традиционные транзакции. Промежуточные состояния видны другим частям системы. Поэтому при проектировании Saga нужно тщательно продумывать, как система будет вести себя при сбоях на разных этапах.
В одном из моих проектов мы столкнулись с проблемой "потерянных" сообщений. Иногда событие отправлялось в шину, но не доходило до получателя из-за сбоев в сети. Это приводило к "зависанию" Saga. Решили проблему введением таймаутов и механизма проверки состояния процесса. Если Saga не завершалась за отведенное время, система автоматически инициировала процедуру восстановления. Еще одна распространенная проблема в распределенных системах — конкурентный доступ к ресурсам. Представте ситуацию: два пользователя одновременно пытаются зарезервировать последний билет на концерт. Без правильной обработки конкурентности мы можем столкнуться с избыточным бронированием.
В Event Sourcing конкурентность обычно решается через механизм версионирования агрегатов. Каждый агрегат имеет версию, которая увеличивается при каждом изменении. Когда мы сохраняем события, мы указываем ожидаемую версию агрегата. Если актуальная версия в базе отличается, значит кто-то уже внес изменения, и мы должны обработать этот конфликт.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| public void ApplyChanges(Order order, int expectedVersion)
{
// Проверяем версию перед сохранением
if (order.Version != expectedVersion)
throw new ConcurrencyException($"Ожидалась версия {expectedVersion}, но текущая версия {order.Version}");
// Логика сохранения изменений
var uncommittedEvents = order.GetUncommittedEvents();
_eventStore.SaveEvents(order.Id, uncommittedEvents, expectedVersion);
// Публикуем события
foreach (var evt in uncommittedEvents)
_eventPublisher.Publish(evt);
} |
|
На практике я реализую две стратегии обработки конфликтов: оптимистичную и пессимистичную. При оптимистичном подходе мы предполагаем, что конфликты редки, и просто выбрасываем исключение, если они возникают. Клиентский код должен перезагрузить агрегат и повторить операцию. Пессимистичный подход предполагает блокировку агрегата на время операции, что исключает параллельные изменения, но снижает производительность. Я однажды работал над системой бронирования отелей, где мы использовали оба подхода в разных частях системы. Для просмотра доступности номеров — оптимистичный, а для финального подтверждения бронирования — пессимистичный с блокировкой. Это дало оптимальный баланс между производительностью и надежностью.
При построении систем с CQRS и Event Sourcing часто возникает дилемма между консистентностью и доступностью. Согласно теореме CAP, распределенная система не может одновременно обеспечивать консистентность, доступность и устойчивость к разделению сети. Приходится жертвовать чем-то одним. В большинстве бизнес-приложений я выбираю "AP" из CAP, жертвуя немедленной консистентностью в пользу доступности. Это означает, что система продолжает функционировать даже при частичных сбоях, но данные могут временно быть несогласованными. Такой подход обычно приемлем, если бизнес-процессы допускают некоторую задержку в обновлении информации.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| // Обработчик события, обновляющий проекцию для чтения
public class OrderCreatedProjection : IEventHandler<OrderCreated>
{
private readonly IReadDbContext _readDb;
public OrderCreatedProjection(IReadDbContext readDb)
{
_readDb = readDb;
}
public async Task Handle(OrderCreated @event)
{
// Создаем объект для проекции чтения
var orderSummary = new OrderSummaryReadModel
{
Id = @event.OrderId,
CustomerId = @event.CustomerId,
TotalAmount = @event.TotalAmount,
OrderDate = @event.Timestamp,
Status = "Created"
};
await _readDb.OrderSummaries.AddAsync(orderSummary);
await _readDb.SaveChangesAsync();
}
} |
|
Тестирование систем с CQRS и Event Sourcing имеет свои особенности. Я предпочитаю подход, основанный на поведении (behavior-driven development). Для агрегатов это выглядит примерно так:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| [Test]
public void WhenAddingItemToOrder_ThenOrderItemAddedEventIsGenerated()
{
// Arrange
var order = new Order(Guid.NewGuid(), "customer@example.com");
// Act
order.AddItem("product-1", 2, 10.0m);
// Assert
var uncommittedEvents = order.GetUncommittedEvents();
Assert.That(uncommittedEvents, Has.Exactly(1).Items);
Assert.That(uncommittedEvents.First(), Is.TypeOf<OrderItemAdded>());
var @event = (OrderItemAdded)uncommittedEvents.First();
Assert.That(@event.ProductId, Is.EqualTo("product-1"));
Assert.That(@event.Quantity, Is.EqualTo(2));
Assert.That(@event.Price, Is.EqualTo(10.0m));
} |
|
Отдельная головная боль — мониторинг и отладка распределенных систем. Когда операция проходит через множество сервисов, обрабатывается асинхронно, локализовать проблему становится сложно. Я использую распределенную трассировку с помощью OpenTelemetry, чтобы отслеживать путь запроса через все компоненты системы.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| public async Task Handle(ProcessOrderCommand command)
{
using var activity = _activitySource.StartActivity("ProcessOrder");
activity?.SetTag("orderId", command.OrderId);
try
{
// Логика обработки команды
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
} |
|
Для восстановления состояния системы после сбоев Event Sourcing предоставляет уникальные возможности. Вместо традиционного бэкапа базы данных мы можем восстановить состояние, просто "проиграв" все события заново. Но для больших систем с миллионами событий это может занять много времени.
Стратегия, которую я обычно применяю — это комбинация снэпшотов и событий. Мы периодически создаем снэпшоты состояния для каждого агрегата, а при восстановлении загружаем последний снэпшот и применяем только события, произошедшие после его создания.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| public async Task<T> LoadAggregate<T>(Guid id) where T : AggregateRoot, new()
{
var snapshot = await _snapshotStore.GetLatestSnapshot<T>(id);
var aggregate = new T();
if (snapshot != null)
{
aggregate.LoadFromSnapshot(snapshot);
var events = await _eventStore.GetEventsAfterVersion(id, snapshot.Version);
aggregate.ApplyEvents(events);
}
else
{
var events = await _eventStore.GetAllEvents(id);
aggregate.ApplyEvents(events);
}
return aggregate;
} |
|
Эффективное кэширование проекций — еще один важный аспект оптимизации CQRS-систем. Поскольку проекции обновляются асинхронно, возникает риск возврата устаревших данных. Я использую кэширование с таймаутом и механизмом инвалидации на основе событий.
При реализации кэширования в CQRS-системах я обычно использую два основных подхода: time-to-live (TTL) кэширование и инвалидацию на основе событий. TTL прост в реализации, но может привести к возврату устаревших данных. Инвалидация на основе событий требует больше усилий, но дает более точные результаты.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| public class CachedOrderQueryService : IOrderQueryService
{
private readonly IOrderQueryService _innerService;
private readonly IMemoryCache _cache;
private readonly IEventBus _eventBus;
private readonly TimeSpan _defaultTtl = TimeSpan.FromMinutes(10);
public CachedOrderQueryService(
IOrderQueryService innerService,
IMemoryCache cache,
IEventBus eventBus)
{
_innerService = innerService;
_cache = cache;
_eventBus = eventBus;
// Подписываемся на события, которые инвалидируют кэш
_eventBus.Subscribe<OrderStatusChanged>(InvalidateOrderCache);
_eventBus.Subscribe<OrderItemAdded>(InvalidateOrderCache);
}
public async Task<OrderDetailsDto> GetOrderDetailsAsync(Guid orderId)
{
var cacheKey = $"Order:{orderId}";
if (!_cache.TryGetValue(cacheKey, out OrderDetailsDto result))
{
result = await _innerService.GetOrderDetailsAsync(orderId);
_cache.Set(cacheKey, result, _defaultTtl);
}
return result;
}
private Task InvalidateOrderCache(OrderStatusChanged @event)
{
var cacheKey = $"Order:{@event.OrderId}";
_cache.Remove(cacheKey);
return Task.CompletedTask;
}
private Task InvalidateOrderCache(OrderItemAdded @event)
{
var cacheKey = $"Order:{@event.OrderId}";
_cache.Remove(cacheKey);
return Task.CompletedTask;
}
} |
|
Для высоконагруженных систем я часто использую распределенный кэш на базе Redis. Это позволяет синхронизировать инвалидацию кэша между разными экземплярами сервиса.
Однако за годы практики я убедился, что самый сложный аспект совместного использования CQRS и Event Sourcing — управление версиями событий. В долгоживущих системах неизбежно возникает необходимость изменить структуру событий. Но что делать со старыми событиями, которые уже сохранены в хранилище? Я использую несколько стратегий:
1. Восходящее преобразование (Upcasting) — при загрузке старые события преобразуются в новый формат на лету:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| public class EventUpcastingDecorator : IEventStore
{
private readonly IEventStore _inner;
private readonly Dictionary<Type, Func<object, object>> _upcasters;
public EventUpcastingDecorator(IEventStore inner)
{
_inner = inner;
_upcasters = new Dictionary<Type, Func<object, object>>
{
{ typeof(OldOrderCreatedEvent), e => UpcastOrderCreatedEvent((OldOrderCreatedEvent)e) }
};
}
public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(Guid aggregateId)
{
var events = await _inner.GetEventsAsync(aggregateId);
return events.Select(UpcastIfNeeded);
}
private IDomainEvent UpcastIfNeeded(IDomainEvent @event)
{
var eventType = @event.GetType();
if (_upcasters.TryGetValue(eventType, out var upcaster))
{
return (IDomainEvent)upcaster(@event);
}
return @event;
}
private OrderCreatedEvent UpcastOrderCreatedEvent(OldOrderCreatedEvent old)
{
return new OrderCreatedEvent
{
OrderId = old.OrderId,
CustomerId = old.CustomerId,
TotalAmount = old.TotalAmount,
Items = old.Items.Select(i => new OrderItemDto
{
ProductId = i.ProductId,
Quantity = i.Quantity,
Price = i.Price,
// Новое поле, которого не было в старой версии
DiscountAmount = 0
}).ToList(),
Timestamp = old.Timestamp
};
}
// Остальные методы интерфейса IEventStore...
} |
|
2. Схемы событий с версиями — каждая версия события имеет свою собственную схему и идентификатор версии:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| [EventVersion("OrderCreated", 1)]
public class OrderCreatedEventV1 : IDomainEvent
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
// Поля версии 1
}
[EventVersion("OrderCreated", 2)]
public class OrderCreatedEventV2 : IDomainEvent
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
// Поля версии 1
// + дополнительные поля версии 2
} |
|
3. Снэпшоты состояния — периодическое сохранение полного состояния агрегата, что позволяет не зависеть от старых событий.
Помню случай, когда меня вызвали консультировать проект в крупном банке. Они начали внедрять Event Sourcing и через полгода столкнулись с необходимостью изменить структуру нескольких ключевых событий. Без стратегии версионирования они оказались в сложной ситуации — им пришлось написать отдельный инструмент для миграции всех событий в базе на новый формат. Это был болезненный опыт, который можно было избежать с помощью правильного планирования.
При работе с большими объемами событий производительность становится критическим фактором. В одном из проектов мы достигли точки, когда загрузка агрегата с историей в тысячи событий занимала неприемлемое время. Вот несколько стратегий оптимизации, которые я применяю:
1. Снэпшоты через фиксированные интервалы — сохранять снэпшот каждые N событий:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
| public async Task SaveEventsAsync(Guid aggregateId, IEnumerable<IDomainEvent> events, int expectedVersion)
{
// Сохраняем события
await _innerEventStore.SaveEventsAsync(aggregateId, events, expectedVersion);
// Проверяем, нужно ли создать снэпшот
var currentVersion = expectedVersion + events.Count();
if (currentVersion % SnapshotFrequency == 0)
{
var aggregate = await _aggregateLoader.LoadAggregateAsync<T>(aggregateId);
await _snapshotStore.SaveSnapshotAsync(aggregate);
}
} |
|
2. Оптимизация десериализации — используйте эффективные сериализаторы типа Protocol Buffers или MessagePack вместо JSON:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(Guid aggregateId)
{
var eventRecords = await _dbContext.Events
.Where(e => e.AggregateId == aggregateId)
.OrderBy(e => e.Version)
.ToListAsync();
return eventRecords.Select(record =>
{
// Используем MessagePack для десериализации
return MessagePackSerializer.Deserialize(record.Data,
Type.GetType(record.EventType)) as IDomainEvent;
});
} |
|
3. Параллельная обработка проекций — обрабатывайте разные проекции параллельно:
| C# | 1
2
3
4
5
6
7
8
| public async Task Handle(IDomainEvent @event)
{
var projectionTasks = _projections
.Where(p => p.CanHandle(@event.GetType()))
.Select(p => p.HandleAsync(@event));
await Task.WhenAll(projectionTasks);
} |
|
Правильное проектирование границ агрегатов — это еще одно ключевое решение при использовании Event Sourcing. Слишком большие агрегаты страдают от конфликтов конкурентности и проблем с производительностью. Слишком маленькие требуют сложных механизмов обеспечения консистентности между собой.
Реализация CQRS с использованием MediatR и современных C# фич
За годы работы с CQRS я перепробовал десятки подходов к его реализации, начиная от самописных решений и заканчивая тяжеловесными фреймворками. В итоге пришел к выводу, что наиболее гибким и удобным инструментом для .NET-разработчика является библиотека MediatR от Джимми Богарда. Она проста, элегантна и при этом достаточно функциональна для большинства сценариев.
MediatR реализует паттерн посредника (Mediator), который отлично подходит для CQRS, поскольку позволяет разделить отправителей и получателей сообщений. В контексте CQRS эти сообщения — наши команды и запросы.
Подключение MediatR в проект .NET 7+ предельно просто:
| C# | 1
2
3
4
| public void ConfigureServices(IServiceCollection services)
{
services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(Startup).Assembly));
} |
|
Для команд в MediatR используется интерфейс IRequest, а для запросов — IRequest<T>, где T — тип возвращаемого результата. Вот как выглядит типичная команда:
| C# | 1
2
3
4
| public record CreateOrderCommand(
Guid CustomerId,
List<OrderItemDto> Items,
string ShippingAddress) : IRequest; |
|
Обратите внимание на использование record-типов — это одна из моих любимых фич современного C#. Records идеально подходят для команд и запросов, так как они иммутабельны (что важно для предсказуемости) и имеют встроенное сравнение по значениям (что удобно для тестирования). Обработчик команды выглядит так:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand>
{
private readonly IOrderRepository _repository;
private readonly IEventStore _eventStore;
public CreateOrderCommandHandler(IOrderRepository repository, IEventStore eventStore)
{
_repository = repository;
_eventStore = eventStore;
}
public async Task Handle(CreateOrderCommand request, CancellationToken cancellationToken)
{
var order = new Order(
Guid.NewGuid(),
request.CustomerId,
request.ShippingAddress);
foreach (var item in request.Items)
{
order.AddItem(item.ProductId, item.Quantity, item.Price);
}
await _repository.SaveAsync(order, cancellationToken);
// Сохраняем события
var events = order.GetUncommittedEvents();
await _eventStore.SaveEventsAsync(order.Id, events, -1, cancellationToken);
}
} |
|
Один из важнейших аспектов реализации CQRS — разделение моделей. С MediatR мы легко можем определить отдельные запросы и DTO для чтения:
| C# | 1
2
3
4
5
6
7
8
9
10
11
| public record GetOrderDetailsQuery(Guid OrderId) : IRequest<OrderDetailsDto>;
public class OrderDetailsDto
{
public Guid Id { get; init; }
public string CustomerName { get; init; }
public decimal TotalAmount { get; init; }
public string Status { get; init; }
public List<OrderItemDto> Items { get; init; }
// Другие поля, необходимые для UI
} |
|
Здесь я использую еще одну удобную возможность C# — init-only свойства. Они позволяют создать объект, который нельзя изменить после инициализации, но при этом более гибки, чем конструктор с большим количеством параметров.
Обработчик запроса будет работать с оптимизированной для чтения моделью:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| public class GetOrderDetailsQueryHandler : IRequestHandler<GetOrderDetailsQuery, OrderDetailsDto>
{
private readonly IReadDbContext _readDb;
public GetOrderDetailsQueryHandler(IReadDbContext readDb)
{
_readDb = readDb;
}
public async Task<OrderDetailsDto> Handle(GetOrderDetailsQuery request, CancellationToken cancellationToken)
{
return await _readDb.OrderDetails
.AsNoTracking() // Важно для производительности!
.Where(o => o.Id == request.OrderId)
.Select(o => new OrderDetailsDto
{
Id = o.Id,
CustomerName = o.CustomerName,
TotalAmount = o.TotalAmount,
Status = o.Status,
Items = o.Items.Select(i => new OrderItemDto
{
ProductId = i.ProductId,
ProductName = i.ProductName,
Quantity = i.Quantity,
Price = i.Price
}).ToList()
})
.FirstOrDefaultAsync(cancellationToken)
?? throw new OrderNotFoundException(request.OrderId);
}
} |
|
Особое внимание обратите на последнюю строку с использованием null-coalescing оператора (??) и выбрасыванием исключения. Это паттерн, который я часто использую в обработчиках запросов: если объект не найден, лучше явно выбросить доменное исключение, чем возвращать null.
Одно из мощнейших преимуществ MediatR — пайплайны обработки. Они позволяют добавлять сквозную функциональность ко всем командам и запросам без изменения их обработчиков. Например, вот как реализовать валидацию команд с помощью FluentValidation:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| public class ValidationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
private readonly IEnumerable<IValidator<TRequest>> _validators;
public ValidationBehavior(IEnumerable<IValidator<TRequest>> validators)
{
_validators = validators;
}
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
if (!_validators.Any()) return await next();
var context = new ValidationContext<TRequest>(request);
var validationResults = await Task.WhenAll(
_validators.Select(v => v.ValidateAsync(context, cancellationToken)));
var failures = validationResults
.SelectMany(r => r.Errors)
.Where(f => f != null)
.ToList();
if (failures.Any())
throw new ValidationException(failures);
return await next();
}
} |
|
И регистрация этого пайплайна:
| C# | 1
2
3
4
| services.AddMediatR(cfg => {
cfg.RegisterServicesFromAssembly(typeof(Startup).Assembly);
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
}); |
|
Интеграция с системами внешних уведомлений и аналитикой
Одно из мощнейших преимуществ Event Sourcing в связке с CQRS — это возможность легко интегрироваться с внешними системами. События, которые являются первоклассными гражданами в нашей архитектуре, становятся идеальным источником данных для уведомлений, аналитики и интеграций. Когда я впервые осознал эту возможность, для меня это было почти откровением. Ведь вместо того, чтобы встраивать логику уведомлений непосредственно в код обработчиков команд, мы можем подписаться на события и реагировать на них независимо. Этот подход следует принципу единственной ответственности и делает систему намного более гибкой. Вот как я обычно реализую отправку email-уведомлений:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| public class OrderConfirmationEmailSender : IEventHandler<OrderConfirmed>
{
private readonly IEmailService _emailService;
private readonly ITemplateEngine _templateEngine;
public OrderConfirmationEmailSender(IEmailService emailService, ITemplateEngine templateEngine)
{
_emailService = emailService;
_templateEngine = templateEngine;
}
public async Task Handle(OrderConfirmed @event, CancellationToken cancellationToken)
{
var emailTemplate = await _templateEngine.RenderAsync(
"OrderConfirmation",
new { OrderId = @event.OrderId, CustomerEmail = @event.CustomerEmail });
await _emailService.SendEmailAsync(
@event.CustomerEmail,
"Ваш заказ подтвержден",
emailTemplate,
cancellationToken);
}
} |
|
Аналогичным образом можно интегрироваться с SMS-сервисами, push-уведомлениями или даже мессенджерами. Главное преимущество — все эти интеграции работают независимо от основной бизнес-логики. В одном проекте для финтех-компании мы настроили интеграцию с Telegram-ботом, который уведомлял клиентов о крупных транзакциях. Реализация была до смешного простой — отдельный обработчик события TransactionCompleted, который фильтровал транзакции выше определенной суммы и отправлял сообщение через Telegram API. Когда бизнес-требования изменились и потребовалось отправлять уведомления только в рабочее время, нам пришлось изменить только этот обработчик, не трогая основную логику обработки транзакций.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| public class LargeTransactionNotifier : IEventHandler<TransactionCompleted>
{
private readonly ITelegramService _telegramService;
private readonly decimal _largeTransactionThreshold;
public LargeTransactionNotifier(ITelegramService telegramService, IConfiguration config)
{
_telegramService = telegramService;
_largeTransactionThreshold = config.GetValue<decimal>("Notifications:LargeTransactionThreshold");
}
public async Task Handle(TransactionCompleted @event, CancellationToken cancellationToken)
{
if (@event.Amount < _largeTransactionThreshold)
return;
// Проверяем, рабочее ли сейчас время
var now = DateTime.Now;
var isWorkingHours = now.Hour >= 9 && now.Hour < 18 && now.DayOfWeek != DayOfWeek.Saturday && now.DayOfWeek != DayOfWeek.Sunday;
if (!isWorkingHours)
return;
await _telegramService.SendMessageAsync(
@event.UserId,
$"Уведомление: выполнена транзакция на сумму {@event.Amount} руб.",
cancellationToken);
}
} |
|
Интеграция с аналитическими системами следует тому же принципу. События, происходящие в нашей системе, являются богатым источником данных для аналитики. Мы можем отправлять их в специализированные хранилища или потоковые системы обработки данных. В одном из моих проектов мы интегрировались с Elasticsearch для аналитики пользовательского поведения. Каждое событие, связанное с действиями пользователя, дублировалось в Elasticsearch, что позволяло строить сложные аналитические запросы и визуализации в Kibana.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| public class ElasticsearchEventSink : IEventHandler<IDomainEvent>
{
private readonly IElasticClient _elasticClient;
private readonly ILogger<ElasticsearchEventSink> _logger;
public ElasticsearchEventSink(IElasticClient elasticClient, ILogger<ElasticsearchEventSink> logger)
{
_elasticClient = elasticClient;
_logger = logger;
}
public async Task Handle(IDomainEvent @event, CancellationToken cancellationToken)
{
try
{
// Обогащаем событие дополнительными метаданными
var eventWrapper = new EventWrapper
{
EventId = Guid.NewGuid(),
EventType = @event.GetType().Name,
Timestamp = DateTime.UtcNow,
Data = @event
};
// Индекс именуется по типу события и дате
var indexName = $"events-{@event.GetType().Name.ToLower()}-{DateTime.UtcNow:yyyy-MM}";
var response = await _elasticClient.IndexDocumentAsync(eventWrapper, cancellationToken);
if (!response.IsValid)
{
_logger.LogError("Ошибка при индексации события: {Error}", response.DebugInformation);
}
}
catch (Exception ex)
{
// Важно: ошибки в аналитике не должны влиять на основной поток выполнения
_logger.LogError(ex, "Ошибка при отправке события в Elasticsearch");
}
}
} |
|
Особое внимание стоит обратить на обработку ошибок. Интеграции с внешними системами не должны нарушать работу основного приложения. Если сервис уведомлений недоступен, пользователь все равно должен иметь возможность совершить заказ. Поэтому все интеграционные обработчики должны быть устойчивы к сбоям и не пробрасывать исключения наверх.
Один из подходов, который я часто использую, — это модель "отложенных задач" для критически важных уведомлений. Если отправка уведомления завершается неудачей, мы сохраняем задачу в отдельное хранилище и периодически пытаемся выполнить ее снова.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| public async Task Handle(OrderConfirmed @event, CancellationToken cancellationToken)
{
try
{
var email = ComposeEmail(@event);
await _emailService.SendAsync(email, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Не удалось отправить уведомление о заказе {OrderId}", @event.OrderId);
// Сохраняем задачу для повторной попытки
await _retryTaskRepository.SaveAsync(new RetryTask
{
Id = Guid.NewGuid(),
Type = "SendOrderConfirmationEmail",
Data = JsonSerializer.Serialize(new { OrderId = @event.OrderId, CustomerEmail = @event.CustomerEmail }),
CreatedAt = DateTime.UtcNow,
RetryCount = 0,
NextRetryAt = DateTime.UtcNow.AddMinutes(5)
}, cancellationToken);
}
} |
|
Практическая реализация
В этом разделе я хочу показать вам скелет реального приложения с CQRS и Event Sourcing, которое можно использовать как стартовую точку для своих проектов. Разрабатывая архитектуру, я обычно выделяю несколько ключевых проектов в решении:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| Solution 'EShop'
├── Domain
│ ├── Aggregates
│ ├── Commands
│ ├── Events
│ └── Exceptions
├── Application
│ ├── CommandHandlers
│ ├── QueryHandlers
│ ├── EventHandlers
│ └── Services
├── Infrastructure
│ ├── Persistence
│ ├── EventStore
│ ├── ReadModels
│ └── ExternalServices
└── API
├── Controllers
├── Filters
└── Startup |
|
Такая структура хорошо масштабируется и позволяет новым разработчикам быстро разобраться в проекте. Домен содержит только бизнес-логику, не зависит от инфраструктуры и фреймворков. Приложение оркестрирует взаимодействие между доменом и внешним миром. Инфраструктура реализует интерфейсы, определенные в домене и приложении.
Для хранения событий я часто использую комбинированный подход. События хранятся в двух местах: в реляционной базе данных (для надежности и атомарного сохранения) и в специализированной базе данных для событий (для эффективного чтения и обработки).
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| public class HybridEventStore : IEventStore
{
private readonly SqlEventStore _sqlStore;
private readonly EventStoreDb _eventStoreDb;
private readonly ILogger<HybridEventStore> _logger;
public HybridEventStore(
SqlEventStore sqlStore,
EventStoreDb eventStoreDb,
ILogger<HybridEventStore> logger)
{
_sqlStore = sqlStore;
_eventStoreDb = eventStoreDb;
_logger = logger;
}
public async Task SaveEventsAsync(Guid aggregateId, IEnumerable<IDomainEvent> events, int expectedVersion)
{
// Сначала сохраняем в SQL для атомарности
await _sqlStore.SaveEventsAsync(aggregateId, events, expectedVersion);
try
{
// Затем в EventStoreDb для эффективного чтения
await _eventStoreDb.SaveEventsAsync(aggregateId, events, expectedVersion);
}
catch (Exception ex)
{
// Если не удалось сохранить в EventStoreDb, логируем ошибку,
// но не фейлим операцию, т.к. данные уже сохранены в SQL
_logger.LogError(ex, "Не удалось сохранить события в EventStoreDb");
}
}
public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(Guid aggregateId)
{
try
{
// Сначала пытаемся получить из EventStoreDb
return await _eventStoreDb.GetEventsAsync(aggregateId);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Не удалось получить события из EventStoreDb, пробуем из SQL");
// Если не получилось, берем из SQL
return await _sqlStore.GetEventsAsync(aggregateId);
}
}
} |
|
Такой подход дает нам "лучшее из обоих миров": надежность реляционных баз данных и производительность специализированых event store.
Для доступа к моделям чтения я обычно использую Dapper - он дает нам почти сырую производительность SQL, но при этом избавляет от большей части рутины:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| public class DapperOrderQueries : IOrderQueries
{
private readonly IDbConnection _connection;
public DapperOrderQueries(string connectionString)
{
_connection = new SqlConnection(connectionString);
}
public async Task<OrderDetailsDto> GetOrderDetailsAsync(Guid orderId)
{
var sql = @"
SELECT o.Id, o.CustomerName, o.TotalAmount, o.Status,
i.Id as ItemId, i.ProductName, i.Quantity, i.Price
FROM Orders o
LEFT JOIN OrderItems i ON o.Id = i.OrderId
WHERE o.Id = @OrderId";
OrderDetailsDto order = null;
var items = new List<OrderItemDto>();
await _connection.QueryAsync<OrderDetailsDto, OrderItemDto, OrderDetailsDto>(
sql,
(o, i) => {
if (order == null)
order = o;
if (i != null)
items.Add(i);
return order;
},
new { OrderId = orderId },
splitOn: "ItemId");
if (order != null)
order.Items = items;
return order;
}
} |
|
Для обновления моделей чтения я использую проекции. Это обработчики событий, которые преобразуют события в записи в базе данных, оптимизированной для чтения. Вот пример такого обработчика:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| public class OrderProjection :
IEventHandler<OrderCreated>,
IEventHandler<OrderItemAdded>,
IEventHandler<OrderConfirmed>
{
private readonly IReadDbContext _readDb;
public OrderProjection(IReadDbContext readDb)
{
_readDb = readDb;
}
public async Task Handle(OrderCreated @event, CancellationToken cancellationToken)
{
var orderReadModel = new OrderReadModel
{
Id = @event.OrderId,
CustomerId = @event.CustomerId,
CustomerName = @event.CustomerName,
TotalAmount = 0, // Начальная сумма
Status = "Created",
CreatedAt = @event.Timestamp
};
await _readDb.Orders.AddAsync(orderReadModel, cancellationToken);
await _readDb.SaveChangesAsync(cancellationToken);
}
public async Task Handle(OrderItemAdded @event, CancellationToken cancellationToken)
{
var order = await _readDb.Orders.FindAsync(new object[] { @event.OrderId }, cancellationToken);
if (order == null) return; // Защита от отсутствия проекции
var itemReadModel = new OrderItemReadModel
{
Id = Guid.NewGuid(),
OrderId = @event.OrderId,
ProductId = @event.ProductId,
ProductName = @event.ProductName,
Quantity = @event.Quantity,
Price = @event.Price
};
await _readDb.OrderItems.AddAsync(itemReadModel, cancellationToken);
// Обновляем общую сумму заказа
order.TotalAmount += @event.Price * @event.Quantity;
await _readDb.SaveChangesAsync(cancellationToken);
}
public async Task Handle(OrderConfirmed @event, CancellationToken cancellationToken)
{
var order = await _readDb.Orders.FindAsync(new object[] { @event.OrderId }, cancellationToken);
if (order == null) return;
order.Status = "Confirmed";
order.ConfirmedAt = @event.Timestamp;
await _readDb.SaveChangesAsync(cancellationToken);
}
} |
|
Я думаю, важно также показать, как реализовать API-контроллеры для CQRS-архитектуры. В ASP.NET Core это выглядит достаточно элегантно:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| [ApiController]
[Route("api/orders")]
public class OrdersController : ControllerBase
{
private readonly IMediator _mediator;
public OrdersController(IMediator mediator)
{
_mediator = mediator;
}
[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderCommand command)
{
var orderId = await _mediator.Send(command);
return CreatedAtAction(nameof(GetOrder), new { id = orderId }, null);
}
[HttpGet("{id}")]
public async Task<ActionResult<OrderDetailsDto>> GetOrder(Guid id)
{
var query = new GetOrderDetailsQuery(id);
var result = await _mediator.Send(query);
return result;
}
} |
|
Использование MediatR делает контроллеры предельно простыми. Они просто принимают команды или запросы и передают их медиатору. Вся логика содержится в обработчиках.
Поверьте моему опыту, такой подход радикально упрощает сопровождение системы. Когда нужно изменить логику заказа, вы точно знаете, где искать - в соответствующем обработчике команды или проекции.
Как форсировать event для контрола ? Всем привет!
вопрос такой.
Есть MainWindow, в нем есть TabControl, в котором есть несколько... Странное поведение event-ов Всем доброго дня!
Подскажите, кто знает, в чем может быть проблема.
Есть структура такого... Как изменить свойство компонента, не вызывая связаный Event? А если конкретно - как установить для ItemMenu значение Checked, не вызвав при этом CheckedChanged? Переопределение event Привет всем. Подскажите плз не как не въеду как переопределить события базового класса в... переопределение event Привет всем. Подскажите плз не как не въеду как переопределить события базового класса в... Event в userControl Я сделал UserControl и поместил туда label там же и есть свойсто Text. Не могу добавить событие... WCF + event Доброго времени суток.
Есть WCF web сервис и есть клиент. используется... Parse event Здравствуйте,
подскажите для чего используется событие Parse event в C# . Искал на яндакск в... Динамичное добавление linkbutton & event Всем привет
Значит я динамично добавляю при загрузке страницы linkbuttons, все получатся все... ProgressChanged event Создал я класс. Один из методов по сути зализит на страницу по указанному адресу, парсит её и... Собственный Event с Delegate пример(нужен совет) Начал разбираться в событиях. Хочу написать пример но ничего в голову не приходит, написал простой... Не срабатывает event MouseDown на левую кнопку! Обнаружил странное поведение MouseDown в контроле Button. (Может и в других так, но пока не...
|