Форум программистов, компьютерный форум, киберфорум
bytestream
Войти
Регистрация
Восстановить пароль
Оценить эту запись

Как написать микросервис на C# с Kafka, MediatR, Redis и GitLab CI/CD

Запись от bytestream размещена 15.01.2025 в 15:27. Обновил(-а) bytestream 15.01.2025 в 15:28
Показов 2194 Комментарии 0

Нажмите на изображение для увеличения
Название: 0c7f96e5-ddba-48f9-8a4b-ce7012cd5b8b.png
Просмотров: 28
Размер:	2.49 Мб
ID:	9208
В современной разработке программного обеспечения микросервисная архитектура стала стандартом де-факто для создания масштабируемых и гибких приложений. Этот подход позволяет разделить сложную систему на небольшие, независимые компоненты, каждый из которых отвечает за конкретную бизнес-функцию. В данной статье мы подробно рассмотрим процесс создания микросервиса с использованием современного технологического стека, включающего C#, Apache Kafka, Redis и GitLab CI/CD.

Выбранный технологический стек представляет собой мощное сочетание инструментов, каждый из которых привносит свои уникальные преимущества в разработку. C# как основной язык программирования обеспечивает надежную типизацию, высокую производительность и богатую экосистему библиотек. Платформа .NET предоставляет множество готовых решений для создания веб-приложений и микросервисов, включая встроенную поддержку REST API и механизмы dependency injection.

Apache Kafka выступает в роли распределенной системы обмена сообщениями, обеспечивая надежную асинхронную коммуникацию между микросервисами. Эта технология позволяет обрабатывать большие потоки данных в реальном времени, гарантируя их доставку и сохраняя порядок сообщений. Использование Kafka особенно важно в микросервисной архитектуре, где необходимо обеспечить слабую связанность компонентов и устойчивость к сбоям.

Redis как распределенное хранилище данных типа "ключ-значение" предоставляет высокопроизводительное решение для кэширования и временного хранения данных. Его использование позволяет значительно снизить нагрузку на основную базу данных и ускорить работу приложения за счет хранения часто запрашиваемых данных в оперативной памяти.

Автоматизация процессов разработки, тестирования и развертывания обеспечивается с помощью GitLab CI/CD. Эта платформа позволяет настроить непрерывную интеграцию и доставку кода, автоматизировать процессы тестирования и развертывания, что существенно ускоряет цикл разработки и повышает качество конечного продукта.

Для успешной работы с описанным стеком технологий необходимо подготовить соответствующее окружение разработки. Это включает установку .NET SDK, настройку IDE (рекомендуется использовать Visual Studio или JetBrains Rider), установку Docker для контейнеризации приложения и его зависимостей. Также потребуется настроить локальное окружение для работы с Kafka и Redis, что может быть реализовано как с помощью Docker-контейнеров, так и путем локальной установки этих сервисов.

В процессе разработки микросервиса особое внимание будет уделено принципам SOLID, чистой архитектуры и лучшим практикам разработки программного обеспечения. Это позволит создать легко поддерживаемый, масштабируемый и надежный сервис, готовый к промышленной эксплуатации. Реализация будет включать полный набор функциональности: от базовой структуры приложения до настройки мониторинга и логирования в продакшн-окружении.

Разработка базовой структуры микросервиса



Создание микросервиса начинается с организации правильной структуры проекта, которая обеспечит удобство разработки и поддержки кода в будущем. Для начала необходимо создать новый проект с использованием шаблона ASP.NET Core Web API. Это можно сделать через командную строку с помощью команды dotnet new webapi -n MicroserviceName или через интерфейс Visual Studio. Созданный проект будет содержать базовую структуру для разработки REST API.

После создания проекта следует организовать архитектуру в соответствии с принципами чистой архитектуры. Для этого создаются основные слои приложения: Domain, Application, Infrastructure и API. Каждый слой реализуется как отдельный проект в решении, что обеспечивает четкое разделение ответственности и соблюдение принципа инверсии зависимостей.

Слой Domain содержит бизнес-модели и интерфейсы основных сервисов. Здесь определяются сущности, которые представляют основные концепции предметной области. Например, если микросервис обрабатывает заказы, можно создать следующую модель:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Order
{
    public Guid Id { get; private set; }
    public string CustomerName { get; private set; }
    public decimal TotalAmount { get; private set; }
    public OrderStatus Status { get; private set; }
    public DateTime CreatedAt { get; private set; }
 
    public Order(string customerName, decimal totalAmount)
    {
        Id = Guid.NewGuid();
        CustomerName = customerName;
        TotalAmount = totalAmount;
        Status = OrderStatus.Created;
        CreatedAt = DateTime.UtcNow;
    }
 
    public void UpdateStatus(OrderStatus newStatus)
    {
        Status = newStatus;
    }
}
Слой Application содержит бизнес-логику приложения и определяет способы взаимодействия с внешним миром. Здесь реализуются сервисы, которые обрабатывают команды и запросы, используя паттерн CQRS (Command Query Responsibility Segregation). Для организации команд и запросов используется библиотека MediatR, которая позволяет реализовать слабосвязанную архитектуру:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CreateOrderCommand : IRequest<Guid>
{
    public string CustomerName { get; set; }
    public decimal TotalAmount { get; set; }
}
 
public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, Guid>
{
    private readonly IOrderRepository _orderRepository;
 
    public CreateOrderCommandHandler(IOrderRepository orderRepository)
    {
        _orderRepository = orderRepository;
    }
 
    public async Task<Guid> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
    {
        var order = new Order(request.CustomerName, request.TotalAmount);
        await _orderRepository.AddAsync(order, cancellationToken);
        return order.Id;
    }
}
Слой Infrastructure отвечает за взаимодействие с внешними системами и реализацию интерфейсов, определенных в слое Domain. Здесь размещаются реализации репозиториев, сервисов для работы с базой данных, кэшем и очередями сообщений. Важно соблюдать принцип инкапсуляции деталей реализации:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class OrderRepository : IOrderRepository
{
    private readonly ApplicationDbContext _context;
 
    public OrderRepository(ApplicationDbContext context)
    {
        _context = context;
    }
 
    public async Task<Order> GetByIdAsync(Guid id, CancellationToken cancellationToken)
    {
        return await _context.Orders.FindAsync(new object[] { id }, cancellationToken);
    }
 
    public async Task AddAsync(Order order, CancellationToken cancellationToken)
    {
        await _context.Orders.AddAsync(order, cancellationToken);
        await _context.SaveChangesAsync(cancellationToken);
    }
}
Для настройки dependency injection и конфигурации сервисов используется встроенный в ASP.NET Core механизм внедрения зависимостей. В файле Program.cs регистрируются все необходимые сервисы и настраивается конфигурация приложения:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var builder = WebApplication.CreateBuilder(args);
 
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
 
// Регистрация сервисов
builder.Services.AddScoped<IOrderRepository, OrderRepository>();
builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(CreateOrderCommand).Assembly));
 
// Настройка DbContext
builder.Services.AddDbContext<ApplicationDbContext>(options =>
    options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection")));
 
var app = builder.Build();
После настройки базовой структуры проекта необходимо реализовать REST API endpoints, которые будут обрабатывать входящие HTTP-запросы. В ASP.NET Core контроллеры являются основным механизмом для обработки HTTP-запросов. Создадим контроллер для работы с заказами:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
    private readonly IMediator _mediator;
 
    public OrdersController(IMediator mediator)
    {
        _mediator = mediator;
    }
 
    [HttpPost]
    public async Task<ActionResult<Guid>> CreateOrder([FromBody] CreateOrderCommand command)
    {
        var orderId = await _mediator.Send(command);
        return CreatedAtAction(nameof(GetOrder), new { id = orderId }, orderId);
    }
 
    [HttpGet("{id}")]
    public async Task<ActionResult<OrderDto>> GetOrder(Guid id)
    {
        var query = new GetOrderByIdQuery { Id = id };
        var result = await _mediator.Send(query);
        return result != null ? Ok(result) : NotFound();
    }
}
Для обеспечения безопасности и валидации входящих запросов используется механизм модельной валидации. Создадим валидаторы с помощью библиотеки FluentValidation:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CreateOrderCommandValidator : AbstractValidator<CreateOrderCommand>
{
    public CreateOrderCommandValidator()
    {
        RuleFor(x => x.CustomerName)
            .NotEmpty()
            .MinimumLength(2)
            .MaximumLength(100);
 
        RuleFor(x => x.TotalAmount)
            .GreaterThan(0)
            .LessThan(1000000);
    }
}
Важным аспектом разработки микросервиса является обработка ошибок. Создадим систему обработки исключений, которая будет преобразовывать различные типы ошибок в соответствующие HTTP-ответы:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ExceptionHandlingMiddleware
{
    private readonly RequestDelegate _next;
    private readonly ILogger<ExceptionHandlingMiddleware> _logger;
 
    public ExceptionHandlingMiddleware(RequestDelegate next, ILogger<ExceptionHandlingMiddleware> logger)
    {
        _next = next;
        _logger = logger;
    }
 
    public async Task InvokeAsync(HttpContext context)
    {
        try
        {
            await _next(context);
        }
        catch (ValidationException ex)
        {
            _logger.LogWarning(ex, "Validation error occurred");
            context.Response.StatusCode = StatusCodes.Status400BadRequest;
            await HandleExceptionAsync(context, ex);
        }
        catch (NotFoundException ex)
        {
            _logger.LogWarning(ex, "Resource not found");
            context.Response.StatusCode = StatusCodes.Status404NotFound;
            await HandleExceptionAsync(context, ex);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "An unexpected error occurred");
            context.Response.StatusCode = StatusCodes.Status500InternalServerError;
            await HandleExceptionAsync(context, ex);
        }
    }
 
    private static Task HandleExceptionAsync(HttpContext context, Exception exception)
    {
        var result = JsonSerializer.Serialize(new
        {
            Status = context.Response.StatusCode,
            Message = exception.Message
        });
 
        context.Response.ContentType = "application/json";
        return context.Response.WriteAsync(result);
    }
}

Swagger



Для документирования API используется Swagger, который автоматически генерирует интерактивную документацию на основе кода. Настроим Swagger с дополнительной информацией о микросервисе:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
builder.Services.AddSwaggerGen(c =>
{
    c.SwaggerDoc("v1", new OpenApiInfo
    {
        Title = "Orders Microservice API",
        Version = "v1",
        Description = "API для управления заказами",
        Contact = new OpenApiContact
        {
            Name = "Team Name",
            Email = "team@example.com"
        }
    });
 
    c.AddSecurityDefinition("Bearer", new OpenApiSecurityScheme
    {
        Description = "JWT Authorization header using the Bearer scheme",
        Name = "Authorization",
        In = ParameterLocation.Header,
        Type = SecuritySchemeType.ApiKey,
        Scheme = "Bearer"
    });
 
    c.AddSecurityRequirement(new OpenApiSecurityRequirement
    {
        {
            new OpenApiSecurityScheme
            {
                Reference = new OpenApiReference
                {
                    Type = ReferenceType.SecurityScheme,
                    Id = "Bearer"
                }
            },
            Array.Empty<string>()
        }
    });
});

JWT-токены



Для обеспечения безопасности API реализуем аутентификацию и авторизацию с использованием JWT-токенов:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
    .AddJwtBearer(options =>
    {
        options.TokenValidationParameters = new TokenValidationParameters
        {
            ValidateIssuer = true,
            ValidateAudience = true,
            ValidateLifetime = true,
            ValidateIssuerSigningKey = true,
            ValidIssuer = builder.Configuration["Jwt:Issuer"],
            ValidAudience = builder.Configuration["Jwt:Audience"],
            IssuerSigningKey = new SymmetricSecurityKey(
                Encoding.UTF8.GetBytes(builder.Configuration["Jwt:Key"]))
        };
    });
 
builder.Services.AddAuthorization(options =>
{
    options.AddPolicy("RequireAdminRole", policy =>
        policy.RequireRole("Admin"));
});

Kafka - брокер сообщений
Доброго времени суток! Подскажите кто-то работал с Kafka? Можете пожалуйста подкинуть литературу и желательно примеры?)

Consumer apache kafka
Доброго времени суток уважаемые форумчане. С apache kafka работаю совсем недавно и столкнулся с неприятной проблемой. Работу с kafka осуществляю...

Получение нескольких сообщений потребителем Apache Kafka
Всем привет! Мой производитель отправляет много сообщений apache kafka, и я предполагал, что apache kafka объединит их в пакеты. Я предполагал, что...

Внесение и считывание данных из БД Redis
Здравствуйте! Есть задание - создать консольное приложение, подключающееся к БД Redis и добавляющее значения разных типов: string, int, byte,...


Интеграция с Apache Kafka



Интеграция микросервиса с Apache Kafka начинается с установки и настройки брокера сообщений. Для локальной разработки удобно использовать Docker-контейнер с Kafka. Создадим файл docker-compose.yml, который будет содержать конфигурацию для Kafka и необходимого ей ZooKeeper:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "orders:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
Для работы с Kafka в .NET проекте используется библиотека Confluent.Kafka. Добавим её через NuGet и создадим конфигурационный класс для настройки подключения:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class KafkaSettings
{
    public string BootstrapServers { get; set; }
    public string GroupId { get; set; }
    public bool EnableAutoCommit { get; set; }
    public string OrdersTopic { get; set; }
}
 
public static class KafkaExtensions
{
    public static IServiceCollection AddKafkaServices(this IServiceCollection services, IConfiguration configuration)
    {
        services.Configure<KafkaSettings>(configuration.GetSection("Kafka"));
        
        services.AddSingleton<IProducerFactory>(sp =>
        {
            var settings = sp.GetRequiredService<IOptions<KafkaSettings>>().Value;
            return new ProducerFactory(new ProducerConfig
            {
                BootstrapServers = settings.BootstrapServers,
                EnableDeliveryReports = true,
                ClientId = $"orders-producer-{Guid.NewGuid()}"
            });
        });
 
        return services;
    }
}
Реализуем сервис для публикации сообщений в Kafka. Важно обеспечить надежную доставку сообщений и обработку ошибок:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class KafkaProducerService : IMessageProducer
{
    private readonly IProducer<string, string> _producer;
    private readonly ILogger<KafkaProducerService> _logger;
    private readonly string _topic;
 
    public KafkaProducerService(
        IProducerFactory producerFactory,
        IOptions<KafkaSettings> settings,
        ILogger<KafkaProducerService> logger)
    {
        _producer = producerFactory.CreateProducer<string, string>();
        _logger = logger;
        _topic = settings.Value.OrdersTopic;
    }
 
    public async Task ProduceMessageAsync<T>(string key, T message)
    {
        try
        {
            var serializedMessage = JsonSerializer.Serialize(message);
            var kafkaMessage = new Message<string, string>
            {
                Key = key,
                Value = serializedMessage,
                Timestamp = new Timestamp(DateTime.UtcNow)
            };
 
            var deliveryResult = await _producer.ProduceAsync(_topic, kafkaMessage);
            
            if (deliveryResult.Status == PersistenceStatus.Persisted)
            {
                _logger.LogInformation(
                    "Message successfully delivered to topic {Topic} at partition {Partition} with offset {Offset}",
                    deliveryResult.Topic, deliveryResult.Partition, deliveryResult.Offset);
            }
        }
        catch (ProduceException<string, string> ex)
        {
            _logger.LogError(ex, "Error producing message to Kafka");
            throw new MessagePublishException("Failed to publish message to Kafka", ex);
        }
    }
}
Для потребления сообщений из Kafka создадим базовый класс консьюмера, который будет обрабатывать сообщения в фоновом режиме:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public abstract class KafkaConsumerService : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly ILogger _logger;
    private readonly string _topic;
 
    protected KafkaConsumerService(
        IOptions<KafkaSettings> settings,
        ILogger logger)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = settings.Value.BootstrapServers,
            GroupId = settings.Value.GroupId,
            EnableAutoCommit = settings.Value.EnableAutoCommit,
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
 
        _consumer = new ConsumerBuilder<string, string>(config)
            .SetErrorHandler((_, e) => 
                logger.LogError("Error: {Error}", e.Reason))
            .Build();
 
        _topic = settings.Value.OrdersTopic;
        _logger = logger;
    }
 
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            _consumer.Subscribe(_topic);
 
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    var consumeResult = _consumer.Consume(stoppingToken);
 
                    if (consumeResult?.Message == null) continue;
 
                    await ProcessMessageAsync(consumeResult.Message, stoppingToken);
 
                    if (!consumeResult.Message.Headers.TryGetLastBytes("RetryCount", out var retryCountBytes))
                    {
                        _consumer.Commit(consumeResult);
                        continue;
                    }
 
                    var retryCount = BitConverter.ToInt32(retryCountBytes);
                    if (retryCount >= 3)
                    {
                        await ProcessFailedMessageAsync(consumeResult.Message, stoppingToken);
                        _consumer.Commit(consumeResult);
                    }
                }
                catch (ConsumeException ex)
                {
                    _logger.LogError(ex, "Error occurred during message consumption");
                    await Task.Delay(1000, stoppingToken);
                }
            }
        }
        catch (OperationCanceledException)
        {
            _logger.LogInformation("Consumer service is shutting down");
        }
        finally
        {
            _consumer.Close();
        }
    }
 
    protected abstract Task ProcessMessageAsync(Message<string, string> message, CancellationToken cancellationToken);
    protected abstract Task ProcessFailedMessageAsync(Message<string, string> message, CancellationToken cancellationToken);
}
При обработке сообщений важно учитывать возможные сбои и реализовать механизм повторных попыток. Создадим конкретную реализацию консьюмера для обработки заказов:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class OrdersConsumerService : KafkaConsumerService
{
    private readonly IOrderService _orderService;
    private readonly ILogger<OrdersConsumerService> _logger;
 
    public OrdersConsumerService(
        IOptions<KafkaSettings> settings,
        IOrderService orderService,
        ILogger<OrdersConsumerService> logger)
        : base(settings, logger)
    {
        _orderService = orderService;
        _logger = logger;
    }
 
    protected override async Task ProcessMessageAsync(Message<string, string> message, CancellationToken cancellationToken)
    {
        try
        {
            var order = JsonSerializer.Deserialize<Order>(message.Value);
            await _orderService.ProcessOrderAsync(order, cancellationToken);
            
            _logger.LogInformation(
                "Successfully processed order {OrderId} from message at offset {Offset}",
                order.Id, message.Offset);
        }
        catch (JsonException ex)
        {
            _logger.LogError(ex, "Failed to deserialize message");
            throw;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing message");
            await RetryMessageAsync(message);
        }
    }
 
    private async Task RetryMessageAsync(Message<string, string> message)
    {
        var retryCount = GetRetryCount(message);
        var headers = new Headers();
        headers.Add("RetryCount", BitConverter.GetBytes(retryCount + 1));
        
        await Task.Delay(CalculateRetryDelay(retryCount));
    }
 
    private int GetRetryCount(Message<string, string> message)
    {
        if (!message.Headers.TryGetLastBytes("RetryCount", out var retryCountBytes))
            return 0;
 
        return BitConverter.ToInt32(retryCountBytes);
    }
 
    private static TimeSpan CalculateRetryDelay(int retryCount)
    {
        return TimeSpan.FromSeconds(Math.Pow(2, retryCount));
    }
 
    protected override async Task ProcessFailedMessageAsync(Message<string, string> message, CancellationToken cancellationToken)
    {
        _logger.LogError("Message processing failed after maximum retries");
        // Реализация обработки окончательно неудачных сообщений
        // Например, сохранение в dead letter queue или уведомление администратора
    }
}
Для обеспечения надежной обработки сообщений в Kafka важно реализовать механизм сериализации и десериализации данных. Создадим собственные сериализаторы, которые будут обрабатывать различные типы сообщений:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class JsonSerializer<T> : ISerializer<T>
{
    public byte[] Serialize(T data, SerializationContext context)
    {
        if (data == null) return null;
        return JsonSerializer.SerializeToUtf8Bytes(data, typeof(T), new JsonSerializerOptions
        {
            WriteIndented = false,
            PropertyNamingPolicy = JsonNamingPolicy.CamelCase
        });
    }
}
 
public class JsonDeserializer<T> : IDeserializer<T>
{
    public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        if (isNull || data.IsEmpty) return default;
        return JsonSerializer.Deserialize<T>(data, new JsonSerializerOptions
        {
            PropertyNamingPolicy = JsonNamingPolicy.CamelCase
        });
    }
}

Batch Processing



Для обработки больших объемов сообщений реализуем паттерн Batch Processing, который позволит эффективно обрабатывать группы сообщений:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class BatchMessageProcessor<T>
{
    private readonly ILogger<BatchMessageProcessor<T>> _logger;
    private readonly int _batchSize;
    private readonly TimeSpan _maxBatchWindow;
    private readonly List<T> _messages;
    private readonly SemaphoreSlim _semaphore;
    private readonly Func<IEnumerable<T>, Task> _processBatchAsync;
 
    public BatchMessageProcessor(
        ILogger<BatchMessageProcessor<T>> logger,
        int batchSize,
        TimeSpan maxBatchWindow,
        Func<IEnumerable<T>, Task> processBatchAsync)
    {
        _logger = logger;
        _batchSize = batchSize;
        _maxBatchWindow = maxBatchWindow;
        _messages = new List<T>();
        _semaphore = new SemaphoreSlim(1, 1);
        _processBatchAsync = processBatchAsync;
    }
 
    public async Task AddMessageAsync(T message)
    {
        await _semaphore.WaitAsync();
        try
        {
            _messages.Add(message);
            if (_messages.Count >= _batchSize)
            {
                await ProcessBatchAsync();
            }
        }
        finally
        {
            _semaphore.Release();
        }
    }
 
    private async Task ProcessBatchAsync()
    {
        if (!_messages.Any()) return;
 
        var messagesToProcess = _messages.ToList();
        _messages.Clear();
 
        try
        {
            await _processBatchAsync(messagesToProcess);
            _logger.LogInformation("Successfully processed batch of {Count} messages", messagesToProcess.Count);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing batch of {Count} messages", messagesToProcess.Count);
            throw;
        }
    }
}

Circuit Breaker



Для обеспечения отказоустойчивости реализуем механизм Circuit Breaker, который будет защищать систему от каскадных сбоев:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class KafkaCircuitBreaker
{
    private readonly ILogger<KafkaCircuitBreaker> _logger;
    private int _failureCount;
    private DateTime _lastFailureTime;
    private readonly int _threshold;
    private readonly TimeSpan _resetTimeout;
    private CircuitState _state;
 
    public KafkaCircuitBreaker(
        ILogger<KafkaCircuitBreaker> logger,
        int threshold = 5,
        TimeSpan? resetTimeout = null)
    {
        _logger = logger;
        _threshold = threshold;
        _resetTimeout = resetTimeout ?? TimeSpan.FromSeconds(60);
        _state = CircuitState.Closed;
    }
 
    public async Task ExecuteAsync(Func<Task> action)
    {
        if (_state == CircuitState.Open)
        {
            if (DateTime.UtcNow - _lastFailureTime > _resetTimeout)
            {
                _state = CircuitState.HalfOpen;
                _logger.LogInformation("Circuit breaker entering half-open state");
            }
            else
            {
                throw new CircuitBreakerOpenException("Circuit breaker is open");
            }
        }
 
        try
        {
            await action();
            if (_state == CircuitState.HalfOpen)
            {
                _state = CircuitState.Closed;
                _failureCount = 0;
                _logger.LogInformation("Circuit breaker reset to closed state");
            }
        }
        catch (Exception ex)
        {
            HandleFailure(ex);
            throw;
        }
    }
 
    private void HandleFailure(Exception ex)
    {
        _failureCount++;
        _lastFailureTime = DateTime.UtcNow;
 
        if (_failureCount >= _threshold)
        {
            _state = CircuitState.Open;
            _logger.LogWarning(ex, "Circuit breaker opened after {FailureCount} failures", _failureCount);
        }
    }
 
    private enum CircuitState
    {
        Closed,
        Open,
        HalfOpen
    }
}
Для мониторинга состояния Kafka и производительности обработки сообщений реализуем систему метрик:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class KafkaMetricsCollector
{
    private readonly IMetricFactory _metricFactory;
    private readonly Counter _messageProcessedCounter;
    private readonly Counter _messageFailedCounter;
    private readonly Histogram _messageProcessingDuration;
 
    public KafkaMetricsCollector(IMetricFactory metricFactory)
    {
        _metricFactory = metricFactory;
        
        _messageProcessedCounter = _metricFactory.CreateCounter(
            "kafka_messages_processed_total",
            "Total number of successfully processed Kafka messages");
            
        _messageFailedCounter = _metricFactory.CreateCounter(
            "kafka_messages_failed_total",
            "Total number of failed Kafka message processing attempts");
            
        _messageProcessingDuration = _metricFactory.CreateHistogram(
            "kafka_message_processing_duration_seconds",
            "Duration of message processing in seconds");
    }
 
    public void RecordMessageProcessed()
    {
        _messageProcessedCounter.Increment();
    }
 
    public void RecordMessageFailed()
    {
        _messageFailedCounter.Increment();
    }
 
    public IDisposable MeasureProcessingDuration()
    {
        return _messageProcessingDuration.NewTimer();
    }
}
Для управления партициями и репликами в Kafka реализуем административный сервис:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class KafkaAdminService
{
    private readonly IAdminClient _adminClient;
    private readonly ILogger<KafkaAdminService> _logger;
 
    public KafkaAdminService(
        AdminClientConfig config,
        ILogger<KafkaAdminService> logger)
    {
        _adminClient = new AdminClientBuilder(config).Build();
        _logger = logger;
    }
 
    public async Task CreateTopicAsync(string topicName, int numPartitions, short replicationFactor)
    {
        try
        {
            await _adminClient.CreateTopicsAsync(new TopicSpecification[]
            {
                new TopicSpecification
                {
                    Name = topicName,
                    NumPartitions = numPartitions,
                    ReplicationFactor = replicationFactor
                }
            });
            
            _logger.LogInformation(
                "Created topic {TopicName} with {NumPartitions} partitions and replication factor {ReplicationFactor}",
                topicName, numPartitions, replicationFactor);
        }
        catch (CreateTopicsException ex)
        {
            _logger.LogError(ex, "Failed to create topic {TopicName}", topicName);
            throw;
        }
    }
 
    public async Task DeleteTopicAsync(string topicName)
    {
        try
        {
            await _adminClient.DeleteTopicsAsync(new[] { topicName });
            _logger.LogInformation("Deleted topic {TopicName}", topicName);
        }
        catch (DeleteTopicsException ex)
        {
            _logger.LogError(ex, "Failed to delete topic {TopicName}", topicName);
            throw;
        }
    }
}

Работа с Redis



Redis играет ключевую роль в архитектуре микросервиса, обеспечивая высокопроизводительное кэширование данных и снижая нагрузку на основное хранилище. Для интеграции Redis в наш микросервис начнем с установки и настройки необходимых компонентов. В проект добавляются пакеты StackExchange.Redis для базового взаимодействия с Redis и Microsoft.Extensions.Caching.StackExchangeRedis для интеграции с системой кэширования ASP.NET Core.

Создадим конфигурационный класс для настройки подключения к Redis:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RedisSettings
{
    public string ConnectionString { get; set; }
    public string InstanceName { get; set; }
    public int DefaultDatabase { get; set; }
    public TimeSpan DefaultExpiration { get; set; }
}
 
public static class RedisExtensions
{
    public static IServiceCollection AddRedisServices(
        this IServiceCollection services, 
        IConfiguration configuration)
    {
        var redisSettings = configuration
            .GetSection("Redis")
            .Get<RedisSettings>();
 
        services.AddStackExchangeRedisCache(options =>
        {
            options.Configuration = redisSettings.ConnectionString;
            options.InstanceName = redisSettings.InstanceName;
        });
 
        services.AddSingleton<IConnectionMultiplexer>(sp =>
            ConnectionMultiplexer.Connect(redisSettings.ConnectionString));
 
        services.AddSingleton<ICacheService, RedisCacheService>();
        
        return services;
    }
}
Реализуем сервис кэширования, который будет абстрагировать работу с Redis и предоставлять удобный интерфейс для остальных компонентов системы:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public interface ICacheService
{
    Task<T> GetOrSetAsync<T>(
        string key,
        Func<Task<T>> factory,
        TimeSpan? expiration = null);
    Task<T> GetAsync<T>(string key);
    Task SetAsync<T>(string key, T value, TimeSpan? expiration = null);
    Task RemoveAsync(string key);
    Task<bool> ExistsAsync(string key);
}
 
public class RedisCacheService : ICacheService
{
    private readonly IConnectionMultiplexer _redis;
    private readonly ILogger<RedisCacheService> _logger;
    private readonly RedisSettings _settings;
    private readonly IDatabase _database;
 
    public RedisCacheService(
        IConnectionMultiplexer redis,
        IOptions<RedisSettings> settings,
        ILogger<RedisCacheService> logger)
    {
        _redis = redis;
        _logger = logger;
        _settings = settings.Value;
        _database = _redis.GetDatabase(_settings.DefaultDatabase);
    }
 
    public async Task<T> GetOrSetAsync<T>(
        string key,
        Func<Task<T>> factory,
        TimeSpan? expiration = null)
    {
        var value = await GetAsync<T>(key);
        if (value != null)
        {
            _logger.LogDebug("Cache hit for key: {Key}", key);
            return value;
        }
 
        value = await factory();
        await SetAsync(key, value, expiration ?? _settings.DefaultExpiration);
        _logger.LogDebug("Cache miss for key: {Key}, value set", key);
        return value;
    }
 
    public async Task<T> GetAsync<T>(string key)
    {
        var value = await _database.StringGetAsync(key);
        if (!value.HasValue)
            return default;
 
        try
        {
            return JsonSerializer.Deserialize<T>(value);
        }
        catch (JsonException ex)
        {
            _logger.LogError(ex, "Failed to deserialize cached value for key: {Key}", key);
            await RemoveAsync(key);
            return default;
        }
    }
 
    public async Task SetAsync<T>(
        string key,
        T value,
        TimeSpan? expiration = null)
    {
        var serializedValue = JsonSerializer.Serialize(value);
        await _database.StringSetAsync(
            key,
            serializedValue,
            expiration ?? _settings.DefaultExpiration);
    }
 
    public async Task RemoveAsync(string key)
    {
        await _database.KeyDeleteAsync(key);
        _logger.LogDebug("Removed cache entry for key: {Key}", key);
    }
 
    public async Task<bool> ExistsAsync(string key)
    {
        return await _database.KeyExistsAsync(key);
    }
}
Для эффективного управления кэшированием данных заказов реализуем специализированный сервис:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class OrderCacheService
{
    private readonly ICacheService _cacheService;
    private readonly ILogger<OrderCacheService> _logger;
 
    public OrderCacheService(
        ICacheService cacheService,
        ILogger<OrderCacheService> logger)
    {
        _cacheService = cacheService;
        _logger = logger;
    }
 
    private static string GetOrderKey(Guid orderId) =>
        $"order:{orderId}";
 
    private static string GetCustomerOrdersKey(string customerId) =>
        $"customer:{customerId}:orders";
 
    public async Task<Order> GetOrderAsync(Guid orderId)
    {
        return await _cacheService.GetAsync<Order>(GetOrderKey(orderId));
    }
 
    public async Task SetOrderAsync(Order order)
    {
        var key = GetOrderKey(order.Id);
        await _cacheService.SetAsync(key, order);
 
        var customerKey = GetCustomerOrdersKey(order.CustomerName);
        var customerOrders = await _cacheService.GetAsync<List<Guid>>(customerKey)
            ?? new List<Guid>();
 
        if (!customerOrders.Contains(order.Id))
        {
            customerOrders.Add(order.Id);
            await _cacheService.SetAsync(customerKey, customerOrders);
        }
    }
 
    public async Task RemoveOrderAsync(Guid orderId)
    {
        var key = GetOrderKey(orderId);
        var order = await GetOrderAsync(orderId);
        
        if (order != null)
        {
            var customerKey = GetCustomerOrdersKey(order.CustomerName);
            var customerOrders = await _cacheService.GetAsync<List<Guid>>(customerKey);
            
            if (customerOrders?.Contains(orderId) == true)
            {
                customerOrders.Remove(orderId);
                await _cacheService.SetAsync(customerKey, customerOrders);
            }
        }
 
        await _cacheService.RemoveAsync(key);
    }
}
Для обработки распределенных блокировок, которые могут потребоваться при конкурентном доступе к данным, реализуем механизм с использованием Redis:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class RedisLockManager
{
    private readonly IDatabase _database;
    private readonly ILogger<RedisLockManager> _logger;
    private readonly TimeSpan _defaultLockTimeout = TimeSpan.FromSeconds(30);
 
    public RedisLockManager(
        IConnectionMultiplexer redis,
        ILogger<RedisLockManager> logger)
    {
        _database = redis.GetDatabase();
        _logger = logger;
    }
 
    public async Task<IDisposable> AcquireLockAsync(
        string resource,
        TimeSpan? timeout = null)
    {
        var lockId = Guid.NewGuid().ToString();
        var lockKey = $"lock:{resource}";
        var acquireTimeout = timeout ?? _defaultLockTimeout;
        var startTime = DateTime.UtcNow;
 
        while (DateTime.UtcNow - startTime < acquireTimeout)
        {
            if (await _database.LockTakeAsync(lockKey, lockId, _defaultLockTimeout))
            {
                _logger.LogDebug(
                    "Acquired lock for resource {Resource} with ID {LockId}",
                    resource, lockId);
 
                return new RedisLock(_database, lockKey, lockId);
            }
 
            await Task.Delay(100);
        }
 
        throw new TimeoutException($"Failed to acquire lock for resource {resource}");
    }
 
    private class RedisLock : IDisposable
    {
        private readonly IDatabase _database;
        private readonly string _lockKey;
        private readonly string _lockId;
        private bool _disposed;
 
        public RedisLock(IDatabase database, string lockKey, string lockId)
        {
            _database = database;
            _lockKey = lockKey;
            _lockId = lockId;
        }
 
        public void Dispose()
        {
            if (_disposed)
                return;
 
            _database.LockRelease(_lockKey, _lockId);
            _disposed = true;
        }
    }
}
Для реализации более сложных сценариев кэширования и оптимизации производительности создадим систему управления кэшем с поддержкой паттерна Cache-Aside и механизмом инвалидации:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class CacheManager<TKey, TValue>
{
    private readonly ICacheService _cacheService;
    private readonly Func<TKey, Task<TValue>> _dataProvider;
    private readonly ILogger _logger;
    private readonly string _prefix;
    private readonly TimeSpan _slidingExpiration;
    private readonly TimeSpan _absoluteExpiration;
 
    public CacheManager(
        ICacheService cacheService,
        Func<TKey, Task<TValue>> dataProvider,
        ILogger logger,
        string prefix,
        TimeSpan? slidingExpiration = null,
        TimeSpan? absoluteExpiration = null)
    {
        _cacheService = cacheService;
        _dataProvider = dataProvider;
        _logger = logger;
        _prefix = prefix;
        _slidingExpiration = slidingExpiration ?? TimeSpan.FromMinutes(10);
        _absoluteExpiration = absoluteExpiration ?? TimeSpan.FromHours(1);
    }
 
    private string GetCacheKey(TKey key) => $"{_prefix}:{key}";
 
    public async Task<TValue> GetAsync(TKey key)
    {
        var cacheKey = GetCacheKey(key);
        var value = await _cacheService.GetAsync<CacheEntry<TValue>>(cacheKey);
 
        if (value != null && !IsExpired(value))
        {
            await RefreshExpirationAsync(cacheKey, value);
            return value.Data;
        }
 
        var newData = await _dataProvider(key);
        await SetAsync(key, newData);
        return newData;
    }
 
    private async Task RefreshExpirationAsync(string key, CacheEntry<TValue> entry)
    {
        if (DateTime.UtcNow - entry.LastAccess > _slidingExpiration / 2)
        {
            entry.LastAccess = DateTime.UtcNow;
            await _cacheService.SetAsync(key, entry, _absoluteExpiration);
        }
    }
 
    private bool IsExpired(CacheEntry<TValue> entry)
    {
        return DateTime.UtcNow - entry.Created > _absoluteExpiration ||
               DateTime.UtcNow - entry.LastAccess > _slidingExpiration;
    }
 
    public async Task SetAsync(TKey key, TValue value)
    {
        var entry = new CacheEntry<TValue>
        {
            Data = value,
            Created = DateTime.UtcNow,
            LastAccess = DateTime.UtcNow
        };
 
        await _cacheService.SetAsync(GetCacheKey(key), entry, _absoluteExpiration);
    }
 
    private class CacheEntry<T>
    {
        public T Data { get; set; }
        public DateTime Created { get; set; }
        public DateTime LastAccess { get; set; }
    }
}
Для оптимизации производительности и уменьшения нагрузки на Redis реализуем механизм пакетной обработки операций:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class RedisBatchProcessor
{
    private readonly IConnectionMultiplexer _redis;
    private readonly ILogger<RedisBatchProcessor> _logger;
    private readonly int _batchSize;
    private readonly ConcurrentQueue<RedisOperation> _operations;
    private readonly Timer _processingTimer;
 
    public RedisBatchProcessor(
        IConnectionMultiplexer redis,
        ILogger<RedisBatchProcessor> logger,
        int batchSize = 100)
    {
        _redis = redis;
        _logger = logger;
        _batchSize = batchSize;
        _operations = new ConcurrentQueue<RedisOperation>();
        _processingTimer = new Timer(ProcessBatch, null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
    }
 
    public void EnqueueOperation(RedisOperation operation)
    {
        _operations.Enqueue(operation);
        if (_operations.Count >= _batchSize)
        {
            ProcessBatch(null);
        }
    }
 
    private void ProcessBatch(object state)
    {
        if (_operations.IsEmpty)
            return;
 
        var batch = _redis.GetDatabase().CreateBatch();
        var tasks = new List<Task>();
 
        while (_operations.Count > 0 && tasks.Count < _batchSize)
        {
            if (_operations.TryDequeue(out var operation))
            {
                tasks.Add(ExecuteOperation(batch, operation));
            }
        }
 
        batch.Execute();
        Task.WhenAll(tasks).Wait();
    }
 
    private async Task ExecuteOperation(IBatch batch, RedisOperation operation)
    {
        try
        {
            switch (operation.Type)
            {
                case OperationType.StringSet:
                    await batch.StringSetAsync(
                        operation.Key,
                        operation.Value,
                        operation.Expiry);
                    break;
                case OperationType.KeyDelete:
                    await batch.KeyDeleteAsync(operation.Key);
                    break;
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error executing Redis operation for key {Key}", operation.Key);
        }
    }
 
    public class RedisOperation
    {
        public string Key { get; set; }
        public RedisValue Value { get; set; }
        public TimeSpan? Expiry { get; set; }
        public OperationType Type { get; set; }
    }
 
    public enum OperationType
    {
        StringSet,
        KeyDelete
    }
}
Для мониторинга состояния кэша и производительности операций реализуем систему метрик:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class RedisCacheMetrics
{
    private readonly IMetricFactory _metrics;
    private readonly Counter _cacheHits;
    private readonly Counter _cacheMisses;
    private readonly Histogram _operationDuration;
    private readonly Gauge _cacheSize;
 
    public RedisCacheMetrics(IMetricFactory metrics)
    {
        _metrics = metrics;
        _cacheHits = metrics.CreateCounter(
            "redis_cache_hits_total",
            "Total number of cache hits");
        _cacheMisses = metrics.CreateCounter(
            "redis_cache_misses_total",
            "Total number of cache misses");
        _operationDuration = metrics.CreateHistogram(
            "redis_operation_duration_seconds",
            "Duration of Redis operations");
        _cacheSize = metrics.CreateGauge(
            "redis_cache_size_bytes",
            "Total size of cached data in bytes");
    }
 
    public void RecordHit()
    {
        _cacheHits.Increment();
    }
 
    public void RecordMiss()
    {
        _cacheMisses.Increment();
    }
 
    public IDisposable MeasureOperation()
    {
        return _operationDuration.NewTimer();
    }
 
    public void SetCacheSize(long bytes)
    {
        _cacheSize.Set(bytes);
    }
}
Для обеспечения отказоустойчивости и высокой доступности реализуем механизм переподключения и обработки сбоев:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class RedisConnectionManager
{
    private readonly ConfigurationOptions _config;
    private readonly ILogger<RedisConnectionManager> _logger;
    private IConnectionMultiplexer _connection;
    private readonly object _lock = new object();
    private readonly int _maxRetries;
    private readonly TimeSpan _retryDelay;
 
    public RedisConnectionManager(
        ConfigurationOptions config,
        ILogger<RedisConnectionManager> logger,
        int maxRetries = 3,
        TimeSpan? retryDelay = null)
    {
        _config = config;
        _logger = logger;
        _maxRetries = maxRetries;
        _retryDelay = retryDelay ?? TimeSpan.FromSeconds(2);
        
        _config.AbortOnConnectFail = false;
        _config.ConnectRetry = 3;
    }
 
    public IDatabase GetDatabase()
    {
        EnsureConnection();
        return _connection.GetDatabase();
    }
 
    private void EnsureConnection()
    {
        if (_connection?.IsConnected == true)
            return;
 
        lock (_lock)
        {
            if (_connection?.IsConnected == true)
                return;
 
            for (int i = 0; i < _maxRetries; i++)
            {
                try
                {
                    _connection = ConnectionMultiplexer.Connect(_config);
                    _connection.ConnectionFailed += OnConnectionFailed;
                    _connection.ConnectionRestored += OnConnectionRestored;
                    return;
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed to connect to Redis (attempt {Attempt}/{MaxRetries})",
                        i + 1, _maxRetries);
                    
                    if (i < _maxRetries - 1)
                        Thread.Sleep(_retryDelay);
                }
            }
 
            throw new RedisConnectionException("Failed to establish Redis connection after multiple attempts");
        }
    }
 
    private void OnConnectionFailed(object sender, ConnectionFailedEventArgs e)
    {
        _logger.LogError("Redis connection failed: {FailureType}, {Exception}",
            e.FailureType, e.Exception);
    }
 
    private void OnConnectionRestored(object sender, ConnectionFailedEventArgs e)
    {
        _logger.LogInformation("Redis connection restored");
    }
}

Настройка CI/CD в GitLab



Непрерывная интеграция и развертывание (CI/CD) являются ключевыми практиками современной разработки программного обеспечения. GitLab предоставляет мощные инструменты для автоматизации процессов сборки, тестирования и развертывания приложений. Настройка CI/CD для нашего микросервиса начинается с создания файла .gitlab-ci.yml, который определяет конфигурацию пайплайна.

Базовая структура файла .gitlab-ci.yml включает определение этапов и задач, которые должны быть выполнены в процессе сборки и развертывания:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
stages:
  - build
  - test
  - analyze
  - package
  - deploy
 
variables:
  DOCKER_REGISTRY: "registry.example.com"
  DOCKER_IMAGE: "$DOCKER_REGISTRY/$CI_PROJECT_PATH"
  NUGET_PATH: ".nuget"
  DOTNET_VERSION: "6.0"
 
default:
  image: mcr.microsoft.com/dotnet/sdk:${DOTNET_VERSION}
  before_script:
    - dotnet restore --packages $NUGET_PATH
На этапе сборки выполняется компиляция исходного кода и создание артефактов. Важно настроить кэширование зависимостей для оптимизации процесса:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
build:
  stage: build
  script:
    - dotnet build --configuration Release --no-restore
    - dotnet publish --configuration Release --no-build --output ./publish
  artifacts:
    paths:
      - publish/
    expire_in: 1 week
  cache:
    key: ${CI_COMMIT_REF_SLUG}
    paths:
      - $NUGET_PATH
Этап тестирования включает выполнение модульных и интеграционных тестов. Для повышения эффективности тесты выполняются параллельно:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
unit-tests:
  stage: test
  script:
    - dotnet test ./tests/UnitTests --no-restore --collect:"XPlat Code Coverage"
  coverage: '/Total.*?(\d+(?:\.\d+)?)%/'
  artifacts:
    reports:
      coverage_report:
        coverage_format: cobertura
        path: ./tests/**/coverage.cobertura.xml
 
integration-tests:
  stage: test
  services:
    - name: redis:latest
      alias: redis
    - name: confluentinc/cp-kafka:latest
      alias: kafka
  variables:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
  script:
    - dotnet test ./tests/IntegrationTests --no-restore
На этапе анализа кода выполняется статический анализ и проверка качества кода:

YAML
1
2
3
4
5
6
7
8
9
10
code-quality:
  stage: analyze
  script:
    - dotnet tool install --global dotnet-sonarscanner
    - dotnet sonarscanner begin /k:"project-key" /d:sonar.host.url="$SONAR_HOST_URL" /d:sonar.login="$SONAR_TOKEN"
    - dotnet build --no-restore
    - dotnet sonarscanner end /d:sonar.login="$SONAR_TOKEN"
  only:
    - merge_requests
    - master
Этап создания пакета включает сборку Docker-образа и его публикацию в реестре:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
package:
  stage: package
  image: docker:latest
  services:
    - docker:dind
  variables:
    DOCKER_TLS_CERTDIR: "/certs"
  script:
    - docker build -t $DOCKER_IMAGE:$CI_COMMIT_SHA .
    - docker tag $DOCKER_IMAGE:$CI_COMMIT_SHA $DOCKER_IMAGE:latest
    - echo $DOCKER_PASSWORD | docker login $DOCKER_REGISTRY -u $DOCKER_USER --password-stdin
    - docker push $DOCKER_IMAGE:$CI_COMMIT_SHA
    - docker push $DOCKER_IMAGE:latest
Для безопасной работы с секретными данными используются переменные окружения GitLab:

YAML
1
2
3
4
5
.setup_env: &setup_env
  before_script:
    - echo "$KUBECONFIG_FILE" > kubeconfig.yml
    - export KUBECONFIG=kubeconfig.yml
    - echo "$DOCKER_AUTH_CONFIG" > /kaniko/.docker/config.json
Для обеспечения качества кода настраивается автоматическая проверка стиля и форматирования:

YAML
1
2
3
4
5
6
format-check:
  stage: analyze
  script:
    - dotnet tool install -g dotnet-format
    - dotnet format --verify-no-changes --verbosity diagnostic
  allow_failure: true
Важным аспектом CI/CD является настройка правил выполнения задач в зависимости от событий в репозитории:

YAML
1
2
3
4
5
.rules-common: &rules-common
  rules:
    - if: '$CI_COMMIT_BRANCH == "master"'
    - if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
    - if: '$CI_COMMIT_TAG =~ /^v\d+\.\d+\.\d+$/'
Для автоматизации процесса развертывания в различных окружениях используются специальные задачи деплоя:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
.deploy-template: &deploy-template
  stage: deploy
  image: bitnami/kubectl:latest
  <<: *setup_env
  script:
    - kubectl apply -f k8s/namespace.yml
    - kubectl apply -f k8s/configmap.yml
    - kubectl apply -f k8s/secret.yml
    - |
      envsubst < k8s/deployment.yml | kubectl apply -f -
    - kubectl rollout status deployment/$CI_PROJECT_NAME -n $KUBE_NAMESPACE
 
deploy-staging:
  <<: *deploy-template
  variables:
    KUBE_NAMESPACE: staging
  environment:
    name: staging
  only:
    - master
 
deploy-production:
  <<: *deploy-template
  variables:
    KUBE_NAMESPACE: production
  environment:
    name: production
  when: manual
  only:
    - tags
Для обеспечения безопасного развертывания реализуется стратегия канареечного релиза:

YAML
1
2
3
4
5
6
7
8
9
10
11
canary-deploy:
  stage: deploy
  script:
    - kubectl set image deployment/$CI_PROJECT_NAME $CI_PROJECT_NAME=$DOCKER_IMAGE:$CI_COMMIT_SHA
    - kubectl scale deployment/$CI_PROJECT_NAME --replicas=3
    - kubectl set image deployment/$CI_PROJECT_NAME-canary $CI_PROJECT_NAME=$DOCKER_IMAGE:$CI_COMMIT_SHA
    - kubectl scale deployment/$CI_PROJECT_NAME-canary --replicas=1
  environment:
    name: production
    url: [url]https://api.example.com[/url]
  when: manual
Для автоматизации миграций базы данных создается отдельная задача:

YAML
1
2
3
4
5
6
7
8
9
10
database-migration:
  stage: deploy
  script:
    - dotnet tool install --global dotnet-ef
    - export PATH="$PATH:$HOME/.dotnet/tools"
    - dotnet ef database update --configuration Release
  variables:
    ConnectionStrings__DefaultConnection: $DB_CONNECTION_STRING
  when: manual
  allow_failure: false
Для управления конфигурацией в различных окружениях используются переменные и конфигурационные файлы:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
.config-template: &config-template
  script:
    - |
      cat > appsettings.json << EOF
      {
        "Logging": {
          "LogLevel": {
            "Default": "Information",
            "Microsoft": "Warning"
          }
        },
        "AllowedHosts": "*",
        "ConnectionStrings": {
          "DefaultConnection": "${DB_CONNECTION_STRING}"
        },
        "Redis": {
          "ConnectionString": "${REDIS_CONNECTION_STRING}"
        },
        "Kafka": {
          "BootstrapServers": "${KAFKA_BOOTSTRAP_SERVERS}"
        }
      }
      EOF
Для мониторинга состояния развертывания реализуется система проверок:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
health-check:
  stage: deploy
  script:
    - |
      response_code=$(curl -s -o /dev/null -w "%{http_code}" ${APP_URL}/health)
      if [ $response_code -ne 200 ]; then
        echo "Health check failed with status code: $response_code"
        exit 1
      fi
  environment:
    name: staging
  needs:
    - deploy-staging
Для автоматизации создания тегов и релизов используется специальная задача:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create-release:
  stage: deploy
  script:
    - |
      if [[ $CI_COMMIT_TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
        echo "Creating release for tag $CI_COMMIT_TAG"
        release_notes="Release notes for version $CI_COMMIT_TAG"
        curl --request POST --header "PRIVATE-TOKEN: $GITLAB_API_TOKEN" \
          "${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/releases" \
          --form "name=$CI_COMMIT_TAG" \
          --form "tag_name=$CI_COMMIT_TAG" \
          --form "description=$release_notes"
      fi
  only:
    - tags
Для обеспечения безопасности и качества кода настраиваются дополнительные проверки:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
dependency-scan:
  stage: analyze
  script:
    - dotnet restore
    - dotnet list package --vulnerable --include-transitive
  allow_failure: true
  artifacts:
    reports:
      dependency_scanning: gl-dependency-scanning-report.json
 
container-scan:
  stage: analyze
  image: 
    name: aquasec/trivy
    entrypoint: [""]
  script:
    - trivy image $DOCKER_IMAGE:$CI_COMMIT_SHA
  allow_failure: true
Для оптимизации процесса сборки используется кэширование и параллельное выполнение задач:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
cache:
  key: ${CI_COMMIT_REF_SLUG}
  paths:
    - .nuget/
    - .npm/
    - node_modules/
  policy: pull-push
 
parallel:
  matrix:
    - DOTNET_VERSION: ["6.0", "7.0"]
    - TEST_SUITE: ["unit", "integration"]
Для управления временем жизни артефактов и их очисткой реализуется политика хранения:

YAML
1
2
3
4
5
6
7
8
9
cleanup:
  stage: .post
  script:
    - |
      # Очистка старых артефактов
      curl --request DELETE --header "PRIVATE-TOKEN: $GITLAB_API_TOKEN" \
        "${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/artifacts?keep_n=5"
  rules:
    - when: always
В конце каждого успешного деплоя выполняется нотификация в корпоративный мессенджер:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
notify:
  stage: .post
  script:
    - |
      MESSAGE="Deployment to ${CI_ENVIRONMENT_NAME} completed successfully
      Version: ${CI_COMMIT_SHA}
      Pipeline: ${CI_PIPELINE_URL}"
      curl -X POST -H 'Content-type: application/json' \
        --data "{"text":"${MESSAGE}"}" \
        ${SLACK_WEBHOOK_URL}
  rules:
    - if: '$CI_COMMIT_BRANCH == "master"'
      when: on_success

Деплой и мониторинг



Развертывание микросервиса в производственной среде требует тщательного планирования и настройки инфраструктуры. Для организации надежного деплоя будем использовать Kubernetes как платформу оркестрации контейнеров. Начнем с создания необходимых манифестов для развертывания приложения.

Основной манифест deployment.yml определяет конфигурацию развертывания микросервиса:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
apiVersion: apps/v1
kind: Deployment
metadata:
  name: orders-service
  namespace: production
spec:
  replicas: 3
  selector:
    matchLabels:
      app: orders-service
  template:
    metadata:
      labels:
        app: orders-service
    spec:
      containers:
      - name: orders-service
        image: registry.example.com/orders-service:latest
        ports:
        - containerPort: 80
        resources:
          requests:
            memory: "256Mi"
            cpu: "200m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        readinessProbe:
          httpGet:
            path: /health
            port: 80
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /health
            port: 80
          initialDelaySeconds: 15
          periodSeconds: 20

Prometheus и Grafana



Для мониторинга состояния микросервиса используется Prometheus в сочетании с Grafana. Настроим экспорт метрик, добавив соответствующие аннотации в манифест:

YAML
1
2
3
4
5
metadata:
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/path: "/metrics"
    prometheus.io/port: "80"

ELK Stack



Для сбора логов настраивается ELK Stack (Elasticsearch, Logstash, Kibana). В приложении используется структурированное логирование через Serilog:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static class LoggingExtensions
{
    public static IHostBuilder ConfigureLogging(this IHostBuilder builder)
    {
        return builder.UseSerilog((context, configuration) =>
        {
            configuration
                .WriteTo.Console(new JsonFormatter())
                .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(
                    new Uri(context.Configuration["Elasticsearch:Url"]))
                {
                    IndexFormat = $"orders-service-{context.HostingEnvironment.EnvironmentName}-{DateTime.UtcNow:yyyy-MM}",
                    AutoRegisterTemplate = true,
                    MinimumLogEventLevel = LogEventLevel.Information
                })
                .Enrich.WithEnvironmentName()
                .Enrich.WithMachineName()
                .Enrich.WithCorrelationId();
        });
    }
}
Важным аспектом мониторинга является отслеживание бизнес-метрик. Реализуем сервис для сбора метрик производительности:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MetricsService
{
    private readonly IMetricsRoot _metrics;
    private readonly Counter _orderProcessed;
    private readonly Histogram _orderProcessingTime;
    private readonly Gauge _activeOrders;
 
    public MetricsService(IMetricsRoot metrics)
    {
        _metrics = metrics;
        _orderProcessed = _metrics.CreateCounter("orders_processed_total", "Total number of processed orders");
        _orderProcessingTime = _metrics.CreateHistogram("order_processing_duration_seconds", "Order processing duration");
        _activeOrders = _metrics.CreateGauge("active_orders", "Number of currently active orders");
    }
 
    public void RecordOrderProcessed()
    {
        _orderProcessed.Increment();
    }
 
    public IDisposable MeasureOrderProcessing()
    {
        return _orderProcessingTime.NewTimer();
    }
 
    public void SetActiveOrders(int count)
    {
        _activeOrders.Set(count);
    }
}
Для обеспечения высокой доступности настраивается горизонтальное масштабирование с помощью HorizontalPodAutoscaler:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: orders-service-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: orders-service
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
Для отслеживания состояния приложения реализуем расширенный health check с подробной информацией о зависимостях:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class HealthCheck : IHealthCheck
{
    private readonly IKafkaClient _kafka;
    private readonly IConnectionMultiplexer _redis;
    private readonly ApplicationDbContext _dbContext;
 
    public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken)
    {
        var healthData = new Dictionary<string, object>();
        var status = HealthStatus.Healthy;
 
        try
        {
            // Проверка подключения к Kafka
            var kafkaStatus = await _kafka.CheckConnectionAsync();
            healthData.Add("kafka_status", kafkaStatus);
            
            // Проверка подключения к Redis
            var redisStatus = _redis.IsConnected;
            healthData.Add("redis_status", redisStatus);
            
            // Проверка подключения к базе данных
            var dbStatus = await _dbContext.Database.CanConnectAsync(cancellationToken);
            healthData.Add("database_status", dbStatus);
 
            if (!kafkaStatus || !redisStatus || !dbStatus)
            {
                status = HealthStatus.Degraded;
            }
        }
        catch (Exception ex)
        {
            status = HealthStatus.Unhealthy;
            healthData.Add("error", ex.Message);
        }
 
        return new HealthCheckResult(status, null, null, healthData);
    }
}
Для отслеживания производительности и выявления узких мест настраивается профилирование с помощью dotnet-trace:

YAML
1
2
3
4
5
6
7
8
apiVersion: v1
kind: ConfigMap
metadata:
  name: profiling-config
data:
  TraceSettings__Enabled: "true"
  TraceSettings__SamplingRate: "0.1"
  TraceSettings__BufferSize: "1024"
Для мониторинга бизнес-процессов и технических метрик создадим панель в Grafana, которая будет отображать ключевые показатели производительности микросервиса:

JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
  "dashboard": {
    "title": "Orders Service Dashboard",
    "panels": [
      {
        "title": "Order Processing Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(orders_processed_total[5m])",
            "legendFormat": "Orders/sec"
          }
        ]
      },
      {
        "title": "Processing Time Distribution",
        "type": "heatmap",
        "targets": [
          {
            "expr": "rate(order_processing_duration_seconds_bucket[5m])",
            "format": "heatmap"
          }
        ]
      }
    ]
  }
}
Для оповещения о критических ситуациях настраиваем правила алертинга в Prometheus:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
groups:
- name: OrdersServiceAlerts
  rules:
  - alert: HighOrderProcessingTime
    expr: histogram_quantile(0.95, rate(order_processing_duration_seconds_bucket[5m])) > 1
    for: 5m
    labels:
      severity: warning
    annotations:
      description: "95th percentile of order processing time is above 1 second"
      
  - alert: HighErrorRate
    expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      description: "Error rate is above 5% for the last 5 minutes"

OpenTelemetry



Для анализа производительности приложения реализуем трейсинг с использованием OpenTelemetry:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static class TracingExtensions
{
    public static IServiceCollection AddTracing(this IServiceCollection services)
    {
        services.AddOpenTelemetryTracing(builder =>
        {
            builder
                .SetResourceBuilder(ResourceBuilder.CreateDefault()
                    .AddService("OrdersService"))
                .AddAspNetCoreInstrumentation()
                .AddHttpClientInstrumentation()
                .AddEntityFrameworkCoreInstrumentation()
                .AddSource("OrdersService.Processing")
                .SetSampler(new AlwaysOnSampler())
                .AddJaegerExporter(options =>
                {
                    options.AgentHost = "jaeger-agent";
                    options.AgentPort = 6831;
                });
        });
 
        return services;
    }
}
Для отслеживания использования ресурсов настраиваем лимиты и мониторинг на уровне контейнеров:

YAML
1
2
3
4
5
6
7
8
9
10
apiVersion: v1
kind: ResourceQuota
metadata:
  name: orders-service-quota
spec:
  hard:
    requests.cpu: "2"
    requests.memory: 2Gi
    limits.cpu: "4"
    limits.memory: 4Gi
Реализуем систему мониторинга очередей Kafka для отслеживания отставания в обработке сообщений:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class KafkaMonitoringService
{
    private readonly IKafkaClient _kafka;
    private readonly IMetricsRoot _metrics;
    private readonly Gauge _consumerLag;
    private readonly Counter _messageDropped;
 
    public async Task MonitorConsumerLag()
    {
        var consumer = _kafka.GetConsumer();
        var lag = await consumer.GetLagAsync();
        _consumerLag.Set(lag);
 
        if (lag > 1000)
        {
            await NotifyHighLagAsync(lag);
        }
    }
 
    private async Task NotifyHighLagAsync(long lag)
    {
        var alert = new Alert
        {
            Severity = AlertSeverity.Warning,
            Message = $"Consumer lag is high: {lag} messages",
            Component = "Kafka Consumer",
            Timestamp = DateTime.UtcNow
        };
 
        await _alertingService.SendAlertAsync(alert);
    }
}
Для обеспечения безопасного обновления приложения настраиваем стратегию развертывания:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
apiVersion: apps/v1
kind: Deployment
metadata:
  name: orders-service
spec:
  strategy:
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
    type: RollingUpdate
  minReadySeconds: 30
  revisionHistoryLimit: 5
Для быстрого восстановления в случае сбоев реализуем автоматическое резервное копирование данных:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class BackupService
{
    private readonly IBackupProvider _backupProvider;
    private readonly ILogger<BackupService> _logger;
 
    public async Task CreateBackupAsync()
    {
        try
        {
            var backupMetadata = new BackupMetadata
            {
                Timestamp = DateTime.UtcNow,
                Version = Assembly.GetExecutingAssembly().GetName().Version.ToString(),
                Environment = _environment.EnvironmentName
            };
 
            var backupId = await _backupProvider.CreateBackupAsync(backupMetadata);
            _logger.LogInformation("Created backup with ID: {BackupId}", backupId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to create backup");
            throw;
        }
    }
}
Настраиваем систему диагностики для отладки проблем в производственной среде:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static class DiagnosticsConfig
{
    public static IServiceCollection AddDiagnostics(this IServiceCollection services)
    {
        services.AddDiagnosticListener(subscriber =>
        {
            subscriber.Subscribe(listener =>
            {
                listener.OnNext = (diagnostic) =>
                {
                    if (diagnostic.Key == "Microsoft.AspNetCore.Diagnostics.HandledException")
                    {
                        var exception = diagnostic.Value as Exception;
                        DiagnosticsCollector.CaptureException(exception);
                    }
                };
            });
        });
 
        return services;
    }
}

Итоговая реализация



После настройки всех компонентов микросервиса рассмотрим полную структуру решения и итоговую реализацию основных компонентов. Для удобства использования и дальнейшего развития проекта все исходные файлы организованы в соответствии с принципами чистой архитектуры.

Базовая структура проекта выглядит следующим образом:

Код
OrdersService/
├── src/
│   ├── OrdersService.Api/
│   ├── OrdersService.Application/
│   ├── OrdersService.Domain/
│   └── OrdersService.Infrastructure/
├── tests/
│   ├── OrdersService.UnitTests/
│   └── OrdersService.IntegrationTests/
├── docker/
│   ├── Dockerfile
│   └── docker-compose.yml
├── k8s/
│   ├── deployment.yml
│   └── service.yml
└── .gitlab-ci.yml
Основной файл конфигурации appsettings.json содержит все необходимые настройки для работы микросервиса:

JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
  "ConnectionStrings": {
    "DefaultConnection": "Server=db;Database=OrdersDb;User=sa;Password=YourStrong!Password"
  },
  "Kafka": {
    "BootstrapServers": "kafka:9092",
    "GroupId": "orders-service",
    "Topics": {
      "Orders": "orders"
    }
  },
  "Redis": {
    "ConnectionString": "redis:6379",
    "InstanceName": "Orders:"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning"
    }
  }
}
Центральным компонентом приложения является класс OrdersController, который обрабатывает все HTTP-запросы:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
    private readonly IMediator _mediator;
    private readonly ILogger<OrdersController> _logger;
 
    public OrdersController(IMediator mediator, ILogger<OrdersController> logger)
    {
        _mediator = mediator;
        _logger = logger;
    }
 
    [HttpPost]
    public async Task<ActionResult<OrderResponse>> CreateOrder(
        [FromBody] CreateOrderCommand command)
    {
        var result = await _mediator.Send(command);
        return CreatedAtAction(nameof(GetOrder), new { id = result.Id }, result);
    }
 
    [HttpGet("{id}")]
    public async Task<ActionResult<OrderResponse>> GetOrder(Guid id)
    {
        var result = await _mediator.Send(new GetOrderQuery { Id = id });
        return result != null ? Ok(result) : NotFound();
    }
}
Вся бизнес-логика обработки заказов инкапсулирована в соответствующих обработчиках команд:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, OrderResponse>
{
    private readonly IOrderRepository _repository;
    private readonly IOrderProcessor _processor;
    private readonly ICacheService _cache;
    private readonly IPublisher _publisher;
 
    public async Task<OrderResponse> Handle(CreateOrderCommand command, CancellationToken cancellationToken)
    {
        var order = new Order(command.CustomerId, command.Items);
        await _repository.AddAsync(order, cancellationToken);
        await _cache.SetAsync($"order:{order.Id}", order);
        await _publisher.PublishAsync(new OrderCreatedEvent(order));
        return new OrderResponse(order);
    }
}
Dockerfile для сборки контейнера микросервиса оптимизирован для многоэтапной сборки:

Windows Batch file
1
2
3
4
5
6
7
8
9
10
11
12
13
FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
WORKDIR /src
COPY ["OrdersService.sln", "./"]
COPY ["src/*/*.csproj", "./"]
RUN for file in $(ls *.csproj); do mkdir -p src/${file%.*}/ && mv $file src/${file%.*}/; done
RUN dotnet restore
COPY . .
RUN dotnet publish -c Release -o /app
 
FROM mcr.microsoft.com/dotnet/aspnet:6.0
WORKDIR /app
COPY --from=build /app ./
ENTRYPOINT ["dotnet", "OrdersService.Api.dll"]
Все компоненты микросервиса связаны между собой через систему внедрения зависимостей, которая настраивается при запуске приложения:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static class DependencyInjection
{
    public static IServiceCollection AddApplicationServices(
        this IServiceCollection services)
    {
        services.AddMediatR(cfg => 
            cfg.RegisterServicesFromAssembly(Assembly.GetExecutingAssembly()));
        services.AddAutoMapper(Assembly.GetExecutingAssembly());
        services.AddValidatorsFromAssembly(Assembly.GetExecutingAssembly());
        services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
        
        return services;
    }
}
При правильной настройке всех компонентов микросервис предоставляет надежную и масштабируемую систему обработки заказов с поддержкой распределенного кэширования, асинхронной обработки сообщений и автоматизированного развертывания.

Место Apache Kafka в архитектуре
Всем привет! Я не разбираюсь в архитектуре, но у меня появилась необходимость использовать Kafka в проекте. Проект имеет такую структуру: ...

Docker, Redis Cluster, не получается подключиться к кластеру
Здравствуйте. В Docker создал кластер. Есть код: import ( &quot;context&quot; &quot;fmt&quot; &quot;github.com/redis/go-redis/v9&quot; ) var ctx =...

Spring Kafka. Ошибка Connection refused при подключении к брокеру Kafka
Пишу Kafka Broker и Consumer, чтобы ловить сообщения от приложения. При попытке достать сообщения из Consumer вылетает ошибка ...

Gitlab: После gitlab-ctl restart бд улетает в бан
4. После gitlab-ctl restart бд улетает в бан, помогает mysqladmin flush-hosts до следующего рестарта. БД создавалась так: CREATE DATABASE IF...

Как соединиться с контейнером Kafka из контейнера Ubuntu?
Добрый день, уважаемое сообщество, а так же отдельные интеллектуальный уникумы. Коим я, к сожалению, не являюсь. У меня в практике devops...

Микросервис Геолокации
B общем, собственную идею, тему про которую удалили, я реализовал за одну ночь… Where Are You? Описание При открытии страницы...

Как использовать gitLab
Добрый день. Подскажите как использовать контроль версий gitLab? Уже всё настроено, и меня интересует как скачивать код с сервера и заливать его...

Как настроить gitlab ci/cd?
Мне нужно настроить деплой проекта на сервер через ssh. Сгенерировал ssh key и написал следующий скрипт .gitlab-ci.yml image: java:8 ...

Как почистить кэш в Redis? Symony 4
Доброго времени суток! Есть задача закешировать запросы при помощи Redis + автоматичесое обновление каждые 4 часа. Я все это успешно сделал: ...

Микросервисная архитектура: передать объект подключения к БД в микросервис
Ситуация такая: пилим небольшую ERP-систему, интегрированную с интернет-магазином и клиентским сервисом типа личного кабинета. Архитектуру решили...

Разработать микросервис (графическое определение координат и площади)
Ищу программиста для разработки микросервиса, выполняющего графическое определение координат точек и площадей на основе входных данных. Детальное...

Размещено в Без категории
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Всего комментариев 0
Комментарии
 
Новые блоги и статьи
Ошибка "Cleartext HTTP traffic not permitted" в Android
hw_wired 13.02.2025
При разработке Android-приложений можно столнуться с неприятной ошибкой "Cleartext HTTP traffic not permitted", которая может серьезно затруднить отладку и тестирование. Эта проблема особенно. . .
Изменение версии по умолчанию в NVM
hw_wired 13.02.2025
Node Version Manager, или коротко NVM - незаменимый инструмент для разработчиков, использующих Node. js. Многие сталкивались с ситуацией, когда разные проекты требуют различных версий Node. js,. . .
Переименование коммита в Git (локального и удаленного)
hw_wired 13.02.2025
Git как система контроля версий предоставляет разработчикам множество средств для управления этой историей, и одним из таких важных средств является возможность изменения сообщений коммитов. Но зачем. . .
Отличия Promise и Observable в Angular
hw_wired 13.02.2025
В веб-разработки асинхронные операции стали неотъемлимой частью почти каждого приложения. Ведь согласитесь, было бы странно, если бы при каждом запросе к серверу или при обработке больших объемов. . .
Сравнение NPM, Gulp, Webpack, Bower, Grunt и Browserify
hw_wired 13.02.2025
В современной веб-разработке существует множество средств сборки и управления зависимостями проектов, каждое из которых решает определенные задачи и имеет свои особенности. Когда я начинаю новый. . .
Отличия AddTransient, AddScoped и AddSingleton в ASP.Net Core DI
hw_wired 13.02.2025
В современной разработке веб-приложений на платформе ASP. NET Core правильное управление зависимостями играет ключевую роль в создании надежного и производительного кода. Фреймворк предоставляет три. . .
Отличия между venv, pyenv, pyvenv, virtualenv, pipenv, conda, virtualenvwrapp­­er, poetry и другими в Python
hw_wired 13.02.2025
В Python существует множество средств для управления зависимостями и виртуальными окружениями, что порой вызывает замешательство даже у опытных разработчиков. Каждый инструмент создавался для решения. . .
Навигация с помощью React Router
hw_wired 13.02.2025
React Router - это наиболее распространенное средство для создания навигации в React-приложениях, без которого сложно представить современную веб-разработку. Когда мы разрабатываем сложное. . .
Ошибка "error:0308010C­­:dig­ital envelope routines::unsup­­ported"
hw_wired 13.02.2025
Если вы сталкиваетесь с ошибкой "error:0308010C:digital envelope routines::unsupported" при разработке Node. js приложений, то наверняка уже успели поломать голову над её решением. Эта коварная ошибка. . .
Подключение к контейнеру Docker и работа с его содержимым
hw_wired 13.02.2025
В мире современной разработки контейнеры Docker изменили подход к созданию, развертыванию и масштабированию приложений. Эта технология позволяет упаковать приложение со всеми его зависимостями в. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru