Переход от монолитной архитектуры к микросервисам принес множество преимуществ: гибкость разработки, независимость развертывания и масштабирования отдельных компонентов. Однако этот переход создал и новые вызовы, среди которых особое место занимает проблема управления транзакциями, охватывающими несколько сервисов. Ситуация, которая легко решалась в монолите с помощью классических ACID-транзакций превратилась в комплексную головоломку.
Проблематика распределенных транзакций
Типичный сценарий онлайн-магазина: пользователь размещает заказ, система должна проверить наличие товара на складе, зарезервировать его, проверить платежные данные клиента, авторизовать оплату и создать заказ. В монолите это выполнялось в рамках одной транзакции:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| using (var transaction = new TransactionScope())
{
var stockService = new StockService();
var paymentService = new PaymentService();
var orderService = new OrderService();
// Проверка и резервирование товара
stockService.ReserveItem(itemId, quantity);
// Проверка и обработка платежа
paymentService.ProcessPayment(userId, amount);
// Создание заказа
orderService.CreateOrder(userId, itemId, quantity);
transaction.Complete();
} |
|
В микросервисной архитектуре каждый сервис управляет своими данными, что делает такой подход невозможным. Эта проблема имеет множество граней, которые необходимо осознать перед поиском решения.
Сложности поддержания целостности данных в микросервисах
В микросервисной архитектуре данные распределены между разными сервисами, каждый из которых имеет собственное хранилище. Принцип "Одна база данных на сервис" стал почти догмой. Он обеспечивает независимость и инкапсуляцию, но создает проблему: как гарантировать целостность данных при операциях, затрагивающих несколько сервисов? Сложность усугубляется, когда сервисы используют разные типы баз данных: реляционные, документоориентированные, графовые или ключ-значение. Универсального механизма для управления транзакциями между столь разными технологиями просто не существует.
Ограничения классической модели ACID-транзакций
ACID-свойства (атомарность, согласованность, изолированность, долговечность) транзакций, которые так ценны в монолитных приложениях, практически недоступны в распределенной среде. Особенно критично отсутствие атомарности: либо все операции выполняются успешно, либо ни одна. Традиционный подход к распределенным транзакциям - двухфазный коммит (2PC) - имеет существенные ограничения:
1. Высокая связность — все участники транзакции должны быть доступны для её завершения.
2. Снижение производительности — система блокирует ресурсы до завершения всей транзакции.
3. Технологические ограничения — многие современные NoSQL базы данных (MongoDB, Cassandra) и брокеры сообщений (RabbitMQ, Kafka) просто не поддерживают протокол 2PC.
Примечательно, что использование двухфазного коммита также приводит к снижению доступности системы в целом. Если транзакция охватывает два сервиса с доступностью 99.5% каждый, то общая доступность упадет до 99%.
Исторический контекст возникновения проблемы
Проблема распределенных транзакций была известна задолго до микросервисов. Еще в 1988 году Джим Грей и Андреас Ройтер описали модель распределенной транзакции X/Open DTP, которая впоследствии стала стандартом.
Но настоящий всплеск интереса к этой проблеме возник с популяризацией микросервисной архитектуры в 2010-х годах. Сервисы стали меньше, их стало больше, и потребность в надежных механизмах координации между ними резко возросла.
Современные архитектурные паттерны и их влияние
Современная микросервисная архитектура делает акцент на слабую связность и высокую когезию. Это означает, что сервисы должны быть максимально независимы друг от друга, но при этом каждый сервис должен полностью отвечать за свою предметную область. Такой подход приводит к появлению "границ согласованности" (consistency boundaries), которые совпадают с границами сервисов. Внутри сервиса данные согласованы по принципам ACID, но между сервисами применяется принцип "согласованности в конечном счете" (eventual consistency). Это заметно влияет на дизайн системы. Например вместо нормализованной модели данных, распределенной между сервисами, часто используется денормализация и дублирование данных:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| // Сервис пользователей имеет полную информацию о пользователе
public class User
{
public Guid Id { get; set; }
public string Email { get; set; }
public string Name { get; set; }
public Address Address { get; set; }
// ...
}
// Сервис заказов хранит необходимые данные о пользователе
public class OrderCustomer
{
public Guid UserId { get; set; }
public string Email { get; set; }
public string Name { get; set; }
// Минимально необходимая информация
} |
|
Влияние распределенных транзакций на производительность
Распределенные транзакции оказывают значительное влияние на производительность системы. Традиционный двухфазный коммит требует дополнительных сетевых взаимодействий между участниками транзакции, что увеличивает задержки. В высоконагруженных системах даже небольшое увеличение времени отклика может привести к существенному снижению пропускной способности. Например, если среднее время обработки запроса составляет 100 мс, а распределенная транзакция добавляет еще 50 мс, то максимальная пропускная способность системы падает на треть. Кроме того, длительные блокировки ресурсов во время выполнения распределенной транзакции могут привести к каскадным проблемам производительности: другие запросы ждут освобождения заблокированных ресурсов, а очередь запросов растет.
С учетом всех этих сложностей становится очевидным, что классический подход к транзакциям через двухфазный коммит часто неприменим в современной микросервисной архитектуре. Необходимы альтернативные решения, и одним из наиболее перспективных является паттерн SAGA.
Почему паттерн абстрактная фабрика - паттерн уровня объектов, если в нём могут быть статические отношения? Взято из Шевчук А., Охрименко Д., Касьянов А. Design Patterns via C#. Приемы... Распределенные вычисления Итак. надо написать простенькую прожку аля-калькулятор, но таким образом что бы в вычислениях было... Моделирование случайных величин, Равномерно распределенные случайные величины 1. Моделирование случайных величин.
Подсчитать относительную частоту появления каждого из чисел... Моделирование случайных величин. Равномерно распределенные случайные величины Помогите пожалуйста с задачкой, очень нужно!
Создать программу, в которой реализовать генерацию...
Паттерн SAGA как альтернатива
Учитывая ограничения классических распределенных транзакций, разработчики микросервисных архитектур нуждались в альтернативном подходе, который позволил бы обеспечить согласованность данных без жертвования автономностью сервисов. Так на передний план вышел паттерн SAGA – механизм поддержания целостности данных в распределенных системах без использования двухфазного коммита.
Определение и основные концепции
SAGA – это паттерн проектирования, представляющий собой последовательность локальных транзакций, где каждая транзакция обновляет данные только в рамках одного сервиса, используя привычные ACID-механизмы. При этом завершение одной локальной транзакции инициирует следующую, формируя цепочку взаимосвязанных операций. Ключевая особенность SAG – отсутствие изоляции между локальными транзакциями. Это означает, что промежуточные результаты становятся видимыми для остальной системы сразу после завершения каждой локальной транзакции, а не после окончания всей цепочки. Фактически, паттерн SAGA реализует модель ACD (атомарность, согласованность, долговечность), отказываясь от свойства изоляции.
Рассмотрим классический пример создания заказа в электронной коммерции с использованием SAGA:
1. Сервис заказов создает заказ в статусе "ожидает подтверждения".
2. Сервис клиентов проверяет, может ли клиент разместить заказ.
3. Сервис склада проверяет наличие товаров и создает резервирование.
4. Сервис платежей авторизует кредитную карту клиента.
5. Сервис склада изменяет статус резервирования на "ожидает отправки".
6. Сервис заказов меняет статус заказа на "подтвержден".
Каждый шаг – отдельная транзакция, выполняемая в контексте своего сервиса. Проблема возникает при сбое на любом из этапов – например, если авторизация карты не проходит на четвертом шаге. В этом случае необходимо отменить все уже выполненные шаги, для чего SAGA использует компенсирующие транзакции.
Хореография vs оркестрация
Существует два основных подхода к организации взаимодействия между участниками SAGA: хореография и оркестрация.
При хореографическом подходе сервисы обмениваются событиями (events), и каждый участник "подписывается" на события, которые его касаются. Решения о следующих шагах распределены между участниками, и нет центральной точки координации. Каждый сервис публикует события о своих действиях, которые могут инициировать действия других сервисов.
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| // Пример публикации события в сервисе заказов
public async Task CreateOrder(CreateOrderCommand command)
{
var order = new Order
{
Id = Guid.NewGuid(),
CustomerId = command.CustomerId,
Items = command.Items,
Status = OrderStatus.Pending
};
await _orderRepository.AddAsync(order);
// Публикация события для других сервисов
await _eventBus.PublishAsync(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
Items = order.Items
});
} |
|
При оркестрационном подходе существует центральный координатор (оркестратор), который знает все шаги процесса и направляет участников через команды. Оркестратор сохраняет состояние SAGA и определяет, какие действия выполнять в зависимости от успеха или неудачи предыдущих шагов.
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| // Пример оркестратора для создания заказа
public class CreateOrderSaga : ISaga<CreateOrderSagaState>
{
public Task<CreateOrderSagaState> InitiateAsync(CreateOrderCommand command)
{
return Task.FromResult(new CreateOrderSagaState
{
OrderId = Guid.NewGuid(),
CustomerId = command.CustomerId,
Items = command.Items,
Step = CreateOrderSagaStep.Started
});
}
public Task<CreateOrderSagaState> HandleAsync(CreateOrderSagaState state, object message)
{
switch (state.Step)
{
case CreateOrderSagaStep.Started:
return HandleStarted(state, message as OrderCreatedEvent);
case CreateOrderSagaStep.CustomerVerified:
return HandleCustomerVerified(state, message as CustomerVerifiedEvent);
// Другие шаги...
}
throw new InvalidOperationException($"Unknown step: {state.Step}");
}
// Методы обработки для каждого шага...
} |
|
У каждого подхода есть свои преимущества:
Хореография:- Простота – сервисы просто публикуют события при изменении своего состояния.
- Слабая связность – участники не знают напрямую друг о друге.
- Естественное распределение ответственности без централизованного контроля.
Оркестрация:- Более ясная логика процесса, сосредоточенная в одном месте.
- Отсутствие циклических зависимостей между сервисами.
- Упрощенная отладка и мониторинг процесса.
- Четкое разделение бизнес-логики и координации.
Связь SAGA с другими паттернами проектирования микросервисной архитектуры
SAGA редко используется изолированно – обычно она интегрируется с другими паттернами микросервисной архитектуры, формируя комплексное решение. Особенно тесно SAGA взаимодействует с Event Sourcing и CQRS (Command Query Responsibility Segregation).
Event Sourcing хранит состояние приложения как последовательность событий, что естественным образом сочетается с SAGA, которая также опирается на события для координации действий:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // Пример интеграции с Event Sourcing
public class OrderAggregate : EventSourcedAggregate
{
private OrderState _state;
public void Process(CreateOrderCommand command)
{
// Проверка возможности создания заказа
if (_state.Status != OrderStatus.None)
throw new InvalidOperationException("Order already exists");
// Применение события, которое будет сохранено в event store
ApplyEvent(new OrderCreatedEvent(command.OrderId, command.CustomerId));
}
// Обработчик события
private void Apply(OrderCreatedEvent evt)
{
_state.Id = evt.OrderId;
_state.CustomerId = evt.CustomerId;
_state.Status = OrderStatus.Pending;
}
} |
|
CQRS разделяет операции чтения и записи, что помогает справиться с проблемой различных консистентностных требований к операциям в SAGA. Команды обрабатываются через SAGA с сильными гарантиями согласованности, а запросы могут работать с денормализованными представлениями данных:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| // Пример команды, запускающей SAGA
public class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
{
private readonly ISagaCoordinator _sagaCoordinator;
public async Task HandleAsync(CreateOrderCommand command)
{
// Запуск SAGA через координатор
await _sagaCoordinator.StartSagaAsync<CreateOrderSaga>(command);
}
}
// Пример запроса, использующего денормализованное представление
public class GetOrderQueryHandler : IQueryHandler<GetOrderQuery, OrderDto>
{
private readonly IReadDbContext _readDb;
public async Task<OrderDto> HandleAsync(GetOrderQuery query)
{
// Чтение из оптимизированной для запросов базы
return await _readDb.Orders
.Include(o => o.Items)
.Where(o => o.Id == query.OrderId)
.ProjectTo<OrderDto>()
.FirstOrDefaultAsync();
}
} |
|
Не менее важна интеграция SAGA с паттерном Outbox, который обеспечивает надежную публикацию событий даже при сбоях. Этот паттерн особенно критичен для хореографического подхода, где события – основной механизм координации.
Преимущества и недостатки паттерна SAGA по сравнению с двухфазным коммитом
SAGA предлагает компромисс между строгой согласованностью и автономностью сервисов. В отличие от двухфазного коммита, который предоставляет ACID-гарантии ценой высокой связности и снижения доступности, SAGA жертвует изоляцией ради независимости сервисов.
Преимущества SAGA:
1. Поддержка гетерогенной инфраструктуры – можно комбинировать различные типы баз данных и сервисов.
2. Повышенная доступность – отдельный сервис может продолжать работу даже если другие сервисы недоступны.
3. Улучшенная масштабируемость – отсутствие блокировок на уровне базы данных позволяет эффективнее обрабатывать параллельные запросы.
4. Согласованность бизнес-процессов – SAGA естественным образом моделирует бизнес-транзакции, которые часто не требуют строгой изоляции.
Недостатки SAGA:
1. Усложнение бизнес-логики – необходимость писать компенсирующие транзакции для всех операций.
2. Отсутствие изоляции – промежуточные результаты видны сразу, что может привести к аномалиям.
3. Сложность отладки – распределенная природа SAGA затрудняет отслеживание проблем.
4. Повышенные требования к идемпотентности – операции должны быть идемпотентными для корректной работы при повторах.
Сравнивая эти два подхода на практическом примере:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| // Двухфазный коммит (при условии поддержки XA)
public void ProcessOrder_2PC(Order order)
{
using (var scope = new TransactionScope(
TransactionScopeOption.Required,
new TransactionOptions { IsolationLevel = IsolationLevel.Serializable }))
{
_orderService.CreateOrder(order);
_inventoryService.ReserveInventory(order);
_paymentService.ProcessPayment(order);
scope.Complete();
} // При ошибке автоматически выполняется откат
}
// SAGA (упрощенно)
public async Task ProcessOrder_Saga(Order order)
{
var orderCreated = await _orderService.CreateOrderAsync(order);
if (!orderCreated.Success)
return; // Первый шаг не выполнен
var inventoryReserved = await _inventoryService.ReserveInventoryAsync(order);
if (!inventoryReserved.Success)
{
// Компенсирующая транзакция
await _orderService.CancelOrderAsync(order.Id);
return;
}
var paymentProcessed = await _paymentService.ProcessPaymentAsync(order);
if (!paymentProcessed.Success)
{
// Компенсирующие транзакции
await _inventoryService.ReleaseInventoryAsync(order);
await _orderService.CancelOrderAsync(order.Id);
return;
}
// Все шаги успешно выполнены
} |
|
Масштабирование SAGA при увеличении количества микросервисов
По мере роста системы и увеличения числа микросервисов, SAGA сталкивается с новыми вызовами. Наиболее очевидная проблема – рост сложности при добавлении новых шагов и участников в процесс. При хореографическом подходе добавление нового сервиса, который должен участвовать в транзакции, требует модификации существующих сервисов для отправки и обработки новых событий. Это может привести к "эффекту домино", когда изменение одного сервиса вызывает необходимость изменений в нескольких других. Оркестрационный подход лучше справляется с такими сценариями, поскольку изменения локализуются в оркестраторе, но и он может стать сложным при большом количестве шагов и условных переходов.
Несколько стратегий помогают справиться с проблемами масштабирования SAGA:
1. Декомпозиция сложных SAGA на несколько более простых – например, разделение процесса оформления заказа на несколько подпроцессов: проверка клиента, резервирование товара, обработка платежа
2. Использование иерархии оркестраторов – родительский оркестратор вызывает дочерние оркестраторы для обработки подпроцессов:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| public class OrderProcessingSaga : Saga<OrderSagaData>,
IAmStartedByMessages<StartOrderProcessing>
{
private readonly IServiceBus _bus;
public OrderProcessingSaga(IServiceBus bus)
{
_bus = bus;
}
public async Task Handle(StartOrderProcessing message)
{
Data.OrderId = message.OrderId;
// Запуск дочерней SAGA для проверки клиента
await _bus.SendAsync(new StartCustomerVerification {
OrderId = message.OrderId,
CustomerId = message.CustomerId
});
// Основная SAGA ждет завершения дочерних процессов
}
// Обработчики завершения дочерних SAGA...
} |
|
3. Применение паттерна Process Manager – разновидность оркестратора, который поддерживает более сложную бизнес-логику и может динамически определять следующие шаги на основе результатов предыдущих
4. Использование специализированных фреймворков – инструменты вроде MassTransit, NServiceBus или Automatonymous предоставляют функционал для управления состоянием саги, обработки ошибок и повторов:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| // Пример использования MassTransit для создания SAGA
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
InstanceState(x => x.CurrentState);
Initially(
When(OrderSubmitted)
.Then(context => {
context.Instance.OrderId = context.Data.OrderId;
context.Instance.CustomerId = context.Data.CustomerId;
})
.TransitionTo(Submitted)
.Send(context => new VerifyCustomer {
OrderId = context.Instance.OrderId,
CustomerId = context.Instance.CustomerId
})
);
During(Submitted,
When(CustomerVerified)
.TransitionTo(Verified)
.Send(context => new ReserveInventory {
OrderId = context.Instance.OrderId,
Items = context.Data.Items
})
);
// Другие состояния и переходы...
}
public State Submitted { get; private set; }
public State Verified { get; private set; }
public Event<OrderSubmittedEvent> OrderSubmitted { get; private set; }
public Event<CustomerVerifiedEvent> CustomerVerified { get; private set; }
} |
|
Растущие системы также могут сталкиваться с проблемами производительности при обработке большого количества SAGA-транзакций. В таких случаях применяются стратегии горизонтального масштабирования с шардированием оркестраторов по идентификатору транзакции и использованием распределенных брокеров сообщений.
Реализация SAGA на C#
Теоретическое понимание паттерна SAGA важно, но истинная ценность появляется при переходе к практике. Рассмотрим конкретные реализации SAGA на C# – как хореографического, так и оркестрационного подходов, с рабочими примерами кода и анализом архитектурных решений.
Пример реализации хореографического подхода
При хореографическом подходе участники SAGA взаимодействуют через события, без центрального координатора. Этот подход естественно вписывается в архитектуру, ориентированную на события (Event-Driven Architecture).
Начнем с определения базовых интерфейсов для событий и их обработчиков:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| public interface IEvent
{
Guid CorrelationId { get; }
DateTime Timestamp { get; }
}
public interface IEventHandler<in TEvent> where TEvent : IEvent
{
Task HandleAsync(TEvent @event);
}
public interface IEventBus
{
Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;
void Subscribe<TEvent, THandler>()
where TEvent : IEvent
where THandler : IEventHandler<TEvent>;
} |
|
Далее определим конкретные события для нашего процесса создания заказа:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| public class OrderCreatedEvent : IEvent
{
public Guid CorrelationId { get; set; } // Совпадает с OrderId
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
public Guid CustomerId { get; set; }
public List<OrderItem> Items { get; set; }
public decimal TotalAmount { get; set; }
}
public class CustomerVerifiedEvent : IEvent
{
public Guid CorrelationId { get; set; }
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
public Guid CustomerId { get; set; }
public bool IsVerified { get; set; }
public string RejectionReason { get; set; }
}
// Аналогично для других событий процесса |
|
Ключевым элементом в событиях является CorrelationId – идентификатор, связывающий все события одной SAGA транзакции. Обычно этот идентификатор совпадает с Id основного бизнес-объекта (в нашем случае – OrderId).
Теперь реализуем обработчики событий для каждого сервиса:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| // В сервисе клиентов
public class OrderCreatedEventHandler : IEventHandler<OrderCreatedEvent>
{
private readonly ICustomerRepository _customerRepository;
private readonly IEventBus _eventBus;
public OrderCreatedEventHandler(
ICustomerRepository customerRepository,
IEventBus eventBus)
{
_customerRepository = customerRepository;
_eventBus = eventBus;
}
public async Task HandleAsync(OrderCreatedEvent @event)
{
var customer = await _customerRepository.GetByIdAsync(@event.CustomerId);
var isVerified = customer != null && customer.Status == CustomerStatus.Active;
var rejectionReason = !isVerified ? "Customer not found or inactive" : null;
await _eventBus.PublishAsync(new CustomerVerifiedEvent
{
CorrelationId = @event.CorrelationId,
CustomerId = @event.CustomerId,
IsVerified = isVerified,
RejectionReason = rejectionReason
});
}
} |
|
Важно обеспечить надежную доставку и обработку сообщений. В реальных системах это обычно достигается с помощью брокеров сообщений, таких как RabbitMQ или Kafka и паттерна Outbox:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| public class OutboxEventBus : IEventBus
{
private readonly IOutboxRepository _outboxRepository;
private readonly IServiceProvider _serviceProvider;
public OutboxEventBus(
IOutboxRepository outboxRepository,
IServiceProvider serviceProvider)
{
_outboxRepository = outboxRepository;
_serviceProvider = serviceProvider;
}
public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
// Сохраняем событие в таблицу Outbox в той же транзакции,
// что и основные изменения в базе данных
await _outboxRepository.AddAsync(new OutboxMessage
{
Id = Guid.NewGuid(),
CorrelationId = @event.CorrelationId,
Type = typeof(TEvent).FullName,
Data = JsonSerializer.Serialize(@event),
CreatedAt = DateTime.UtcNow
});
// Отправка выполняется асинхронно фоновым процессом
}
// Метод подписки...
} |
|
Полный поток хореографической SAGA можно представить диаграммой последовательности действий:
1. OrderService создает заказ и публикует OrderCreatedEvent.
2. CustomerService получает событие, проверяет клиента и публикует CustomerVerifiedEvent.
3. InventoryService получает OrderCreatedEvent, проверяет наличие товаров и публикует InventoryReservedEvent.
4. PaymentService получает CustomerVerifiedEvent и InventoryReservedEvent, обрабатывает платеж и публикует PaymentProcessedEvent.
5. InventoryService получает PaymentProcessedEvent и подтверждает резервирование.
6. OrderService получает PaymentProcessedEvent и подтверждает заказ.
При сбое на любом этапе публикуется соответствующее событие об ошибке, которое запускает компенсирующие действия.
Пример реализации оркестрации с использованием MediatR
Оркестрационный подход имеет центрального координатора – оркестратор SAGA, который управляет всем процессом. В C# для реализации этого подхода удобно использовать библиотеку MediatR, которая упрощает отправку команд и запросов между компонентами. Начнем с определения состояния SAGA и команд для взаимодействия:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| public class CreateOrderSagaState
{
public Guid Id { get; set; }
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public List<OrderItem> Items { get; set; }
public decimal TotalAmount { get; set; }
public CreateOrderSagaStatus Status { get; set; }
public string ErrorMessage { get; set; }
// Дополнительные данные для каждого шага
public bool IsCustomerVerified { get; set; }
public bool IsInventoryReserved { get; set; }
public bool IsPaymentProcessed { get; set; }
}
public enum CreateOrderSagaStatus
{
NotStarted,
OrderCreated,
CustomerVerified,
CustomerVerificationFailed,
InventoryReserved,
InventoryReservationFailed,
PaymentProcessed,
PaymentFailed,
OrderCompleted,
OrderFailed
} |
|
Оркестратор SAGA на базе MediatR может выглядеть так:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
| public class CreateOrderSaga :
IRequestHandler<StartCreateOrderSagaCommand, Guid>,
INotificationHandler<CustomerVerifiedEvent>,
INotificationHandler<InventoryReservedEvent>,
INotificationHandler<PaymentProcessedEvent>,
INotificationHandler<SagaFailedEvent>
{
private readonly ISagaRepository<CreateOrderSagaState> _sagaRepository;
private readonly IMediator _mediator;
public CreateOrderSaga(
ISagaRepository<CreateOrderSagaState> sagaRepository,
IMediator mediator)
{
_sagaRepository = sagaRepository;
_mediator = mediator;
}
public async Task<Guid> Handle(StartCreateOrderSagaCommand request, CancellationToken cancellationToken)
{
// Создание состояния SAGA и заказа
var state = new CreateOrderSagaState
{
Id = Guid.NewGuid(),
OrderId = Guid.NewGuid(),
CustomerId = request.CustomerId,
Items = request.Items,
TotalAmount = request.Items.Sum(i => i.Price * i.Quantity),
Status = CreateOrderSagaStatus.NotStarted
};
await _sagaRepository.SaveAsync(state);
// Создание заказа
await _mediator.Send(new CreateOrderCommand
{
OrderId = state.OrderId,
CustomerId = state.CustomerId,
Items = state.Items,
TotalAmount = state.TotalAmount,
SagaId = state.Id
});
return state.Id;
}
public async Task Handle(CustomerVerifiedEvent notification, CancellationToken cancellationToken)
{
var state = await _sagaRepository.GetByIdAsync(notification.SagaId);
if (notification.IsVerified)
{
state.Status = CreateOrderSagaStatus.CustomerVerified;
state.IsCustomerVerified = true;
// Следующий шаг - резервирование товаров
await _mediator.Send(new ReserveInventoryCommand
{
OrderId = state.OrderId,
Items = state.Items,
SagaId = state.Id
});
}
else
{
state.Status = CreateOrderSagaStatus.CustomerVerificationFailed;
state.ErrorMessage = notification.RejectionReason;
// Компенсация - отмена заказа
await _mediator.Send(new CancelOrderCommand
{
OrderId = state.OrderId,
Reason = notification.RejectionReason,
SagaId = state.Id
});
}
await _sagaRepository.SaveAsync(state);
}
// Другие обработчики событий...
} |
|
Для сохранения состояния SAGA обычно используется выделенное хранилище – реляционная база данных или документоориентированная. Это критически важно для восстановления при сбоях:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| public class SagaRepository<TSaga> : ISagaRepository<TSaga> where TSaga : class
{
private readonly SagaDbContext _dbContext;
public SagaRepository(SagaDbContext dbContext)
{
_dbContext = dbContext;
}
public async Task<TSaga> GetByIdAsync(Guid id)
{
return await _dbContext.Set<TSaga>().FindAsync(id);
}
public async Task SaveAsync(TSaga saga)
{
var entry = _dbContext.Entry(saga);
if (entry.State == EntityState.Detached)
await _dbContext.Set<TSaga>().AddAsync(saga);
await _dbContext.SaveChangesAsync();
}
} |
|
Важная часть оркестрации – обработка ошибок и таймаутов. Для этого можно использовать механизм повторных попыток с экспоненциальной задержкой через библиотеку Polly:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| public class SagaRetryPolicy<TCommand>
{
private readonly IMediator _mediator;
private readonly ISagaRepository<CreateOrderSagaState> _repository;
public SagaRetryPolicy(
IMediator mediator,
ISagaRepository<CreateOrderSagaState> repository)
{
_mediator = mediator;
_repository = repository;
}
public async Task ExecuteWithRetryAsync(TCommand command, Guid sagaId)
{
var policy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: async (exception, timeSpan, retryCount, context) =>
{
var state = await _repository.GetByIdAsync(sagaId);
state.ErrorMessage = $"Retry {retryCount}: {exception.Message}";
await _repository.SaveAsync(state);
});
await policy.ExecuteAsync(() => _mediator.Send(command));
}
} |
|
Оркестрационный подход обеспечивает более централизованное управление процессом, что упрощает мониторинг и отладку. Однако он создает зависимость между оркестратором и всеми участниками SAGA, что может снизить автономность сервисов.
Интеграция SAGA с брокерами сообщений (RabbitMQ, Kafka) на C#
Для эффективной реализации SAGA в распределенной среде необходим надежный механизм обмена сообщениями между сервисами. Брокеры сообщений, такие как RabbitMQ и Kafka, предоставляют необходимую инфраструктуру для асинхронной коммуникации. RabbitMQ отлично подходит для реализации SAGA благодаря поддержке различных моделей обмена сообщениями. Рассмотрим пример интеграции с использованием клиентской библиотеки MassTransit:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| public class CreateOrderSagaRabbitMqSetup : IHostedService
{
private readonly IBusControl _busControl;
private readonly IServiceProvider _serviceProvider;
public CreateOrderSagaRabbitMqSetup(
IBusControl busControl,
IServiceProvider serviceProvider)
{
_busControl = busControl;
_serviceProvider = serviceProvider;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
// Настройка маршрутизации сообщений
_busControl.ConnectReceiveEndpoint("create-order-saga", endpoint =>
{
// Регистрация сообщений для оркестратора SAGA
endpoint.Instance(_serviceProvider.GetRequiredService<CreateOrderSaga>());
// Определение стратегии повторов
endpoint.UseMessageRetry(r => r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)));
// Автоматический перемещение ошибочных сообщений в очередь ошибок
endpoint.UseDelayedRedelivery(r => r.Intervals(
TimeSpan.FromMinutes(5),
TimeSpan.FromMinutes(15),
TimeSpan.FromMinutes(30)
));
});
await _busControl.StartAsync(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _busControl.StopAsync(cancellationToken);
}
} |
|
В случае Kafka, который ориентирован на потоковую обработку данных с возможностью воспроизведения сообщений, интеграция может выглядеть так:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| public class KafkaSagaEventConsumer : BackgroundService
{
private readonly IConsumer<Ignore, string> _consumer;
private readonly IMediator _mediator;
private readonly ILogger<KafkaSagaEventConsumer> _logger;
public KafkaSagaEventConsumer(
IConsumer<Ignore, string> consumer,
IMediator mediator,
ILogger<KafkaSagaEventConsumer> logger)
{
_consumer = consumer;
_mediator = mediator;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("order-saga-events");
try
{
while (!stoppingToken.IsCancellationRequested)
{
var result = _consumer.Consume(stoppingToken);
var eventJson = result.Message.Value;
try
{
// Десериализация события
var baseEvent = JsonSerializer.Deserialize<BaseEvent>(eventJson);
// Динамическая маршрутизация события на основе его типа
Type eventType = Type.GetType(baseEvent.EventType);
var typedEvent = JsonSerializer.Deserialize(eventJson, eventType);
// Отправка события через MediatR
await _mediator.Publish(typedEvent, stoppingToken);
// Подтверждение обработки
_consumer.Commit(result);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing saga event: {EventJson}", eventJson);
}
}
}
finally
{
_consumer.Close();
}
}
} |
|
При работе с брокерами сообщений важно учитывать свойства доставки сообщений. Для SAGA критичны гарантии "at-least-once delivery" (доставка хотя бы один раз), что требует идемпотентной обработки сообщений для защиты от дубликатов.
Мониторинг выполнения SAGA-транзакций в реальном времени
Мониторинг распределенных транзакций – нетривиальная задача из-за их асинхронной и распределенной природы. Для этого можно создать специализированный сервис мониторинга, который отслеживает состояние SAGA и предоставляет актуальную информацию:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| public class SagaMonitoringService
{
private readonly IDatabase _redisDb;
private readonly IServiceScopeFactory _scopeFactory;
public SagaMonitoringService(
IConnectionMultiplexer redis,
IServiceScopeFactory scopeFactory)
{
_redisDb = redis.GetDatabase();
_scopeFactory = scopeFactory;
}
public async Task TrackSagaProgressAsync<TState>(Guid sagaId, TState state, string currentStep)
{
var sagaInfo = new SagaProgressInfo
{
SagaId = sagaId,
SagaType = typeof(TState).Name,
CurrentStep = currentStep,
Status = GetStatusFromState(state),
LastUpdated = DateTime.UtcNow,
StateSnapshot = JsonSerializer.Serialize(state)
};
// Сохранение в Redis для быстрого доступа
await _redisDb.HashSetAsync(
$"saga:{sagaId}",
new HashEntry[]
{
new HashEntry("status", sagaInfo.Status),
new HashEntry("currentStep", sagaInfo.CurrentStep),
new HashEntry("lastUpdated", sagaInfo.LastUpdated.ToString("o")),
new HashEntry("stateSnapshot", sagaInfo.StateSnapshot)
});
// Публикация события для real-time обновлений через SignalR
await _redisDb.PublishAsync("saga-progress-channel", JsonSerializer.Serialize(sagaInfo));
// Сохранение в постоянное хранилище для истории
using var scope = _scopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<ISagaMonitoringRepository>();
await repository.SaveProgressAsync(sagaInfo);
}
private string GetStatusFromState<TState>(TState state)
{
// Извлечение статуса из свойств объекта состояния
var statusProperty = typeof(TState).GetProperty("Status");
return statusProperty?.GetValue(state)?.ToString() ?? "Unknown";
}
} |
|
Для визуализации можно использовать SignalR для отправки обновлений в реальном времени на клиентский дашборд:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| public class SagaMonitoringHub : Hub
{
private readonly IRedisSubscriber _redisSubscriber;
public SagaMonitoringHub(IRedisSubscriber redisSubscriber)
{
_redisSubscriber = redisSubscriber;
}
public override async Task OnConnectedAsync()
{
// Подписка на канал обновлений SAGA
_redisSubscriber.Subscribe("saga-progress-channel", async (message) =>
{
var progress = JsonSerializer.Deserialize<SagaProgressInfo>(message);
await Clients.All.SendAsync("SagaProgressUpdated", progress);
});
await base.OnConnectedAsync();
}
} |
|
Такой подход позволяет создать интерактивный дашборд, отображающий текущий статус всех активных SAGA-транзакций, что упрощает отладку и мониторинг системы.
Обработка долгоживущих транзакций в SAGA с использованием Polly для C#
Некоторые бизнес-процессы могут выполняться продолжительное время – часы или даже дни. Для таких случаев SAGA должна поддерживать механизмы долгоживущих транзакций. Библиотека Polly предоставляет гибкие механизмы для определения политик повторов и таймаутов, что делает её идеальным инструментом для работы с долгоживущими SAGA:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
| public class LongRunningSagaWithPolly<TState> where TState : class, ISagaState
{
private readonly IServiceProvider _serviceProvider;
private readonly ISagaRepository<TState> _repository;
private readonly ILogger<LongRunningSagaWithPolly<TState>> _logger;
public LongRunningSagaWithPolly(
IServiceProvider serviceProvider,
ISagaRepository<TState> repository,
ILogger<LongRunningSagaWithPolly<TState>> logger)
{
_serviceProvider = serviceProvider;
_repository = repository;
_logger = logger;
}
public async Task ExecuteWithTimeoutAndRetryAsync(
Guid sagaId,
Func<TState, Task<bool>> action,
TimeSpan timeout,
int maxRetries = 3)
{
// Политика таймаута
var timeoutPolicy = Policy.TimeoutAsync(timeout);
// Политика повторов с экспоненциальной задержкой
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(
maxRetries,
attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
async (ex, delay, attempt, ctx) =>
{
_logger.LogWarning(ex,
"Error executing saga step (Attempt {Attempt}/{MaxRetries}). Retrying in {Delay}s",
attempt, maxRetries, delay.TotalSeconds);
// Обновление состояния SAGA с информацией о повторе
var state = await _repository.GetByIdAsync(sagaId);
state.LastError = ex.Message;
state.RetryCount = attempt;
state.NextRetryAt = DateTime.UtcNow.Add(delay);
await _repository.SaveAsync(state);
});
// Комбинирование политик
var combinedPolicy = Policy.WrapAsync(retryPolicy, timeoutPolicy);
try
{
await combinedPolicy.ExecuteAsync(async (ctx) =>
{
var state = await _repository.GetByIdAsync(sagaId);
bool isCompleted = await action(state);
// Если шаг не завершен, имитируем исключение для активации повтора
if (!isCompleted)
throw new SagaStepIncompleteException("Step not completed, will retry later");
return isCompleted;
}, new Context($"Saga_{sagaId}"));
}
catch (TimeoutRejectedException)
{
// Обработка превышения общего таймаута
var state = await _repository.GetByIdAsync(sagaId);
state.Status = SagaStatus.TimedOut;
state.LastError = "Saga execution timed out";
await _repository.SaveAsync(state);
// Запуск компенсационной логики
await InitiateCompensationAsync(sagaId);
}
catch (Exception ex) when (ex is not SagaStepIncompleteException)
{
// Обработка других ошибок
_logger.LogError(ex, "Unhandled error in saga execution");
var state = await _repository.GetByIdAsync(sagaId);
state.Status = SagaStatus.Failed;
state.LastError = ex.Message;
await _repository.SaveAsync(state);
// Запуск компенсационной логики
await InitiateCompensationAsync(sagaId);
}
}
private async Task InitiateCompensationAsync(Guid sagaId)
{
// Логика запуска компенсационных действий
// ...
}
} |
|
Для долгоживущих SAGA также полезно реализовать механизм сохранения контрольных точек (checkpoints), позволяющий возобновить выполнение с определенного места после сбоя:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| public async Task ResumeFromCheckpointAsync(Guid sagaId)
{
var state = await _repository.GetByIdAsync(sagaId);
// Определение следующего шага на основе текущего состояния
switch (state.CurrentCheckpoint)
{
case "OrderCreated":
await ExecuteWithTimeoutAndRetryAsync(
sagaId,
s => VerifyCustomerAsync(s),
TimeSpan.FromMinutes(5));
break;
case "CustomerVerified":
await ExecuteWithTimeoutAndRetryAsync(
sagaId,
s => ReserveInventoryAsync(s),
TimeSpan.FromMinutes(10));
break;
// Другие шаги...
}
} |
|
Реализация SAGA с применением Azure Service Bus в .NET приложениях
Azure Service Bus предоставляет надежную инфраструктуру для обмена сообщениями в облаке и хорошо подходит для реализации SAGA. Его ключевые возможности – гарантированная доставка, поддержка сессий и планирование сообщений – делают его мощным инструментом для оркестрации распределенных транзакций.
Рассмотрим пример реализации SAGA с использованием Azure Service Bus:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
| public class AzureServiceBusSagaOrchestrator : ISagaOrchestrator
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _commandSender;
private readonly ISagaRepository<OrderSagaState> _repository;
public AzureServiceBusSagaOrchestrator(
ServiceBusClient client,
ISagaRepository<OrderSagaState> repository)
{
_client = client;
_commandSender = client.CreateSender("saga-commands");
_repository = repository;
}
public async Task<Guid> StartSagaAsync(StartOrderSagaCommand command)
{
// Создание состояния SAGA
var state = new OrderSagaState
{
Id = Guid.NewGuid(),
OrderId = Guid.NewGuid(),
CustomerId = command.CustomerId,
Items = command.Items,
Status = SagaStatus.Started,
CreatedAt = DateTime.UtcNow
};
await _repository.SaveAsync(state);
// Отправка команды создания заказа с указанием sessionId для корреляции
var createOrderCommand = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(
new CreateOrderCommand
{
SagaId = state.Id,
OrderId = state.OrderId,
CustomerId = state.CustomerId,
Items = state.Items
}))
{
SessionId = state.Id.ToString(), // Использование SagaId как идентификатора сессии
MessageId = Guid.NewGuid().ToString(),
Subject = "CreateOrder"
};
await _commandSender.SendMessageAsync(createOrderCommand);
return state.Id;
}
public async Task HandleResponseAsync(ServiceBusReceivedMessage message)
{
var sagaId = Guid.Parse(message.SessionId);
var state = await _repository.GetByIdAsync(sagaId);
// Обработка ответа в зависимости от темы сообщения
switch (message.Subject)
{
case "OrderCreated":
await HandleOrderCreatedAsync(state, message);
break;
case "CustomerVerified":
await HandleCustomerVerifiedAsync(state, message);
break;
// Другие обработчики...
}
}
private async Task HandleOrderCreatedAsync(OrderSagaState state, ServiceBusReceivedMessage message)
{
// Обновление состояния
state.Status = SagaStatus.OrderCreated;
await _repository.SaveAsync(state);
// Отправка следующей команды
var verifyCustomerCommand = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(
new VerifyCustomerCommand
{
SagaId = state.Id,
CustomerId = state.CustomerId
}))
{
SessionId = state.Id.ToString(),
MessageId = Guid.NewGuid().ToString(),
Subject = "VerifyCustomer"
};
await _commandSender.SendMessageAsync(verifyCustomerCommand);
}
} |
|
Для обработки сообщений можно использовать обработчик с поддержкой сессий:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
| public class AzureServiceBusSagaProcessor : BackgroundService
{
private readonly ServiceBusClient _client;
private readonly ISagaOrchestrator _orchestrator;
private readonly ILogger<AzureServiceBusSagaProcessor> _logger;
private ServiceBusSessionProcessor _processor;
public AzureServiceBusSagaProcessor(
ServiceBusClient client,
ISagaOrchestrator orchestrator,
ILogger<AzureServiceBusSagaProcessor> logger)
{
_client = client;
_orchestrator = orchestrator;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Создание процессора сообщений с поддержкой сессий
_processor = _client.CreateSessionProcessor(
"saga-responses",
new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 100,
MaxConcurrentCallsPerSession = 1,
AutoCompleteMessages = false
});
// Настройка обработчиков
_processor.ProcessMessageAsync += ProcessMessageAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
await _processor.StartProcessingAsync(stoppingToken);
try
{
await Task.Delay(Timeout.Infinite, stoppingToken);
}
catch (OperationCanceledException)
{
// Ожидаемое исключение при остановке сервиса
}
finally
{
await _processor.StopProcessingAsync();
}
}
private async Task ProcessMessageAsync(ProcessSessionMessageEventArgs args)
{
try
{
await _orchestrator.HandleResponseAsync(args.Message);
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing saga response");
// В зависимости от типа ошибки решаем, откладывать сообщение или отправлять в dead-letter
if (IsTransientError(ex))
await args.AbandonMessageAsync(args.Message);
else
await args.DeadLetterMessageAsync(args.Message, ex.Message);
}
}
private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception, "Error in service bus processor");
return Task.CompletedTask;
}
private bool IsTransientError(Exception ex)
{
// Определение, является ли ошибка временной
return ex is TimeoutException || ex is ServiceBusException sbEx && sbEx.IsTransient;
}
} |
|
Azure Service Bus также позволяет реализовать планирование отложенных действий, что полезно для долгоживущих SAGA или реализации тайм-аутов:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| public async Task ScheduleTimeoutAsync(Guid sagaId, string timeoutName, TimeSpan delay)
{
var timeoutMessage = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(
new SagaTimeoutMessage
{
SagaId = sagaId,
TimeoutName = timeoutName
}))
{
SessionId = sagaId.ToString(),
Subject = "SagaTimeout",
MessageId = Guid.NewGuid().ToString()
};
// Планирование сообщения на будущее время
timeoutMessage.ScheduledEnqueueTime = DateTimeOffset.UtcNow.Add(delay);
await _commandSender.SendMessageAsync(timeoutMessage);
} |
|
Обработка сбоев
Одна из ключевых сложностей при работе с SAGA-транзакциями – обработка сбоев. В отличие от классических ACID-транзакций, SAGA не может просто выполнить ROLLBACK при возникновении ошибки. Поскольку каждый шаг SAGA фиксирует изменения в локальной базе данных, необходим специальный механизм для отката частично выполненной транзакции.
Компенсирующие транзакции
Компенсирующие транзакции – это операции, которые отменяют эффект предыдущих успешно выполненных шагов SAGA. Они выполняются в обратном порядке по отношению к основным транзакциям при возникновении сбоя.
Рассмотрим пример компенсирующих транзакций для нашего сценария создания заказа:
1. Если авторизация платежа не удалась, необходимо отменить резервирование товара.
2. Если резервирование товара не удалось, необходимо отклонить заказ.
3. Если проверка клиента не прошла, необходимо отклонить заказ.
Код обработки сбоя в оркестрационном подходе может выглядеть так:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| public async Task Handle(PaymentFailedEvent notification, CancellationToken cancellationToken)
{
var state = await _sagaRepository.GetByIdAsync(notification.SagaId);
state.Status = CreateOrderSagaStatus.PaymentFailed;
state.ErrorMessage = notification.FailureReason;
// Компенсирующая транзакция для отмены резервирования товара
if (state.IsInventoryReserved)
{
await _mediator.Send(new CancelInventoryReservationCommand
{
OrderId = state.OrderId,
Reason = "Payment failed: " + notification.FailureReason,
SagaId = state.Id
});
}
// Компенсирующая транзакция для отмены заказа
await _mediator.Send(new RejectOrderCommand
{
OrderId = state.OrderId,
Reason = "Payment failed: " + notification.FailureReason,
SagaId = state.Id
});
await _sagaRepository.SaveAsync(state);
} |
|
В хореографическом подходе компенсирующие действия инициируются событиями об ошибках:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| // В сервисе инвентаризации
public class PaymentFailedEventHandler : IEventHandler<PaymentFailedEvent>
{
private readonly IInventoryRepository _inventoryRepository;
private readonly IEventBus _eventBus;
public async Task HandleAsync(PaymentFailedEvent @event)
{
// Найти резервирование по идентификатору заказа
var reservation = await _inventoryRepository.GetByOrderIdAsync(@event.OrderId);
if (reservation != null)
{
// Отменить резервирование
reservation.Status = ReservationStatus.Cancelled;
reservation.CancellationReason = "Payment failed: " + @event.FailureReason;
await _inventoryRepository.UpdateAsync(reservation);
// Опубликовать событие об отмене резервирования
await _eventBus.PublishAsync(new InventoryReservationCancelledEvent
{
CorrelationId = @event.CorrelationId,
OrderId = @event.OrderId,
Reason = reservation.CancellationReason
});
}
}
} |
|
Важно отметить, что компенсирующие транзакции должны быть тщательно спроектированы с учетом специфики предметной области. Не все операции можно легко отменить – например, если уже инициирован перевод денег через внешнюю платежную систему, может потребоваться не отмена, а возврат средств.
Идемпотентность операций
Идемпотентность – ключевое свойство для надежного выполнения SAGA. Операция считается идемпотентной, если её многократное выполнение даёт тот же результат, что и однократное. В асинхронной распределенной среде сообщения могут доставляться повторно из-за сбоев сети или перезапуска сервисов. Если операции не идемпотентны, это может привести к непредсказуемым результатам. Существует несколько подходов к обеспечению идемпотентности:
1. Использование уникальных идентификаторов операций:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| public async Task<bool> ProcessPaymentIdempotent(ProcessPaymentCommand command)
{
// Проверка, была ли эта операция уже выполнена
var existingTransaction = await _transactionRepository
.GetByIdempotencyKeyAsync(command.IdempotencyKey);
if (existingTransaction != null)
{
// Операция уже была выполнена, возвращаем предыдущий результат
return existingTransaction.IsSuccessful;
}
// Выполнение операции и сохранение результата
bool isSuccessful;
try
{
// Основная логика обработки платежа
await _paymentGateway.ChargeAsync(command.PaymentInfo);
isSuccessful = true;
}
catch (Exception)
{
isSuccessful = false;
}
// Сохранение результата операции с идентификатором идемпотентности
await _transactionRepository.SaveAsync(new PaymentTransaction
{
IdempotencyKey = command.IdempotencyKey,
OrderId = command.OrderId,
Amount = command.Amount,
IsSuccessful = isSuccessful,
Timestamp = DateTime.UtcNow
});
return isSuccessful;
} |
|
2. Проверка состояния перед выполнением операции:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| public async Task ReserveInventoryIdempotent(ReserveInventoryCommand command)
{
// Проверка текущего состояния
var existingReservation = await _inventoryRepository
.GetByOrderIdAsync(command.OrderId);
// Если резервирование уже существует, не создаем новое
if (existingReservation != null)
return;
// Создание нового резервирования
var reservation = new InventoryReservation
{
Id = Guid.NewGuid(),
OrderId = command.OrderId,
Items = command.Items,
Status = ReservationStatus.Pending,
CreatedAt = DateTime.UtcNow
};
await _inventoryRepository.AddAsync(reservation);
} |
|
3. Использование условных операций в базе данных:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| public async Task UpdateOrderStatusIdempotent(Guid orderId, OrderStatus newStatus, OrderStatus expectedCurrentStatus)
{
// Выполнение условного обновления
int updatedRows = await _dbContext.Orders
.Where(o => o.Id == orderId && o.Status == expectedCurrentStatus)
.ExecuteUpdateAsync(o => o.SetProperty(x => x.Status, newStatus));
// Если обновлено 0 строк, значит условие не выполнилось
if (updatedRows == 0)
{
// Проверяем, не было ли обновление уже выполнено
var currentOrder = await _dbContext.Orders.FindAsync(orderId);
if (currentOrder.Status != newStatus)
{
throw new ConcurrencyException($"Cannot update order status. Expected {expectedCurrentStatus}, but found {currentOrder.Status}");
}
}
} |
|
Идемпотентность играет критическую роль не только для основных операций SAGA, но и для компенсирующих транзакций. Компенсирующие действия должны быть спроектированы так, чтобы они могли быть безопасно выполнены несколько раз.
Стратегии восстановления после частичного сбоя
Когда SAGA сталкивается с частичным сбоем, существует несколько стратегий восстановления:
1. Полный откат – выполнение всех компенсирующих транзакций для отмены эффекта успешно выполненных шагов. Это наиболее распространенный подход.
2. Пропуск сбойных шагов – в некоторых сценариях допустимо пропустить неудачный шаг и продолжить выполнение SAGA. Это подходит для необязательных операций, которые не влияют на целостность бизнес-процесса.
3. Повторные попытки – автоматический повтор неудачного шага с экспоненциальной задержкой.
Вот пример стратегии восстановления с автоматическими повторными попытками:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| public async Task ExecuteStepWithRetryAsync(
Guid sagaId,
Func<Task<bool>> stepAction,
Func<Task> compensationAction,
int maxRetries = 3)
{
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(
maxRetries,
attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
async (ex, timeSpan, retryCount, context) =>
{
_logger.LogWarning(ex,
"Step failed. Retry {RetryCount}/{MaxRetries} in {Delay}s",
retryCount, maxRetries, timeSpan.TotalSeconds);
// Обновление метаданных SAGA
await UpdateSagaMetadataAsync(sagaId, ex.Message, retryCount);
});
try
{
await retryPolicy.ExecuteAsync(stepAction);
}
catch (Exception ex)
{
_logger.LogError(ex, "Step failed after all retries. Executing compensation");
// Если все попытки исчерпаны, выполняем компенсацию
await compensationAction();
// Обновление статуса SAGA
await SetSagaStatusToFailedAsync(sagaId, ex.Message);
}
} |
|
Важную роль в восстановлении играет состояние SAGA. Оно должно быть надежно сохранено и доступно для анализа даже после серьезных сбоев системы:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| public class SagaStateManager<TState> where TState : class, ISagaState
{
private readonly ISagaRepository<TState> _repository;
private readonly ILogger<SagaStateManager<TState>> _logger;
public async Task<SagaRecoveryDecision> DetermineRecoveryActionAsync(Guid sagaId)
{
var state = await _repository.GetByIdAsync(sagaId);
if (state == null)
return SagaRecoveryDecision.Abort;
// Анализ состояния для определения стратегии восстановления
if (state.RetryCount >= state.MaxRetries)
return SagaRecoveryDecision.Compensate;
if (state.LastExecutedStepIndex < 0)
return SagaRecoveryDecision.Restart;
if (IsTimeoutExceeded(state.StartedAt))
return SagaRecoveryDecision.Compensate;
return SagaRecoveryDecision.RetryFromLastStep;
}
private bool IsTimeoutExceeded(DateTime startedAt)
{
var sagaDuration = DateTime.UtcNow - startedAt;
return sagaDuration > TimeSpan.FromHours(1); // Настраиваемый максимальный срок
}
}
public enum SagaRecoveryDecision
{
Restart,
RetryFromLastStep,
Compensate,
Abort
} |
|
Применение Outbox Pattern для гарантированной доставки событий
Надежное взаимодействие между сервисами — критическая часть SAGA. Проблема возникает, когда сервис должен атомарно обновить свою базу данных и отправить сообщение. Если сервис сначала обновляет базу, а затем отправляет сообщение, возможна ситуация, когда после обновления, но до отправки сообщения произойдет сбой. В этом случае данные изменились, но следующий шаг SAGA не запустился. Паттерн Outbox решает эту проблему, гарантируя доставку сообщений даже при сбоях:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| public class OrderService
{
private readonly OrderDbContext _dbContext;
private readonly IOutboxMessageRepository _outboxRepository;
public async Task CreateOrderAsync(CreateOrderCommand command)
{
using var transaction = await _dbContext.Database.BeginTransactionAsync();
try
{
// 1. Создание заказа
var order = new Order
{
Id = command.OrderId,
CustomerId = command.CustomerId,
Status = OrderStatus.Pending,
Items = command.Items.Select(i => new OrderItem { /* ... */ }).ToList()
};
_dbContext.Orders.Add(order);
// 2. Сохранение события в таблицу Outbox (в той же транзакции)
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
Type = "OrderCreated",
Content = JsonSerializer.Serialize(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
Items = order.Items.Select(i => new OrderItemDto { /* ... */ }).ToList()
}),
CreatedAt = DateTime.UtcNow,
ProcessedAt = null
};
_outboxRepository.Add(outboxMessage);
// 3. Фиксация транзакции
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
} |
|
Для передачи сообщений из Outbox в брокер сообщений используется отдельный фоновый процесс:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| public class OutboxProcessor : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<OutboxProcessor> _logger;
private readonly TimeSpan _pollingInterval = TimeSpan.FromSeconds(5);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = _scopeFactory.CreateScope();
var outboxService = scope.ServiceProvider.GetRequiredService<IOutboxService>();
// Обработка сообщений из Outbox
await outboxService.ProcessPendingMessagesAsync(stoppingToken);
// Ожидание перед следующей итерацией
await Task.Delay(_pollingInterval, stoppingToken);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogError(ex, "Error processing outbox messages");
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
}
}
}
} |
|
Такой подход гарантирует, что события будут доставлены в нужном порядке и даже если сервис временно выйдет из строя, сообщения всё равно будут обработаны после восстановления.
Механизмы логирования и аудита распределенных транзакций
Из-за распределенной природы SAGA трудно понять, что пошло не так при сбое. Комплексная система логирования и аудита играет важную роль в диагностике и устранении проблем. Эффективная система логирования для SAGA должна:
1. Связывать все логи одной транзакции через корреляционный идентификатор.
2. Фиксировать переходы состояний каждого шага.
3. Записывать контекст выполнения для облегчения диагностики.
Пример реализации централизовного логирования для SAGA:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
| public class SagaLogger<TSaga> where TSaga : class
{
private readonly ISagaRepository _repository;
private readonly ILogger _logger;
public async Task LogTransitionAsync(
Guid sagaId,
string fromState,
string toState,
object context = null)
{
// Сохранение в журнал аудита
var logEntry = new SagaLogEntry
{
SagaId = sagaId,
SagaType = typeof(TSaga).Name,
FromState = fromState,
ToState = toState,
Timestamp = DateTime.UtcNow,
Context = context != null ? JsonSerializer.Serialize(context) : null
};
await _repository.SaveLogEntryAsync(logEntry);
// Структурированное логирование
using (_logger.BeginScope(new Dictionary<string, object>
{
["SagaId"] = sagaId,
["SagaType"] = typeof(TSaga).Name
}))
{
_logger.LogInformation(
"Saga transition: {FromState} -> {ToState}",
fromState, toState);
}
}
public async Task LogErrorAsync(
Guid sagaId,
string state,
Exception exception,
object context = null)
{
// Запись ошибки в журнал аудита
var errorEntry = new SagaErrorEntry
{
SagaId = sagaId,
SagaType = typeof(TSaga).Name,
State = state,
ErrorType = exception.GetType().Name,
ErrorMessage = exception.Message,
StackTrace = exception.StackTrace,
Timestamp = DateTime.UtcNow,
Context = context != null ? JsonSerializer.Serialize(context) : null
};
await _repository.SaveErrorEntryAsync(errorEntry);
// Структурированное логирование ошибки
using (_logger.BeginScope(new Dictionary<string, object>
{
["SagaId"] = sagaId,
["SagaType"] = typeof(TSaga).Name,
["State"] = state
}))
{
_logger.LogError(exception, "Saga error occurred");
}
}
} |
|
Для углубленного анализа SAGA-транзакций полезно реализовать инструменты визуализации процесса:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| public class SagaVisualizerService
{
private readonly ISagaLogRepository _logRepository;
public async Task<SagaVisualizationDto> GenerateSagaFlowDiagramAsync(Guid sagaId)
{
// Получение всех записей лога для данной SAGA
var logs = await _logRepository.GetBySagaIdAsync(sagaId);
// Построение временной шкалы выполнения
var timeline = logs
.OrderBy(l => l.Timestamp)
.Select(l => new SagaTimelineItemDto
{
Timestamp = l.Timestamp,
State = l.ToState,
Duration = l.DurationMs,
IsError = l is SagaErrorEntry
})
.ToList();
// Формирование графа состояний
var stateGraph = BuildStateGraph(logs);
return new SagaVisualizationDto
{
SagaId = sagaId,
StartedAt = logs.Min(l => l.Timestamp),
CompletedAt = logs.Max(l => l.Timestamp),
CurrentState = logs.OrderByDescending(l => l.Timestamp).First().ToState,
Timeline = timeline,
StateGraph = stateGraph
};
}
private List<StateTransitionDto> BuildStateGraph(IEnumerable<SagaLogEntry> logs)
{
// Логика построения графа состояний
// ...
return new List<StateTransitionDto>();
}
} |
|
Обработка параллельных SAGA-транзакций и предотвращение конфликтов
В распределенной системе часто возникают ситуации, когда несколько SAGA-транзакций одновременно работают с одними и теми же данными. Из-за отсутствия изоляции в SAGA это может привести к аномалиям и конфликтам. Существует несколько стратегий для предотвращения таких конфликтов:
1. Семантические блокировки — добавление флага блокировки в записи, которые обрабатываются в рамках SAGA:
C# | 1
2
3
4
5
6
7
8
9
10
11
| public async Task<bool> TryAcquireOrderLockAsync(Guid orderId, Guid lockOwnerId)
{
// Попытка установить блокировку атомарной операцией
int updatedRows = await _dbContext.Orders
.Where(o => o.Id == orderId && (o.LockOwnerId == null || o.LockExpiresAt < DateTime.UtcNow))
.ExecuteUpdateAsync(o => o
.SetProperty(x => x.LockOwnerId, lockOwnerId)
.SetProperty(x => x.LockExpiresAt, DateTime.UtcNow.AddMinutes(5)));
return updatedRows > 0;
} |
|
2. Оптимистическая блокировка — использование версионности для выявления конфликтов:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| public async Task UpdateInventoryWithOptimisticLockAsync(
UpdateInventoryCommand command)
{
var inventory = await _dbContext.InventoryItems
.FindAsync(command.ProductId);
if (inventory == null)
throw new NotFoundException($"Product {command.ProductId} not found");
if (inventory.Version != command.ExpectedVersion)
throw new ConcurrencyException("Inventory has been modified by another transaction");
inventory.Quantity -= command.QuantityToReserve;
inventory.Version++; // Увеличение версии
await _dbContext.SaveChangesAsync();
} |
|
3. Перечитывание значений — проверка актуальности данных перед их модификацией:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| public async Task<bool> ApproveOrderWithRereadAsync(Guid orderId)
{
// Чтение заказа
var order = await _dbContext.Orders.FindAsync(orderId);
if (order == null)
return false;
// Проверка, что заказ всё ещё в ожидании и не был отменен другой транзакцией
if (order.Status != OrderStatus.Pending)
return false;
// Обновление статуса
order.Status = OrderStatus.Approved;
await _dbContext.SaveChangesAsync();
return true;
} |
|
4. Файл версий — сохранение истории операций для возможности их реупорядочивания:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| public async Task RecordPaymentOperationAsync(PaymentOperation operation)
{
// Добавление операции в историю
await _dbContext.PaymentOperations.AddAsync(new PaymentOperationEntity
{
Id = Guid.NewGuid(),
AccountId = operation.AccountId,
OperationType = operation.Type,
Amount = operation.Amount,
Timestamp = DateTime.UtcNow,
ReferenceId = operation.ReferenceId,
ProcessedOrder = await GetNextOperationOrderAsync(operation.AccountId)
});
await _dbContext.SaveChangesAsync();
// Асинхронная обработка операций в правильном порядке
_ = Task.Run(() => ReprocessOperationsAsync(operation.AccountId));
} |
|
Этот подход особенно полезен для систем, где порядок операций критически важен, например, в финансовых приложениях.
При проектировании SAGA важно заранее проанализировать потенциальные конфликты между параллельными транзакциями и выбрать соответствующие механизмы их разрешения. Универсального решения не существует — выбор зависит от конкретных бизнес-требований и характеристик системы.
Практические рекомендации
Внедрение паттерна SAGA требует тщательного планирования и осторожности. Опыт показывает, что успешная реализация зависит от глубокого понимания как технических, так и бизнес-аспектов задачи. Рассмотрим ключевые практические рекомендации, которые помогут избежать распространенных ошибок.
Потенциальные проблемы и узкие места
При использовании SAGA разработчики сталкиваются с несколькими типичными проблемами:
1. Сложность отладки — в распределенной среде трудно проследить поток выполнения. Ситуация усугубляется асинхронной природой взаимодействий. Решением может стать внедрение централизованной системы трассировки с использованием инструментов вроде OpenTelemetry:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| public async Task ExecuteSagaStepAsync(Guid sagaId, string stepName, Func<Task> action)
{
using var activity = _activitySource.StartActivity($"SagaStep:{stepName}");
activity?.SetTag("saga.id", sagaId);
try
{
await action();
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
} |
|
2. Проблемы с согласованностью данных — временные несоответствия между сервисами могут привести к аномалиям. Важно четко документировать потенциальные сценарии и обучать пользователей системы особенностям её работы.
3. Сложность разработки компенсирующих транзакций — не все операции легко обратимы. Например, отправка уведомления клиенту не может быть полностью отменена. В таких случаях следует проектировать компенсирующие действия как бизнес-операции, а не технические отмены.
Когда SAGA не подходит
Несмотря на гибкость, SAGA не является универсальным решением:
1. Требуется строгая изоляция — если бизнес-требования предписывают полную изоляцию транзакций, SAGA не подойдет из-за принципиального отсутствия изоляции между шагами.
2. Критически важная целостность данных — для финансовых операций, где потеря согласованности недопустима даже на короткое время, традиционные транзакции могут быть предпочтительнее.
3. Простые операции — для простых обновлений, затрагивающих один сервис, SAGA излишне усложняет систему без явной выгоды.
В таких случаях стоит рассмотреть альтернативы: API Composition (для запросов), выделение ограниченного контекста в отдельный сервис или даже использование монолитной архитектуры для критичных компонентов.
Инструменты тестирования распределенных транзакций
Тестирование SAGA требует особого подхода из-за распределенной природы и асинхронного взаимодействия. Эффективными инструментами являются:
1. Интеграционное тестирование с TestContainers — позволяет тестировать взаимодействие с реальными зависимостями:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| public class OrderSagaIntegrationTests : IClassFixture<TestContainersFixture>
{
private readonly TestContainersFixture _fixture;
public OrderSagaIntegrationTests(TestContainersFixture fixture)
{
_fixture = fixture;
}
[Fact]
public async Task CompleteOrderSaga_WhenAllStepsSucceed_OrderApproved()
{
// Arrange
var sagaId = await _fixture.OrderApi.StartCreateOrderSagaAsync(
new CreateOrderRequest { /* ... */ });
// Act
await _fixture.WaitForSagaCompletionAsync(sagaId, TimeSpan.FromSeconds(30));
// Assert
var sagaState = await _fixture.SagaRepository.GetByIdAsync(sagaId);
Assert.Equal(SagaStatus.Completed, sagaState.Status);
var order = await _fixture.OrderRepository.GetByIdAsync(sagaState.OrderId);
Assert.Equal(OrderStatus.Approved, order.Status);
}
} |
|
2. Симуляция сбоев с Chaos Engineering — проверка системы на устойчивость к различным видам отказов:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| [Fact]
public async Task OrderSaga_WhenPaymentServiceUnavailable_RetryAndEventuallySucceed()
{
// Arrange
await _fixture.ChaosProxy.ConfigureAsync(new ChaosConfiguration
{
TargetService = "payment-service",
FailureMode = FailureMode.TemporaryUnavailable,
Duration = TimeSpan.FromSeconds(10)
});
var sagaId = await _fixture.OrderApi.StartCreateOrderSagaAsync(
new CreateOrderRequest { /* ... */ });
// Act
await _fixture.WaitForSagaCompletionAsync(sagaId, TimeSpan.FromMinutes(2));
// Assert
var sagaState = await _fixture.SagaRepository.GetByIdAsync(sagaId);
Assert.Equal(SagaStatus.Completed, sagaState.Status);
} |
|
Сравнительный анализ производительности
При выборе между хореографией и оркестрацией важно учитывать характеристики производительности:
Хореография обычно демонстрирует меньшую задержку при небольшом количестве участников, но масштабируется хуже с ростом их числа из-за увеличения сетевого трафика (O(n²) сообщений).
Оркестрация имеет стабильную сложность взаимодействия (O(n) сообщений), но страдает от потенциального узкого места в виде оркестратора.
Измерения показывают, что при количестве шагов менее 5, разница в производительности минимальна. При более сложных процессах оркестрация обеспечивает лучшую масштабируемость и прозрачность.
Оптимизация SAGA для отказоустойчивости
Для повышения отказоустойчивости SAGA рекомендуется:
1. Сегментирование сложных SAGA — разбиение длинных цепочек на несколько меньших, что снижает вероятность полного отказа.
2. Применение паттерна Circuit Breaker — для предотвращения каскадных сбоев:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| public async Task ExecuteSagaStepWithCircuitBreakerAsync(string serviceName, Func<Task> action)
{
var circuitBreaker = _registry.GetCircuitBreaker(serviceName);
await circuitBreaker.ExecuteAsync(async () =>
{
try
{
await action();
}
catch (Exception ex)
{
_logger.LogError(ex, "Service {ServiceName} failed", serviceName);
throw;
}
});
} |
|
3. Использование специализированных очередей для компенсаций — выделение отдельной инфраструктуры для обработки компенсирующих транзакций, чтобы они не конкурировали с основным потоком операций.
Миграция с монолитной архитектуры к микросервисам с использованием паттерна SAGA
Переход от монолита к микросервисам — испытание, требующее не только технической трансформации, но и изменения мышления команды. Внедрение паттерна SAGA в процессе такой миграции позволяет постепенно декомпозировать сложные бизнес-процессы, сохраняя их целостность.
Стратегия "удушающей лозы" (strangler fig pattern) наиболее эффективна при такой миграции. Вместо рискованного полного переписывания, команда постепенно выделяет отдельные домены в микросервисы, используя SAGA для обеспечения согласованности между новыми сервисами и монолитом:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| // Шлюз, перенаправляющий запросы между монолитом и микросервисами
public class StranglerFacade : IOrderProcessor
{
private readonly ILegacyOrderSystem _monolith;
private readonly IOrderService _microservice;
private readonly IFeatureToggleManager _featureToggles;
public async Task<OrderResult> ProcessOrderAsync(CreateOrderCommand command)
{
// Проверка флага переключения
if (_featureToggles.IsEnabled("UseNewOrderService"))
{
// Запуск SAGA в новом микросервисе
return await _microservice.StartOrderSagaAsync(command);
}
// Использование старой логики монолита
return await _monolith.CreateOrderAsync(command);
}
} |
|
При выборе компонентов для миграции полезен анализ зависимостей в существующем монолите. Компоненты с минимальными зависимостями лучше мигрировать первыми, что снижает сложность взаимодействия. Инструменты типа NDepend для .NET помогают визуализировать и анализировать эти зависимости.
Пошаговая стратегия миграции с внедрением SAGA
1. Идентификация границ предметных областей (bounded contexts) — ключевой этап перед началом миграции. Эффективное разделение монолита на независимые домены минимизирует количество необходимых распределенных транзакций.
2. Создание API-шлюза и анти-коррупционного слоя — обеспечивает совместимость между микросервисами и монолитом:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| public class AntiCorruptionLayer : IOrderRepository
{
private readonly LegacyOrderDatabase _legacyDb;
private readonly IEventBus _eventBus;
public async Task<Order> GetOrderAsync(Guid orderId)
{
// Получение данных из монолита
var legacyOrder = await _legacyDb.GetOrderAsync(orderId);
// Преобразование в новую модель
return MapToNewModel(legacyOrder);
}
public async Task UpdateOrderAsync(Order order)
{
// Обновление в монолите
await _legacyDb.UpdateOrderAsync(MapToLegacyModel(order));
// Публикация события для микросервисов
await _eventBus.PublishAsync(new OrderUpdatedEvent
{
OrderId = order.Id,
NewStatus = order.Status,
UpdatedBy = "AntiCorruptionLayer"
});
}
} |
|
3. Внедрение Event Sourcing и CQRS — обеспечивает постепенный переход от прямых API-вызовов к асинхронному обмену событиями:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // В монолите добавляем публикацию событий
public class LegacyOrderService
{
private readonly IEventBus _eventBus;
public void ProcessOrder(OrderData data)
{
// Существующая логика
var orderResult = _legacyProcessingLogic.Process(data);
// Дополнительно публикуем события для микросервисов
_eventBus.Publish(new LegacyOrderProcessedEvent
{
OrderId = orderResult.Id,
Status = orderResult.Status,
// Другие данные...
});
}
} |
|
4. Постепенное внедрение SAGA — начиная с хореографического подхода и при необходимости эволюционируя к оркестрации для более сложных процессов.
Очень частая ошибка — попытка одновременно мигрировать все бизнес-процессы. Для минимизации рисков рекомендуется начинать с некритичных процессов, чтобы команда могла адаптироваться к новой парадигме. Кандидатами для первой миграции могут быть процессы создания отчетов или аналитические функции — они требуют минимальной транзакционной согласованности и могут работать с данными, которые допускают эвентуальную согласованность.
Гибридный подход при миграции
Интересное решение — использование гибридного подхода к транзакциям во время переходного периода. Некоторые части бизнес-процесса могут оставаться в монолите и использовать ACID-транзакции, в то время как другие части, перенесенные в микросервисы, будут взаимодействовать через SAGA:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| public class HybridOrderSaga : ISagaOrchestrator
{
private readonly MonolithTransactionProxy _monolithProxy;
private readonly ICustomerService _customerService;
private readonly IPaymentService _paymentService;
public async Task<Guid> StartOrderProcessingAsync(CreateOrderCommand command)
{
// Создание заказа и проверка запасов в монолите через транзакцию
var monolithResult = await _monolithProxy.ExecuteInTransactionAsync(async () => {
var orderResult = await _monolithProxy.CreateOrderAsync(command);
var inventoryResult = await _monolithProxy.CheckInventoryAsync(command.Items);
return new { OrderId = orderResult.OrderId, IsInventoryAvailable = inventoryResult.IsAvailable };
});
if (!monolithResult.IsInventoryAvailable)
throw new InsufficientInventoryException();
// Создание состояния SAGA
var sagaState = new OrderSagaState
{
Id = Guid.NewGuid(),
OrderId = monolithResult.OrderId,
CustomerId = command.CustomerId,
Status = SagaStatus.Started
};
await _repository.SaveAsync(sagaState);
// Запуск SAGA для обработки оставшихся шагов в микросервисах
await _customerService.VerifyCustomerAsync(new VerifyCustomerCommand {
SagaId = sagaState.Id,
CustomerId = command.CustomerId
});
return sagaState.Id;
}
// Остальные обработчики SAGA...
} |
|
Такой гибридный подход позволяет использовать преимущества уже существующего монолита, минимизируя риски и постепенно наращивая экспертизу команды в области микросервисов и распределенных транзакций.
Мониторинг и измерение прогресса миграции
Определение ключевых метрик успеха — критическая часть миграции. Отслеживание не только технических показателей (время отклика, доступность), но и бизнес-метрик (успешность выполнения транзакций, количество компенсационных действий) помогает оценить прогресс и своевременно корректировать стратегию:
C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| public class MigrationMetricsCollector
{
private readonly IMetricsRegistry _registry;
public void TrackTransactionExecution(string transactionType, bool isMonolith, TimeSpan duration, bool isSuccessful)
{
// Запись метрик выполнения транзакции
_registry.RecordTimer($"transaction.{transactionType}.duration", duration);
_registry.RecordCounter($"transaction.{transactionType}.count", 1);
_registry.RecordCounter($"transaction.{transactionType}.{(isSuccessful ? "success" : "failure")}", 1);
// Метрики для сравнения монолита и микросервисов
var systemType = isMonolith ? "monolith" : "microservice";
_registry.RecordTimer($"system.{systemType}.{transactionType}.duration", duration);
_registry.RecordCounter($"system.{systemType}.{transactionType}.count", 1);
}
} |
|
Опыт показывает, что наибольшую сложность при миграции с использованием SAGA вызывает обеспечение правильной обработки граничных случаев и мониторинг выполнения распределенных транзакций. Для сложных процессов может быть полезно временно дублировать выполнение операций в обеих системах (монолите и микросервисах) с последующим сравнением результатов и переключением только после подтверждения корректности работы новой реализации.
Распределенные вычисления Кто-нибудь пробовал ?
Нужно вычислить интеграл, при этом код вычислений записан на одной машине.... Простой пример клиент-серверного приложения использующего распределенные вычисления Ребят, хелп! Помогите найти доступный для понимания пример распределенных вычислений. То бишь -... Генерировать случайные числа х, распределенные в диапазоне от-5 до 5 и вычислять для чисел > 0 Генерировать случайные числа х, распределенные в диапазоне от-5 до 5 и вычислять для чисел > 0... Распределенные БД, соединиться к Access и выполнить транзакцию Соединение с MS SQL Server я сделал, нужно соединится еще к серверу MySQL или Access и сделать там... распределенные и параллельные вычисления Подскажите пример приложения которое использует эти вычисления Сформировать матрицу элементами которой являются вещественные случайные числа, равномерно распределенные на отрезке как это сделать
Даны вещественные числа a,b (a<b). Сформировать матрицу XY(17,20),
элементами... Программа-тестировщик, вопрос по архитектуре Привет.
Мне по учебе нужно написать программу-тестировщик для оценки знаний студентов. ... Литература по архитектуре, оптимизаци, паттернам, антипаттернам Ну собственно из названия темы все понятно. Многие книги в основном посвящены синтаксису и... Разработка эмулятора вычислительной машины в классической архитектуре Фон Неймана Написать эмулятор вычислительной машины в классической архитектуре фон Неймана, которая реализует... Избавиться от зависимости порядка обработки систем в архитектуре приложения Здравствуйте, предположим, что в нашем гипотетическом приложении есть системы Водитель, Машина,... Консультация по архитектуре UI приложения Здравствуйте!
Не знаю реально ли, но может кто-то откликнется.
Реализую UI приложение на C#.... Репозиторий в n-tier / 3-layer архитектуре Передо мной стоит задача сделать проект с заменяемой ORM. Тестирую с nHibernate и Entity Framework....
|