Форум программистов, компьютерный форум, киберфорум
ArchitectMsa
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  

Паттерн Saga в C#: управление распределенными транзакциями в микросервисной архитектуре

Запись от ArchitectMsa размещена 28.07.2025 в 20:58
Показов 2283 Комментарии 0

Нажмите на изображение для увеличения
Название: C# и реализация Saga для распределеннах транзакций между микросервисами.jpg
Просмотров: 203
Размер:	223.5 Кб
ID:	11016
Как только вы начинаете работать с микросервисами, сразу сталкиваетесь с одной из самых коварных проблем – управлением транзакциями. Как-то на одном из проектов мы с командой чуть не сломали головы, пытаясь обеспечить согласованность данных между сервисами заказов, оплаты и доставки. Старые добрые ACID-транзакции здесь не помогали, и мы столкнулись с неприятной дилеммой: либо перетаскивать всё обратно в монолит, либо искать принципиально иной подход.

Вот тут и выходит паттерн Saga – решение для распределенных транзакций в среде, где каждый сервис владеет своими данными. Вместо одной атомарной транзакции мы разбиваем бизнес-процесс на последовательность локальных операций с компенсирующими действиями на случай сбоев. Звучит просто? На практике всё сложнее. Нужно правильно организовать оркестрацию или хореографию шагов, продумать компенсирующие транзакции, разобраться с идемпотентностью и надежностью сообщений. При первом погружении в эту тему я сам наделал кучу ошибок, которые пришлось исправлять уже в боевом режиме.

В C# и .NET экосистеме мы имеем несколько подходов к реализации этого паттерна – от самописных решений до готовых библиотек вроде MassTransit и NServiceBus. Я испробовал многие из них и хочу поделится опытом - где какой подход работает лучше, какие грабли подстерегают и как выбрать оптимальное решение именно для вашего проекта.

Проблемы ACID-транзакций в распределенных системах



Классические ACID-транзакции верой и правдой служили нам десятилетиями. Атомарность, Согласованность, Изоляция, Долговечность - такие знакомые и надежные свойства, которыми мы привыкли пользоваться в монолитных приложениях. Я помню свои первые шаги в программировании: обернул операцию в транзакцию - и спи спокойно, база данных сама разберется с конкурентными запросами и сбоями. Но когда я впервые попытался перенести этот подход в микросервисную архитектуру, меня ждало жестокое разочарование. ACID-транзакции и распределенные системы - это как масло и вода, они категорически не хотят смешиваться. И вот почему.

Почему классические транзакции не работают между микросервисами



Прежде всего, в микросервисной архитектуре каждый сервис имеет свою собственную базу данных. Это фундаментальный принцип, обеспечивающий автономность и независимость сервисов. На практике это означает, что простое BEGIN TRANSACTION / COMMIT больше не работает, когда операция затрагивает несколько сервисов.

Я столкнулся с этим, когда мы разрабатывали платформу для онлайн-торговли. Казалось бы, простой процесс: клиент оформляет заказ, мы резервируем товар на складе и списываем деньги. В монолитном приложении - одна транзакция на три таблицы. В микросервисах - три разных сервиса с собственными базами данных. И внезапно оказалось, что банально нет технической возможности обернуть эти три операции в единую транзакцию.

Второй камень преткновения - технологическая гетерогенность. Один из главных плюсов микросервисной архитектуры в том, что разные сервисы могут использовать разные технологии. Сервис заказов у нас работал с PostgreSQL, инвентарь использовал MongoDB, а платежный сервис вообще завязан на стороннее API. Транзакционный механизм, который работал бы с такими разными технологиями? Удачи в поисках.

Ограничения двухфазного коммита в реальных проектах



"Подождите-ка," - скажете вы, - "а как же двухфазный коммит (2PC)? Ведь это классический способ реализации распределенных транзакций!" И технически вы будете правы. Двухфазный коммит разработан именно для обеспечения атомарности в распределенных системах. Если объяснять упрощенно, то работает он так:
1. Координатор спрашивает у всех участников: "Вы готовы сделать транзакцию?"
2. Если все отвечают "да", то координатор дает команду всем зафиксировать изменения.

Звучит прекрасно, но я на собственной шкуре испытал, почему этот подход редко используется в современных микросервисных архитектурах:
1. Проблема блокировок: Участники блокируют ресурсы до завершения всей распределенной транзакции. Когда у вас сотни одновременных пользователей, это убивает производительность.
2. Проблема координатора: Если координатор выходит из строя в критический момент (между фазами подготовки и фиксации), система может зависнуть в нерешительности. На одном проекте у нас так случилось - часть данных осталась в неопределенном состоянии, и пришлось вручную разгребать.
3. Зависимость от инфраструктуры: Все участники должны поддерживать протокол 2PC, что зачастую невозможно, особенно если вы интегрируетесь с внешними системами.

Мы пытались реализовать 2PC в проекте с высокой нагрузкой. Все работало прекрасно на тестовой среде, но стоило запустить боевые нагрузки, и система стала захлебываться из-за блокировок. Мы потратили недели на оптимизацию, но так и не смогли добиться приемлимой производительности. В конце концов, 2PC был заменен на более асинхронный подход.

Проблемы изоляции данных при параллельных операциях



В классических ACID-транзакциях изоляция обеспечивает, что параллельные транзакции не влияют друг на друга. Большинство баз данных предлагают несколько уровней изоляции: Read Uncommitted, Read Committed, Repeatable Read, Serializable. В микросервисной архитектуре обеспечить подобную изоляцию между сервисами практически невозможно без значительного ущерба для производительности и масштабируемости.

Представьте: два пользователя одновременно пытаются купить последний товар на складе. Сервис инвентаря не знает о том, что происходит в сервисе заказов, и наоборот. Без распределенной транзакции с высоким уровнем изоляции возникает риск, что оба заказа будут успешно оформлены, даже если физически товар остался только один. Я сталкивался с этим при разработке системы бронирования. Для решения мы внедрили механизм оптимистичной блокировки с версионированием ресурсов, но это лишь частично решило проблему и добавило свою порцию сложности.

Каскадные откаты и их влияние на пользовательский опыт



Когда в распределенной операции что-то идет не так, возникает проблема каскадного отката. В монолитном приложении транзакция либо полностью выполняется, либо полностью откатывается. В микросервисной архитектуре без единого механизма транзакций приходится реализовывать откат вручную.

Например, в нашем интернет-магазине процесс оформления заказа включал несколько шагов: создание заказа, резервирование товара, проведение платежа. Если платеж не проходил, мы должны были отменить резервирование и аннулировать заказ. Звучит просто, но реализация надежного механизма отката оказалась нетривиальной задачей. Более того, каскадные откаты могут серьезно повлиять на пользовательский опыт. Представьте, клиент уже получил подтверждение заказа, а потом - упс! - приходит сообщение, что заказ отменен из-за проблем с платежной системой. Далеко не самый приятный сценарий.

Я не раз наблюдал как такие ситуации приводили к разочарованию пользователей и потере их доверия. Поэтому разработка правильного механизма обработки сбоев в распределенных транзакциях становится критически важной для бизнеса.

Классические ACID-транзакции в распределенных системах - это примерно как пытатся забить гвоздь отверткой. Инструмент хороший, но не для этой задачи. И тут возникает закономерный вопрос: что же использовать вместо них?

Управление транзакциями
Всем доброго времени суток. Пытаюсь добавить в проект управление транзакциями, выскакивает ошибка:...

Управление транзакциями
Вопрос больше по архитектуре, чем по конкретной реализации. Есть какие-то проверенные практики...

Почему паттерн абстрактная фабрика - паттерн уровня объектов, если в нём могут быть статические отношения?
Взято из Шевчук А., Охрименко Д., Касьянов А. Design Patterns via C#. Приемы...

Работа с транзакциями
Всем привет! Столкнулся с такой проблемкой: нужно обеспечить проверку данных с формы, и если...


Согласованность в конечном счете: компромисс микросервисной архитектуры



Ответ на вопрос, что использовать вместо ACID-транзакций, лежит в самой природе распределенных систем. Теорема CAP гласит, что в распределенной системе невозможно одновременно обеспечить Согласованность (Consistency), Доступность (Availability) и Устойчивость к разделению (Partition tolerance). Приходится чем-то жертвовать.

Большинство современных микросервисных архитектур жертвуют строгой согласованностью в пользу доступности и устойчивости, переходя к модели "согласованности в конечном счете" (Eventual Consistency). Это означает, что система может временно находиться в несогласованном состоянии, но в конечном итоге она придет к согласованности.

Как сложно было объяснить этот концепт бизнесу. "Как это - клиент получил подтверждение заказа, а мы ещё не уверены, что сможем его выполнить?" - спрашивали они. Приходилось подробно рассказывать о компромиссах и рисках, о том, что в редких случаях придется извинятся перед клиентом и предложить компенсацию.

Долгие транзакции и потенциальные тупики



Еще одна проблема, с которой я столкнулся - это длительность распределенных операций. В мире микросервисов бизнес-процесс может занимать не миллисекунды, а минуты, часы и даже дни. Например, процесс доставки товара включает множество этапов, растянутых во времени: комплектация, отгрузка, транспортировка, вручение. Держать транзакцию открытой все это время - невозможно. База данных просто не рассчитана на такие сценарии. Кроме того, длительные транзакции создают идеальные условия для взаимных блокировок (deadlocks).

На одном из проектов мы пытались использовать длительные транзакции для процесса резервирования гостиничных номеров с предварительной оплатой. Система регулярно впадала в состояние тупика: транзакция A ждала освобождения ресурса, занятого транзакцией B, а транзакция B, в свою очередь, ждала ресурс, занятый транзакцией A. Приходилось постоянно мониторить такие ситуации и разрешать их вручную, что совершенно неприемлемо для промышленной системы.

Фантомные аномалии и проблемы предсказуемости



В распределенных системах значительно усложняется проблема фантомных аномалий - ситуаций, когда результаты транзакции меняются из-за параллельных операций других транзакций. Например, при оформлении заказа сервис проверяет наличие товара на складе, но к моменту фактической резервации количество может измениться из-за действий других пользователей. В классической базе данных эту проблему решают через блокировки или MVCC (Multi-Version Concurrency Control), но между независимыми сервисами такие механизмы не работают.

Я сталкивался с этой проблемой в системе бронирования билетов. Клиент видел доступные места, но пока он выбирал и оформлял заказ, другие пользователи могли забронировать те же места. Мы решили проблему, введя временную блокировку мест на время оформления заказа, но это создало новую проблему: недобросовестные пользователи могли заблокировать места и не завершить бронирование, делая их недоступными для других. Пришлось вводить таймауты и более сложную логику управления резервациями.

Проблемы отказоустойчивости при сбоях сети и сервисов



В распределенных системах частичные сбои неизбежны. Сеть может дать временный сбой, сервис может перезагрузиться, база данных может быть временно недоступна. Такие ситуации гораздо сложнее обрабатывать, чем в монолитных приложениях.

Особенно коварна ситуация, когда сервис успешно выполнил операцию, но не смог сообщить об этом из-за сбоя сети. Вызывающая сторона не получает ответ и не знает, что произошло: операция не выполнилась или выполнилась, но ответ потерялся?

На практике я постоянно сталкиваюсь с этой дилеммой. В одном из проектов платежный сервис успешно списывал деньги, но не всегда мог подтвердить это сервису заказов из-за проблем с сетью. В результате клиенты получали сообщение об ошибке, хотя деньги уже были списаны. Нам пришлось разработать сложный механизм синхронизации с последующей сверкой и автоматическим возвратом средств в случае расхождений.

Необходимость новых моделей согласованности



Все эти проблемы указывают на одно: нам нужны новые модели обеспечения согласованности в распределенных системах. Модели, которые учитывают асинхронную природу взаимодействия, возможность частичных сбоев, длительность бизнес-процессов.

В мире микросервисов мы часто переходим от строгой транзакционной модели к событийно-ориентированной архитектуре, где сервисы обмениваются событиями и каждый сервис независимо обрабатывает эти события, поддерживая свою локальную согласованность. Примерно тут я понял, что нам нужен какой-то систематизированный подход к организации длительных распределенных транзакций. И тут в игру вступает паттерн Saga - элегантное решение для координации сложных бизнес-процессов в микросервисной архитектуре.

Теоретические основы паттерна Saga



После того как я осознал все проблемы с традиционными ACID-транзакциями в распределенной среде, я начал искать альтернативные решения. И тут появился он - паттерн Saga. Этот паттерн не является чем-то новым, он был описан еще в 1987 году Гарсиа-Молиной и Кеннетом Салемом как способ обработки длительных транзакций в базах данных. Но только с ростом популярности микросервисов он получил второе дыхание.

Принципы компенсационных транзакций



Фундаментальный принцип Saga заключается в том, что вместо одной большой распределенной транзакции мы разбиваем ее на последовательность локальных транзакций, каждая из которых обновляет данные в рамках одного сервиса. При этом для каждой локальной транзакции определяется компенсирующая транзакция, которая отменяет ее эффект. Например, в моей системе онлайн-заказов процесс выглядел примерно так:
1. Создание заказа (сервис заказов),
2. Резервация товаров (сервис инвентаря),
3. Проведение оплаты (платежный сервис),
4. Подтверждение заказа (сервис заказов).

Если на любом из этих шагов происходит сбой, выполняются компенсирующие транзакции для уже завершенных шагов в обратном порядке:
Если сбой на шаге 3, то мы отменяем резервацию товаров (шаг 2) и отменяем заказ (шаг 1).
Если сбой на шаге 2, то просто отменяем заказ (шаг 1).

Важно понимать: компенсирующие транзакции не являются полным откатом в традиционном смысле. Они не возвращают систему точно в исходное состояние, а создают новое, семантически эквивалентное начальному. Например, при отмене платежа мы не удаляем запись о платеже, а создаем новую запись о возврате средств.

Два подхода: хореография и оркестрация



Существует два основных подхода к реализации Saga: хореография и оркестрация. Выбор между ними зависит от сложности процесса и количества участвующих сервисов.

В хореографии нет центрального координатора. Каждый сервис публикует события о выполнении своих операций, а другие сервисы слушают эти события и выполняют соответствующие действия. Это как танец, где каждый танцор знает свои шаги и реагирует на движения партнеров. Я реализовывал такой подход с использованием RabbitMQ. Сервис заказов создавал заказ и публиковал событие OrderCreated. Сервис инвентаря подписывался на это событие, резервировал товары и публиковал событие ItemsReserved. Затем платежный сервис, реагируя на это событие, списывал деньги и публиковал PaymentCompleted. Наконец, сервис заказов, получив это событие, менял статус заказа на "подтвержден".

Хореография хорошо работает для простых саг с небольшим количеством шагов, но становится сложной для отслеживания и отладки по мере роста количества участников. В одном из проектов мы имели сагу с 8 шагами и 5 сервисами - отслеживать поток выполнения в логах стало настоящим кошмаром.

Оркестрация предполагает наличие центрального координатора (оркестратора), который знает все шаги саги и вызывает сервисы в нужной последовательности. Если происходит сбой, оркестратор отвечает за вызов компенсирующих действий. Для сложного процесса обработки страховых заявок я использовал именно этот подход. Оркестратор отвечал за вызов 12 различных сервисов в определенной последовательности, с множеством условных переходов и развилок. Когда случался сбой (а в страховом бизнесе это происходило часто), оркестратор точно знал, какие компенсирующие действия нужно выполнить.

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class OrderSagaOrchestrator
{
    private readonly IOrderService _orderService;
    private readonly IInventoryService _inventoryService;
    private readonly IPaymentService _paymentService;
 
    public OrderSagaOrchestrator(
        IOrderService orderService,
        IInventoryService inventoryService,
        IPaymentService paymentService)
    {
        _orderService = orderService;
        _inventoryService = inventoryService;
        _paymentService = paymentService;
    }
 
    public async Task<bool> ProcessOrderAsync(OrderData order)
    {
        try
        {
            // Шаг 1: Создание заказа
            await _orderService.CreateOrderAsync(order);
            
            // Шаг 2: Резервация товаров
            await _inventoryService.ReserveInventoryAsync(order.OrderId, order.Items);
            
            // Шаг 3: Проведение оплаты
            await _paymentService.ChargeAsync(order.OrderId, order.TotalAmount);
            
            return true;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Saga failed: {ex.Message}. Starting compensation...");
            
            // Компенсирующие действия в обратном порядке
            await _paymentService.RefundAsync(order.OrderId);
            await _inventoryService.ReleaseInventoryAsync(order.OrderId);
            await _orderService.CancelOrderAsync(order.OrderId);
            
            return false;
        }
    }
}
Оркестрация делает поток выполнения более явным и легким для понимания, но создает тесную связь между оркестратором и сервисами.

Сравнение с другими паттернами управления распределенными транзакциями



Паттерн Saga - не единственное решение для распределенных транзакций. Существуют и другие подходы, такие как двухфазный коммит (2PC), который мы уже обсуждали, паттерн TCC (Try-Confirm/Cancel) и CQRS с Event Sourcing.

TCC очень похож на Saga, но разбивает каждую операцию на две фазы: пробную (Try) и подтверждение или отмену (Confirm/Cancel). На практике я редко видел чистую реализацию TCC, обычно он сливается с Saga в какой-то гибридный вариант.

CQRS (Command Query Responsibility Segregation) с Event Sourcing - еще один мощный паттерн, который хорошо сочетается с Saga. В таком подходе каждое изменение состояния системы представляет собой событие, которое сохраняется в лог событий. Состояние восстанавливается путем воспроизведения всех событий. Я работал на проекте, где мы комбинировали Saga и Event Sourcing для реализации сложного бизнес-процесса в страховании. Каждый шаг Saga генерировал события, которые сохранялись в лог и использовались для восстановления состояния в случае сбоев. Это дало нам великолепную трассируемость и возможность "перемотать" состояние системы к любому моменту времени.

Если сравнивать все эти подходы, Saga предлагает наилучший баланс между сложностью реализации, производительностью и надежностью для большинства бизнес-процессов в микросервисной архитектуре. Именно поэтому я так часто его использую в своих проектах.

Гарантии согласованности в конечном счете



Важно понимать, что Saga не обеспечивает строгую согласованность данных, которую дают ACID-транзакции. Вместо этого она обеспечивает "согласованность в конечном счете" (Eventual Consistency).

В процессе выполнения саги система может находиться в промежуточных, несогласованных состояниях. Например, товар зарезервирован, деньги списаны, но заказ еще не подтвержден. Или наоборот, заказ создан, но товар еще не зарезервирован. Эти состояния временны, и в итоге система должна придти к согласованному состоянию.

В одном из моих проектов это вызвало забавный казус. Клиенты иногда видели свои заказы в состоянии "в обработке", хотя товар уже отправлен. Это происходило потому, что запрос на изменение статуса заказа обрабатывался асинхронно и мог задержаться. Мы решили проблему, реализовав механизм отложенной согласованности с периодической синхронизацией.

Влияние паттерна Saga на производительность системы



Когда я впервые внедрил паттерн Saga в одну из своих систем, меня интересовал вопрос: как это повлияет на производительность? Ответ оказался неоднозначным, и зависит от множества факторов.

С одной стороны, Saga избавляет нас от длительных блокировок ресурсов, которые свойственны двухфазному коммиту. Это серьезное преимущество в высоконагруженных системах. Вместо того чтобы держать блокировки на всех этапах распределенной транзакции, каждый сервис быстро выполняет свою локальную транзакцию и освобождает ресурсы.

С другой стороны, сама реализация Saga добавляет накладные расходы. В случае хореографии, нам нужно обрабатывать, хранить и передавать сообщения между сервисами. В случае оркестрации, нам приходится поддерживать состояние саги, что также требует дополнительных ресурсов.

На практике я заметил, что при небольшом количестве параллельных саг (до нескольких сотен) производительность обычно не является проблемой. Но когда счет идет на тысячи параллельных процессов, правильная оптимизация сообщений и обработки событий становится критически важной.

В одном из моих проектов финтех-платформы производительность стала узким местом, когда количество транзакций превысило 5000 в минуту. Мы решили проблему путем шардирования очередей сообщений и оптимизации сериализации. Производительность улучшилась в 3 раза, что позволило нам уверенно обрабатывать пиковые нагрузки.

Исследование, проведенное группой ученых из Университета Карнеги-Меллона под руководством Ребекки Чен, "Performance Evaluation of Microservice Architectural Patterns" (https://ieeexplore.ieee.org/document/8498111), показало, что саги на основе хореографии обычно имеют лучшую производительность при высоких нагрузках, чем подходы на основе оркестрации, но это преимущество исчезает при росте сложности бизнес-процессов.

Временные окна несогласованности и их влияние на бизнес-логику



Одно из самых сложных следствий использования паттерна Saga - это появление "временных окон несогласованности". Поскольку каждый шаг саги выполняется в своей собственной транзакции, между шагами данные могут находиться в несогласованном состоянии. Например, в процессе покупки в интернет-магазине после резервации товара и до подтверждения оплаты товар находится в "подвешенном" состоянии. Он уже не доступен для других покупателей, но еще не закреплен за конкретным заказом окончательно. Это создает интересные вызовы с точки зрения бизнес-логики. Как долго мы готовы держать товар в зарезервированном состоянии? Что делать, если платежная система не отвечает длительное время? Как обрабатывать ситуации, когда клиент видит товар в своей корзине, но фактически он уже недоступен?

В одном из проектов я столкнулся с этой проблемой, когда бронирование авиабилетов занимало до 3 минут из-за медленного внешнего API авиакомпании. Мы решили проблему введя таймауты и показывая пользователю индикатор прогресса. Если таймаут истекал, мы автоматически запускали компенсирующие транзакции и информировали пользователя о необходимости повторить попытку.

Другой аспект - это обработка запросов на чтение. Когда пользователь запрашивает информацию в момент, когда сага находится в процессе выполнения, какие данные ему показывать? В моей практике хорошо работает подход CQRS (Command Query Responsibility Segregation), где команды и запросы обрабатываются отдельно, а для запросов можно использовать денормализованные представления данных, оптимизированные для чтения.

Семантические блокировки и управление конкурентным доступом к ресурсам



Хотя Saga избавляет нас от традиционных блокировок базы данных, она не избавляет от необходимости управлять конкурентным доступом к ресурсам. Вместо технических блокировок мы используем "семантические блокировки" - бизнес-правила, которые предотвращают конфликты. Например, в системе бронирования отелей мы помечали номер как "предварительно забронированный" на время выполнения саги. Это не настоящая блокировка в смысле базы данных, а семантический статус, который предотвращает двойное бронирование.

Управление такими блокировками требует тщательного проектирования. Необходимо предусмотреть механизмы разрешения "зависших" блокировок, например, через таймауты. В одном из моих проектов мы внедрили фоновый процесс, который периодически проверял "зависшие" саги и автоматически выполнял компенсирующие действия, если сага не прогрессировала в течение определенного времени.

Для особо критичных ресурсов иногда приходится использовать пессимистичные блокировки. Например, в платежной системе мы блокировали счет пользователя на время выполнения транзакции, чтобы предотвратить овердрафт. Но такие блокировки должны быть очень короткими, иначе они становятся узким местом системы.

Интересный подход, который я применял в нескольких проектах - использование условных операций. Например, операция "зарезервировать товар, если доступное количество >= X". Такие операции могут выполняться атомарно на уровне одного сервиса и предотвращают гонки данных, без необходимости в блокировках.

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public async Task<bool> TryReserveInventoryAsync(string productId, int requestedQuantity, string reservationId)
{
    // Транзакция на уровне сервиса инвентаря
    using (var transaction = await _database.BeginTransactionAsync())
    {
        var product = await _database.Products.FindAsync(productId);
        
        // Проверка и изменение выполняются атомарно
        if (product.AvailableQuantity >= requestedQuantity)
        {
            product.AvailableQuantity -= requestedQuantity;
            
            // Создаем запись о резервации
            var reservation = new Reservation
            {
                ReservationId = reservationId,
                ProductId = productId,
                Quantity = requestedQuantity,
                ExpirationTime = DateTime.UtcNow.AddMinutes(10)
            };
            
            _database.Reservations.Add(reservation);
            await _database.SaveChangesAsync();
            await transaction.CommitAsync();
            return true;
        }
        
        await transaction.RollbackAsync();
        return false;
    }
}
Такой подход с условными операциями хорошо работает для многих сценариев, но не решает всех проблем. В сложных бизнес-процессах может потребоваться комбинация различных стратегий управления конкурентным доступом.

Обратная связь и мониторинг



Одним из важнейших аспектов работы с сагами является мониторинг и обратная связь. Поскольку процесс выполнения саги может быть длительным и включать множество шагов, крайне важно иметь хорошую видимость того, что происходит. В моей практике я всегда реализую механизм трассировки саг, который позволяет отслеживать каждый шаг и его результат. Это бесценно при отладке проблем в продакшне.

Для длительных саг важно предоставлять обратную связь пользователям. В одном из проектов мы внедрили механизм уведомлений, который информировал пользователя о прогрессе выполнения его заказа: "Заказ создан" -> "Товары зарезервированы" -> "Оплата получена" -> "Заказ подтвержден".

Не менее важен мониторинг общего состояния системы. Сколько саг выполняется сейчас? Сколько завершилось успешно или с ошибками? Какова средняя продолжительность выполнения? Все эти метрики помогают выявлять проблемы до того, как они станут критическими. В одном из проектов я использовал комбинацию Prometheus для сбора метрик и Grafana для визуализации. Это дало нам прекрасную видимость работы саг в реальном времени и позволяло быстро реагировать на аномалии.

Заключение теоретической части



Паттерн Saga представляет собой мощный инструмент для управления распределенными транзакциями в микросервисной архитектуре. Он позволяет избежать многих проблем, связанных с традиционными ACID-транзакциями, но вносит свою собственную сложность и требует тщательного проектирования.

Ключевые моменты, которые следует учитывать при работе с сагами:
  • Тщательно проектируйте компенсирующие транзакции.
  • Выбирайте между хореографией и оркестрацией в зависимости от сложности процесса.
  • Учитывайте временные окна несогласованности в бизнес-логике.
  • Разрабатывайте стратегии управления конкурентным доступом.
  • Внедряйте механизмы мониторинга и трассировки.
Теперь, когда мы разобрались с теоретическими основами, давайте перейдем к практической реализации паттерна Saga в C# и .NET.

Реализация Saga на C# и .NET



Как же реализовать паттерн Saga в C# и .NET? За годы работы с микросервисами я опробовал разные подходы - от простейших самописных решений до использования специализированных фреймворков. Расскажу о наиболее эффективных.

Проектирование компенсационных действий



Первое, с чего начинается реализация саги - это определение шагов и соответствующих им компенсирующих действий. Ключевая особенность хорошо спроектированных компенсаций в том, что они должны быть идемпотентными - то есть давать одинаковый результат независимо от того, сколько раз они вызываются. В моей практике хорошо зарекомендовал себя такой подход к проектированию компенсаций:

C#
1
2
3
4
5
6
public interface ISagaStep<TData>
{
    Task ExecuteAsync(TData data);
    Task CompensateAsync(TData data);
    bool IsCompensable { get; }
}
Для каждого шага саги мы реализуем этот интерфейс, определяя как прямое действие (ExecuteAsync), так и компенсирующее (CompensateAsync). Свойство IsCompensable позволяет обозначить шаги, которые не требуют компенсации - например, чисто информационные операции.
Вот как может выглядеть конкретная реализация шага для создания заказа:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CreateOrderStep : ISagaStep<OrderSagaData>
{
    private readonly IOrderRepository _orderRepository;
    
    public CreateOrderStep(IOrderRepository orderRepository)
    {
        _orderRepository = orderRepository;
    }
    
    public async Task ExecuteAsync(OrderSagaData data)
    {
        var order = new Order
        {
            Id = data.OrderId,
            CustomerId = data.CustomerId,
            Items = data.Items.Select(i => new OrderItem
            {
                ProductId = i.ProductId,
                Quantity = i.Quantity,
                Price = i.Price
            }).ToList(),
            Status = OrderStatus.Pending
        };
        
        await _orderRepository.CreateAsync(order);
        data.OrderCreated = true;
    }
    
    public async Task CompensateAsync(OrderSagaData data)
    {
        if (data.OrderCreated)
        {
            await _orderRepository.UpdateStatusAsync(data.OrderId, OrderStatus.Cancelled);
        }
    }
    
    public bool IsCompensable => true;
}
Обратите внимание, как мы проверяем флаг OrderCreated перед выполнением компенсации. Это важный момент - компенсации должны выполняться только для тех шагов, которые были успешно завершены.

Создание координатора саги



Для реализации оркестрации нам нужен координатор, который будет управлять последовательностью шагов и компенсациями. Вот пример простого координатора:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class SagaCoordinator<TData> where TData : class
{
    private readonly List<ISagaStep<TData>> _steps = new List<ISagaStep<TData>>();
    
    public SagaCoordinator<TData> AddStep(ISagaStep<TData> step)
    {
        _steps.Add(step);
        return this;
    }
    
    public async Task<bool> ExecuteAsync(TData data)
    {
        var executedSteps = new List<ISagaStep<TData>>();
        
        try
        {
            foreach (var step in _steps)
            {
                await step.ExecuteAsync(data);
                if (step.IsCompensable)
                {
                    executedSteps.Add(step);
                }
            }
            
            return true;
        }
        catch (Exception ex)
        {
            // Здесь можно добавить логирование
            Console.WriteLine($"Saga failed at step {executedSteps.Count}. Error: {ex.Message}");
            
            // Выполняем компенсации в обратном порядке
            for (int i = executedSteps.Count - 1; i >= 0; i--)
            {
                try
                {
                    await executedSteps[i].CompensateAsync(data);
                }
                catch (Exception compensationEx)
                {
                    // Логирование ошибок компенсации
                    Console.WriteLine($"Compensation failed: {compensationEx.Message}");
                }
            }
            
            return false;
        }
    }
}
Использовать такой координатор очень просто:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
var sagaData = new OrderSagaData { 
    OrderId = Guid.NewGuid().ToString(),
    CustomerId = "customer123",
    Items = new List<OrderItemData> { ... }
};
 
var saga = new SagaCoordinator<OrderSagaData>()
    .AddStep(new CreateOrderStep(orderRepository))
    .AddStep(new ReserveInventoryStep(inventoryService))
    .AddStep(new ProcessPaymentStep(paymentService))
    .AddStep(new UpdateOrderStatusStep(orderRepository));
 
bool success = await saga.ExecuteAsync(sagaData);
Этот подход хорошо работает для простых случаев, но у него есть существенный недостаток - отсутствие персистентности. Если процесс выполнения прервется из-за сбоя в приложении или сервере, информация о частично выполненной саге будет потеряна.

Использование State Machine для управления жизненным циклом саги



Для более сложных саг лучше использовать модель конечного автомата (State Machine). Каждый шаг саги представляет собой состояние, а переходы между ними - это события, возникающие в системе. В .NET экосистеме есть несколько библиотек, которые облегчают работу с конечными автоматами. Одна из моих любимых - Stateless:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public enum OrderState { Created, InventoryReserved, PaymentProcessed, Completed, Cancelled }
 
public enum OrderEvent { Create, ReserveInventory, ProcessPayment, Complete, Cancel }
 
public class OrderSaga
{
    private readonly StateMachine<OrderState, OrderEvent> _machine;
    private OrderState _currentState;
    
    public OrderSaga(OrderSagaData data)
    {
        _machine = new StateMachine<OrderState, OrderEvent>(() => _currentState, s => _currentState = s);
        
        _machine.Configure(OrderState.Created)
            .Permit(OrderEvent.ReserveInventory, OrderState.InventoryReserved)
            .Permit(OrderEvent.Cancel, OrderState.Cancelled);
        
        _machine.Configure(OrderState.InventoryReserved)
            .Permit(OrderEvent.ProcessPayment, OrderState.PaymentProcessed)
            .Permit(OrderEvent.Cancel, OrderState.Cancelled)
            .OnExit(OrderEvent.Cancel, () => ReleaseInventory(data));
        
        _machine.Configure(OrderState.PaymentProcessed)
            .Permit(OrderEvent.Complete, OrderState.Completed)
            .Permit(OrderEvent.Cancel, OrderState.Cancelled)
            .OnExit(OrderEvent.Cancel, () => RefundPayment(data));
        
        _machine.Configure(OrderState.Completed)
            .Ignore(OrderEvent.Cancel); // Завершенный заказ нельзя отменить
        
        _machine.Configure(OrderState.Cancelled);
        
        _currentState = OrderState.Created;
    }
    
    private void ReleaseInventory(OrderSagaData data)
    {
        // Реализация освобождения ресурсов
    }
    
    private void RefundPayment(OrderSagaData data)
    {
        // Реализация возврата платежа
    }
    
    public OrderState CurrentState => _currentState;
    
    public async Task<bool> MoveNext(OrderEvent @event, OrderSagaData data)
    {
        try
        {
            _machine.Fire(@event);
            return true;
        }
        catch (InvalidOperationException)
        {
            return false;
        }
    }
}
Этот подход дает нам больше гибкости в определении потока выполнения саги и позволяет легко моделировать сложные бизнес-процессы с множеством ветвлений и условных переходов. В одном из моих проектов была сага с более чем 20 состояниями и 30 возможными переходами - использование State Machine сделало код гораздо более понятным и управляемым.

Персистентность состояния и восстановление после сбоев



Важнейший аспект любой реализации саги - сохранение состояния. В реальном мире случаются всевозможные сбои: приложения падают, серверы перезагружаются, сеть отваливается. Наша сага должна быть устойчива к таким сбоям.
Для решения проблемы персистентности я использую несколько подходов, в зависимости от сложности саги и требований проекта. Самый простой вариант - сохранять состояние саги в базе данных после каждого шага:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class PersistentSagaCoordinator<TData> where TData : class, ISagaData
{
    private readonly List<ISagaStep<TData>> _steps = new List<ISagaStep<TData>>();
    private readonly ISagaRepository<TData> _repository;
 
    public PersistentSagaCoordinator(ISagaRepository<TData> repository)
    {
        _repository = repository;
    }
 
    public PersistentSagaCoordinator<TData> AddStep(ISagaStep<TData> step)
    {
        _steps.Add(step);
        return this;
    }
 
    public async Task<bool> ExecuteAsync(TData data)
    {
        // Загружаем существующую сагу или создаем новую
        var existingSaga = await _repository.GetByIdAsync(data.SagaId);
        if (existingSaga != null)
        {
            // Копируем состояние существующей саги
            data = existingSaga;
        }
        else
        {
            // Сохраняем начальное состояние
            await _repository.SaveAsync(data);
        }
 
        // Выполняем шаги, начиная с текущего
        for (int i = data.CurrentStep; i < _steps.Count; i++)
        {
            try
            {
                await _steps[i].ExecuteAsync(data);
                
                // Обновляем текущий шаг и сохраняем
                data.CurrentStep = i + 1;
                await _repository.SaveAsync(data);
            }
            catch (Exception ex)
            {
                // Логируем ошибку
                data.ErrorMessage = ex.Message;
                await _repository.SaveAsync(data);
 
                // Компенсируем выполненные шаги
                for (int j = i - 1; j >= 0; j--)
                {
                    if (_steps[j].IsCompensable)
                    {
                        await _steps[j].CompensateAsync(data);
                        data.CompensatedSteps.Add(j);
                        await _repository.SaveAsync(data);
                    }
                }
                
                return false;
            }
        }
 
        return true;
    }
}
 
public interface ISagaData
{
    Guid SagaId { get; set; }
    int CurrentStep { get; set; }
    List<int> CompensatedSteps { get; set; }
    string ErrorMessage { get; set; }
}
 
public interface ISagaRepository<TData> where TData : ISagaData
{
    Task<TData> GetByIdAsync(Guid sagaId);
    Task SaveAsync(TData data);
}
В реальных проектах я обычно использую Entity Framework Core для персистентности саг. Вот пример простого репозитория:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class EfSagaRepository<TData> : ISagaRepository<TData> 
    where TData : class, ISagaData
{
    private readonly DbContext _dbContext;
    
    public EfSagaRepository(DbContext dbContext)
    {
        _dbContext = dbContext;
    }
    
    public async Task<TData> GetByIdAsync(Guid sagaId)
    {
        return await _dbContext.Set<TData>()
            .FirstOrDefaultAsync(s => s.SagaId == sagaId);
    }
    
    public async Task SaveAsync(TData data)
    {
        var existing = await GetByIdAsync(data.SagaId);
        
        if (existing == null)
        {
            _dbContext.Set<TData>().Add(data);
        }
        else
        {
            _dbContext.Entry(existing).CurrentValues.SetValues(data);
        }
        
        await _dbContext.SaveChangesAsync();
    }
}

Обработка состояний и переходов



Для более сложных саг с множеством состояний и переходов я предпочитаю подход с явным моделированием состояний и событий. Вот пример с использованием машины состояний и персистентности:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class StateMachineSaga<TState, TEvent, TData>
    where TState : struct, Enum
    where TEvent : struct, Enum
    where TData : class, ISagaData
{
    private readonly StateMachine<TState, TEvent> _stateMachine;
    private readonly ISagaRepository<TData> _repository;
    private TData _data;
    
    public StateMachineSaga(
        TData data,
        ISagaRepository<TData> repository,
        Func<StateMachine<TState, TEvent>, TData, Task> configureStateMachine)
    {
        _data = data;
        _repository = repository;
        
        _stateMachine = new StateMachine<TState, TEvent>(
            () => (TState)Enum.Parse(typeof(TState), _data.CurrentState),
            state => 
            {
                _data.CurrentState = state.ToString();
                _repository.SaveAsync(_data).GetAwaiter().GetResult();
            });
        
        configureStateMachine(_stateMachine, _data).GetAwaiter().GetResult();
    }
    
    public async Task FireEventAsync(TEvent @event)
    {
        // Загружаем актуальное состояние из БД
        _data = await _repository.GetByIdAsync(_data.SagaId);
        
        // Проверяем, можно ли выполнить переход
        if (_stateMachine.CanFire(@event))
        {
            await Task.Run(() => _stateMachine.Fire(@event));
            return;
        }
        
        throw new InvalidOperationException(
            $"Cannot fire event {@event} in state {_data.CurrentState}");
    }
}
Использование этого класса выглядит примерно так:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Определение состояний и событий
public enum OrderState { New, InventoryReserved, PaymentProcessed, Completed, Cancelled }
public enum OrderEvent { ReserveInventory, ProcessPayment, Complete, Cancel }
 
// Создание и настройка саги
var sagaData = new OrderSagaData { SagaId = Guid.NewGuid(), CurrentState = OrderState.New.ToString() };
var saga = new StateMachineSaga<OrderState, OrderEvent, OrderSagaData>(
    sagaData,
    new EfSagaRepository<OrderSagaData>(dbContext),
    ConfigureStateMachine);
 
// Настройка машины состояний
async Task ConfigureStateMachine(
    StateMachine<OrderState, OrderEvent> machine, 
    OrderSagaData data)
{
    machine.Configure(OrderState.New)
        .Permit(OrderEvent.ReserveInventory, OrderState.InventoryReserved)
        .OnEntryAsync(() => LogStateChange(data, "New"))
        .OnExitAsync(() => LogStateChange(data, "Leaving New"));
        
    machine.Configure(OrderState.InventoryReserved)
        .Permit(OrderEvent.ProcessPayment, OrderState.PaymentProcessed)
        .Permit(OrderEvent.Cancel, OrderState.Cancelled)
        .OnEntryAsync(() => ReserveInventory(data))
        .OnExitAsync(OrderEvent.Cancel, () => ReleaseInventory(data));
    
    // И так далее...
}
 
// Использование
await saga.FireEventAsync(OrderEvent.ReserveInventory);
await saga.FireEventAsync(OrderEvent.ProcessPayment);
await saga.FireEventAsync(OrderEvent.Complete);

Интеграция с популярными ORM и механизмами персистентности



В большинстве проектов я использую Entity Framework Core как ORM для сохранения состояния саг. Модель данных обычно выглядит примерно так:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class OrderSagaData : ISagaData
{
    public Guid SagaId { get; set; }
    public string CurrentState { get; set; }
    public int CurrentStep { get; set; }
    public List<int> CompensatedSteps { get; set; } = new List<int>();
    public string ErrorMessage { get; set; }
    
    // Бизнес-данные
    public string OrderId { get; set; }
    public string CustomerId { get; set; }
    public List<OrderItem> Items { get; set; } = new List<OrderItem>();
    public decimal TotalAmount { get; set; }
    public string PaymentTransactionId { get; set; }
    public bool InventoryReserved { get; set; }
    public bool PaymentProcessed { get; set; }
}
 
public class OrderItem
{
    public string ProductId { get; set; }
    public int Quantity { get; set; }
    public decimal Price { get; set; }
}
 
public class SagaDbContext : DbContext
{
    public SagaDbContext(DbContextOptions options) : base(options) { }
    
    public DbSet<OrderSagaData> OrderSagas { get; set; }
    
    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<OrderSagaData>()
            .HasKey(s => s.SagaId);
            
        modelBuilder.Entity<OrderSagaData>()
            .Property(s => s.CurrentState)
            .IsRequired();
            
        modelBuilder.Entity<OrderSagaData>()
            .Property(s => s.CompensatedSteps)
            .HasConversion(
                v => JsonSerializer.Serialize(v, null),
                v => JsonSerializer.Deserialize<List<int>>(v, null) ?? new List<int>());
                
        modelBuilder.Entity<OrderSagaData>()
            .Property(s => s.Items)
            .HasConversion(
                v => JsonSerializer.Serialize(v, null),
                v => JsonSerializer.Deserialize<List<OrderItem>>(v, null) ?? new List<OrderItem>());
    }
}
Для проектов с очень высокими требованиями к производительности я иногда использую NoSQL базы данных, такие как MongoDB, для хранения состояния саги. Это особенно удобно, когда структура данных саги сложная и содержит вложенные коллекции:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MongoSagaRepository<TData> : ISagaRepository<TData>
    where TData : class, ISagaData
{
    private readonly IMongoCollection<TData> _collection;
    
    public MongoSagaRepository(IMongoDatabase database, string collectionName)
    {
        _collection = database.GetCollection<TData>(collectionName);
    }
    
    public async Task<TData> GetByIdAsync(Guid sagaId)
    {
        return await _collection.Find(s => s.SagaId == sagaId)
            .FirstOrDefaultAsync();
    }
    
    public async Task SaveAsync(TData data)
    {
        await _collection.ReplaceOneAsync(
            s => s.SagaId == data.SagaId,
            data,
            new ReplaceOptions { IsUpsert = true });
    }
}
При выборе механизма персистентности важно учитывать требования к атомарности операций. В некоторых случаях может потребоваться транзакционное обновление нескольких коллекций или таблиц, что требует особого внимания при использовании NoSQL баз данных.

Практические примеры и код



Давайте теперь разберем реальное приложение, которое я создал для демонстрации паттерна Saga. Я выбрал самый распространенный пример - процесс оформления заказа в интернет-магазине, потому что он наглядно демонстрирует все аспекты распределенных транзакций.

Архитектура примера: выбор доменной области и декомпозиция на микросервисы



Для нашего примера я выделил следующие микросервисы:
OrderService - управляет заказами;
InventoryService - отвечает за товарные запасы;
PaymentService - обрабатывает платежи;
NotificationService - отправляет уведомления клиентам.

Каждый сервис имеет свою базу данных и свой API. Взаимодействие между сервисами происходит через сообщения.
Вот как выглядит простая модель данных для заказа:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Order
{
    public Guid Id { get; set; }
    public string CustomerId { get; set; }
    public OrderStatus Status { get; set; }
    public List<OrderItem> Items { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime CreatedAt { get; set; }
}
 
public enum OrderStatus
{
    Created,
    InventoryReserved,
    PaymentProcessed,
    Completed,
    Cancelled
}

Сага заказа с платежом и доставкой



Давайте реализуем сагу для процесса оформления заказа с использованием подхода оркестрации. Сначала определим данные саги:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class OrderProcessingSagaData : ISagaData
{
    public Guid SagaId { get; set; }
    public string CurrentState { get; set; }
    
    // Бизнес-данные
    public Guid OrderId { get; set; }
    public string CustomerId { get; set; }
    public List<OrderItemDto> Items { get; set; }
    public decimal TotalAmount { get; set; }
    public string PaymentId { get; set; }
    public string InventoryReservationId { get; set; }
    public DateTime CreatedAt { get; set; }
    
    // Флаги состояния
    public bool OrderCreated { get; set; }
    public bool InventoryReserved { get; set; }
    public bool PaymentProcessed { get; set; }
    public bool NotificationSent { get; set; }
}
Теперь определим шаги саги. Вот пример шага для резервирования товаров:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ReserveInventoryStep : ISagaStep<OrderProcessingSagaData>
{
    private readonly IInventoryService _inventoryService;
    private readonly ILogger<ReserveInventoryStep> _logger;
 
    public ReserveInventoryStep(
        IInventoryService inventoryService,
        ILogger<ReserveInventoryStep> logger)
    {
        _inventoryService = inventoryService;
        _logger = logger;
    }
 
    public async Task ExecuteAsync(OrderProcessingSagaData data)
    {
        _logger.LogInformation("Reserving inventory for order {OrderId}", data.OrderId);
        
        var items = data.Items.Select(i => new InventoryItemDto
        {
            ProductId = i.ProductId,
            Quantity = i.Quantity
        }).ToList();
        
        var reservationId = await _inventoryService.ReserveInventoryAsync(
            data.OrderId, items);
            
        data.InventoryReservationId = reservationId;
        data.InventoryReserved = true;
        
        _logger.LogInformation("Inventory reserved successfully for order {OrderId}", data.OrderId);
    }
 
    public async Task CompensateAsync(OrderProcessingSagaData data)
    {
        if (data.InventoryReserved)
        {
            _logger.LogInformation("Releasing inventory for order {OrderId}", data.OrderId);
            await _inventoryService.ReleaseInventoryAsync(data.InventoryReservationId);
            data.InventoryReserved = false;
            _logger.LogInformation("Inventory released for order {OrderId}", data.OrderId);
        }
    }
 
    public bool IsCompensable => true;
}
Аналогично мы определяем шаги для создания заказа, обработки платежа и отправки уведомления. Затем собираем их в сагу:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var sagaCoordinator = new PersistentSagaCoordinator<OrderProcessingSagaData>(repository)
    .AddStep(new CreateOrderStep(orderService, logger))
    .AddStep(new ReserveInventoryStep(inventoryService, logger))
    .AddStep(new ProcessPaymentStep(paymentService, logger))
    .AddStep(new SendNotificationStep(notificationService, logger));
 
var sagaData = new OrderProcessingSagaData
{
    SagaId = Guid.NewGuid(),
    OrderId = Guid.NewGuid(),
    CustomerId = "customer123",
    Items = items,
    TotalAmount = items.Sum(i => i.Price * i.Quantity),
    CreatedAt = DateTime.UtcNow
};
 
bool success = await sagaCoordinator.ExecuteAsync(sagaData);

Интеграция с MassTransit и Rebus



В реальных проектах я часто использую специализированные библиотеки для работы с сагами. Одна из самых популярных - MassTransit. Вот как выглядит реализация той же саги с использованием MassTransit:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    public OrderProcessingSaga()
    {
        InstanceState(x => x.CurrentState);
        
        // Определяем события
        Event(() => OrderSubmitted, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => InventoryReserved, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => PaymentProcessed, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => OrderFaulted, x => x.CorrelateById(context => context.Message.OrderId));
        
        // Определяем состояния
        Initially(
            When(OrderSubmitted)
                .Then(context => {
                    context.Instance.OrderId = context.Data.OrderId;
                    context.Instance.CustomerId = context.Data.CustomerId;
                    context.Instance.TotalAmount = context.Data.TotalAmount;
                    context.Instance.Items = context.Data.Items;
                })
                .PublishAsync(context => context.Init<ReserveInventory>(new {
                    OrderId = context.Instance.OrderId,
                    Items = context.Instance.Items
                }))
                .TransitionTo(Reserving));
                
        During(Reserving,
            When(InventoryReserved)
                .PublishAsync(context => context.Init<ProcessPayment>(new {
                    OrderId = context.Instance.OrderId,
                    CustomerId = context.Instance.CustomerId,
                    Amount = context.Instance.TotalAmount
                }))
                .TransitionTo(ProcessingPayment),
            When(OrderFaulted)
                .TransitionTo(Faulted));
                
        // Продолжаем определять остальные переходы
    }
    
    // Определяем события и состояния
    public Event<OrderSubmittedEvent> OrderSubmitted { get; private set; }
    public Event<InventoryReservedEvent> InventoryReserved { get; private set; }
    public Event<PaymentProcessedEvent> PaymentProcessed { get; private set; }
    public Event<OrderCompletedEvent> OrderCompleted { get; private set; }
    public Event<OrderFaultedEvent> OrderFaulted { get; private set; }
    
    public State Reserving { get; private set; }
    public State ProcessingPayment { get; private set; }
    public State Completed { get; private set; }
    public State Faulted { get; private set; }
}
Для конфигурации MassTransit нужно добавить сагу в DI-контейнер и настроить шину сообщений:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<OrderProcessingSaga, OrderProcessingSagaState>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
            r.AddDbContext<DbContext, SagaDbContext>((provider, builder) =>
            {
                builder.UseSqlServer(connectionString);
            });
        });
        
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        
        cfg.ConfigureEndpoints(context);
    });
});
Другая популярная библиотека - Rebus. Вот пример саги с использованием Rebus:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class OrderProcessingSaga : Saga<OrderProcessingSagaData>,
    IAmInitiatedBy<OrderSubmittedEvent>,
    IHandleMessages<InventoryReservedEvent>,
    IHandleMessages<PaymentProcessedEvent>,
    IHandleMessages<OrderFaultedEvent>
{
    private readonly ILogger<OrderProcessingSaga> _logger;
    
    public OrderProcessingSaga(ILogger<OrderProcessingSaga> logger)
    {
        _logger = logger;
    }
    
    protected override void CorrelateMessages(ICorrelationConfig<OrderProcessingSagaData> config)
    {
        config.Correlate<OrderSubmittedEvent>(m => m.OrderId, d => d.OrderId);
        config.Correlate<InventoryReservedEvent>(m => m.OrderId, d => d.OrderId);
        config.Correlate<PaymentProcessedEvent>(m => m.OrderId, d => d.OrderId);
        config.Correlate<OrderFaultedEvent>(m => m.OrderId, d => d.OrderId);
    }
    
    public async Task Handle(OrderSubmittedEvent message)
    {
        _logger.LogInformation("Handling OrderSubmittedEvent for order {OrderId}", message.OrderId);
        
        Data.OrderId = message.OrderId;
        Data.CustomerId = message.CustomerId;
        Data.Items = message.Items;
        Data.TotalAmount = message.TotalAmount;
        Data.CreatedAt = DateTime.UtcNow;
        
        await bus.Send(new ReserveInventoryCommand
        {
            OrderId = Data.OrderId,
            Items = Data.Items
        });
    }
    
    // Реализация других обработчиков...
}
Настройка Rebus выглядит примерно так:

C#
1
2
3
4
5
6
7
8
services.AddRebus(configure => configure
    .Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost:5672", "order-processing"))
    .Routing(r => r.TypeBased()
        .Map<ReserveInventoryCommand>("inventory")
        .Map<ProcessPaymentCommand>("payment")
        .Map<SendNotificationCommand>("notification"))
    .Sagas(s => s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex"))
    .Options(o => o.SetNumberOfWorkers(5)));

Реализация распределенной саги через Azure Service Bus



В корпоративных проектах я часто использую Azure Service Bus для обеспечения надежной доставки сообщений между сервисами. Этот брокер сообщений отлично интегрируется с .NET и предоставляет все необходимые функции для реализации надежных саг.

Вот пример настройки саги с использованием Azure Service Bus и MassTransit:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<OrderProcessingSaga, OrderProcessingSagaState>()
        .MessageSessionRepository();
        
    x.UsingAzureServiceBus((context, cfg) =>
    {
        cfg.Host(new Uri("sb://your-namespace.servicebus.windows.net/"), h =>
        {
            h.TokenProvider = TokenProvider.CreateManagedIdentityTokenProvider();
        });
        
        cfg.ConfigureEndpoints(context, new KebabCaseEndpointNameFormatter("order", false));
        
        // Настройка топологии сообщений
        cfg.Message<OrderSubmittedEvent>(m => m.SetEntityName("order-events"));
        cfg.Message<InventoryReservedEvent>(m => m.SetEntityName("inventory-events"));
        cfg.Message<PaymentProcessedEvent>(m => m.SetEntityName("payment-events"));
    });
});
Для хранения состояния саги в Azure Service Bus я использую сессии сообщений. Это позволяет сохранять состояние между обработками сообщений:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AzureServiceBusSagaRepository<TSaga> : ISagaRepository<TSaga>
    where TSaga : class, ISagaData
{
    private readonly IMessageSession _messageSession;
    private readonly string _sessionId;
 
    public AzureServiceBusSagaRepository(IMessageSession messageSession, string sessionId)
    {
        _messageSession = messageSession;
        _sessionId = sessionId;
    }
 
    public async Task<TSaga> GetByIdAsync(Guid sagaId)
    {
        var session = await _messageSession.AcceptMessageSessionAsync(_sessionId);
        var state = await session.GetStateAsync();
        
        if (state == null)
            return null;
            
        return JsonSerializer.Deserialize<TSaga>(state);
    }
 
    public async Task SaveAsync(TSaga saga)
    {
        var session = await _messageSession.AcceptMessageSessionAsync(_sessionId);
        var state = JsonSerializer.SerializeToUtf8Bytes(saga);
        await session.SetStateAsync(state);
    }
}

Мониторинг и логирование выполнения



Мониторинг и логирование саг - жизненно важная часть любой распределенной системы. В моих проектах я использую многоуровневую стратегию логирования:
1. Структурированное логирование с использованием Serilog для записи всех событий саги.
2. Трассировка распределенных запросов с помощью OpenTelemetry для отслеживания полного пути выполнения.
3. Аудит саг - специальный лог, фиксирующий все изменения состояния сага для возможности восстановления и анализа.
Вот пример настройки логирования с Serilog:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
public static IHostBuilder CreateHostBuilder(string[] args) =>
    Host.CreateDefaultBuilder(args)
        .UseSerilog((context, services, configuration) => configuration
            .ReadFrom.Configuration(context.Configuration)
            .ReadFrom.Services(services)
            .Enrich.FromLogContext()
            .Enrich.WithProperty("Application", "OrderService")
            .WriteTo.Console()
            .WriteTo.Seq("http://seq-server:5341"))
        .ConfigureWebHostDefaults(webBuilder =>
        {
            webBuilder.UseStartup<Startup>();
        });
А вот как выглядит логирование в шаге саги:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public async Task ExecuteAsync(OrderProcessingSagaData data)
{
    _logger.LogInformation(
        "Starting payment processing for order {OrderId}, amount: {Amount}", 
        data.OrderId, 
        data.TotalAmount);
    
    using (_logger.BeginScope(new Dictionary<string, object>
    {
        ["SagaId"] = data.SagaId,
        ["OrderId"] = data.OrderId,
        ["CustomerId"] = data.CustomerId
    }))
    {
        try
        {
            var paymentId = await _paymentService.ProcessPaymentAsync(
                data.CustomerId, 
                data.OrderId,
                data.TotalAmount);
                
            data.PaymentId = paymentId;
            data.PaymentProcessed = true;
            
            _logger.LogInformation(
                "Payment processed successfully for order {OrderId}, paymentId: {PaymentId}", 
                data.OrderId, 
                paymentId);
        }
        catch (Exception ex)
        {
            _logger.LogError(
                ex, 
                "Failed to process payment for order {OrderId}", 
                data.OrderId);
                
            throw;
        }
    }
}

Использование Polly для реализации политик отказоустойчивости



В реальном мире сервисы могут временно не отвечать, быть перегруженными или давать сбои. Для повышения надежности саг я использую библиотеку Polly, которая предоставляет элегантный способ реализации политик повторных попыток, таймаутов и прерывателей цепи.
Вот пример шага саги с Polly для обработки временных сбоев:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class ReserveInventoryStep : ISagaStep<OrderProcessingSagaData>
{
    private readonly IInventoryService _inventoryService;
    private readonly ILogger<ReserveInventoryStep> _logger;
    private readonly AsyncPolicy _retryPolicy;
 
    public ReserveInventoryStep(
        IInventoryService inventoryService,
        ILogger<ReserveInventoryStep> logger)
    {
        _inventoryService = inventoryService;
        _logger = logger;
        
        // Определяем политику повторных попыток
        _retryPolicy = Policy
            .Handle<HttpRequestException>()
            .Or<TimeoutException>()
            .WaitAndRetryAsync(
                3, // количество повторов
                retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), // экспоненциальный отступ
                (exception, timeSpan, retryCount, context) =>
                {
                    _logger.LogWarning(
                        exception,
                        "Attempt {RetryCount} to reserve inventory failed. Retrying in {RetryTimeSpan}...",
                        retryCount,
                        timeSpan);
                });
    }
 
    public async Task ExecuteAsync(OrderProcessingSagaData data)
    {
        await _retryPolicy.ExecuteAsync(async () =>
        {
            _logger.LogInformation("Reserving inventory for order {OrderId}", data.OrderId);
            
            var items = data.Items.Select(i => new InventoryItemDto
            {
                ProductId = i.ProductId,
                Quantity = i.Quantity
            }).ToList();
            
            var reservationId = await _inventoryService.ReserveInventoryAsync(
                data.OrderId, items);
                
            data.InventoryReservationId = reservationId;
            data.InventoryReserved = true;
            
            _logger.LogInformation("Inventory reserved successfully for order {OrderId}", data.OrderId);
        });
    }
 
    public async Task CompensateAsync(OrderProcessingSagaData data)
    {
        // Также применяем политику повторов к компенсации
        await _retryPolicy.ExecuteAsync(async () =>
        {
            if (data.InventoryReserved)
            {
                _logger.LogInformation("Releasing inventory for order {OrderId}", data.OrderId);
                await _inventoryService.ReleaseInventoryAsync(data.InventoryReservationId);
                data.InventoryReserved = false;
                _logger.LogInformation("Inventory released for order {OrderId}", data.OrderId);
            }
        });
    }
 
    public bool IsCompensable => true;
}
Для сложных сценариев я комбинирую несколько политик - таймауты, повторные попытки и прерыватели цепи:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Политика таймаута
var timeoutPolicy = Policy
    .TimeoutAsync(30, TimeoutStrategy.Pessimistic);
 
// Политика прерывателя цепи
var circuitBreakerPolicy = Policy
    .Handle<Exception>()
    .CircuitBreakerAsync(
        exceptionsAllowedBeforeBreaking: 5,
        durationOfBreak: TimeSpan.FromMinutes(1),
        onBreak: (ex, breakDelay) => 
            _logger.LogWarning(
                ex, 
                "Circuit broken for {BreakDelay}!", 
                breakDelay),
        onReset: () => 
            _logger.LogInformation("Circuit reset!"),
        onHalfOpen: () => 
            _logger.LogInformation("Circuit half-open"));
 
// Комбинируем политики
var resiliencePolicy = Policy.WrapAsync(
    circuitBreakerPolicy,
    _retryPolicy,
    timeoutPolicy);

Подводные камни и решения проблем



Когда я только начинал внедрять паттерн Saga в своих проектах, то часто наступал на одни и те же грабли. Теоретически все выглядит просто и красиво, но на практике возникает масса нюансов, которые могут превратить элегантное решение в источник новых проблем. Давайте разберем основные подводные камни и способы их обхода.

Обработка частичных сбоев



Самая коварная проблема распределенных систем - частичные сбои. В монолитных приложениях все либо работает, либо падает целиком. В микросервисной архитектуре возможны ситуации, когда часть операций выполнилась успешно, а часть - нет. Особенно неприятен сценарий, когда сервис выполнил операцию, но не смог отправить ответ из-за сетевого сбоя. Вызывающая сторона думает, что операция не выполнилась, и повторяет запрос, что может привести к дублированию данных.

Я столкнулся с этим в проекте платежной системы, когда из-за временной недоступности сети платеж списывался дважды. Клиенты были, мягко говоря, недовольны. Решение оказалось в реализации идемпотентных операций.

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public async Task<PaymentResult> ProcessPaymentAsync(string paymentId, decimal amount)
{
    // Проверяем, был ли уже обработан этот платеж
    var existingPayment = await _paymentRepository.GetByIdAsync(paymentId);
    if (existingPayment != null)
    {
        _logger.LogWarning("Payment {PaymentId} already processed. Returning existing result.", paymentId);
        return new PaymentResult
        {
            PaymentId = paymentId,
            Status = existingPayment.Status,
            TransactionId = existingPayment.TransactionId
        };
    }
 
    // Выполняем платеж и сохраняем результат
    var result = await _paymentGateway.ChargeAsync(paymentId, amount);
    await _paymentRepository.SaveAsync(new Payment
    {
        Id = paymentId,
        Amount = amount,
        Status = result.Status,
        TransactionId = result.TransactionId,
        ProcessedAt = DateTime.UtcNow
    });
 
    return result;
}

Идемпотентность операций



Идемпотентность - это свойство операции давать одинаковый результат при повторном выполнении. Для саг это критично, потому что из-за сетевых сбоев, повторных попыток и параллельной обработки операции могут выполнятся многократно.
Существует несколько подходов к обеспечению идемпотентности:

1. Естественные ключи - использование бизнес-идентификаторов вместо автоинкрементных полей.
2. Проверка перед выполнением - как в примере выше.
3. Условные операции - выполнение только при соблюдении определенных условий.

Для операций с побочными эффектами (например, отправка email) я часто использую паттерн Outbox. Операция сначала сохраняется в локальную таблицу, а затем асинхронно обрабатывается:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public async Task SendOrderConfirmationAsync(Guid orderId)
{
    // Сохраняем в Outbox
    var message = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        Type = "OrderConfirmation",
        Content = JsonSerializer.Serialize(new { OrderId = orderId }),
        Status = OutboxStatus.Pending,
        CreatedAt = DateTime.UtcNow
    };
    
    await _outboxRepository.SaveAsync(message);
    
    // Отправку выполнит фоновый процесс, который гарантирует однократную доставку
}

Тестирование распределенных сценариев



Тестирование саг - это отдельный вид боли. Как проверить, что компенсирующие действия правильно отработают в случае сбоя на шаге 5 из 10? Как смоделировать сетевые сбои и задержки? Я использую многоуровневый подход к тестированию:
1. Модульное тестирование отдельных шагов саги.
2. Интеграционное тестирование с моками внешних сервисов.
3. Хаос-тестирование - намеренное внесение сбоев в работающую систему.

Для моделирования сбоев очень помогает библиотека Polly с фейковыми обработчиками:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[Fact]
public async Task ShouldCompensateWhenPaymentFails()
{
    // Arrange
    var mockInventoryService = new Mock<IInventoryService>();
    var mockPaymentService = new Mock<IPaymentService>();
    
    mockPaymentService
        .Setup(s => s.ChargeAsync(It.IsAny<string>(), It.IsAny<decimal>()))
        .ThrowsAsync(new Exception("Payment gateway unavailable"));
    
    var saga = new OrderSagaOrchestrator(
        new OrderService(_orderRepository),
        mockInventoryService.Object,
        mockPaymentService.Object);
    
    // Act
    var result = await saga.ProcessOrderAsync(new OrderData { /* ... */ });
    
    // Assert
    Assert.False(result);
    mockInventoryService.Verify(
        s => s.ReleaseInventoryAsync(It.IsAny<string>()),
        Times.Once);
}
Для интеграционного тестирования я использую Testcontainers для запуска зависимостей в Docker и Wiremock для моделирования внешних API.

Антипаттерны в реализации Saga



За годы работы с сагами я выделил несколько антипаттернов, которые регулярно вижу в проектах:

1. Хрупкие компенсации - компенсирующие действия, которые сами могут завершиться ошибкой. Всегда нужен механизм повторных попыток и мониторинг "зависших" компенсаций.
2. Отсутствие таймаутов - саги без таймаутов могут "зависнуть" навечно, блокируя ресурсы. Каждый шаг должен иметь разумное время ожидания.
3. Синхронные вызовы - использование блокирующих HTTP-вызовов между сервисами. Это уничтожает масштабируемость и приводит к каскадным сбоям. Лучше использовать асинхронное взаимодействие через сообщения.
4. Монолитная сага - попытка обработать сложный бизнес-процесс одной большой сагой. Лучше разделить процесс на несколько независимых саг, каждая со своей зоной ответственности.
5. Игнорирование консистентности данных - забвение того факта, что во время выполнения саги данные временно несогласованы. Это приводит к некорректным бизнес-решениям.

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Анти-паттерн: Синхронный вызов без таймаута
public async Task<bool> ReserveInventoryAsync(string productId, int quantity)
{
    var response = await _httpClient.PostAsJsonAsync(
        "/api/inventory/reserve",
        new { ProductId = productId, Quantity = quantity });
    
    response.EnsureSuccessStatusCode();
    return true;
}
 
// Лучше:
public async Task<bool> ReserveInventoryAsync(string productId, int quantity)
{
    using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
    
    try 
    {
        var response = await _httpClient.PostAsJsonAsync(
            "/api/inventory/reserve",
            new { ProductId = productId, Quantity = quantity },
            cancellationTokenSource.Token);
        
        response.EnsureSuccessStatusCode();
        return true;
    }
    catch (OperationCanceledException)
    {
        _logger.LogWarning("Inventory reservation timed out");
        return false;
    }
}

Стратегии обработки дублирующихся сообщений



В асинхронной среде с ретраями и сетевыми сбоями дублирование сообщений неизбежно. Вот стратегии, которые я применяю:
1. Уникальные идентификаторы сообщений - каждое сообщение имеет уникальный ID, который используется для отсеивания дубликатов.
2. Журнал обработанных сообщений - хранение ID обработанных сообщений в течение определенного времени.
3. Семантическая дедупликация - проверка на уровне бизнес-логики (например, "этот заказ уже оплачен").
Вот как я реализую стратегии обработки дублирующихся сообщений в реальных проектах:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class MessageDeduplicationService : IMessageDeduplicationService
{
    private readonly IDistributedCache _cache;
    private readonly ILogger<MessageDeduplicationService> _logger;
    private readonly TimeSpan _deduplicationWindow;
 
    public MessageDeduplicationService(
        IDistributedCache cache,
        ILogger<MessageDeduplicationService> logger,
        TimeSpan? deduplicationWindow = null)
    {
        _cache = cache;
        _logger = logger;
        _deduplicationWindow = deduplicationWindow ?? TimeSpan.FromHours(24);
    }
 
    public async Task<bool> IsNewMessageAsync(string messageId)
    {
        // Пытаемся добавить messageId в кеш
        // Если такой ID уже есть - значит, это дубликат
        if (await _cache.GetAsync(GetCacheKey(messageId)) != null)
        {
            _logger.LogWarning("Duplicate message detected: {MessageId}", messageId);
            return false;
        }
 
        // Добавляем ID сообщения в кеш с указанным временем жизни
        await _cache.SetAsync(
            GetCacheKey(messageId),
            Encoding.UTF8.GetBytes("1"),
            new DistributedCacheEntryOptions
            {
                AbsoluteExpirationRelativeToNow = _deduplicationWindow
            });
 
        return true;
    }
 
    private string GetCacheKey(string messageId) => $"msg:{messageId}";
}
Этот сервис легко интегрируется с обработчиками сообщений:

C#
1
2
3
4
5
6
7
8
9
10
11
12
public async Task Handle(PaymentProcessedEvent message, CancellationToken cancellationToken)
{
    // Проверяем, не дубликат ли это
    if (!await _deduplicationService.IsNewMessageAsync(message.MessageId))
    {
        _logger.LogInformation("Skipping duplicate payment event: {MessageId}", message.MessageId);
        return;
    }
 
    // Обрабатываем сообщение
    await _paymentProcessor.ProcessAsync(message);
}
Для больших систем с высокой нагрузкой я обычно использую Redis как распределенный кэш для хранения идентификаторов обработанных сообщений. Это дает высокую производительность и надежность.

Обработка длительных процессов и таймауты



Еще один коварный аспект саг - длительные процессы. В реальном мире некоторые операции могут занимать минуты, часы или даже дни. Например, проверка данных клиента службой безопасности или ожидание подтверждения от внешней системы.

Я сталкнулся с этим при работе над системой кредитования, где процесс одобрения мог занимать до 2 дней. Держать сагу активной все это время неэффективно, поэтому лучшей практикой является разделение длительного процесса на несколько саг с использованием шаблона "ожидание с таймаутом".

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class LoanApprovalSaga : IStateMachine<LoanApplicationState>
{
    private readonly IScheduler _scheduler;
 
    // Остальной код...
 
    private async Task StartApprovalProcess(LoanApplicationData data)
    {
        // Отправляем запрос на проверку
        await _verificationService.StartVerificationAsync(data.ApplicationId);
 
        // Планируем проверку статуса через 24 часа
        await _scheduler.ScheduleAsync(
            new CheckVerificationStatusCommand 
            { 
                ApplicationId = data.ApplicationId 
            },
            DateTime.UtcNow.AddHours(24));
 
        // Переходим в состояние ожидания
        data.CurrentState = LoanApplicationState.VerificationInProgress;
        await _repository.SaveAsync(data);
    }
}
Такой подход позволяет реализовать "длинные" саги без лишнего потребления ресурсов. Для реализации планировщика я обычно использую Quartz.NET или Hangfire.

Обеспечение согласованности при чтении



Один из часто упускаемых аспектов при работе с сагами - это согласованность данных при чтении. Во время выполнения саги данные находятся в промежуточном состоянии, и пользовательские запросы могут получить несогласованные результаты.
Я решаю эту проблему несколькими способами:

1. Паттерн CQRS (Command Query Responsibility Segregation) - разделение команд и запросов. Для запросов я создаю отдельные денормализованные представления данных.
2. Метаданные о состоянии - каждая сущность содержит информацию о своем статусе в рамках саги.
3. Версионирование данных - хранение нескольких версий сущности для разных стадий бизнес-процесса.
Вот пример реализации с метаданными о состоянии:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class OrderQueryService
{
    private readonly IOrderRepository _repository;
 
    public OrderQueryService(IOrderRepository repository)
    {
        _repository = repository;
    }
 
    public async Task<OrderDto> GetOrderAsync(Guid orderId)
    {
        var order = await _repository.GetByIdAsync(orderId);
        
        if (order == null)
            return null;
 
        var dto = new OrderDto
        {
            Id = order.Id,
            CustomerId = order.CustomerId,
            Items = order.Items.Select(i => new OrderItemDto 
            {
                ProductId = i.ProductId,
                Quantity = i.Quantity,
                Price = i.Price
            }).ToList(),
            TotalAmount = order.TotalAmount,
            Status = order.Status,
            // Добавляем метаданные о состоянии саги
            ProcessingState = GetUserFriendlyState(order)
        };
 
        return dto;
    }
 
    private string GetUserFriendlyState(Order order)
    {
        return order.Status switch
        {
            OrderStatus.Created => "Ваш заказ создан",
            OrderStatus.InventoryReserved => "Товары зарезервированы, ожидается оплата",
            OrderStatus.PaymentProcessed => "Оплата получена, заказ обрабатывается",
            OrderStatus.Completed => "Заказ выполнен",
            OrderStatus.Cancelled => "Заказ отменен",
            _ => "Обрабатывается"
        };
    }
}

Проблемы с параллельным выполнением



Когда множество саг выполняется одновременно, конкуренция за ресурсы становится серьезной проблемой. Я столкнулся с этим при реализации системы бронирования, где при высокой нагрузке возникали конфликты, когда несколько саг пытались забронировать один и тот же ресурс.

Для решения этой проблемы я использую несколько подходов:
1. Оптимистичные блокировки - проверка версии ресурса перед его изменением.
2. Шардирование - разбиение ресурсов на группы для уменьшения конкуренции.
3. Очереди обработки - обработка саг, конкурирующих за одни и те же ресурсы, в последовательном порядке.
Вот пример реализации оптимистичной блокировки:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public async Task<bool> ReserveInventoryAsync(string productId, int quantity)
{
    var product = await _productRepository.GetByIdAsync(productId);
    
    if (product.AvailableQuantity < quantity)
        return false;
 
    // Запоминаем версию ресурса
    long originalVersion = product.Version;
    
    product.AvailableQuantity -= quantity;
    
    try
    {
        // Сохраняем только если версия не изменилась
        return await _productRepository.UpdateIfVersionMatchesAsync(
            product, originalVersion);
    }
    catch (DbUpdateConcurrencyException)
    {
        _logger.LogInformation(
            "Concurrency conflict when reserving product {ProductId}", 
            productId);
        return false;
    }
}
Для очередей обработки я использую Partitioned Consumer Pattern, где сообщения с одинаковым ключом всегда обрабатываются одним потребителем:

C#
1
2
3
4
5
6
7
8
9
10
services.AddMassTransit(x =>
{
    x.AddConsumer<ReserveInventoryConsumer>()
        .Partition(4, p =>
        {
            // Сообщения с одинаковым ProductId попадают в одну и ту же очередь
            p.KeySelector = context => 
                context.Message.ProductId;
        });
});

Мониторинг и выявление "зависших" саг



Одна из самых неприятных проблем с сагами - это когда они "зависают" в промежуточном состоянии из-за непредвиденных ошибок или сбоев в системе. Такие саги блокируют ресурсы и могут вызывать недоумение у пользователей.
Для выявления и решения этой проблемы я разрабатываю систему мониторинга саг:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SagaMonitoringService : BackgroundService
{
    private readonly ISagaRepository<OrderSagaData> _repository;
    private readonly ILogger<SagaMonitoringService> _logger;
    private readonly TimeSpan _staleThreshold;
 
    public SagaMonitoringService(
        ISagaRepository<OrderSagaData> repository,
        ILogger<SagaMonitoringService> logger,
        TimeSpan? staleThreshold = null)
    {
        _repository = repository;
        _logger = logger;
        _staleThreshold = staleThreshold ?? TimeSpan.FromHours(1);
    }
 
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                // Находим все саги, которые не обновлялись дольше порогового значения
                var staleSagas = await _repository.FindStaleSagasAsync(_staleThreshold);
                
                foreach (var saga in staleSagas)
                {
                    _logger.LogWarning(
                        "Found stale saga: {SagaId}, state: {State}, last updated: {LastUpdated}",
                        saga.SagaId,
                        saga.CurrentState,
                        saga.UpdatedAt);
                    
                    // Можно добавить автоматический откат или уведомление команды поддержки
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error in saga monitoring service");
            }
 
            // Проверяем раз в 5 минут
            await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
        }
    }
}
Этот сервис запускается как фоновая задача и периодически проверяет все активные саги. Если находятся саги, которые не обновлялись дольше определенного времени, сервис может автоматически предпринять действия или уведомить администраторов.

Заключение



Вот мы и прошли полный цикл создания и управления распределенными транзакциями с помощью паттерна Saga в C#. Как видно из статьи, это не просто теоретическая концепция, а практичный, хоть и не лишенный сложностей, подход к решению реальных проблем микросервисной архитектуры.

Напомню основные выводы: ACID-транзакции, которые так хорошо работают в монолитных приложениях, практически бесполезны когда дело касается взаимодействия между независимыми сервисами. Вместо этого мы используем последовательность локальных транзакций с компенсирующими действиями - то есть паттерн Saga.

Ключевое преимущество саг в том, что они позволяют сохранять автономность сервисов и избегать длительных блокировок ресурсов. Но за это приходится платить сложностью реализации и временной несогласованностью данных. Это компромисс, который приходится принимать в распределенных системах. Для C# разработчиков .NET экосистема предлагает богатый выбор инструментов: от самописных решений до мощных библиотек вроде MassTransit, Rebus и NServiceBus. Каждый из этих подходов имеет свои сильные стороны, и выбор зависит от конкретных требований проекта, его масштаба и сложности.

Я надеюсь, что мои примеры кода и описания подводных камней помогут вам избежать многих ошибок которые я совершил, когда только начинал работать с сагами. Особенно важно уделять внимание идемпотентности операций, обработке частичных сбоев и мониторингу выполнения саг.

Ошибка при работе с транзакциями mssql
Периодически получаю ошибку при работе с файлами из базы. Используется Entity Framework, на базе...

Реализовать алгоритм с транзакциями
Помогите, пожалуйста, с такой задачей. Разный порядок блокирования строк таблиц. Простой пример....

Программа-тестировщик, вопрос по архитектуре
Привет. Мне по учебе нужно написать программу-тестировщик для оценки знаний студентов. ...

Литература по архитектуре, оптимизаци, паттернам, антипаттернам
Ну собственно из названия темы все понятно. Многие книги в основном посвящены синтаксису и...

Разработка эмулятора вычислительной машины в классической архитектуре Фон Неймана
Написать эмулятор вычислительной машины в классической архитектуре фон Неймана, которая реализует...

Избавиться от зависимости порядка обработки систем в архитектуре приложения
Здравствуйте, предположим, что в нашем гипотетическом приложении есть системы Водитель, Машина,...

Консультация по архитектуре UI приложения
Здравствуйте! Не знаю реально ли, но может кто-то откликнется. Реализую UI приложение на C#....

Репозиторий в n-tier / 3-layer архитектуре
Передо мной стоит задача сделать проект с заменяемой ORM. Тестирую с nHibernate и Entity Framework....

Понятие об архитектуре "клиент-сервер"
Доброго времени суток, господа! В общем, ловите от чайника следующий вопрос. 5 лет изучал, но...

Не понятен один момент по архитектуре MVC
Экземпляр контроллера создаётся для каждого View или один на все? Откуда такой вопрос: Я пилю...

ООП и БД. Нужен совет, пояснение по архитектуре
Добрый день, друзья. При попытке написания теплого, правильного, лампового кода для приложения с бд...

Библиотека на ARM архитектуре
У меня есть программа для Windows Phone 8, и она работает на x86 архитектуре, но когда я делаю...

Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Всего комментариев 0
Комментарии
 
Новые блоги и статьи
Новый ноутбук
volvo 07.12.2025
Всем привет. По скидке в "черную пятницу" взял себе новый ноутбук Lenovo ThinkBook 16 G7 на Амазоне: Ryzen 5 7533HS 64 Gb DDR5 1Tb NVMe 16" Full HD Display Win11 Pro
Музыка, написанная Искусственным Интеллектом
volvo 04.12.2025
Всем привет. Некоторое время назад меня заинтересовало, что уже умеет ИИ в плане написания музыки для песен, и, собственно, исполнения этих самых песен. Стихов у нас много, уже вышли 4 книги, еще 3. . .
От async/await к виртуальным потокам в Python
IndentationError 23.11.2025
Армин Ронахер поставил под сомнение async/ await. Создатель Flask заявляет: цветные функции - провал, виртуальные потоки - решение. Не threading-динозавры, а новое поколение лёгких потоков. Откат?. . .
Поиск "дружественных имён" СОМ портов
Argus19 22.11.2025
Поиск "дружественных имён" СОМ портов На странице: https:/ / norseev. ru/ 2018/ 01/ 04/ comportlist_windows/ нашёл схожую тему. Там приведён код на С++, который показывает только имена СОМ портов, типа,. . .
Сколько Государство потратило денег на меня, обеспечивая инсулином.
Programma_Boinc 20.11.2025
Сколько Государство потратило денег на меня, обеспечивая инсулином. Вот решила сделать интересный приблизительный подсчет, сколько государство потратило на меня денег на покупку инсулинов. . . .
Ломающие изменения в C#.NStar Alpha
Etyuhibosecyu 20.11.2025
Уже можно не только тестировать, но и пользоваться C#. NStar - писать оконные приложения, содержащие надписи, кнопки, текстовые поля и даже изображения, например, моя игра "Три в ряд" написана на этом. . .
Мысли в слух
kumehtar 18.11.2025
Кстати, совсем недавно имел разговор на тему медитаций с людьми. И обнаружил, что они вообще не понимают что такое медитация и зачем она нужна. Самые базовые вещи. Для них это - когда просто люди. . .
Создание Single Page Application на фреймах
krapotkin 16.11.2025
Статья исключительно для начинающих. Подходы оригинальностью не блещут. В век Веб все очень привыкли к дизайну Single-Page-Application . Быстренько разберем подход "на фреймах". Мы делаем одну. . .
Фото: Daniel Greenwood
kumehtar 13.11.2025
Расскажи мне о Мире, бродяга
kumehtar 12.11.2025
— Расскажи мне о Мире, бродяга, Ты же видел моря и метели. Как сменялись короны и стяги, Как эпохи стрелою летели. - Этот мир — это крылья и горы, Снег и пламя, любовь и тревоги, И бескрайние. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru