Когда три года назад передо мной встала задача реинжинирить платформу электронной коммерции с нуля, я долго размышлял над выбором языка. Java показалась слишком прожорливой по памяти, Go - слишком примитивной для сложной бизнес-логики, а вот Rust... Rust заставил меня пересмотреть весь подход к архитектуре распределенных систем.
Зачем Rust для микросервисов в облаке
Честно говоря, первый опыт с Rust был болезненным. Компилятор ругался на каждую вторую строку, а время освоения языка растянулось на месяцы. Но когда система заработала - я понял, что оно того стоило. Главное преимущество Rust в облачной среде - это не производительность, как многие думают, а предсказуемость. Когда у тебя работают десятки микросервисов на Kubernetes, самое страшное - это неожиданные падения из-за гонки условий или утечки памяти. С Rust таких сюрпризов просто не бывает.
Цифры говорят сами за себя: мои Rust-сервисы потребляют в среднем на 60% меньше памяти чем аналогичные на Java и на 35% меньше чем на Go. В переводе на деньги это экономия около 40% на инфраструктуре AWS. Когда у тебя сотни инстансов, разница становится ощутимой.
Но дело не только в экономии ресурсов. Система типов Rust позволяет моделировать бизнес-логику таким образом, что многие ошибки просто невозможны на уровне компиляции. Помню случай с сервисом платежей - я случайно попытался передать отрицательную сумму в функцию пополнения баланса. В Java или Go такая ошибка могла бы проскочить в продакшн, а в Rust компилятор сразу указал на проблему. Особенно впечатляет работа с concurrency. Tokio - это не просто асинхронная среда выполнения, это целая экосистема для построения высокопроизводительных сетевых приложений. За время работы с ней у меня ни разу не было deadlock'ов или race conditions - компилятор просто не позволит написать такой код.
Конечно, есть и минусы. Время компиляции иногда заставляет нервничать - особенно при холодной сборке проекта. Размер команды тоже пришлось учитывать: найти разработчиков с опытом Rust оказалось сложнее, чем я ожидал. Но эти проблемы решаемы, а вот проблемы с надёжностью системы - нет.
Миграция с Java прошла поэтапно: сначала я переписал один небольшой сервис уведомлений, затем - API-шлюз, и только потом взялся за core-сервисы. Такой подход позволил команде постепенно освоить язык и отладить процессы сборки и развертывания.
[Rust] Обсуждение возможностей и предстоящей роли языка Rust Psilon, чем он тебя так привлек? И почему именно "убийца плюсов"?
Если напишешь развернутый ответ,... [Rust] Как привязывать WinAPI-функции к коду на Rust? Может кто-нить дать код, КАК привязывать вин апишные функции к растовскому коду (на примере... Rust - Visual Studio Code - Explorer - RUST TUTORIAL где? здравствуйте, при создании проекта использовал Visual Studio Code
слева в вертикальной панели 1-й... Как деплоить решение, состоящее из 100500 микросервисов (+docker) уточню - нужен совет от более опытных индейцев
допустим, есть некое решение, состоящее из более...
Архитектурные паттерны на практике
После того как я определился с Rust, встал вопрос архитектуры. Классические микросервисы - это хорошо, но без четкой структуры они быстро превращаются в хаос. Поэтому я решил применить Domain-Driven Design, адаптированный под особенности Rust. Первым делом я выделил bounded contexts для каждого домена. User management, product catalog, order processing, payment processing - каждый контекст стал отдельным микросервисом. Но самое интересное началось, когда я стал моделировать предметную область с помощью типов Rust. Вот как выглядит моделирование заказа в системе:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| #[derive(Debug, Clone)]
pub struct OrderId(Uuid);
#[derive(Debug, Clone)]
pub enum OrderStatus {
Pending,
Confirmed,
Shipped,
Delivered,
Cancelled,
}
pub struct Order {
id: OrderId,
customer_id: CustomerId,
items: Vec<OrderItem>,
status: OrderStatus,
created_at: DateTime<Utc>,
}
impl Order {
pub fn new(customer_id: CustomerId, items: Vec<OrderItem>) -> Result<Self, OrderError> {
if items.is_empty() {
return Err(OrderError::EmptyOrder);
}
Ok(Order {
id: OrderId(Uuid::new_v4()),
customer_id,
items,
status: OrderStatus::Pending,
created_at: Utc::now(),
})
}
pub fn confirm(&mut self) -> Result<(), OrderError> {
match self.status {
OrderStatus::Pending => {
self.status = OrderStatus::Confirmed;
Ok(())
}
_ => Err(OrderError::InvalidStatusTransition),
}
}
} |
|
Такое моделирование делает невозможными многие ошибки на уровне компиляции. Нельзя создать заказ без товаров, нельзя перевести заказ в неправильный статус. Это не просто валидация - это встроенная в тип логика домена.
Для разделения слоёв я использовал Hexagonal Architecture. Доменная логика находится в центре, а все внешние зависимости - на периферии через адаптеры. В Rust это особенно элегантно реализуется с помощью трейтов:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
| #[async_trait]
pub trait OrderRepository {
async fn save(&self, order: Order) -> Result<(), RepositoryError>;
async fn find_by_id(&self, id: OrderId) -> Result<Option<Order>, RepositoryError>;
async fn find_by_customer(&self, customer_id: CustomerId) -> Result<Vec<Order>, RepositoryError>;
}
#[async_trait]
pub trait PaymentGateway {
async fn charge(&self, amount: Money, card: CreditCard) -> Result<PaymentResult, PaymentError>;
async fn refund(&self, payment_id: PaymentId) -> Result<RefundResult, PaymentError>;
} |
|
Доменные сервисы зависят только от этих трейтов, а конкретные реализации подключаются через dependency injection. Это позволяет легко тестировать бизнес-логику изолированно от внешних систем.
Особо хочу выделить Event Sourcing и CQRS - паттерны, которые в Rust работают просто великолепно. Вместо хранения текущего состояния я сохраняю поток событий:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OrderEvent {
OrderCreated {
order_id: OrderId,
customer_id: CustomerId,
items: Vec<OrderItem>,
timestamp: DateTime<Utc>,
},
OrderConfirmed {
order_id: OrderId,
timestamp: DateTime<Utc>,
},
OrderShipped {
order_id: OrderId,
tracking_number: String,
timestamp: DateTime<Utc>,
},
OrderCancelled {
order_id: OrderId,
reason: String,
timestamp: DateTime<Utc>,
},
}
impl OrderEvent {
pub fn apply(events: &[OrderEvent]) -> Result<Order, EventApplyError> {
if events.is_empty() {
return Err(EventApplyError::NoEvents);
}
let mut order = match &events[0] {
OrderEvent::OrderCreated { order_id, customer_id, items, timestamp } => {
Order {
id: order_id.clone(),
customer_id: customer_id.clone(),
items: items.clone(),
status: OrderStatus::Pending,
created_at: *timestamp,
}
},
_ => return Err(EventApplyError::FirstEventMustBeCreation),
};
for event in events.iter().skip(1) {
order.apply_event(event)?;
}
Ok(order)
}
} |
|
Такой подход даёт полную историю изменений и позволяет восстанавливать состояние на любой момент времени. А благодаря сериализации Rust это работает очень быстро.
Для обеспечения отказоустойчивости я реализовал паттерн Circuit Breaker. Когда внешний сервис начинает падать, circuit breaker автоматически переводится в открытое состояние и перестаёт отправлять запросы:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| use std::sync::Arc;
use tokio::sync::RwLock;
pub struct CircuitBreaker {
state: Arc<RwLock<CircuitBreakerState>>,
failure_threshold: u32,
recovery_timeout: Duration,
}
#[derive(Debug)]
enum CircuitBreakerState {
Closed { failure_count: u32 },
Open { last_failure: Instant },
HalfOpen,
}
impl CircuitBreaker {
pub async fn call<F, T, E>(&self, operation: F) -> Result<T, CircuitBreakerError<E>>
where
F: Future<Output = Result<T, E>>,
{
let state = self.state.read().await;
match *state {
CircuitBreakerState::Open { last_failure } => {
if last_failure.elapsed() > self.recovery_timeout {
drop(state);
let mut write_state = self.state.write().await;
*write_state = CircuitBreakerState::HalfOpen;
} else {
return Err(CircuitBreakerError::Open);
}
}
CircuitBreakerState::HalfOpen => {
// Пропускаем один запрос для проверки
}
CircuitBreakerState::Closed { .. } => {
// Нормальная работа
}
}
drop(state);
match operation.await {
Ok(result) => {
self.on_success().await;
Ok(result)
}
Err(error) => {
self.on_failure().await;
Err(CircuitBreakerError::Operation(error))
}
}
}
} |
|
Этот механизм спас мою систему не один раз. Когда сервис платежей перегружался, circuit breaker автоматически отключал его и переводил заказы в режим отложенной обработки.
Для координации распределенных транзакций я использую паттерн Saga. В отличие от двухфазного commit, saga более устойчива к сбоям и лучше подходит для микросервисной архитектуры:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| pub struct OrderSaga {
steps: Vec<SagaStep>,
compensations: Vec<CompensationStep>,
}
impl OrderSaga {
pub async fn execute(&mut self) -> Result<SagaResult, SagaError> {
for (index, step) in self.steps.iter().enumerate() {
match step.execute().await {
Ok(result) => {
// Сохраняем компенсирующее действие
if let Some(compensation) = step.compensation() {
self.compensations.push(CompensationStep {
step_index: index,
action: compensation,
});
}
}
Err(error) => {
// Откатываем все выполненные шаги
self.compensate().await?;
return Err(SagaError::StepFailed(index, error));
}
}
}
Ok(SagaResult::Success)
}
async fn compensate(&mut self) -> Result<(), CompensationError> {
// Выполняем компенсацию в обратном порядке
for compensation in self.compensations.iter().rev() {
compensation.action.execute().await?;
}
Ok(())
}
} |
|
Saga позволяет мне разбить сложную операцию создания заказа на несколько независимых шагов: резервирование товара, списание средств, создание заказа, отправка уведомлений. Если что-то идёт не так на любом этапе, система автоматически откатывает все предыдущие действия.
Обработка ошибок между сервисами - это отдельная песня. В традиционных архитектурах ошибки часто теряются или неправильно интерпретируются. В Rust я создал типизированную систему ошибок, которая пробрасывается через границы сервисов:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ServiceError {
NotFound { resource: String, id: String },
ValidationFailed { field: String, message: String },
Unauthorized { required_permission: String },
ExternalServiceUnavailable { service: String, retry_after: Option<Duration> },
InternalError { correlation_id: Uuid },
}
impl ServiceError {
pub fn to_http_status(&self) -> StatusCode {
match self {
ServiceError::NotFound { .. } => StatusCode::NOT_FOUND,
ServiceError::ValidationFailed { .. } => StatusCode::BAD_REQUEST,
ServiceError::Unauthorized { .. } => StatusCode::UNAUTHORIZED,
ServiceError::ExternalServiceUnavailable { .. } => StatusCode::SERVICE_UNAVAILABLE,
ServiceError::InternalError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
}
}
pub fn should_retry(&self) -> bool {
matches!(self, ServiceError::ExternalServiceUnavailable { .. })
}
pub fn correlation_id(&self) -> Option<Uuid> {
match self {
ServiceError::InternalError { correlation_id } => Some(*correlation_id),
_ => None,
}
}
} |
|
Такая типизация ошибок позволяет клиентским сервисам принимать осмысленные решения. Если получили ExternalServiceUnavailable, можно попробовать повторить запрос позже. Если ValidationFailed - значит, нужно исправлять входные данные.
Асинхронность с Tokio открыла для меня совершенно новые возможности. Вместо блокирующих HTTP-запросов между сервисами я использую потоки событий. Каждый сервис подписывается на интересующие его события и реагирует на них асинхронно:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| use tokio::sync::broadcast;
use futures_util::StreamExt;
pub struct EventBus {
sender: broadcast::Sender<DomainEvent>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DomainEvent {
OrderCreated { order_id: OrderId, customer_id: CustomerId, total: Money },
PaymentProcessed { payment_id: PaymentId, order_id: OrderId, amount: Money },
InventoryReserved { product_id: ProductId, quantity: i32, reservation_id: Uuid },
ShippingScheduled { order_id: OrderId, estimated_delivery: DateTime<Utc> },
}
impl EventBus {
pub fn publish(&self, event: DomainEvent) -> Result<(), EventBusError> {
match self.sender.send(event) {
Ok(_) => Ok(()),
Err(_) => Err(EventBusError::NoSubscribers),
}
}
pub fn subscribe(&self) -> broadcast::Receiver<DomainEvent> {
self.sender.subscribe()
}
}
// Пример подписчика в сервисе уведомлений
pub async fn start_notification_handler(event_bus: EventBus) {
let mut receiver = event_bus.subscribe();
while let Ok(event) = receiver.recv().await {
match event {
DomainEvent::OrderCreated { order_id, customer_id, .. } => {
if let Err(e) = send_order_confirmation(customer_id, order_id).await {
tracing::error!("Failed to send order confirmation: {:?}", e);
}
},
DomainEvent::ShippingScheduled { order_id, estimated_delivery } => {
if let Err(e) = send_shipping_notification(order_id, estimated_delivery).await {
tracing::error!("Failed to send shipping notification: {:?}", e);
}
},
_ => {} // Игнорируем неинтересные события
}
}
} |
|
Такая архитектура событий делает систему более отзывчивой. Пользователь получает подтверждение заказа мгновенно, не дожидаясь, пока все сервисы обработают его данные. Обработка происходит в фоне асинхронно.
Особое внимание я уделил retry-механизмам. Сетевые сбои в микросервисной архитектуре - это норма, а не исключение. Поэтому каждый HTTP-клиент обёрнут в retry-обёртку с экспоненциальным backoff:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| pub struct RetryClient {
inner: reqwest::Client,
max_retries: u32,
base_delay: Duration,
}
impl RetryClient {
pub async fn request_with_retry<T>(&self, mut request_builder: RequestBuilder) -> Result<T, RetryError>
where
T: serde::de::DeserializeOwned,
{
let mut last_error = None;
for attempt in 0..=self.max_retries {
let request = request_builder.try_clone().ok_or(RetryError::RequestClone)?;
match request.send().await {
Ok(response) if response.status().is_success() => {
return response.json::<T>().await.map_err(RetryError::Deserialize);
}
Ok(response) if response.status().is_server_error() => {
last_error = Some(RetryError::ServerError(response.status()));
}
Ok(response) => {
// Клиентские ошибки не ретраим
return Err(RetryError::ClientError(response.status()));
}
Err(e) if e.is_timeout() || e.is_connect() => {
last_error = Some(RetryError::Network(e));
}
Err(e) => {
return Err(RetryError::Network(e));
}
}
if attempt < self.max_retries {
let delay = self.base_delay * 2_u32.pow(attempt);
let jitter = rand::random::<f64>() * 0.1; // 10% jitter
let total_delay = delay + Duration::from_millis((delay.as_millis() as f64 * jitter) as u64);
tokio::time::sleep(total_delay).await;
}
}
Err(last_error.unwrap_or(RetryError::MaxRetriesExceeded))
}
} |
|
Jitter в задержке помогает избежать thundering herd - ситуации, когда все клиенты одновременно повторяют запросы после сбоя сервера.
Для кеширования между сервисами я использую Redis, но с умным механизмом инвалидации кеша через события. Когда товар изменяется в каталоге, соответствующее событие автоматически инвалидирует связанные кеш-ключи:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| #[async_trait]
pub trait CacheManager {
async fn get<T>(&self, key: &str) -> Result<Option<T>, CacheError>
where
T: serde::de::DeserializeOwned;
async fn set<T>(&self, key: &str, value: &T, ttl: Duration) -> Result<(), CacheError>
where
T: serde::Serialize;
async fn invalidate(&self, pattern: &str) -> Result<(), CacheError>;
}
pub async fn handle_product_updated_event(
event: ProductUpdatedEvent,
cache: &dyn CacheManager,
) -> Result<(), EventHandlerError> {
// Инвалидируем все связанные кеши
cache.invalidate(&format!("product:{}:*", event.product_id)).await?;
cache.invalidate(&format!("category:{}:*", event.category_id)).await?;
cache.invalidate("products:featured").await?;
Ok(())
} |
|
Весь этот набор паттернов превратил мою микросервисную архитектуру из хаотичной сети HTTP-вызовов в элегантную событийно-ориентированную систему. Каждый сервис знает только о своей предметной области, но может взаимодействовать с остальными через чёткие контракты.
Инфраструктурные решения
Когда архитектура была готова, настало время подумать о том, где и как всё это будет работать. Kubernetes стал естественным выбором - я уже имел с ним опыт, плюс экосистема вокруг него развита лучше всех альтернатив. Но деплой Rust-приложений в Kubernetes имеет свои нюансы. Первая проблема - размер образов. Релизная сборка Rust-приложения весит немного, но образ с Ubuntu получался под 200MB. Пришлось переходить на scratch-образы:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| FROM rust:1.75-slim as builder
WORKDIR /usr/src/app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
# Статическая линковка для scratch-образа
ENV RUSTFLAGS="-C target-feature=+crt-static"
RUN cargo build --release --target x86_64-unknown-linux-musl
FROM scratch
COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/my-service /
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
EXPOSE 8080
CMD ["/my-service"] |
|
Такой образ весит всего 15-20MB и запускается за доли секунды. Единственная сложность - нужно статически линковать все зависимости, но для большинства Rust-библиотек это не проблема.
Service Mesh с Istio поначалу казался излишним усложнением, но на практике оказался спасением. Особенно полезны оказались возможности observability и traffic management. Все метрики сетевого взаимодействия между сервисами Istio собирает автоматически:
| YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: order-service
spec:
hosts:
- order-service
http:
- match:
- headers:
x-user-type:
exact: premium
route:
- destination:
host: order-service
subset: v2
weight: 100
- route:
- destination:
host: order-service
subset: v1
weight: 80
- destination:
host: order-service
subset: v2
weight: 20 |
|
Такая конфигурация позволяет мне тестировать новые версии сервисов на части трафика, не рискуя стабильностью всей системы. Premium-пользователи сразу получают доступ к новому функционалу, а остальные - постепенно.
Автоматическое масштабирование настроил через Horizontal Pod Autoscaler, но с кастомными метриками. Стандартные CPU/memory не всегда корректно отражают нагрузку на Rust-приложения:
| YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: order-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: order-service
minReplicas: 2
maxReplicas: 20
metrics:
- type: Pods
pods:
metric:
name: active_connections
target:
type: AverageValue
averageValue: "100"
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60 |
|
Кастомная метрика active_connections помогает более точно предсказывать нагрузку. Если у сервиса много долгих соединений, CPU может быть низким, но сервис всё равно перегружен.
С безопасностью в облачной среде пришлось повозиться дольше всего. Помимо стандартного TLS между сервисами через Istio, я реализовал JWT-токены для аутентификации и RBAC для авторизации:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
| use jsonwebtoken::{decode, encode, DecodingKey, EncodingKey, Header, Validation};
#[derive(Debug, Serialize, Deserialize)]
pub struct Claims {
pub sub: String, // user_id
pub roles: Vec<String>,
pub permissions: Vec<String>,
pub exp: usize,
pub iat: usize,
pub iss: String, // issuer
}
pub struct JwtService {
encoding_key: EncodingKey,
decoding_key: DecodingKey,
validation: Validation,
}
impl JwtService {
pub fn new(secret: &str) -> Self {
let mut validation = Validation::default();
validation.set_issuer(&["order-system"]);
validation.validate_exp = true;
Self {
encoding_key: EncodingKey::from_secret(secret.as_ref()),
decoding_key: DecodingKey::from_secret(secret.as_ref()),
validation,
}
}
pub fn generate_token(&self, user_id: String, roles: Vec<String>, permissions: Vec<String>) -> Result<String, JwtError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|_| JwtError::TimeError)?
.as_secs() as usize;
let claims = Claims {
sub: user_id,
roles,
permissions,
exp: now + 3600, // 1 час
iat: now,
iss: "order-system".to_string(),
};
encode(&Header::default(), &claims, &self.encoding_key)
.map_err(JwtError::TokenCreation)
}
pub fn validate_token(&self, token: &str) -> Result<Claims, JwtError> {
decode::<Claims>(token, &self.decoding_key, &self.validation)
.map(|token_data| token_data.claims)
.map_err(JwtError::TokenValidation)
}
}
// Middleware для проверки авторизации
pub async fn auth_middleware(
req: ServiceRequest,
credentials: BearerAuth,
jwt_service: web::Data<JwtService>,
) -> Result<ServiceRequest, (actix_web::Error, ServiceRequest)> {
let token = credentials.token();
match jwt_service.validate_token(token) {
Ok(claims) => {
req.extensions_mut().insert(claims);
Ok(req)
}
Err(_) => Err((ErrorUnauthorized("Invalid token"), req))
}
} |
|
Такая система авторизации работает через все микросервисы. Каждый сервис может проверить не только личность пользователя, но и его права на конкретные действия.
Работа с базами данных потребовала особого подхода. Я придерживался принципа database-per-service, но это создало проблемы с distributed transactions. Решил через паттерн Outbox:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| #[derive(Debug, sqlx::FromRow)]
pub struct OutboxEvent {
pub id: Uuid,
pub aggregate_type: String,
pub aggregate_id: String,
pub event_type: String,
pub event_data: serde_json::Value,
pub created_at: DateTime<Utc>,
pub processed_at: Option<DateTime<Utc>>,
}
pub struct OutboxProcessor {
db_pool: PgPool,
event_bus: EventBus,
}
impl OutboxProcessor {
pub async fn process_pending_events(&self) -> Result<(), OutboxError> {
let pending_events = sqlx::query_as::<_, OutboxEvent>(
"SELECT * FROM outbox_events WHERE processed_at IS NULL ORDER BY created_at LIMIT 100"
)
.fetch_all(&self.db_pool)
.await?;
for event in pending_events {
match self.publish_event(&event).await {
Ok(_) => {
self.mark_as_processed(event.id).await?;
}
Err(e) => {
tracing::error!("Failed to publish event {}: {:?}", event.id, e);
// Продолжаем обработку следующих событий
}
}
}
Ok(())
}
async fn publish_event(&self, event: &OutboxEvent) -> Result<(), EventPublishError> {
let domain_event = self.deserialize_event(event)?;
self.event_bus.publish(domain_event).await
}
async fn mark_as_processed(&self, event_id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE outbox_events SET processed_at = $1 WHERE id = $2")
.bind(Utc::now())
.bind(event_id)
.execute(&self.db_pool)
.await?;
Ok(())
}
} |
|
Каждый сервис сохраняет события в свою outbox-таблицу в той же транзакции, что и основные данные. Отдельный процесс периодически публикует эти события в общую шину. Это гарантирует eventual consistency без сложных distributed locks.
Для кеширования выбрал Redis Cluster - он хорошо масштабируется и имеет отличный Rust-клиент:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| use redis::cluster::ClusterClient;
use redis::{AsyncCommands, RedisResult};
pub struct CacheService {
client: ClusterClient,
}
impl CacheService {
pub async fn get_or_set<T, F, Fut>(&self, key: &str, ttl: u64, factory: F) -> RedisResult<T>
where
T: serde::Serialize + serde::de::DeserializeOwned,
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = T>,
{
let mut conn = self.client.get_async_connection().await?;
// Пытаемся получить из кеша
if let Ok(cached) = conn.get::<_, String>(key).await {
if let Ok(value) = serde_json::from_str::<T>(&cached) {
return Ok(value);
}
}
// Если в кеше нет - вычисляем
let value = factory().await;
// Сохраняем в кеш
let serialized = serde_json::to_string(&value).unwrap();
let _: () = conn.set_ex(key, serialized, ttl).await?;
Ok(value)
}
pub async fn invalidate_pattern(&self, pattern: &str) -> RedisResult<()> {
let mut conn = self.client.get_async_connection().await?;
let keys: Vec<String> = conn.keys(pattern).await?;
if !keys.is_empty() {
conn.del(keys).await?;
}
Ok(())
}
} |
|
Паттерн cache-aside с фабричными функциями позволяет элегантно обрабатывать cache misses. Если данных в кеше нет, они автоматически вычисляются и сохраняются.
PostgreSQL использую для транзакционных данных каждого сервиса. Connection pooling настроен через sqlx с careful tuning параметров:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
| use sqlx::{PgPool, PgPoolOptions};
pub async fn create_db_pool(database_url: &str) -> Result<PgPool, sqlx::Error> {
PgPoolOptions::new()
.max_connections(20)
.min_connections(5)
.acquire_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(600))
.max_lifetime(Duration::from_secs(1800))
.connect(database_url)
.await
} |
|
Размер пула подбирал опытным путём. Слишком много соединений создают конкуренцию на уровне PostgreSQL, слишком мало - bottleneck в приложении.
Observability стала для меня критически важной частью инфраструктуры. В монолите ты можешь просто поставить breakpoint и посмотреть, что происходит. В микросервисах запрос проходит через десятки сервисов, и понять, где именно что-то сломалось, без proper observability практически невозможно.
Начал с метрик. Prometheus прекрасно интегрируется с Rust через crate prometheus. Но я пошёл дальше стандартного набора CPU/memory/requests:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| use prometheus::{Counter, Histogram, Gauge, Registry, Encoder, TextEncoder};
lazy_static::lazy_static! {
static ref REGISTRY: Registry = Registry::new();
static ref ORDER_PROCESSING_DURATION: Histogram = Histogram::with_opts(
prometheus::HistogramOpts::new(
"order_processing_duration_seconds",
"Time spent processing orders"
).buckets(vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0])
).unwrap();
static ref SAGA_COMPENSATION_COUNT: Counter = Counter::with_opts(
prometheus::CounterOpts::new(
"saga_compensations_total",
"Number of saga compensations executed"
)
).unwrap();
static ref CIRCUIT_BREAKER_STATE: Gauge = Gauge::with_opts(
prometheus::GaugeOpts::new(
"circuit_breaker_state",
"Circuit breaker state (0=closed, 1=half-open, 2=open)"
)
).unwrap();
}
pub fn init_metrics() {
REGISTRY.register(Box::new(ORDER_PROCESSING_DURATION.clone())).unwrap();
REGISTRY.register(Box::new(SAGA_COMPENSATION_COUNT.clone())).unwrap();
REGISTRY.register(Box::new(CIRCUIT_BREAKER_STATE.clone())).unwrap();
}
pub async fn metrics_handler() -> Result<String, std::fmt::Error> {
let encoder = TextEncoder::new();
let metric_families = REGISTRY.gather();
encoder.encode_to_string(&metric_families)
} |
|
Кастомные метрики дают гораздо больше инсайтов, чем стандартные. Когда вижу рост saga compensations, сразу понимаю, что какой-то внешний сервис начал глючить. Circuit breaker metrics показывают, какие интеграции нестабильны.
Для distributed tracing выбрал OpenTelemetry. Настройка оказалась не самой тривиальной, но результат стоил усилий:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
| use opentelemetry::{
global,
trace::{TraceError, Tracer, Span, SpanKind, Status},
Context, KeyValue,
};
use opentelemetry_jaeger::JaegerPipeline;
use tracing_opentelemetry::OpenTelemetrySpanExt;
pub fn init_tracing() -> Result<(), TraceError> {
let tracer = JaegerPipeline::new()
.with_service_name("order-service")
.with_endpoint("http://jaeger-collector:14268/api/traces")
.with_tags(vec![
KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
KeyValue::new("deployment.environment", std::env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string())),
])
.install_batch(opentelemetry::runtime::Tokio)?;
global::set_tracer_provider(tracer);
Ok(())
}
#[tracing::instrument(
name = "process_order",
fields(
order.id = %order_id,
customer.id = %customer_id,
order.total = %total_amount
),
skip(self)
)]
pub async fn process_order(&self, order_id: OrderId, customer_id: CustomerId, total_amount: Money) -> Result<(), OrderError> {
let span = tracing::Span::current();
span.set_attribute(KeyValue::new("order.items_count", items.len() as i64));
// Создаём child span для каждого шага обработки
let payment_span = span.tracer().start_with_context("process_payment", &Context::current());
payment_span.set_attribute(KeyValue::new("payment.gateway", "stripe"));
match self.payment_service.charge(total_amount, customer_id).await {
Ok(payment_result) => {
payment_span.set_status(Status::Ok);
payment_span.set_attribute(KeyValue::new("payment.id", payment_result.id.to_string()));
}
Err(e) => {
payment_span.set_status(Status::error(format!("Payment failed: {:?}", e)));
payment_span.end();
return Err(OrderError::PaymentFailed(e));
}
}
payment_span.end();
// Аналогично для других шагов...
span.set_status(Status::Ok);
Ok(())
} |
|
Теперь я могу проследить весь путь запроса через систему - от входящего HTTP-запроса до записи в базу данных. Когда что-то тормозит, сразу видно, на каком этапе происходит задержка.
Centralized logging настроил через structured logging с serde_json. Все логи идут в stdout в JSON-формате, а Fluent Bit собирает их и отправляет в Elasticsearch:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
| use tracing::{info, warn, error, debug};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry};
use tracing_subscriber::fmt::layer;
pub fn init_logging() {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));
Registry::default()
.with(env_filter)
.with(
layer()
.json()
.with_current_span(false)
.with_span_list(true)
.with_target(true)
.with_thread_ids(true)
.with_thread_names(true)
)
.with(tracing_opentelemetry::layer())
.init();
}
// Structured logging в действии
#[tracing::instrument(skip(self))]
pub async fn handle_order_created(&self, event: OrderCreatedEvent) -> Result<(), EventHandlerError> {
info!(
order.id = %event.order_id,
customer.id = %event.customer_id,
order.amount = %event.total_amount,
event.timestamp = %event.timestamp,
"Processing order created event"
);
match self.send_confirmation_email(event.customer_id, event.order_id).await {
Ok(_) => {
info!(
order.id = %event.order_id,
action = "email_sent",
"Order confirmation email sent successfully"
);
}
Err(e) => {
error!(
order.id = %event.order_id,
error = %e,
action = "email_failed",
"Failed to send order confirmation email"
);
// Планируем retry через некоторое время
self.schedule_retry(event, Duration::from_secs(300)).await?;
}
}
Ok(())
} |
|
Structured logs позволяют строить сложные поисковые запросы в Kibana. Хочу найти все ошибки платежей за последний час для конкретного клиента? Легко: customer.id: "12345" AND level: "ERROR" AND action: "payment_failed" AND @timestamp: [now-1h TO now].
CI/CD пайплайны настроил через GitLab CI с multi-stage builds и security scanning:
| YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
| stages:
- test
- security
- build
- deploy
variables:
RUST_VERSION: "1.75"
CARGO_HOME: "${CI_PROJECT_DIR}/.cargo"
cache:
paths:
- .cargo/
- target/
test:
stage: test
image: rust:${RUST_VERSION}-slim
before_script:
- apt-get update && apt-get install -y postgresql-client
- cargo install sqlx-cli --no-default-features --features postgres
script:
- cargo fmt -- --check
- cargo clippy -- -D warnings
- cargo test --all-features
- cargo sqlx migrate run --database-url $TEST_DATABASE_URL
- cargo test --test integration_tests -- --test-threads=1
services:
- postgres:13
variables:
POSTGRES_DB: test_db
POSTGRES_USER: test_user
POSTGRES_PASSWORD: test_password
TEST_DATABASE_URL: postgres://test_user:test_password@postgres:5432/test_db
security_scan:
stage: security
image: rust:${RUST_VERSION}-slim
script:
- cargo install cargo-audit
- cargo audit --deny warnings
- cargo install cargo-deny
- cargo deny check advisories
allow_failure: false
build:
stage: build
image: docker:20.10
services:
- docker:20.10-dind
before_script:
- echo $CI_REGISTRY_PASSWORD | docker login -u $CI_REGISTRY_USER --password-stdin $CI_REGISTRY
script:
- docker build --build-arg RUST_VERSION=${RUST_VERSION} -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA $CI_REGISTRY_IMAGE:latest
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
- docker push $CI_REGISTRY_IMAGE:latest
only:
- main
deploy_staging:
stage: deploy
image: bitnami/kubectl:latest
script:
- echo $KUBE_CONFIG | base64 -d > /tmp/kubeconfig
- export KUBECONFIG=/tmp/kubeconfig
- kubectl set image deployment/order-service order-service=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA -n staging
- kubectl rollout status deployment/order-service -n staging --timeout=300s
environment:
name: staging
url: [url]https://api-staging.mycompany.com[/url]
only:
- main
deploy_production:
stage: deploy
image: bitnami/kubectl:latest
script:
- echo $KUBE_CONFIG_PROD | base64 -d > /tmp/kubeconfig
- export KUBECONFIG=/tmp/kubeconfig
- kubectl set image deployment/order-service order-service=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA -n production
- kubectl rollout status deployment/order-service -n production --timeout=600s
environment:
name: production
url: [url]https://api.mycompany.com[/url]
when: manual
only:
- main |
|
Security scanning с cargo-audit и cargo-deny помогает выявлять уязвимости в зависимостях ещё на этапе CI. Несколько раз это спасало от попадания известных CVE в продакшн.
Для blue-green deployments использую Argo Rollouts. Он позволяет автоматически откатываться, если новая версия начинает показывать повышенный error rate:
| YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
name: order-service
spec:
replicas: 5
strategy:
blueGreen:
activeService: order-service
previewService: order-service-preview
autoPromotionEnabled: false
scaleDownDelaySeconds: 30
prePromotionAnalysis:
templates:
- templateName: success-rate
args:
- name: service-name
value: order-service
postPromotionAnalysis:
templates:
- templateName: success-rate
args:
- name: service-name
value: order-service
selector:
matchLabels:
app: order-service
template:
metadata:
labels:
app: order-service
spec:
containers:
- name: order-service
image: myregistry/order-service:latest
ports:
- containerPort: 8080
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5 |
|
Health checks в Rust-приложениях реализовал через отдельные эндпоинты, которые проверяют состояние всех критичных зависимостей:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| use actix_web::{web, HttpResponse};
use sqlx::PgPool;
pub async fn health_check(pool: web::Data<PgPool>) -> HttpResponse {
HttpResponse::Ok().json(serde_json::json!({
"status": "healthy",
"timestamp": chrono::Utc::now().to_rfc3339(),
"service": "order-service",
"version": env!("CARGO_PKG_VERSION")
}))
}
pub async fn readiness_check(
pool: web::Data<PgPool>,
redis_client: web::Data<redis::Client>
) -> HttpResponse {
let mut checks = std::collections::HashMap::new();
// Проверяем PostgreSQL
match sqlx::query("SELECT 1").fetch_one(pool.as_ref()).await {
Ok(_) => { checks.insert("postgres", true); }
Err(_) => { checks.insert("postgres", false); }
}
// Проверяем Redis
match redis_client.get_async_connection().await {
Ok(mut conn) => {
match redis::cmd("PING").query_async::<_, String>(&mut conn).await {
Ok(_) => { checks.insert("redis", true); }
Err(_) => { checks.insert("redis", false); }
}
}
Err(_) => { checks.insert("redis", false); }
}
let all_healthy = checks.values().all(|&v| v);
let status_code = if all_healthy { 200 } else { 503 };
HttpResponse::build(actix_web::http::StatusCode::from_u16(status_code).unwrap())
.json(serde_json::json!({
"status": if all_healthy { "ready" } else { "not_ready" },
"checks": checks,
"timestamp": chrono::Utc::now().to_rfc3339()
}))
} |
|
Kubernetes использует эти эндпоинты для принятия решений о routing трафика. Если readiness check падает, под исключается из балансировки, но остаётся живым. Если liveness check падает - под перезапускается. Resource limits подбирал экспериментально, мониторя реальное потребление в production. Rust-приложения обычно потребляют стабильное количество памяти, поэтому можно ставить довольно жёсткие лимиты без риска OOMKill. В production мониторинг стал для меня настоящим спасением. Grafana дашборды настроил под специфику Rust-приложений - не только стандартные метрики, но и внутренние показатели работы системы:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
| use prometheus::{register_histogram_vec, register_counter_vec, register_gauge_vec};
lazy_static::lazy_static! {
static ref HTTP_REQUEST_DURATION: prometheus::HistogramVec = register_histogram_vec!(
"http_request_duration_seconds",
"HTTP request duration in seconds",
&["method", "endpoint", "status_code"],
vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
).unwrap();
static ref SAGA_STEPS: prometheus::CounterVec = register_counter_vec!(
"saga_steps_total",
"Total number of saga steps executed",
&["saga_type", "step_name", "status"]
).unwrap();
static ref ACTIVE_CONNECTIONS: prometheus::GaugeVec = register_gauge_vec!(
"active_connections",
"Number of active connections",
&["service", "connection_type"]
).unwrap();
}
pub struct MetricsMiddleware;
impl<S> Transform<S, ServiceRequest> for MetricsMiddleware
where
S: Service<ServiceRequest, Response = ServiceResponse, Error = Error> + 'static,
{
type Response = ServiceResponse;
type Error = Error;
type Transform = MetricsMiddlewareService<S>;
type InitError = ();
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(MetricsMiddlewareService { service }))
}
}
pub struct MetricsMiddlewareService<S> {
service: S,
}
impl<S> Service<ServiceRequest> for MetricsMiddlewareService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse, Error = Error>,
{
type Response = ServiceResponse;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&self, req: ServiceRequest) -> Self::Future {
let start_time = std::time::Instant::now();
let method = req.method().to_string();
let path = req.path().to_string();
let fut = self.service.call(req);
Box::pin(async move {
let result = fut.await;
let duration = start_time.elapsed().as_secs_f64();
match &result {
Ok(response) => {
HTTP_REQUEST_DURATION
.with_label_values(&[&method, &path, &response.status().as_str()])
.observe(duration);
}
Err(_) => {
HTTP_REQUEST_DURATION
.with_label_values(&[&method, &path, "500"])
.observe(duration);
}
}
result
})
}
} |
|
Distributed tracing через Jaeger показал мне узкие места, о которых я даже не подозревал. Оказалось, что основная задержка возникала не в обработке бизнес-логики, а в сериализации/десериализации JSON между сервисами. Пришлось оптимизировать структуры данных:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
// Оптимизированная структура для передачи между сервисами
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrderSummary {
#[serde_as(as = "DisplayFromStr")]
pub id: Uuid,
#[serde_as(as = "DisplayFromStr")]
pub customer_id: Uuid,
pub status: OrderStatus,
#[serde(with = "rust_decimal::serde::float")]
pub total_amount: Decimal,
pub items_count: u32,
#[serde(with = "chrono::serde::ts_seconds")]
pub created_at: DateTime<Utc>,
}
// Используем бинарную сериализацию для внутренних вызовов
use bincode::{serialize, deserialize};
pub async fn send_internal_message<T: serde::Serialize>(
client: &reqwest::Client,
url: &str,
payload: &T
) -> Result<(), reqwest::Error> {
let binary_data = serialize(payload).unwrap();
client
.post(url)
.header("Content-Type", "application/octet-stream")
.body(binary_data)
.send()
.await?;
Ok(())
} |
|
Переход на bincode для внутренних коммуникаций дал ускорение сериализации в 3-4 раза по сравнению с JSON. Размер сообщений тоже уменьшился примерно на 40%.
Для backup и disaster recovery настроил автоматические снапшоты баз данных и реплицирование через PostgreSQL streaming replication. Но самое важное - регулярные восстановления из backup'ов в тестовой среде:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
| use sqlx::{PgPool, Row};
use tokio_cron_scheduler::{JobScheduler, Job};
pub struct BackupManager {
primary_pool: PgPool,
replica_pool: PgPool,
}
impl BackupManager {
pub async fn schedule_health_checks(&self) -> Result<(), Box<dyn std::error::Error>> {
let sched = JobScheduler::new().await?;
let primary_pool = self.primary_pool.clone();
sched.add(
Job::new_async("0 */5 * * * *", move |_uuid, _l| {
let pool = primary_pool.clone();
Box::pin(async move {
if let Err(e) = check_database_health(&pool).await {
tracing::error!("Primary database health check failed: {:?}", e);
// Отправляем алерт в Slack/PagerDuty
send_alert("Primary DB unhealthy", &format!("{:?}", e)).await;
}
})
})?
).await?;
let replica_pool = self.replica_pool.clone();
sched.add(
Job::new_async("30 */5 * * * *", move |_uuid, _l| {
let pool = replica_pool.clone();
Box::pin(async move {
match check_replication_lag(&pool).await {
Ok(lag) if lag > Duration::from_secs(60) => {
tracing::warn!("Replication lag is {} seconds", lag.as_secs());
send_alert("High replication lag", &format!("Lag: {}s", lag.as_secs())).await;
}
Err(e) => {
tracing::error!("Failed to check replication lag: {:?}", e);
send_alert("Replica DB check failed", &format!("{:?}", e)).await;
}
_ => {}
}
})
})?
).await?;
sched.start().await?;
Ok(())
}
}
async fn check_database_health(pool: &PgPool) -> Result<(), sqlx::Error> {
// Проверяем простой SELECT
sqlx::query("SELECT 1").fetch_one(pool).await?;
// Проверяем операции записи
sqlx::query("INSERT INTO health_check (timestamp) VALUES ($1)")
.bind(chrono::Utc::now())
.execute(pool)
.await?;
// Очищаем тестовые записи старше часа
sqlx::query("DELETE FROM health_check WHERE timestamp < $1")
.bind(chrono::Utc::now() - chrono::Duration::hours(1))
.execute(pool)
.await?;
Ok(())
}
async fn check_replication_lag(replica_pool: &PgPool) -> Result<Duration, sqlx::Error> {
let row = sqlx::query(
"SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) as lag"
)
.fetch_one(replica_pool)
.await?;
let lag_seconds: f64 = row.get("lag");
Ok(Duration::from_secs_f64(lag_seconds))
} |
|
Несколько раз эти проверки помогали выявить проблемы с репликацией до того, как они превратились в серьёзные incidents. Особенно полезны оказались алерты по lag'у репликации - когда primary база перегружена, replica начинает отставать, и read-only запросы могут возвращать устаревшие данные.
Secret management решил через Kubernetes secrets с шифрованием at rest и Vault для более критичных данных:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| use std::env;
use reqwest::Client;
use serde_json::Value;
pub struct SecretManager {
vault_client: Client,
vault_addr: String,
vault_token: String,
}
impl SecretManager {
pub fn new() -> Result<Self, Box<dyn std::error::Error>> {
let vault_addr = env::var("VAULT_ADDR")
.unwrap_or_else(|_| "http://vault:8200".to_string());
let vault_token = env::var("VAULT_TOKEN")
.expect("VAULT_TOKEN environment variable required");
Ok(Self {
vault_client: Client::new(),
vault_addr,
vault_token,
})
}
pub async fn get_secret(&self, path: &str) -> Result<Value, reqwest::Error> {
let url = format!("{}/v1/secret/data/{}", self.vault_addr, path);
let response = self.vault_client
.get(&url)
.header("X-Vault-Token", &self.vault_token)
.send()
.await?;
if !response.status().is_success() {
return Err(reqwest::Error::from(response.error_for_status().unwrap_err()));
}
let json: Value = response.json().await?;
Ok(json["data"]["data"].clone())
}
pub async fn get_database_url(&self) -> Result<String, Box<dyn std::error::Error>> {
let secrets = self.get_secret("database/postgres").await?;
let username = secrets["username"].as_str().unwrap();
let password = secrets["password"].as_str().unwrap();
let host = secrets["host"].as_str().unwrap();
let database = secrets["database"].as_str().unwrap();
Ok(format!("postgresql://{}:{}@{}/{}", username, password, host, database))
}
} |
|
Vault позволяет ротировать пароли от баз данных автоматически, не перезапуская сервисы. Приложение периодически обновляет connection pool с новыми credentials.
Настройка auto-scaling оказалась довольно специфичной для Rust-приложений. Стандартные метрики CPU плохо отражают реальную нагрузку - Rust настолько эффективен, что даже под высокой нагрузкой CPU может оставаться относительно низким. Пришлось настроить scaling по кастомным метрикам:
| YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: order-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: order-service
minReplicas: 3
maxReplicas: 50
metrics:
- type: Pods
pods:
metric:
name: active_requests_per_pod
target:
type: AverageValue
averageValue: "50"
- type: Pods
pods:
metric:
name: saga_queue_length_per_pod
target:
type: AverageValue
averageValue: "100"
behavior:
scaleUp:
stabilizationWindowSeconds: 30
policies:
- type: Percent
value: 50
periodSeconds: 30
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Pods
value: 1
periodSeconds: 60 |
|
Метрика saga_queue_length особенно важна - когда очередь растёт, нужно быстро добавлять инстансы, иначе пользователи начнут получать timeouts. А active_requests_per_pod помогает предсказывать нагрузку до того, как она станет критичной.
Vertical Pod Autoscaler тоже пришлось настроить отдельно. Rust-приложения имеют предсказуемое потребление памяти, но могут сильно варьировать потребление CPU в зависимости от сложности обрабатываемых запросов:
| YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
name: order-service-vpa
spec:
targetRef:
apiVersion: apps/v1
kind: Deployment
name: order-service
updatePolicy:
updateMode: "Auto"
resourcePolicy:
containerPolicies:
- containerName: order-service
minAllowed:
cpu: 100m
memory: 128Mi
maxAllowed:
cpu: 2000m
memory: 1Gi
controlledResources: ["cpu", "memory"]
controlledValues: RequestsAndLimits |
|
VPA анализирует реальное потребление ресурсов и автоматически корректирует requests/limits. Это особенно полезно на начальной стадии, когда ещё не знаешь точно, сколько ресурсов нужно приложению под реальной нагрузкой.
Подводные камни и их решения
После полутора лет работы с Rust-микросервисами в продакшене я столкнулся с проблемами, которые не описывают в туториалах. Некоторые из них оказались настолько неожиданными, что заставили пересмотреть подход к разработке.
Самая болезненная проблема - время компиляции. В разработке это раздражает, но терпимо. А вот в CI/CD превращается в настоящий кошмар. Холодная сборка моего largest сервиса занимала 25 минут. При частых деплоях это убивало всю продуктивность команды.
Решение нашёл в многоэтапном кешировании. Сначала настроил sccache для кеширования скомпилированных артефактов:
| Rust | 1
2
3
4
5
6
| // .cargo/config.toml
[build]
rustc-wrapper = "sccache"
[target.x86_64-unknown-linux-musl]
linker = "rust-lld" |
|
Затем оптимизировал Dockerfile для максимального переиспользования слоёв:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| FROM rust:1.75-slim as deps-builder
WORKDIR /usr/src/app
# Копируем только файлы зависимостей
COPY Cargo.toml Cargo.lock ./
# Создаём фиктивный main.rs для сборки зависимостей
RUN mkdir src && echo "fn main() {}" > src/main.rs
RUN cargo build --release --target x86_64-unknown-linux-musl
RUN rm -rf src
FROM deps-builder as app-builder
COPY src ./src
# Пересобираем только изменённый код
RUN touch src/main.rs && cargo build --release --target x86_64-unknown-linux-musl
FROM scratch
COPY --from=app-builder /usr/src/app/target/x86_64-unknown-linux-musl/release/service /
CMD ["/service"] |
|
Время сборки сократилось до 3-5 минут для incremental builds. Но это всё равно медленнее Java или Go.
Размер Docker-образов тоже стал проблемой. Debug-версии весили по 150-200MB, что замедляло deployment в Kubernetes. Помимо перехода на scratch-образы, пришлось оптимизировать сами бинарники:
| Rust | 1
2
3
4
5
6
| [profile.release]
opt-level = "z" # Оптимизация по размеру
lto = true
codegen-units = 1
panic = "abort"
strip = "symbols" |
|
Такая конфигурация уменьшила размер бинарников на 40-50%. Правда, время компиляции увеличилось ещё больше.
Интеграция с legacy-системами оказалась сложнее, чем я предполагал. У нас была старая Java-система с SOAP API, и подключение к ней из Rust превратилось в квест. Rust-экосистема для SOAP довольно бедная, пришлось писать собственную обёртку:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| use reqwest::Client;
use serde_xml_rs::{from_str, to_string};
pub struct SoapClient {
client: Client,
endpoint: String,
}
impl SoapClient {
pub async fn call_legacy_service<T, R>(&self, request: T) -> Result<R, SoapError>
where
T: serde::Serialize,
R: serde::de::DeserializeOwned,
{
let soap_body = format!(
r#"<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
<soap:Body>
{}
</soap:Body>
</soap:Envelope>"#,
to_string(&request)?
);
let response = self
.client
.post(&self.endpoint)
.header("Content-Type", "text/xml; charset=utf-8")
.header("SOAPAction", "processOrder")
.body(soap_body)
.send()
.await?;
if !response.status().is_success() {
return Err(SoapError::HttpError(response.status()));
}
let body = response.text().await?;
let parsed: R = from_str(&body)?;
Ok(parsed)
}
} |
|
Эта обёртка работает, но XML-сериализация в Rust намного медленнее JSON. Пришлось добавить агрессивное кеширование результатов.
Управление зависимостями в Rust оказалось палкой о двух концах. С одной стороны, Cargo.lock гарантирует reproducible builds. С другой - обновление зависимостей превращается в русскую рулетку. Несколько раз minor update какого-то crate ломал compilation. Решение нашёл в автоматизированном тестировании зависимостей:
| YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| # .github/workflows/deps-update.yml
name: Dependencies Update
on:
schedule:
- cron: '0 2 * * 1' # Каждый понедельник в 2 утра
jobs:
update-deps:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Install cargo-edit
run: cargo install cargo-edit
- name: Update dependencies
run: |
cargo update
cargo upgrade --dry-run > upgrade-plan.txt
- name: Test updated dependencies
run: |
cargo test --all-features
cargo clippy -- -D warnings
- name: Create PR
if: success()
uses: peter-evans/create-pull-request@v4
with:
title: "Weekly dependencies update"
body: |
Automated dependencies update.
See upgrade-plan.txt for details.
branch: deps/weekly-update
add-paths: |
Cargo.toml
Cargo.lock
upgrade-plan.txt |
|
Такой подход позволяет регулярно обновлять зависимости контролируемо, не накапливая technical debt.
Memory leaks в Rust теоретически невозможны, но на практике можно создать циклические ссылки через Rc/RefCell или забыть drop'ать ресурсы. У меня был случай с WebSocket-соединениями - они накапливались и не освобождались:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| use std::collections::HashMap;
use tokio::sync::RwLock;
use std::sync::Arc;
pub struct ConnectionManager {
connections: Arc<RwLock<HashMap<Uuid, Connection>>>,
}
impl ConnectionManager {
pub async fn add_connection(&self, id: Uuid, conn: Connection) {
let mut connections = self.connections.write().await;
connections.insert(id, conn);
// Периодически чистим мёртвые соединения
tokio::spawn({
let connections = self.connections.clone();
async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
let mut conns = connections.write().await;
conns.retain(|_, conn| !conn.is_closed());
}
}
});
}
} |
|
Без периодической очистки мёртвых соединений memory usage рос до тех пор, пока pod не убивался по OOM.
Graceful shutdown в микросервисной среде критично важен, но реализация в Rust требует аккуратности. Особенно когда есть long-running background tasks:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| use tokio::signal;
use tokio::sync::oneshot;
use tracing::{info, error};
pub async fn run_with_graceful_shutdown<F>(app_future: F)
where
F: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>,
{
let (shutdown_tx, shutdown_rx) = oneshot::channel();
// Обработчик сигналов
tokio::spawn(async move {
match signal::ctrl_c().await {
Ok(_) => {
info!("Received SIGINT, starting graceful shutdown");
let _ = shutdown_tx.send(());
}
Err(e) => {
error!("Failed to install SIGINT handler: {}", e);
}
}
});
// Основное приложение
tokio::select! {
result = app_future => {
if let Err(e) = result {
error!("Application error: {:?}", e);
}
}
_ = shutdown_rx => {
info!("Shutdown signal received, stopping application");
}
}
// Даём время на завершение активных requests
tokio::time::sleep(Duration::from_secs(10)).await;
info!("Graceful shutdown complete");
} |
|
Без правильного graceful shutdown Kubernetes может forcefully убить pod посреди обработки важного запроса.
Cross-compilation для разных архитектур оказался болезненным. Apple M1 машины требуют x86_64 образы для production, но кросс-компиляция постоянно падала с cryptic errors. Пришлось настроить dedicated CI runners для каждой архитектуры.
Error handling между сервисами - отдельная головная боль. Rust заставляет обрабатывать все ошибки явно, но в распределённой системе это приводит к verbose коду:
| Rust | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| pub async fn process_complex_operation(&self) -> Result<ProcessResult, ProcessError> {
let inventory_result = self.inventory_service
.reserve_items(&items)
.await
.map_err(ProcessError::InventoryError)?;
let payment_result = self.payment_service
.charge_customer(&customer, &amount)
.await
.map_err(|e| {
// Компенсируем резерв товара
let _ = self.inventory_service.release_reservation(inventory_result.reservation_id);
ProcessError::PaymentError(e)
})?;
let shipping_result = self.shipping_service
.schedule_delivery(&order)
.await
.map_err(|e| {
// Компенсируем и резерв, и платёж
let _ = self.inventory_service.release_reservation(inventory_result.reservation_id);
let _ = self.payment_service.refund(payment_result.transaction_id);
ProcessError::ShippingError(e)
})?;
Ok(ProcessResult {
inventory: inventory_result,
payment: payment_result,
shipping: shipping_result,
})
} |
|
Такой код быстро превращается в спагетти. Пришлось вынести компенсирующие действия в отдельную Saga, как я описывал ранее.
Отладка распределённых систем на Rust требует специфических навыков. gdb работает, но не так удобно как с C++. tokio-console помогает отлаживать async код, но setup довольно сложный:
| Rust | 1
2
3
4
5
| [dependencies]
console-subscriber = { version = "0.2", features = ["parking_lot"] }
[profile.dev]
debug = true # Нужно для tokio-console |
|
| Rust | 1
2
3
4
5
6
7
8
| fn main() {
console_subscriber::init();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async_main())
} |
|
Но даже с tokio-console понять, почему конкретный async task завис, часто непросто.
Несмотря на все сложности, я не жалею о выборе Rust для микросервисной архитектуры. Большинство проблем решаются tooling'ом и процессами. А надёжность и производительность в production компенсируют все трудности разработки.
Система защиты в облачных платформах Всем привет! Очень нужна помощь, пишу дипломную работу о проектировании систем защиты информации на... Плагин Rust Помогите с плагином! Суть плагина такова игрок стреляет с лука и часть стрел ему возвращается в... Есть ли у rust будущее? Вот вчера общался с 1 товарищем на тему перспектив С++, он меня убеждает что язык скоро будет... [Rust] Непонятно поведение Пытаюсь считать строку с клавы в качестве String, парсить её и получить целочисленное значение,... [Rust] UDP socket error Пытаюсь по udp попробовать передать что-нибудь, но возникает ошибка в err , с чем связано не... [Rust] Time Подскажите как узнать время в Rust.
//Rust
extern crate time;
fn main() {
let now =... Ошибка при первом запуске Rust в VisualStudio Скачал и установил дополнение с сайта студии, при запуске программы выдает:
Сам exeшник в debug... C++ снова хоронят: Rust - серебряная пуля или просто ещё один язык программирования? https://techcrunch.com/2017/07/16/death-to-c/
В общем, если совсем вкратце, чел говорит, что... Rust на linux ( ubuntu ) Доброго времени суток!
Помогите решить вопрос с игрой Rust.
Раньше играл нормально без... Rust+assembler Как связать язык rust и ассемблер не используя ассемблерные вставки(неудобно использовать их в... [Rust] impl для примитивного типа Привет всем!
Решаю задачку на codewars.com, а там, видимо, rust более ранней версии чем свежий,... Расскажите о своём опыте программирования на Rust Доброе утро!
Расскажите, пожалуйста, о своём опыте программирования на Rust. Можно в сравнении с...
|