Разработка современных приложений часто требует интеграции с объектными хранилищами, и Amazon S3 стал де-факто стандартом в этой области. Однако работа с облачными сервисами в процессе разработки создает множество проблем, которые я лично испытал на себе за годы работы с распределенными системами.
Первая проблема - это стоимость. Каждый запрос к S3 стоит денег, и когда команда разработчиков активно тестирует функциональность, расходы могут быстро выйти из-под контроля. Я помню проект, где только за месяц тестирования мы потратили более 500$ на API-вызовы к S3, хотя основная функциональность приложения была совсем в другом. Вторая проблема - зависимость от сети. Интернет-соединение может быть нестабильным, что приводит к непредсказуемым падениям тестов. Особенно это критично для CI/CD пайплайнов, где нестабильность внешних зависимостей может парализовать весь процесс разработки. Третья проблема - сложность отладки. Когда что-то идет не так с S3, понять причину может быть крайне затруднительно - логи AWS не всегда информативны, а воспроизвести проблему локально невозможно.
Локальная эмуляция S3 решает эти проблемы комплексно. MinIO предоставляет полнофункциональную S3-совместимую среду, которая работает как обычный Docker-контейнер. LocalStack идет дальше и эмулирует целую экосистему AWS-сервисов, что особенно полезно для комплексных решений. При выборе между различными решениями я обычно руководствуюсь следующими критериями: MinIO отлично подходит для проектов, где нужна только S3-функциональность с максимальной производительностью. LocalStack выбираю для сложных интеграций с множеством AWS-сервисов. Moto использую для unit-тестов, где нужна легковесная мок-реализация.
Производительность этих решений значительно различается: MinIO показывает результаты, близкие к нативному S3, LocalStack работает медленнее из-за дополнительной абстракции, а Moto самый быстрый для простых операций благодаря in-memory реализации.
Интересный момент, который часто упускают из вида - это возможность тестирования сценариев отказов. С локальным хранилищем можно легко симулировать различные типы ошибок: недоступность сервиса, превышение лимитов, проблемы с правами доступа. Это позволяет проверить, насколько устойчиво ваше приложение к различным типам сбоев. Безопасность - еще один важный аспект. Работа с реальным S3 требует управления ключами доступа, которые могут случайно попасть в репозиторий или логи. Локальная эмуляция позволяет использовать тестовые креденциалы без риска компрометации реальных данных.
Скорость разработки возрастает кратно. Не нужно ждать, пока файл загрузится в облако и обработается - все происходит мгновенно на локальной машине. Это особенно заметно при разработке функций обработки изображений или видео, где размеры файлов могут быть значительными. Для команд разработки локальные хранилища дают возможность создавать изолированные среды для каждого разработчика. Каждый может работать со своим набором данных, не влияя на работу коллег. Это особенно критично при работе с микросервисной архитектурой, где изменения в одном сервисе могут повлиять на другие. Интересное наблюдение: многие разработчики недооценивают сложность работы с S3 до тех пор, пока не столкнутся с проблемами в продакшене. Локальная эмуляция позволяет выявить эти проблемы на раннем этапе, когда их исправление обходится значительно дешевле.
При выборе конкретного решения для локальной эмуляции S3 стоит учитывать потребности проекта. Если вам нужна только базовая функциональность для загрузки и скачивания файлов, MinIO станет отличным выбором - он легко настраивается и практически не отличается от реального S3 по API. Для более сложных сценариев с использованием S3 Events, Lambda-функций и других AWS-сервисов LocalStack предоставляет более полную эмуляцию экосистемы.
Важно понимать, что локальная эмуляция не заменяет полностью тестирование на реальном S3, но значительно ускоряет цикл разработки и позволяет покрыть тестами большинство сценариев использования. Финальные интеграционные тесты все равно должны проводиться на реальной инфраструктуре. Производительность локальных решений может варьироваться в зависимости от типа операций. Массовые загрузки мелких файлов обычно работают быстрее на MinIO, в то время как операции с большими файлами могут показывать схожие результаты на всех платформах. Главное преимущество - отсутствие сетевой задержки, что делает отладку значительно комфортнее.
Архитектура адаптера для работы с S3
Основой архитектуры должен стать паттерн Adapter, который позволяет унифицировать интерфейс работы с различными реализациями S3-совместимых хранилищ. Это особенно важно, когда в проекте используется несколько типов хранилищ: локальное для разработки, облачное для продакшена и mock-объекты для тестирования.
| C# | 1
2
3
4
5
6
7
8
9
| public interface IStorageAdapter
{
Task<bool> BucketExistsAsync(string bucketName, CancellationToken cancellationToken = default);
Task CreateBucketAsync(string bucketName, CancellationToken cancellationToken = default);
Task<StorageObject> GetObjectAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default);
Task<string> PutObjectAsync(string bucketName, string objectKey, Stream content, CancellationToken cancellationToken = default);
Task DeleteObjectAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default);
Task<IEnumerable<StorageObjectInfo>> ListObjectsAsync(string bucketName, string prefix = null, CancellationToken cancellationToken = default);
} |
|
Принцип единственной ответственности (Single Responsibility Principle) требует разделения функциональности на отдельные компоненты. Я обычно создаю отдельные классы для управления подключениями, сериализации объектов, обработки ошибок и логирования операций.
Асинхронность в современных приложениях не роскошь, а необходимость. Все операции с S3 должны быть асинхронными, поскольку работа с сетью и файловой системой может занимать значительное время. Использование async/await позволяет избежать блокировки основного потока и обеспечивает лучшую отзывчивость приложения.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| public class S3StorageAdapter : IStorageAdapter
{
private readonly IAmazonS3 _s3Client;
private readonly ILogger<S3StorageAdapter> _logger;
private readonly IRetryPolicy _retryPolicy;
public S3StorageAdapter(IAmazonS3 s3Client, ILogger<S3StorageAdapter> logger, IRetryPolicy retryPolicy)
{
_s3Client = s3Client ?? throw new ArgumentNullException(nameof(s3Client));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_retryPolicy = retryPolicy ?? throw new ArgumentNullException(nameof(retryPolicy));
}
public async Task<bool> BucketExistsAsync(string bucketName, CancellationToken cancellationToken = default)
{
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
var response = await _s3Client.DoesS3BucketExistV2Async(bucketName);
return response;
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error checking bucket existence: {BucketName}", bucketName);
throw;
}
}
} |
|
Управление жизненным циклом соединений критически важно для производительности. AWS SDK для .NET использует connection pooling внутри, но важно правильно настроить его параметры. Я рекомендую использовать паттерн Factory для создания экземпляров клиентов S3 с предварительно настроенными параметрами.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| public class S3ClientFactory : IS3ClientFactory
{
private readonly IConfiguration _configuration;
public S3ClientFactory(IConfiguration configuration)
{
_configuration = configuration;
}
public IAmazonS3 CreateClient()
{
var config = new AmazonS3Config
{
ServiceURL = _configuration["S3:ServiceUrl"],
ForcePathStyle = _configuration.GetValue<bool>("S3:ForcePathStyle"),
UseHttp = _configuration.GetValue<bool>("S3:UseHttp"),
MaxErrorRetry = _configuration.GetValue<int>("S3:MaxRetries"),
Timeout = TimeSpan.FromSeconds(_configuration.GetValue<int>("S3:TimeoutSeconds")),
ReadWriteTimeout = TimeSpan.FromSeconds(_configuration.GetValue<int>("S3:ReadWriteTimeoutSeconds"))
};
return new AmazonS3Client(_configuration["S3:AccessKey"], _configuration["S3:SecretKey"], config);
}
} |
|
Обработка ошибок требует особого внимания. S3 может возвращать различные типы ошибок: сетевые проблемы, проблемы авторизации, превышение лимитов. Каждый тип ошибки требует своей стратегии обработки. Для временных сбоев я использую exponential backoff с джиттером, а для критических ошибок - circuit breaker паттерн. Интересный момент связан с обработкой больших файлов. Стандартная загрузка в S3 ограничена 5GB, поэтому для больших файлов необходимо использовать multipart upload. Это требует дополнительной логики для управления частями файла и их сборки. Кеширование метаданных может значительно повысить производительность, особенно для операций проверки существования объектов. Я использую распределенный кеш с настраиваемым TTL для различных типов данных.
Безопасность не должна быть второстепенной задачей. Все чувствительные данные, такие как ключи доступа, должны храниться в защищенном виде. Я рекомендую использовать Azure Key Vault или AWS Secrets Manager для хранения критических параметров подключения.
Мониторинг и наблюдаемость системы обеспечиваются через структурированное логирование и метрики. Каждая операция должна логироваться с соответствующим уровнем детализации, а критические метрики, такие как время отклика и количество ошибок, должны отправляться в систему мониторинга.
Паттерн Repository может быть полезен для создания дополнительного слоя абстракции над адаптером, особенно когда бизнес-логика требует специфических операций с объектами хранилища.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| public class DocumentRepository : IDocumentRepository
{
private readonly IStorageAdapter _storageAdapter;
private readonly string _bucketName;
public DocumentRepository(IStorageAdapter storageAdapter, IConfiguration configuration)
{
_storageAdapter = storageAdapter;
_bucketName = configuration["Documents:BucketName"];
}
public async Task<Document> GetDocumentAsync(Guid documentId)
{
var objectKey = $"documents/{documentId}";
var storageObject = await _storageAdapter.GetObjectAsync(_bucketName, objectKey);
return JsonSerializer.Deserialize<Document>(storageObject.Content);
}
} |
|
Dependency Injection контейнер должен быть настроен для правильного управления зависимостями. Клиенты S3 должны регистрироваться как singleton, поскольку они thread-safe и оптимизированы для повторного использования. Тестируемость архитектуры обеспечивается через использование интерфейсов и внедрение зависимостей. Это позволяет легко подменять реальные реализации мок-объектами в unit-тестах.
Важность правильной настройки сериализации данных часто недооценивается разработчиками. При работе с S3 приходится иметь дело с различными форматами данных: JSON для метаданных, бинарные данные для файлов, XML для некоторых операций API. Я создаю отдельный компонент для управления сериализацией, который может гибко переключаться между различными форматами в зависимости от типа операции.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| public class SerializationManager : ISerializationManager
{
private readonly JsonSerializerOptions _jsonOptions;
public SerializationManager()
{
_jsonOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
};
}
public async Task<T> DeserializeAsync<T>(Stream stream, SerializationFormat format)
{
return format switch
{
SerializationFormat.Json => await JsonSerializer.DeserializeAsync<T>(stream, _jsonOptions),
SerializationFormat.Binary => DeserializeBinary<T>(stream),
_ => throw new NotSupportedException($"Format {format} is not supported")
};
}
} |
|
Конфигурация адаптера должна быть достаточно гибкой для поддержки различных сред развертывания. Паттерн Options позволяет инкапсулировать настройки и валидировать их при запуске приложения. Это особенно важно при работе с микросервисами, где конфигурация может быть распределена между несколькими источниками.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| public class S3AdapterOptions
{
public string ServiceUrl { get; set; }
public string AccessKey { get; set; }
public string SecretKey { get; set; }
public bool ForcePathStyle { get; set; } = true;
public int TimeoutSeconds { get; set; } = 30;
public int MaxRetries { get; set; } = 3;
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(1);
public bool UseHttps { get; set; } = false;
public void Validate()
{
if (string.IsNullOrEmpty(ServiceUrl))
throw new ArgumentException("ServiceUrl is required");
if (string.IsNullOrEmpty(AccessKey))
throw new ArgumentException("AccessKey is required");
if (string.IsNullOrEmpty(SecretKey))
throw new ArgumentException("SecretKey is required");
}
} |
|
Metadata обработка требует особого внимания при проектировании архитектуры. S3 позволяет сохранять пользовательские метаданные с каждым объектом, что открывает широкие возможности для реализации бизнес-логики. Я создаю отдельный компонент для работы с метаданными, который может автоматически добавлять служебную информацию: timestamp загрузки, версию приложения, хеш содержимого. Версионирование объектов - критически важная функция для многих бизнес-приложений. S3 поддерживает версионирование нативно, но требует дополнительной логики для управления жизненным циклом версий. Я реализую отдельный сервис для работы с версиями, который может автоматически очищать старые версии согласно заданным правилам.
Batch операции позволяют значительно повысить производительность при массовых операциях с объектами. Вместо отправки отдельных запросов для каждого файла, можно группировать операции и выполнять их параллельно с контролем степени параллелизма.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| public class BatchOperationManager
{
private readonly SemaphoreSlim _semaphore;
private readonly IStorageAdapter _adapter;
public BatchOperationManager(IStorageAdapter adapter, int maxConcurrency = 10)
{
_adapter = adapter;
_semaphore = new SemaphoreSlim(maxConcurrency);
}
public async Task<BatchResult> ProcessBatchAsync(IEnumerable<BatchOperation> operations)
{
var tasks = operations.Select(op => ProcessOperationAsync(op));
var results = await Task.WhenAll(tasks);
return new BatchResult
{
SuccessCount = results.Count(r => r.IsSuccess),
FailureCount = results.Count(r => !r.IsSuccess),
Results = results
};
}
private async Task<OperationResult> ProcessOperationAsync(BatchOperation operation)
{
await _semaphore.WaitAsync();
try
{
return await ExecuteOperationAsync(operation);
}
finally
{
_semaphore.Release();
}
}
} |
|
Транзакционность операций в S3 ограничена, но можно реализовать компенсационные действия через паттерн Saga. Это особенно важно при работе с несколькими связанными объектами, где сбой одной операции должен приводить к компенсации уже выполненных действий. Кеширование стратегий должно быть многоуровневым: L1 кеш для метаданных в памяти, L2 кеш для часто используемых объектов в Redis, L3 кеш на уровне CDN для публичных ресурсов. Каждый уровень требует своей стратегии инвалидации и синхронизации. Мониторинг производительности адаптера включает сбор метрик по времени отклика, количеству ошибок, использованию CPU и памяти. Я интегрирую Prometheus для сбора метрик и создаю дашборды в Grafana для визуализации производительности системы.
Пишем простой бот для браузера Привет!
Подскажите, каким образом лучше подойти к решению задачи. Мне нужно написать бота,... Не могу настроить docker и Nginx (впервые делаю) для общего хранилища Мне надо чтобы я мог загружать файл в общее хранилище через FastAPI и мог использовать эти... Cтруктура программы. Где пишем основную часть кода? Добрый всем денек, есть ламерский вопрос, не пинайте ногами.
Создаю проект C# в VS 2005,... Пишем Секундомер Привет. необходимо сделать такую штуку.
По нажатию клавиши старт должен включиться секундомер(...
Настройка окружения и зависимостей
Выбор библиотек для работы с S3 в .NET довольно ограничен, но качественен. Основной выбор - это официальный AWS SDK for .NET, который предоставляет полный функционал для работы с S3 и регулярно обновляется командой Amazon. Альтернативой может служить MinIO .NET SDK, который оптимизирован для работы с MinIO, но также совместим с AWS S3.
| XML | 1
2
3
4
5
6
7
8
| <PackageReference Include="AWSSDK.S3" Version="3.7.107.25" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
<PackageReference Include="Polly" Version="7.2.3" />
<PackageReference Include="Polly.Extensions.Http" Version="3.0.0" /> |
|
Для локального тестирования я предпочитаю использовать MinIO из-за его простоты настройки и полной совместимости с S3 API. Docker Compose файл для MinIO выглядит достаточно минималистично, но содержит все необходимые параметры для комфортной работы:
| YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| version: '3.8'
services:
minio:
image: minio/minio:latest
container_name: minio-dev
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes:
- minio_data:/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
volumes:
minio_data: |
|
Структура проекта должна следовать принципам чистой архитектуры. Я создаю отдельные проекты для различных слоев: core библиотека с интерфейсами, implementation с конкретными реализациями, tests для тестов и sample application для демонстрации функциональности.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| S3Adapter/
├── src/
│ ├── S3Adapter.Core/
│ │ ├── Interfaces/
│ │ ├── Models/
│ │ └── Exceptions/
│ ├── S3Adapter.Implementation/
│ │ ├── Adapters/
│ │ ├── Configuration/
│ │ └── Services/
│ └── S3Adapter.Sample/
│ ├── Controllers/
│ └── Program.cs
├── tests/
│ ├── S3Adapter.UnitTests/
│ └── S3Adapter.IntegrationTests/
├── docker-compose.yml
└── S3Adapter.sln |
|
Конфигурация приложения должна быть гибкой и поддерживать различные среды развертывания. Я использую паттерн Options для инкапсуляции настроек и их валидации при запуске приложения:
| JSON | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| {
"S3Configuration": {
"ServiceUrl": "http://localhost:9000",
"AccessKey": "minioadmin",
"SecretKey": "minioadmin",
"ForcePathStyle": true,
"UseHttps": false,
"Region": "us-east-1",
"TimeoutSeconds": 30,
"MaxRetries": 3,
"RetryDelaySeconds": 1
},
"Logging": {
"LogLevel": {
"Default": "Information",
"S3Adapter": "Debug"
}
}
} |
|
Регистрация сервисов в DI контейнере требует особого внимания к жизненному циклу объектов. S3 клиенты thread-safe и оптимизированы для повторного использования, поэтому регистрирую их как singleton:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| public static class ServiceCollectionExtensions
{
public static IServiceCollection AddS3Adapter(this IServiceCollection services, IConfiguration configuration)
{
services.Configure<S3AdapterOptions>(configuration.GetSection("S3Configuration"));
services.AddSingleton<IS3ClientFactory, S3ClientFactory>();
services.AddSingleton<IAmazonS3>(provider =>
{
var factory = provider.GetRequiredService<IS3ClientFactory>();
return factory.CreateClient();
});
services.AddScoped<IStorageAdapter, S3StorageAdapter>();
services.AddScoped<IRetryPolicy, ExponentialBackoffRetryPolicy>();
return services;
}
} |
|
Настройка логирования критически важна для отладки проблем в продакшене. Я конфигурирую структурированное логирование с различными уровнями детализации для разных компонентов системы. Особенно важно логировать все операции с S3 вместе с их параметрами и временем выполнения. Для обработки ошибок и повторных попыток использую библиотеку Polly, которая предоставляет богатый набор паттернов для обеспечения отказоустойчивости. Exponential backoff с джиттером помогает избежать thundering herd эффекта при массовых сбоях.
Переменные окружения играют важную роль в настройке различных сред развертывания. Для локальной разработки использую файл .env, который загружается при старте приложения:
| C# | 1
2
3
4
5
6
| S3_SERVICE_URL=http://localhost:9000
S3_ACCESS_KEY=minioadmin
S3_SECRET_KEY=minioadmin
S3_FORCE_PATH_STYLE=true
S3_USE_HTTPS=false
ASPNETCORE_ENVIRONMENT=Development |
|
Healthcheck endpoints необходимы для мониторинга состояния адаптера в продакшене. Я добавляю проверку доступности S3 сервиса и валидности конфигурации:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| public class S3HealthCheck : IHealthCheck
{
private readonly IAmazonS3 _s3Client;
public S3HealthCheck(IAmazonS3 s3Client)
{
_s3Client = s3Client;
}
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
try
{
await _s3Client.ListBucketsAsync(cancellationToken);
return HealthCheckResult.Healthy("S3 service is available");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("S3 service is not available", ex);
}
}
} |
|
Важность правильной настройки сериализации JSON часто недооценивается. Конфигурирую JsonSerializer с оптимальными настройками для работы с S3 метаданными и обеспечиваю обратную совместимость:
| C# | 1
2
3
4
5
6
7
| services.Configure<JsonSerializerOptions>(options =>
{
options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
options.WriteIndented = false;
options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
options.Converters.Add(new JsonStringEnumConverter());
}); |
|
Кеширование конфигурации может значительно повысить производительность приложения. Использую IMemoryCache для кеширования часто используемых параметров конфигурации с настраиваемым TTL. Безопасность конфигурации обеспечивается через использование Azure Key Vault или AWS Secrets Manager для хранения чувствительных данных. В локальной среде разработки использую User Secrets для изоляции конфиденциальной информации от репозитория. Мониторинг производительности настраивается через интеграцию с Application Insights или Prometheus. Собираю метрики по времени отклика операций, количеству ошибок и использованию ресурсов.
Автоматизация развертывания окружения через Docker Compose позволяет новым разработчикам быстро начать работу с проектом. Один командой docker-compose up поднимается полное окружение для разработки и тестирования.
Интеграционные тесты требуют отдельного набора конфигураций для тестового окружения. Создаю separate appsettings.Testing.json файл с настройками для тестовой среды:
| JSON | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| {
"S3Configuration": {
"ServiceUrl": "http://localhost:9000",
"AccessKey": "testkey",
"SecretKey": "testsecret",
"BucketPrefix": "test-",
"CleanupOnDispose": true,
"CreateBucketIfNotExists": true
},
"TestSettings": {
"MaxExecutionTimeSeconds": 60,
"ParallelTestExecution": false,
"RetryFailedTests": true
}
} |
|
Dockerfile для контейнеризации приложения должен учитывать специфику работы с S3 и оптимизироваться для production использования:
| Windows Batch file | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| FROM mcr.microsoft.com/dotnet/aspnet:7.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443
FROM mcr.microsoft.com/dotnet/sdk:7.0 AS build
WORKDIR /src
COPY ["S3Adapter.Sample/S3Adapter.Sample.csproj", "S3Adapter.Sample/"]
COPY ["S3Adapter.Implementation/S3Adapter.Implementation.csproj", "S3Adapter.Implementation/"]
COPY ["S3Adapter.Core/S3Adapter.Core.csproj", "S3Adapter.Core/"]
RUN dotnet restore "S3Adapter.Sample/S3Adapter.Sample.csproj"
COPY . .
WORKDIR "/src/S3Adapter.Sample"
RUN dotnet build "S3Adapter.Sample.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "S3Adapter.Sample.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "S3Adapter.Sample.dll"] |
|
Настройка CI/CD pipeline в GitHub Actions автоматизирует процесс тестирования и развертывания. Конфигурация включает запуск MinIO для интеграционных тестов и проверку совместимости с различными версиями .NET:
| YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| name: CI/CD Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
services:
minio:
image: minio/minio
ports:
- 9000:9000
env:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
options: --health-cmd "curl -f http://localhost:9000/minio/health/live" --health-interval 30s --health-timeout 20s --health-retries 3
steps:
- uses: actions/checkout@v3
- name: Setup .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: 7.0.x
- name: Restore dependencies
run: dotnet restore
- name: Build
run: dotnet build --no-restore
- name: Test
run: dotnet test --no-build --verbosity normal |
|
Профилирование производительности настраивается через dotnet-counters и PerfView для анализа использования памяти и CPU. Это особенно важно при работе с большими файлами и массовыми операциями.
Настройка распределенного кеширования через Redis требует дополнительных конфигураций для обеспечения отказоустойчивости:
| C# | 1
2
3
4
5
6
7
| services.AddStackExchangeRedisCache(options =>
{
options.Configuration = configuration.GetConnectionString("Redis");
options.InstanceName = "S3Adapter";
});
services.AddSingleton<IDistributedCacheService, DistributedCacheService>(); |
|
Трассировка запросов настраивается через OpenTelemetry для мониторинга производительности в распределенных системах:
| C# | 1
2
3
4
5
6
7
8
9
| services.AddOpenTelemetry()
.WithTracing(builder =>
{
builder.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddSource("S3Adapter")
.SetSampler(new AlwaysOnSampler())
.AddJaegerExporter();
}); |
|
Валидация конфигурации при старте приложения предотвращает runtime ошибки и обеспечивает fail-fast поведение:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| public class S3ConfigurationValidator : IValidateOptions<S3AdapterOptions>
{
public ValidateOptionsResult Validate(string name, S3AdapterOptions options)
{
var failures = new List<string>();
if (string.IsNullOrEmpty(options.ServiceUrl))
failures.Add("ServiceUrl is required");
if (options.TimeoutSeconds <= 0)
failures.Add("TimeoutSeconds must be positive");
if (failures.Any())
return ValidateOptionsResult.Fail(failures);
return ValidateOptionsResult.Success;
}
} |
|
Backup и recovery процедуры для локальной разработки обеспечивают быстрое восстановление данных после сбоев. Автоматические снапшоты MinIO данных сохраняются в отдельном volume. Мониторинг ресурсов через custom metrics позволяет отслеживать специфичные для S3 операций показатели: скорость загрузки, размер объектов, количество операций по типам.
Реализация базового адаптера
Создание базового адаптера начинается с реализации основных операций работы с бакетами. Это фундаментальный функционал, который определяет всю дальнейшую архитектуру решения. За годы работы с различными системами хранения я понял, что качественная реализация базовых операций критически важна для стабильности всей системы.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
| public class S3StorageAdapter : IStorageAdapter, IDisposable
{
private readonly IAmazonS3 _s3Client;
private readonly ILogger<S3StorageAdapter> _logger;
private readonly IRetryPolicy _retryPolicy;
private readonly S3AdapterOptions _options;
private readonly SemaphoreSlim _semaphore;
private bool _disposed = false;
public S3StorageAdapter(
IAmazonS3 s3Client,
ILogger<S3StorageAdapter> logger,
IRetryPolicy retryPolicy,
IOptions<S3AdapterOptions> options)
{
_s3Client = s3Client ?? throw new ArgumentNullException(nameof(s3Client));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_retryPolicy = retryPolicy ?? throw new ArgumentNullException(nameof(retryPolicy));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_semaphore = new SemaphoreSlim(_options.MaxConcurrentOperations, _options.MaxConcurrentOperations);
}
public async Task<bool> BucketExistsAsync(string bucketName, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
_logger.LogDebug("Checking bucket existence: {BucketName}", bucketName);
var response = await _s3Client.DoesS3BucketExistV2Async(bucketName);
_logger.LogDebug("Bucket {BucketName} exists: {Exists}", bucketName, response);
return response;
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error checking bucket existence: {BucketName}", bucketName);
throw new StorageException($"Failed to check bucket existence: {bucketName}", ex);
}
finally
{
_semaphore.Release();
}
}
public async Task CreateBucketAsync(string bucketName, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
if (await BucketExistsAsync(bucketName, cancellationToken))
{
_logger.LogInformation("Bucket {BucketName} already exists", bucketName);
return;
}
await _semaphore.WaitAsync(cancellationToken);
try
{
await _retryPolicy.ExecuteAsync(async () =>
{
_logger.LogInformation("Creating bucket: {BucketName}", bucketName);
var request = new PutBucketRequest
{
BucketName = bucketName,
UseClientRegion = true
};
await _s3Client.PutBucketAsync(request, cancellationToken);
_logger.LogInformation("Bucket {BucketName} created successfully", bucketName);
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error creating bucket: {BucketName}", bucketName);
throw new StorageException($"Failed to create bucket: {bucketName}", ex);
}
finally
{
_semaphore.Release();
}
}
} |
|
Загрузка и скачивание файлов требует особого внимания к обработке потоков данных. Важно правильно управлять памятью и обеспечивать возможность работы с большими файлами без загрузки их полностью в память.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
| public async Task<string> PutObjectAsync(string bucketName, string objectKey, Stream content,
ObjectMetadata metadata = null, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
ValidateObjectKey(objectKey);
if (content == null)
throw new ArgumentNullException(nameof(content));
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
_logger.LogDebug("Uploading object: {BucketName}/{ObjectKey}", bucketName, objectKey);
var request = new PutObjectRequest
{
BucketName = bucketName,
Key = objectKey,
InputStream = content,
ContentType = metadata?.ContentType ?? "application/octet-stream",
ServerSideEncryptionMethod = ServerSideEncryptionMethod.AES256
};
if (metadata?.CustomMetadata != null)
{
foreach (var kvp in metadata.CustomMetadata)
{
request.Headers.Add($"x-amz-meta-{kvp.Key}", kvp.Value);
}
}
if (content.CanSeek)
{
request.Headers.ContentLength = content.Length;
}
var response = await _s3Client.PutObjectAsync(request, cancellationToken);
_logger.LogInformation("Object uploaded successfully: {BucketName}/{ObjectKey}, ETag: {ETag}",
bucketName, objectKey, response.ETag);
return response.ETag;
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error uploading object: {BucketName}/{ObjectKey}", bucketName, objectKey);
throw new StorageException($"Failed to upload object: {bucketName}/{objectKey}", ex);
}
finally
{
_semaphore.Release();
}
}
public async Task<StorageObject> GetObjectAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
ValidateObjectKey(objectKey);
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
_logger.LogDebug("Downloading object: {BucketName}/{ObjectKey}", bucketName, objectKey);
var request = new GetObjectRequest
{
BucketName = bucketName,
Key = objectKey
};
var response = await _s3Client.GetObjectAsync(request, cancellationToken);
var metadata = new ObjectMetadata
{
ContentType = response.Headers.ContentType,
ContentLength = response.Headers.ContentLength,
LastModified = response.LastModified,
ETag = response.ETag,
CustomMetadata = response.Metadata.Where(kvp => kvp.Key.StartsWith("x-amz-meta-"))
.ToDictionary(kvp => kvp.Key.Substring(11), kvp => kvp.Value)
};
_logger.LogInformation("Object downloaded successfully: {BucketName}/{ObjectKey}, Size: {Size}",
bucketName, objectKey, response.Headers.ContentLength);
return new StorageObject
{
BucketName = bucketName,
Key = objectKey,
Content = response.ResponseStream,
Metadata = metadata
};
}, cancellationToken);
}
catch (AmazonS3Exception ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
_logger.LogWarning("Object not found: {BucketName}/{ObjectKey}", bucketName, objectKey);
throw new ObjectNotFoundException($"Object not found: {bucketName}/{objectKey}");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error downloading object: {BucketName}/{ObjectKey}", bucketName, objectKey);
throw new StorageException($"Failed to download object: {bucketName}/{objectKey}", ex);
}
finally
{
_semaphore.Release();
}
} |
|
Управление метаданными объектов позволяет сохранять дополнительную информацию вместе с файлами. Это особенно полезно для хранения бизнес-логики, версий файлов и других служебных данных.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
| public async Task<ObjectMetadata> GetObjectMetadataAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
ValidateObjectKey(objectKey);
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
_logger.LogDebug("Getting object metadata: {BucketName}/{ObjectKey}", bucketName, objectKey);
var request = new GetObjectMetadataRequest
{
BucketName = bucketName,
Key = objectKey
};
var response = await _s3Client.GetObjectMetadataAsync(request, cancellationToken);
return new ObjectMetadata
{
ContentType = response.Headers.ContentType,
ContentLength = response.Headers.ContentLength,
LastModified = response.LastModified,
ETag = response.ETag,
CustomMetadata = response.Metadata.Where(kvp => kvp.Key.StartsWith("x-amz-meta-"))
.ToDictionary(kvp => kvp.Key.Substring(11), kvp => kvp.Value)
};
}, cancellationToken);
}
catch (AmazonS3Exception ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
throw new ObjectNotFoundException($"Object not found: {bucketName}/{objectKey}");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting object metadata: {BucketName}/{ObjectKey}", bucketName, objectKey);
throw new StorageException($"Failed to get object metadata: {bucketName}/{objectKey}", ex);
}
finally
{
_semaphore.Release();
}
}
private void ValidateBucketName(string bucketName)
{
if (string.IsNullOrWhiteSpace(bucketName))
throw new ArgumentException("Bucket name cannot be null or empty", nameof(bucketName));
if (bucketName.Length < 3 || bucketName.Length > 63)
throw new ArgumentException("Bucket name must be between 3 and 63 characters", nameof(bucketName));
if (!Regex.IsMatch(bucketName, @"^[a-z0-9][a-z0-9\-]*[a-z0-9]$"))
throw new ArgumentException("Bucket name contains invalid characters", nameof(bucketName));
}
private void ValidateObjectKey(string objectKey)
{
if (string.IsNullOrWhiteSpace(objectKey))
throw new ArgumentException("Object key cannot be null or empty", nameof(objectKey));
if (objectKey.Length > 1024)
throw new ArgumentException("Object key cannot exceed 1024 characters", nameof(objectKey));
} |
|
Реализация операций копирования объектов внутри хранилища без необходимости скачивания данных на клиент. Это значительно экономит трафик и время выполнения операций.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
| public async Task<string> CopyObjectAsync(string sourceBucket, string sourceKey, string destinationBucket, string destinationKey, CancellationToken cancellationToken = default)
{
ValidateBucketName(sourceBucket);
ValidateObjectKey(sourceKey);
ValidateBucketName(destinationBucket);
ValidateObjectKey(destinationKey);
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
_logger.LogDebug("Copying object: {SourceBucket}/{SourceKey} to {DestinationBucket}/{DestinationKey}",
sourceBucket, sourceKey, destinationBucket, destinationKey);
var request = new CopyObjectRequest
{
SourceBucket = sourceBucket,
SourceKey = sourceKey,
DestinationBucket = destinationBucket,
DestinationKey = destinationKey,
ServerSideEncryptionMethod = ServerSideEncryptionMethod.AES256
};
var response = await _s3Client.CopyObjectAsync(request, cancellationToken);
_logger.LogInformation("Object copied successfully: {SourceBucket}/{SourceKey} to {DestinationBucket}/{DestinationKey}",
sourceBucket, sourceKey, destinationBucket, destinationKey);
return response.ETag;
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error copying object: {SourceBucket}/{SourceKey} to {DestinationBucket}/{DestinationKey}",
sourceBucket, sourceKey, destinationBucket, destinationKey);
throw new StorageException($"Failed to copy object: {sourceBucket}/{sourceKey} to {destinationBucket}/{destinationKey}", ex);
}
finally
{
_semaphore.Release();
}
}
public async Task DeleteObjectAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
ValidateObjectKey(objectKey);
await _semaphore.WaitAsync(cancellationToken);
try
{
await _retryPolicy.ExecuteAsync(async () =>
{
_logger.LogDebug("Deleting object: {BucketName}/{ObjectKey}", bucketName, objectKey);
var request = new DeleteObjectRequest
{
BucketName = bucketName,
Key = objectKey
};
await _s3Client.DeleteObjectAsync(request, cancellationToken);
_logger.LogInformation("Object deleted successfully: {BucketName}/{ObjectKey}", bucketName, objectKey);
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error deleting object: {BucketName}/{ObjectKey}", bucketName, objectKey);
throw new StorageException($"Failed to delete object: {bucketName}/{objectKey}", ex);
}
finally
{
_semaphore.Release();
}
} |
|
Batch-операции для массовой обработки объектов позволяют значительно увеличить пропускную способность системы. Вместо последовательного выполнения операций - группируем их и выполняем параллельно с контролем нагрузки.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| public async Task<BatchDeleteResult> DeleteObjectsAsync(string bucketName, IEnumerable<string> objectKeys, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
var keysList = objectKeys?.ToList() ?? throw new ArgumentNullException(nameof(objectKeys));
if (!keysList.Any())
return new BatchDeleteResult { SuccessfulDeletes = new List<string>(), Errors = new List<BatchDeleteError>() };
var successfulDeletes = new List<string>();
var errors = new List<BatchDeleteError>();
// Обрабатываем батчами по 1000 объектов (лимит S3)
var batches = keysList.Batch(1000);
foreach (var batch in batches)
{
await _semaphore.WaitAsync(cancellationToken);
try
{
await _retryPolicy.ExecuteAsync(async () =>
{
var request = new DeleteObjectsRequest
{
BucketName = bucketName,
Objects = batch.Select(key => new KeyVersion { Key = key }).ToList()
};
var response = await _s3Client.DeleteObjectsAsync(request, cancellationToken);
successfulDeletes.AddRange(response.DeletedObjects.Select(d => d.Key));
errors.AddRange(response.DeleteErrors.Select(e => new BatchDeleteError
{
Key = e.Key,
Code = e.Code,
Message = e.Message
}));
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in batch delete operation for bucket: {BucketName}", bucketName);
errors.AddRange(batch.Select(key => new BatchDeleteError
{
Key = key,
Code = "InternalError",
Message = ex.Message
}));
}
finally
{
_semaphore.Release();
}
}
return new BatchDeleteResult
{
SuccessfulDeletes = successfulDeletes,
Errors = errors
};
} |
|
Пагинация критически важна при работе с большими объемами данных. S3 API возвращает максимум 1000 объектов за один запрос, поэтому необходимо правильно реализовать постраничную загрузку.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| public async Task<PagedResult<StorageObjectInfo>> ListObjectsAsync(string bucketName, string prefix = null,
int maxKeys = 1000, string continuationToken = null, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
if (maxKeys <= 0 || maxKeys > 1000)
throw new ArgumentOutOfRangeException(nameof(maxKeys), "MaxKeys must be between 1 and 1000");
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
_logger.LogDebug("Listing objects in bucket: {BucketName}, prefix: {Prefix}, maxKeys: {MaxKeys}",
bucketName, prefix, maxKeys);
var request = new ListObjectsV2Request
{
BucketName = bucketName,
Prefix = prefix,
MaxKeys = maxKeys,
ContinuationToken = continuationToken
};
var response = await _s3Client.ListObjectsV2Async(request, cancellationToken);
var objects = response.S3Objects.Select(obj => new StorageObjectInfo
{
Key = obj.Key,
Size = obj.Size,
LastModified = obj.LastModified,
ETag = obj.ETag,
StorageClass = obj.StorageClass?.Value
}).ToList();
_logger.LogInformation("Listed {Count} objects in bucket {BucketName}", objects.Count, bucketName);
return new PagedResult<StorageObjectInfo>
{
Items = objects,
IsTruncated = response.IsTruncated,
NextContinuationToken = response.NextContinuationToken,
TotalCount = response.KeyCount
};
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error listing objects in bucket: {BucketName}", bucketName);
throw new StorageException($"Failed to list objects in bucket: {bucketName}", ex);
}
finally
{
_semaphore.Release();
}
} |
|
Версионирование объектов требует дополнительных методов для работы с историей изменений. Это критически важно для приложений, где требуется отслеживание изменений данных.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
| public async Task<IEnumerable<ObjectVersion>> ListObjectVersionsAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
ValidateObjectKey(objectKey);
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
var request = new ListVersionsRequest
{
BucketName = bucketName,
Prefix = objectKey
};
var response = await _s3Client.ListVersionsAsync(request, cancellationToken);
return response.Versions
.Where(v => v.Key == objectKey)
.Select(v => new ObjectVersion
{
Key = v.Key,
VersionId = v.VersionId,
IsLatest = v.IsLatest,
LastModified = v.LastModified,
Size = v.Size,
ETag = v.ETag
})
.OrderByDescending(v => v.LastModified);
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error listing object versions: {BucketName}/{ObjectKey}", bucketName, objectKey);
throw new StorageException($"Failed to list object versions: {bucketName}/{objectKey}", ex);
}
finally
{
_semaphore.Release();
}
}
public async Task<StorageObject> GetObjectVersionAsync(string bucketName, string objectKey, string versionId, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
ValidateObjectKey(objectKey);
if (string.IsNullOrWhiteSpace(versionId))
throw new ArgumentException("Version ID cannot be null or empty", nameof(versionId));
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
var request = new GetObjectRequest
{
BucketName = bucketName,
Key = objectKey,
VersionId = versionId
};
var response = await _s3Client.GetObjectAsync(request, cancellationToken);
return new StorageObject
{
BucketName = bucketName,
Key = objectKey,
VersionId = versionId,
Content = response.ResponseStream,
Metadata = new ObjectMetadata
{
ContentType = response.Headers.ContentType,
ContentLength = response.Headers.ContentLength,
LastModified = response.LastModified,
ETag = response.ETag
}
};
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting object version: {BucketName}/{ObjectKey}, version: {VersionId}",
bucketName, objectKey, versionId);
throw new StorageException($"Failed to get object version: {bucketName}/{objectKey}", ex);
}
finally
{
_semaphore.Release();
}
} |
|
Правильная реализация Dispose паттерна обеспечивает корректное освобождение ресурсов при завершении работы с адаптером.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_semaphore?.Dispose();
_s3Client?.Dispose();
}
_disposed = true;
}
} |
|
Фильтрация объектов по различным критериям расширяет возможности поиска и навигации в хранилище. Реализация гибкой системы фильтров позволяет строить сложные запросы для поиска нужных данных.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
| public async Task<PagedResult<StorageObjectInfo>> ListObjectsWithFilterAsync(string bucketName,
ObjectFilter filter, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
if (filter == null)
throw new ArgumentNullException(nameof(filter));
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
var allObjects = new List<StorageObjectInfo>();
string continuationToken = null;
do
{
var request = new ListObjectsV2Request
{
BucketName = bucketName,
Prefix = filter.Prefix,
MaxKeys = 1000,
ContinuationToken = continuationToken
};
var response = await _s3Client.ListObjectsV2Async(request, cancellationToken);
var filteredObjects = response.S3Objects
.Where(obj => ApplyObjectFilter(obj, filter))
.Select(obj => new StorageObjectInfo
{
Key = obj.Key,
Size = obj.Size,
LastModified = obj.LastModified,
ETag = obj.ETag,
StorageClass = obj.StorageClass?.Value
});
allObjects.AddRange(filteredObjects);
continuationToken = response.NextContinuationToken;
} while (continuationToken != null && allObjects.Count < filter.MaxResults);
var sortedObjects = ApplySorting(allObjects, filter.SortBy, filter.SortDirection);
var pagedObjects = sortedObjects.Skip(filter.Skip).Take(filter.MaxResults).ToList();
return new PagedResult<StorageObjectInfo>
{
Items = pagedObjects,
IsTruncated = sortedObjects.Count() > filter.Skip + filter.MaxResults,
TotalCount = allObjects.Count
};
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error filtering objects in bucket: {BucketName}", bucketName);
throw new StorageException($"Failed to filter objects in bucket: {bucketName}", ex);
}
finally
{
_semaphore.Release();
}
}
private bool ApplyObjectFilter(S3Object obj, ObjectFilter filter)
{
if (filter.MinSize.HasValue && obj.Size < filter.MinSize.Value)
return false;
if (filter.MaxSize.HasValue && obj.Size > filter.MaxSize.Value)
return false;
if (filter.ModifiedAfter.HasValue && obj.LastModified < filter.ModifiedAfter.Value)
return false;
if (filter.ModifiedBefore.HasValue && obj.LastModified > filter.ModifiedBefore.Value)
return false;
if (filter.FileExtensions?.Any() == true)
{
var extension = Path.GetExtension(obj.Key)?.ToLowerInvariant();
if (!filter.FileExtensions.Contains(extension))
return false;
}
return true;
}
private IEnumerable<StorageObjectInfo> ApplySorting(IEnumerable<StorageObjectInfo> objects,
SortBy sortBy, SortDirection direction)
{
var query = sortBy switch
{
SortBy.Name => direction == SortDirection.Ascending ?
objects.OrderBy(o => o.Key) : objects.OrderByDescending(o => o.Key),
SortBy.Size => direction == SortDirection.Ascending ?
objects.OrderBy(o => o.Size) : objects.OrderByDescending(o => o.Size),
SortBy.LastModified => direction == SortDirection.Ascending ?
objects.OrderBy(o => o.LastModified) : objects.OrderByDescending(o => o.LastModified),
_ => objects.OrderBy(o => o.Key)
};
return query;
} |
|
Проверка целостности данных через вычисление хешей обеспечивает надежность хранения и передачи файлов. Реализация включает поддержку различных алгоритмов хеширования.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
| public async Task<bool> VerifyObjectIntegrityAsync(string bucketName, string objectKey,
string expectedHash, HashAlgorithmType hashType = HashAlgorithmType.MD5,
CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
ValidateObjectKey(objectKey);
if (string.IsNullOrWhiteSpace(expectedHash))
throw new ArgumentException("Expected hash cannot be null or empty", nameof(expectedHash));
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
using var storageObject = await GetObjectAsync(bucketName, objectKey, cancellationToken);
using var hashAlgorithm = CreateHashAlgorithm(hashType);
var computedHash = await ComputeHashAsync(storageObject.Content, hashAlgorithm, cancellationToken);
var isValid = string.Equals(computedHash, expectedHash, StringComparison.OrdinalIgnoreCase);
_logger.LogInformation("Integrity check for {BucketName}/{ObjectKey}: {Result}",
bucketName, objectKey, isValid ? "VALID" : "INVALID");
return isValid;
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error verifying object integrity: {BucketName}/{ObjectKey}", bucketName, objectKey);
throw new StorageException($"Failed to verify object integrity: {bucketName}/{objectKey}", ex);
}
finally
{
_semaphore.Release();
}
}
private HashAlgorithm CreateHashAlgorithm(HashAlgorithmType hashType)
{
return hashType switch
{
HashAlgorithmType.MD5 => MD5.Create(),
HashAlgorithmType.SHA1 => SHA1.Create(),
HashAlgorithmType.SHA256 => SHA256.Create(),
HashAlgorithmType.SHA512 => SHA512.Create(),
_ => throw new NotSupportedException($"Hash algorithm {hashType} is not supported")
};
}
private async Task<string> ComputeHashAsync(Stream stream, HashAlgorithm hashAlgorithm, CancellationToken cancellationToken)
{
const int bufferSize = 8192;
var buffer = new byte[bufferSize];
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer, 0, bufferSize, cancellationToken)) > 0)
{
hashAlgorithm.TransformBlock(buffer, 0, bytesRead, null, 0);
}
hashAlgorithm.TransformFinalBlock(new byte[0], 0, 0);
return Convert.ToBase64String(hashAlgorithm.Hash);
} |
|
Генерация временных подписанных URL обеспечивает безопасный доступ к объектам без необходимости передачи учетных данных. Это особенно важно для публичного доступа к приватным ресурсам.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| public async Task<string> GeneratePresignedUrlAsync(string bucketName, string objectKey,
TimeSpan expiration, HttpVerb httpVerb = HttpVerb.GET, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
ValidateObjectKey(objectKey);
if (expiration <= TimeSpan.Zero)
throw new ArgumentException("Expiration must be positive", nameof(expiration));
if (expiration > TimeSpan.FromDays(7))
throw new ArgumentException("Expiration cannot exceed 7 days", nameof(expiration));
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
var request = new GetPreSignedUrlRequest
{
BucketName = bucketName,
Key = objectKey,
Expires = DateTime.UtcNow.Add(expiration),
Verb = httpVerb
};
var presignedUrl = await _s3Client.GetPreSignedURLAsync(request);
_logger.LogDebug("Generated presigned URL for {BucketName}/{ObjectKey}, expires in {Expiration}",
bucketName, objectKey, expiration);
return presignedUrl;
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error generating presigned URL: {BucketName}/{ObjectKey}", bucketName, objectKey);
throw new StorageException($"Failed to generate presigned URL: {bucketName}/{objectKey}", ex);
}
finally
{
_semaphore.Release();
}
} |
|
Статистика использования хранилища предоставляет важную информацию о состоянии и эффективности работы системы. Метрики помогают оптимизировать производительность и планировать расходы.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
| public async Task<StorageStatistics> GetStorageStatisticsAsync(string bucketName, string prefix = null,
CancellationToken cancellationToken = default)
{
ValidateBucketName(bucketName);
await _semaphore.WaitAsync(cancellationToken);
try
{
return await _retryPolicy.ExecuteAsync(async () =>
{
var statistics = new StorageStatistics
{
BucketName = bucketName,
Prefix = prefix,
GeneratedAt = DateTime.UtcNow
};
var allObjects = new List<S3Object>();
string continuationToken = null;
do
{
var request = new ListObjectsV2Request
{
BucketName = bucketName,
Prefix = prefix,
MaxKeys = 1000,
ContinuationToken = continuationToken
};
var response = await _s3Client.ListObjectsV2Async(request, cancellationToken);
allObjects.AddRange(response.S3Objects);
continuationToken = response.NextContinuationToken;
} while (continuationToken != null);
statistics.TotalObjects = allObjects.Count;
statistics.TotalSize = allObjects.Sum(obj => obj.Size);
statistics.AverageObjectSize = statistics.TotalObjects > 0 ?
statistics.TotalSize / statistics.TotalObjects : 0;
var groupedByExtension = allObjects
.GroupBy(obj => Path.GetExtension(obj.Key)?.ToLowerInvariant() ?? "no-extension")
.Select(g => new FileTypeStatistics
{
Extension = g.Key,
Count = g.Count(),
TotalSize = g.Sum(obj => obj.Size)
})
.OrderByDescending(stat => stat.TotalSize);
statistics.FileTypeStatistics = groupedByExtension.ToList();
_logger.LogInformation("Storage statistics for {BucketName}: {TotalObjects} objects, {TotalSize} bytes",
bucketName, statistics.TotalObjects, statistics.TotalSize);
return statistics;
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting storage statistics: {BucketName}", bucketName);
throw new StorageException($"Failed to get storage statistics: {bucketName}", ex);
}
finally
{
_semaphore.Release();
}
} |
|
Продвинутые возможности и оптимизация
Потоковая обработка больших файлов требует кардинально иного подхода к архитектуре адаптера. Вместо загрузки всего файла в память, необходимо работать с данными по частям, что особенно критично для видеофайлов или больших архивов. Я разработал специализированный компонент для streaming операций, который может обрабатывать файлы практически неограниченного размера.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| public class StreamingObjectProcessor : IStreamingObjectProcessor
{
private readonly IAmazonS3 _s3Client;
private readonly ILogger<StreamingObjectProcessor> _logger;
private readonly int _bufferSize;
public StreamingObjectProcessor(IAmazonS3 s3Client, ILogger<StreamingObjectProcessor> logger, int bufferSize = 1024 * 1024)
{
_s3Client = s3Client;
_logger = logger;
_bufferSize = bufferSize;
}
public async Task ProcessObjectStreamAsync(string bucketName, string objectKey,
Func<Stream, Task> processor, CancellationToken cancellationToken = default)
{
using var response = await _s3Client.GetObjectAsync(bucketName, objectKey, cancellationToken);
var bufferedStream = new BufferedStream(response.ResponseStream, _bufferSize);
await processor(bufferedStream);
}
public async Task<string> UploadStreamAsync(string bucketName, string objectKey,
Stream sourceStream, IProgress<long> progress = null, CancellationToken cancellationToken = default)
{
var request = new PutObjectRequest
{
BucketName = bucketName,
Key = objectKey,
InputStream = sourceStream,
UseChunkEncoding = true
};
if (progress != null)
{
request.StreamTransferProgress += (sender, args) =>
{
progress.Report(args.TransferredBytes);
};
}
var response = await _s3Client.PutObjectAsync(request, cancellationToken);
return response.ETag;
}
} |
|
Мультипарт загрузки становятся обязательными для файлов размером более 100MB. Это не просто техническое ограничение - правильная реализация multipart upload может увеличить скорость загрузки в несколько раз благодаря параллельной обработке частей файла.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
| public class MultipartUploadManager : IMultipartUploadManager
{
private readonly IAmazonS3 _s3Client;
private readonly ILogger<MultipartUploadManager> _logger;
private readonly int _partSize;
private readonly int _maxConcurrency;
public MultipartUploadManager(IAmazonS3 s3Client, ILogger<MultipartUploadManager> logger,
int partSize = 5 * 1024 * 1024, int maxConcurrency = 3)
{
_s3Client = s3Client;
_logger = logger;
_partSize = partSize;
_maxConcurrency = maxConcurrency;
}
public async Task<string> UploadLargeFileAsync(string bucketName, string objectKey,
Stream fileStream, IProgress<MultipartUploadProgress> progress = null,
CancellationToken cancellationToken = default)
{
var initiateRequest = new InitiateMultipartUploadRequest
{
BucketName = bucketName,
Key = objectKey
};
var initiateResponse = await _s3Client.InitiateMultipartUploadAsync(initiateRequest, cancellationToken);
var uploadId = initiateResponse.UploadId;
try
{
var uploadTasks = new List<Task<UploadPartResponse>>();
var completedParts = new List<PartETag>();
var partNumber = 1;
var totalParts = (int)Math.Ceiling((double)fileStream.Length / _partSize);
var semaphore = new SemaphoreSlim(_maxConcurrency);
while (fileStream.Position < fileStream.Length)
{
var currentPartSize = Math.Min(_partSize, (int)(fileStream.Length - fileStream.Position));
var partData = new byte[currentPartSize];
await fileStream.ReadAsync(partData, 0, currentPartSize, cancellationToken);
var currentPartNumber = partNumber++;
await semaphore.WaitAsync(cancellationToken);
var uploadTask = Task.Run(async () =>
{
try
{
using var partStream = new MemoryStream(partData);
var uploadRequest = new UploadPartRequest
{
BucketName = bucketName,
Key = objectKey,
UploadId = uploadId,
PartNumber = currentPartNumber,
InputStream = partStream,
PartSize = currentPartSize
};
var uploadResponse = await _s3Client.UploadPartAsync(uploadRequest, cancellationToken);
progress?.Report(new MultipartUploadProgress
{
PartNumber = currentPartNumber,
TotalParts = totalParts,
BytesUploaded = currentPartNumber * _partSize
});
return uploadResponse;
}
finally
{
semaphore.Release();
}
}, cancellationToken);
uploadTasks.Add(uploadTask);
}
var uploadResults = await Task.WhenAll(uploadTasks);
completedParts.AddRange(uploadResults.Select(r => new PartETag(r.PartNumber, r.ETag)));
var completeRequest = new CompleteMultipartUploadRequest
{
BucketName = bucketName,
Key = objectKey,
UploadId = uploadId,
PartETags = completedParts.OrderBy(p => p.PartNumber).ToList()
};
var completeResponse = await _s3Client.CompleteMultipartUploadAsync(completeRequest, cancellationToken);
return completeResponse.ETag;
}
catch (Exception ex)
{
_logger.LogError(ex, "Multipart upload failed, aborting upload {UploadId}", uploadId);
await _s3Client.AbortMultipartUploadAsync(bucketName, objectKey, uploadId, cancellationToken);
throw;
}
}
} |
|
Connection pooling и кеширование соединений критически влияют на производительность при интенсивных операциях. AWS SDK использует HttpClient внутри, но его настройки по умолчанию не всегда оптимальны для высоконагруженных сценариев.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| public class OptimizedS3ClientFactory : IS3ClientFactory
{
private readonly IConfiguration _configuration;
private readonly IConnectionPool _connectionPool;
public OptimizedS3ClientFactory(IConfiguration configuration, IConnectionPool connectionPool)
{
_configuration = configuration;
_connectionPool = connectionPool;
}
public IAmazonS3 CreateClient()
{
var httpClientHandler = new HttpClientHandler()
{
MaxConnectionsPerServer = _configuration.GetValue<int>("S3:MaxConnectionsPerServer", 50),
UseCookies = false,
UseProxy = false
};
var httpClient = new HttpClient(httpClientHandler)
{
Timeout = TimeSpan.FromSeconds(_configuration.GetValue<int>("S3:HttpTimeoutSeconds", 300))
};
var config = new AmazonS3Config
{
ServiceURL = _configuration["S3:ServiceUrl"],
ForcePathStyle = true,
UseHttp = true,
MaxErrorRetry = 3,
HttpClient = httpClient,
BufferSize = _configuration.GetValue<int>("S3:BufferSize", 8192),
MaxConnectionsPerServer = _configuration.GetValue<int>("S3:MaxConnectionsPerServer", 50)
};
return new AmazonS3Client(_configuration["S3:AccessKey"], _configuration["S3:SecretKey"], config);
}
} |
|
Компрессия данных перед отправкой в S3 может значительно сократить объем передаваемых данных и время операций. Особенно эффективно это работает с текстовыми файлами и JSON данными.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| public class CompressingStorageAdapter : IStorageAdapter
{
private readonly IStorageAdapter _baseAdapter;
private readonly ICompressionService _compressionService;
public CompressingStorageAdapter(IStorageAdapter baseAdapter, ICompressionService compressionService)
{
_baseAdapter = baseAdapter;
_compressionService = compressionService;
}
public async Task<string> PutObjectAsync(string bucketName, string objectKey, Stream content,
ObjectMetadata metadata = null, CancellationToken cancellationToken = default)
{
var shouldCompress = ShouldCompressContent(metadata?.ContentType, content.Length);
if (shouldCompress)
{
using var compressedStream = await _compressionService.CompressAsync(content, cancellationToken);
if (metadata == null)
metadata = new ObjectMetadata();
metadata.CustomMetadata["compression"] = "gzip";
metadata.CustomMetadata["original-size"] = content.Length.ToString();
return await _baseAdapter.PutObjectAsync(bucketName, objectKey, compressedStream, metadata, cancellationToken);
}
return await _baseAdapter.PutObjectAsync(bucketName, objectKey, content, metadata, cancellationToken);
}
public async Task<StorageObject> GetObjectAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
{
var storageObject = await _baseAdapter.GetObjectAsync(bucketName, objectKey, cancellationToken);
if (storageObject.Metadata?.CustomMetadata?.ContainsKey("compression") == true)
{
var decompressedStream = await _compressionService.DecompressAsync(storageObject.Content, cancellationToken);
return new StorageObject
{
BucketName = storageObject.BucketName,
Key = storageObject.Key,
Content = decompressedStream,
Metadata = storageObject.Metadata
};
}
return storageObject;
}
private bool ShouldCompressContent(string contentType, long contentLength)
{
if (contentLength < 1024) return false; // Не сжимаем маленькие файлы
var compressibleTypes = new[] { "text/", "application/json", "application/xml", "application/javascript" };
return compressibleTypes.Any(type => contentType?.StartsWith(type, StringComparison.OrdinalIgnoreCase) == true);
}
} |
|
Retry логика с exponential backoff и circuit breaker паттерном обеспечивает устойчивость к временным сбоям и защищает от каскадных отказов.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| public class ResilientRetryPolicy : IRetryPolicy
{
private readonly ILogger<ResilientRetryPolicy> _logger;
private readonly CircuitBreakerPolicy _circuitBreaker;
private readonly RetryPolicy _retryPolicy;
public ResilientRetryPolicy(ILogger<ResilientRetryPolicy> logger)
{
_logger = logger;
_retryPolicy = Policy
.Handle<AmazonS3Exception>(ex => IsRetriableException(ex))
.Or<TaskCanceledException>()
.Or<TimeoutException>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)) + TimeSpan.FromMilliseconds(new Random().Next(0, 1000)),
onRetry: (outcome, timespan, retryCount, context) =>
{
_logger.LogWarning("Retry attempt {RetryCount} after {Delay}ms: {Exception}",
retryCount, timespan.TotalMilliseconds, outcome.Exception?.Message);
});
_circuitBreaker = Policy
.Handle<AmazonS3Exception>()
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromSeconds(30),
onBreak: (exception, timespan) =>
{
_logger.LogError("Circuit breaker opened for {Duration}s: {Exception}",
timespan.TotalSeconds, exception.Message);
},
onReset: () =>
{
_logger.LogInformation("Circuit breaker reset");
});
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action, CancellationToken cancellationToken = default)
{
var policy = Policy.WrapAsync(_retryPolicy, _circuitBreaker);
return await policy.ExecuteAsync(action);
}
private bool IsRetriableException(AmazonS3Exception ex)
{
return ex.StatusCode == HttpStatusCode.InternalServerError ||
ex.StatusCode == HttpStatusCode.BadGateway ||
ex.StatusCode == HttpStatusCode.ServiceUnavailable ||
ex.StatusCode == HttpStatusCode.GatewayTimeout ||
ex.ErrorCode == "RequestTimeout" ||
ex.ErrorCode == "SlowDown";
}
} |
|
Мониторинг и логирование требуют структурированного подхода с использованием корелляционных идентификаторов для трассировки запросов в распределенных системах.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| public class InstrumentedStorageAdapter : IStorageAdapter
{
private readonly IStorageAdapter _baseAdapter;
private readonly IMetricsCollector _metricsCollector;
private readonly ILogger<InstrumentedStorageAdapter> _logger;
public InstrumentedStorageAdapter(IStorageAdapter baseAdapter, IMetricsCollector metricsCollector, ILogger<InstrumentedStorageAdapter> logger)
{
_baseAdapter = baseAdapter;
_metricsCollector = metricsCollector;
_logger = logger;
}
public async Task<string> PutObjectAsync(string bucketName, string objectKey, Stream content,
ObjectMetadata metadata = null, CancellationToken cancellationToken = default)
{
var correlationId = Guid.NewGuid().ToString();
var stopwatch = Stopwatch.StartNew();
using var activity = Activity.Current?.Source.StartActivity("S3.PutObject");
activity?.SetTag("bucket.name", bucketName);
activity?.SetTag("object.key", objectKey);
activity?.SetTag("correlation.id", correlationId);
try
{
_logger.LogInformation("Starting upload {CorrelationId}: {BucketName}/{ObjectKey}",
correlationId, bucketName, objectKey);
var result = await _baseAdapter.PutObjectAsync(bucketName, objectKey, content, metadata, cancellationToken);
stopwatch.Stop();
_metricsCollector.RecordOperationDuration("put_object", stopwatch.Elapsed);
_metricsCollector.IncrementCounter("operations_total", ("operation", "put_object"), ("status", "success"));
_logger.LogInformation("Upload completed {CorrelationId}: {BucketName}/{ObjectKey} in {Duration}ms",
correlationId, bucketName, objectKey, stopwatch.ElapsedMilliseconds);
return result;
}
catch (Exception ex)
{
stopwatch.Stop();
_metricsCollector.IncrementCounter("operations_total", ("operation", "put_object"), ("status", "error"));
_metricsCollector.RecordOperationDuration("put_object_error", stopwatch.Elapsed);
_logger.LogError(ex, "Upload failed {CorrelationId}: {BucketName}/{ObjectKey} after {Duration}ms",
correlationId, bucketName, objectKey, stopwatch.ElapsedMilliseconds);
throw;
}
}
} |
|
Распределенное кеширование с использованием Redis помогает избежать повторных запросов к S3 для часто используемых данных.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| public class CachingStorageAdapter : IStorageAdapter
{
private readonly IStorageAdapter _baseAdapter;
private readonly IDistributedCache _cache;
private readonly ILogger<CachingStorageAdapter> _logger;
private readonly TimeSpan _defaultCacheDuration = TimeSpan.FromMinutes(15);
public CachingStorageAdapter(IStorageAdapter baseAdapter, IDistributedCache cache, ILogger<CachingStorageAdapter> logger)
{
_baseAdapter = baseAdapter;
_cache = cache;
_logger = logger;
}
public async Task<ObjectMetadata> GetObjectMetadataAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
{
var cacheKey = $"metadata:{bucketName}:{objectKey}";
var cachedMetadata = await _cache.GetAsync(cacheKey, cancellationToken);
if (cachedMetadata != null)
{
_logger.LogDebug("Metadata cache hit for {BucketName}/{ObjectKey}", bucketName, objectKey);
return JsonSerializer.Deserialize<ObjectMetadata>(cachedMetadata);
}
var metadata = await _baseAdapter.GetObjectMetadataAsync(bucketName, objectKey, cancellationToken);
var serializedMetadata = JsonSerializer.SerializeToUtf8Bytes(metadata);
var cacheOptions = new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = _defaultCacheDuration
};
await _cache.SetAsync(cacheKey, serializedMetadata, cacheOptions, cancellationToken);
_logger.LogDebug("Metadata cached for {BucketName}/{ObjectKey}", bucketName, objectKey);
return metadata;
}
public async Task DeleteObjectAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
{
await _baseAdapter.DeleteObjectAsync(bucketName, objectKey, cancellationToken);
// Инвалидируем кеш после удаления
var cacheKey = $"metadata:{bucketName}:{objectKey}";
await _cache.RemoveAsync(cacheKey, cancellationToken);
}
} |
|
Асинхронная обработка событий S3 открывает возможности для создания реактивных систем, которые автоматически реагируют на изменения в хранилище. Хотя локальные эмуляторы не всегда полностью поддерживают события, можно создать собственную систему уведомлений.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| public class EventDrivenStorageAdapter : IStorageAdapter
{
private readonly IStorageAdapter _baseAdapter;
private readonly IEventPublisher _eventPublisher;
private readonly ILogger<EventDrivenStorageAdapter> _logger;
public EventDrivenStorageAdapter(IStorageAdapter baseAdapter, IEventPublisher eventPublisher, ILogger<EventDrivenStorageAdapter> logger)
{
_baseAdapter = baseAdapter;
_eventPublisher = eventPublisher;
_logger = logger;
}
public async Task<string> PutObjectAsync(string bucketName, string objectKey, Stream content,
ObjectMetadata metadata = null, CancellationToken cancellationToken = default)
{
var etag = await _baseAdapter.PutObjectAsync(bucketName, objectKey, content, metadata, cancellationToken);
var objectEvent = new S3ObjectEvent
{
EventType = S3EventType.ObjectCreated,
BucketName = bucketName,
ObjectKey = objectKey,
ETag = etag,
Timestamp = DateTime.UtcNow,
ObjectSize = content.Length
};
await _eventPublisher.PublishAsync(objectEvent, cancellationToken);
return etag;
}
public async Task DeleteObjectAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
{
await _baseAdapter.DeleteObjectAsync(bucketName, objectKey, cancellationToken);
var deleteEvent = new S3ObjectEvent
{
EventType = S3EventType.ObjectDeleted,
BucketName = bucketName,
ObjectKey = objectKey,
Timestamp = DateTime.UtcNow
};
await _eventPublisher.PublishAsync(deleteEvent, cancellationToken);
}
} |
|
Транзакционная поддержка через паттерн Unit of Work позволяет группировать несколько операций и обеспечивать их атомарность через компенсационные действия.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
| public class TransactionalStorageUnitOfWork : IStorageUnitOfWork
{
private readonly IStorageAdapter _adapter;
private readonly List<IStorageOperation> _operations;
private readonly List<ICompensationAction> _compensationActions;
private readonly ILogger<TransactionalStorageUnitOfWork> _logger;
public TransactionalStorageUnitOfWork(IStorageAdapter adapter, ILogger<TransactionalStorageUnitOfWork> logger)
{
_adapter = adapter;
_operations = new List<IStorageOperation>();
_compensationActions = new List<ICompensationAction>();
_logger = logger;
}
public void AddOperation(IStorageOperation operation)
{
_operations.Add(operation);
}
public async Task<TransactionResult> CommitAsync(CancellationToken cancellationToken = default)
{
var executedOperations = new List<IStorageOperation>();
try
{
foreach (var operation in _operations)
{
await operation.ExecuteAsync(_adapter, cancellationToken);
executedOperations.Add(operation);
var compensationAction = operation.CreateCompensationAction();
if (compensationAction != null)
{
_compensationActions.Insert(0, compensationAction); // LIFO порядок для компенсации
}
}
return new TransactionResult { IsSuccess = true, ExecutedOperations = executedOperations.Count };
}
catch (Exception ex)
{
_logger.LogError(ex, "Transaction failed, executing compensation actions");
await ExecuteCompensationAsync(cancellationToken);
return new TransactionResult
{
IsSuccess = false,
ExecutedOperations = executedOperations.Count,
Error = ex.Message
};
}
}
private async Task ExecuteCompensationAsync(CancellationToken cancellationToken)
{
foreach (var compensationAction in _compensationActions)
{
try
{
await compensationAction.ExecuteAsync(_adapter, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Compensation action failed: {ActionType}", compensationAction.GetType().Name);
}
}
}
} |
|
Оптимизация памяти критически важна при работе с множеством одновременных операций. Пул буферов помогает избежать частых выделений памяти и сборок мусора.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| public class MemoryOptimizedStorageAdapter : IStorageAdapter
{
private readonly IStorageAdapter _baseAdapter;
private readonly ArrayPool<byte> _arrayPool;
private readonly ILogger<MemoryOptimizedStorageAdapter> _logger;
public MemoryOptimizedStorageAdapter(IStorageAdapter baseAdapter, ILogger<MemoryOptimizedStorageAdapter> logger)
{
_baseAdapter = baseAdapter;
_arrayPool = ArrayPool<byte>.Shared;
_logger = logger;
}
public async Task<string> PutObjectAsync(string bucketName, string objectKey, Stream content,
ObjectMetadata metadata = null, CancellationToken cancellationToken = default)
{
const int bufferSize = 81920; // 80KB buffer
var buffer = _arrayPool.Rent(bufferSize);
try
{
using var bufferedStream = new BufferedStream(content, bufferSize);
var memoryStream = new MemoryStream();
int bytesRead;
while ((bytesRead = await bufferedStream.ReadAsync(buffer, 0, bufferSize, cancellationToken)) > 0)
{
await memoryStream.WriteAsync(buffer, 0, bytesRead, cancellationToken);
}
memoryStream.Position = 0;
return await _baseAdapter.PutObjectAsync(bucketName, objectKey, memoryStream, metadata, cancellationToken);
}
finally
{
_arrayPool.Return(buffer);
}
}
public async Task<StorageObject> GetObjectAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
{
var storageObject = await _baseAdapter.GetObjectAsync(bucketName, objectKey, cancellationToken);
// Оборачиваем поток в оптимизированный буферизованный поток
var optimizedStream = new PooledBufferedStream(storageObject.Content, _arrayPool);
return new StorageObject
{
BucketName = storageObject.BucketName,
Key = storageObject.Key,
Content = optimizedStream,
Metadata = storageObject.Metadata
};
}
} |
|
Система метрик и телеметрии обеспечивает глубокую видимость в работу адаптера и помогает выявлять узкие места производительности.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| public class TelemetryStorageAdapter : IStorageAdapter
{
private readonly IStorageAdapter _baseAdapter;
private readonly IMeterFactory _meterFactory;
private readonly Meter _meter;
private readonly Counter<long> _operationCounter;
private readonly Histogram<double> _operationDuration;
private readonly Histogram<long> _objectSize;
public TelemetryStorageAdapter(IStorageAdapter baseAdapter, IMeterFactory meterFactory)
{
_baseAdapter = baseAdapter;
_meterFactory = meterFactory;
_meter = _meterFactory.Create("S3Adapter");
_operationCounter = _meter.CreateCounter<long>("s3_operations_total", "operations", "Total number of S3 operations");
_operationDuration = _meter.CreateHistogram<double>("s3_operation_duration", "seconds", "Duration of S3 operations");
_objectSize = _meter.CreateHistogram<long>("s3_object_size", "bytes", "Size of S3 objects");
}
public async Task<string> PutObjectAsync(string bucketName, string objectKey, Stream content,
ObjectMetadata metadata = null, CancellationToken cancellationToken = default)
{
var stopwatch = Stopwatch.StartNew();
var objectSize = content.Length;
try
{
var result = await _baseAdapter.PutObjectAsync(bucketName, objectKey, content, metadata, cancellationToken);
_operationCounter.Add(1, new KeyValuePair<string, object?>("operation", "put_object"),
new KeyValuePair<string, object?>("status", "success"));
return result;
}
catch (Exception)
{
_operationCounter.Add(1, new KeyValuePair<string, object?>("operation", "put_object"),
new KeyValuePair<string, object?>("status", "error"));
throw;
}
finally
{
stopwatch.Stop();
_operationDuration.Record(stopwatch.Elapsed.TotalSeconds,
new KeyValuePair<string, object?>("operation", "put_object"));
_objectSize.Record(objectSize, new KeyValuePair<string, object?>("operation", "put_object"));
}
}
} |
|
Адаптивная настройка таймаутов и лимитов позволяет системе автоматически подстраиваться под текущие условия сети и нагрузку.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
| public class AdaptiveStorageAdapter : IStorageAdapter
{
private readonly IStorageAdapter _baseAdapter;
private readonly IPerformanceMonitor _performanceMonitor;
private readonly ILogger<AdaptiveStorageAdapter> _logger;
private TimeSpan _currentTimeout = TimeSpan.FromSeconds(30);
private int _currentRetryCount = 3;
public AdaptiveStorageAdapter(IStorageAdapter baseAdapter, IPerformanceMonitor performanceMonitor, ILogger<AdaptiveStorageAdapter> logger)
{
_baseAdapter = baseAdapter;
_performanceMonitor = performanceMonitor;
_logger = logger;
}
public async Task<string> PutObjectAsync(string bucketName, string objectKey, Stream content,
ObjectMetadata metadata = null, CancellationToken cancellationToken = default)
{
var metrics = _performanceMonitor.GetCurrentMetrics();
AdjustParameters(metrics);
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(_currentTimeout);
try
{
var result = await ExecuteWithAdaptiveRetry(
() => _baseAdapter.PutObjectAsync(bucketName, objectKey, content, metadata, timeoutCts.Token),
_currentRetryCount
);
_performanceMonitor.RecordSuccess(_currentTimeout);
return result;
}
catch (OperationCanceledException) when (timeoutCts.Token.IsCancellationRequested)
{
_performanceMonitor.RecordTimeout(_currentTimeout);
throw new TimeoutException($"Operation timed out after {_currentTimeout.TotalSeconds} seconds");
}
}
private void AdjustParameters(PerformanceMetrics metrics)
{
if (metrics.AverageResponseTime > TimeSpan.FromSeconds(20))
{
_currentTimeout = TimeSpan.FromSeconds(Math.Min(120, _currentTimeout.TotalSeconds * 1.5));
_currentRetryCount = Math.Max(1, _currentRetryCount - 1);
}
else if (metrics.AverageResponseTime < TimeSpan.FromSeconds(5))
{
_currentTimeout = TimeSpan.FromSeconds(Math.Max(15, _currentTimeout.TotalSeconds * 0.8));
_currentRetryCount = Math.Min(5, _currentRetryCount + 1);
}
_logger.LogDebug("Adjusted parameters: timeout={Timeout}s, retries={Retries}",
_currentTimeout.TotalSeconds, _currentRetryCount);
}
private async Task<T> ExecuteWithAdaptiveRetry<T>(Func<Task<T>> operation, int maxRetries)
{
for (int attempt = 0; attempt <= maxRetries; attempt++)
{
try
{
return await operation();
}
catch (Exception ex) when (attempt < maxRetries && IsRetriableException(ex))
{
var delay = TimeSpan.FromMilliseconds(Math.Pow(2, attempt) * 1000);
_logger.LogWarning("Retry attempt {Attempt} after {Delay}ms: {Error}",
attempt + 1, delay.TotalMilliseconds, ex.Message);
await Task.Delay(delay);
}
}
throw new InvalidOperationException("Should not reach here");
}
private bool IsRetriableException(Exception ex)
{
return ex is TimeoutException ||
ex is TaskCanceledException ||
(ex is AmazonS3Exception s3Ex && s3Ex.StatusCode >= HttpStatusCode.InternalServerError);
}
} |
|
Предзагрузка и прогревание соединений помогает избежать холодных стартов и обеспечивает стабильную производительность с первого запроса.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
| public class PrewarmedStorageAdapter : IStorageAdapter, IHostedService
{
private readonly IStorageAdapter _baseAdapter;
private readonly IConfiguration _configuration;
private readonly ILogger<PrewarmedStorageAdapter> _logger;
private readonly Timer _healthCheckTimer;
public PrewarmedStorageAdapter(IStorageAdapter baseAdapter, IConfiguration configuration, ILogger<PrewarmedStorageAdapter> logger)
{
_baseAdapter = baseAdapter;
_configuration = configuration;
_logger = logger;
_healthCheckTimer = new Timer(PerformHealthCheck, null, Timeout.Infinite, Timeout.Infinite);
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await PrewarmConnections(cancellationToken);
_healthCheckTimer.Change(TimeSpan.Zero, TimeSpan.FromMinutes(5));
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_healthCheckTimer.Change(Timeout.Infinite, Timeout.Infinite);
}
private async Task PrewarmConnections(CancellationToken cancellationToken)
{
var prewarmBucket = _configuration["S3:PrewarmBucket"];
if (string.IsNullOrEmpty(prewarmBucket))
{
_logger.LogInformation("No prewarm bucket configured, skipping connection prewarming");
return;
}
try
{
_logger.LogInformation("Prewarming S3 connections to bucket: {BucketName}", prewarmBucket);
// Проверяем существование бакета
var bucketExists = await _baseAdapter.BucketExistsAsync(prewarmBucket, cancellationToken);
if (!bucketExists)
{
await _baseAdapter.CreateBucketAsync(prewarmBucket, cancellationToken);
}
// Выполняем несколько операций для прогрева пула соединений
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
tasks.Add(PerformWarmupOperation(prewarmBucket, i, cancellationToken));
}
await Task.WhenAll(tasks);
_logger.LogInformation("S3 connections prewarmed successfully");
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to prewarm S3 connections");
}
}
private async Task PerformWarmupOperation(string bucketName, int index, CancellationToken cancellationToken)
{
try
{
var warmupKey = $"warmup/test-{index}";
var testData = Encoding.UTF8.GetBytes($"warmup-data-{index}");
using var stream = new MemoryStream(testData);
await _baseAdapter.PutObjectAsync(bucketName, warmupKey, stream, null, cancellationToken);
await _baseAdapter.DeleteObjectAsync(bucketName, warmupKey, cancellationToken);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Warmup operation {Index} failed", index);
}
}
private async void PerformHealthCheck(object state)
{
try
{
var prewarmBucket = _configuration["S3:PrewarmBucket"];
if (!string.IsNullOrEmpty(prewarmBucket))
{
await _baseAdapter.BucketExistsAsync(prewarmBucket, CancellationToken.None);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Health check failed");
}
}
// Все остальные методы делегируются базовому адаптеру
public Task<bool> BucketExistsAsync(string bucketName, CancellationToken cancellationToken = default)
=> _baseAdapter.BucketExistsAsync(bucketName, cancellationToken);
public Task CreateBucketAsync(string bucketName, CancellationToken cancellationToken = default)
=> _baseAdapter.CreateBucketAsync(bucketName, cancellationToken);
public Task<StorageObject> GetObjectAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
=> _baseAdapter.GetObjectAsync(bucketName, objectKey, cancellationToken);
public Task<string> PutObjectAsync(string bucketName, string objectKey, Stream content, ObjectMetadata metadata = null, CancellationToken cancellationToken = default)
=> _baseAdapter.PutObjectAsync(bucketName, objectKey, content, metadata, cancellationToken);
public Task DeleteObjectAsync(string bucketName, string objectKey, CancellationToken cancellationToken = default)
=> _baseAdapter.DeleteObjectAsync(bucketName, objectKey, cancellationToken);
} |
|
Готовое решение с полным исходным кодом
Демонстрационное приложение объединяет все компоненты адаптера в единое решение, которое можно использовать как основу для реальных проектов. Архитектура построена на принципах чистого кода и включает все необходимые слои абстракции.
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| // Program.cs - точка входа приложения
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using S3Adapter.Core;
using S3Adapter.Implementation;
using S3Adapter.Sample.Services;
var builder = Host.CreateApplicationBuilder(args);
// Регистрация сервисов
builder.Services.AddS3Adapter(builder.Configuration);
builder.Services.AddMemoryCache();
builder.Services.AddLogging(configure => configure.AddConsole());
// Регистрация демонстрационных сервисов
builder.Services.AddScoped<IDocumentService, DocumentService>();
builder.Services.AddScoped<IFileProcessingService, FileProcessingService>();
builder.Services.AddHostedService<S3AdapterDemoService>();
var host = builder.Build();
// Запуск демонстрационного приложения
await host.RunAsync(); |
|
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
| // S3AdapterDemoService.cs - демонстрационный сервис
public class S3AdapterDemoService : BackgroundService
{
private readonly IStorageAdapter _storageAdapter;
private readonly IDocumentService _documentService;
private readonly ILogger<S3AdapterDemoService> _logger;
private readonly string _demoBucket = "demo-bucket";
public S3AdapterDemoService(
IStorageAdapter storageAdapter,
IDocumentService documentService,
ILogger<S3AdapterDemoService> logger)
{
_storageAdapter = storageAdapter;
_documentService = documentService;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting S3 Adapter demonstration...");
try
{
// Демонстрация базовых операций
await DemonstrateBasicOperations(stoppingToken);
// Демонстрация batch операций
await DemonstrateBatchOperations(stoppingToken);
// Демонстрация работы с документами
await DemonstrateDocumentOperations(stoppingToken);
// Демонстрация потоковой обработки
await DemonstrateStreamOperations(stoppingToken);
_logger.LogInformation("Demonstration completed successfully");
}
catch (Exception ex)
{
_logger.LogError(ex, "Demonstration failed");
}
}
private async Task DemonstrateBasicOperations(CancellationToken cancellationToken)
{
_logger.LogInformation("=== Basic Operations Demo ===");
// Создание бакета
await _storageAdapter.CreateBucketAsync(_demoBucket, cancellationToken);
// Загрузка тестового файла
var testData = Encoding.UTF8.GetBytes("Hello, S3 Adapter!");
using var stream = new MemoryStream(testData);
var metadata = new ObjectMetadata
{
ContentType = "text/plain",
CustomMetadata = new Dictionary<string, string>
{
["author"] = "demo-user",
["version"] = "1.0"
}
};
var etag = await _storageAdapter.PutObjectAsync(_demoBucket, "test.txt", stream, metadata, cancellationToken);
_logger.LogInformation("Uploaded test file with ETag: {ETag}", etag);
// Проверка существования объекта
var objectMetadata = await _storageAdapter.GetObjectMetadataAsync(_demoBucket, "test.txt", cancellationToken);
_logger.LogInformation("Object metadata: Size={Size}, ContentType={ContentType}",
objectMetadata.ContentLength, objectMetadata.ContentType);
// Получение объекта
using var downloadedObject = await _storageAdapter.GetObjectAsync(_demoBucket, "test.txt", cancellationToken);
using var reader = new StreamReader(downloadedObject.Content);
var content = await reader.ReadToEndAsync();
_logger.LogInformation("Downloaded content: {Content}", content);
// Копирование объекта
await _storageAdapter.CopyObjectAsync(_demoBucket, "test.txt", _demoBucket, "test-copy.txt", cancellationToken);
_logger.LogInformation("Object copied successfully");
}
private async Task DemonstrateBatchOperations(CancellationToken cancellationToken)
{
_logger.LogInformation("=== Batch Operations Demo ===");
// Создание нескольких тестовых объектов
var uploadTasks = new List<Task>();
for (int i = 1; i <= 5; i++)
{
uploadTasks.Add(CreateTestObject(i, cancellationToken));
}
await Task.WhenAll(uploadTasks);
_logger.LogInformation("Created 5 test objects");
// Получение списка объектов
var objectList = await _storageAdapter.ListObjectsAsync(_demoBucket, "batch-test-", cancellationToken: cancellationToken);
_logger.LogInformation("Found {Count} objects in batch test", objectList.Items.Count());
// Массовое удаление
var keysToDelete = objectList.Items.Select(obj => obj.Key).ToArray();
var deleteResult = await _storageAdapter.DeleteObjectsAsync(_demoBucket, keysToDelete, cancellationToken);
_logger.LogInformation("Deleted {Count} objects in batch", deleteResult.SuccessfulDeletes.Count);
if (deleteResult.Errors.Any())
{
_logger.LogWarning("Delete errors: {Errors}", string.Join(", ", deleteResult.Errors.Select(e => e.Message)));
}
}
private async Task CreateTestObject(int index, CancellationToken cancellationToken)
{
var testData = Encoding.UTF8.GetBytes($"Test object {index} content");
using var stream = new MemoryStream(testData);
await _storageAdapter.PutObjectAsync(_demoBucket, $"batch-test-{index:D3}.txt", stream, null, cancellationToken);
}
private async Task DemonstrateDocumentOperations(CancellationToken cancellationToken)
{
_logger.LogInformation("=== Document Operations Demo ===");
var document = new Document
{
Id = Guid.NewGuid(),
Title = "Demo Document",
Content = "This is a demonstration document for S3 Adapter",
Author = "System",
CreatedAt = DateTime.UtcNow,
Tags = new[] { "demo", "test", "s3" }
};
// Сохранение документа
await _documentService.SaveDocumentAsync(document, cancellationToken);
_logger.LogInformation("Document saved: {Id}", document.Id);
// Получение документа
var retrievedDoc = await _documentService.GetDocumentAsync(document.Id, cancellationToken);
_logger.LogInformation("Retrieved document: {Title} by {Author}", retrievedDoc.Title, retrievedDoc.Author);
// Обновление документа
retrievedDoc.Content = "Updated content";
await _documentService.UpdateDocumentAsync(retrievedDoc, cancellationToken);
_logger.LogInformation("Document updated successfully");
// Поиск документов
var searchResults = await _documentService.SearchDocumentsAsync("demo", cancellationToken);
_logger.LogInformation("Found {Count} documents matching 'demo'", searchResults.Count());
}
private async Task DemonstrateStreamOperations(CancellationToken cancellationToken)
{
_logger.LogInformation("=== Stream Operations Demo ===");
// Создание большого файла для демонстрации потоковой обработки
var largeData = new byte[1024 * 1024]; // 1MB
new Random().NextBytes(largeData);
using var largeStream = new MemoryStream(largeData);
// Отслеживание прогресса загрузки
var progress = new Progress<long>(bytesUploaded =>
{
_logger.LogInformation("Uploaded {Bytes} bytes", bytesUploaded);
});
// Потоковая загрузка
var streamProcessor = new StreamingObjectProcessor(_storageAdapter, _logger);
await streamProcessor.UploadStreamAsync(_demoBucket, "large-file.bin", largeStream, progress, cancellationToken);
// Потоковая обработка
await streamProcessor.ProcessObjectStreamAsync(_demoBucket, "large-file.bin", async stream =>
{
var buffer = new byte[1024];
var totalRead = 0;
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
{
totalRead += bytesRead;
// Здесь можно выполнить обработку данных
}
_logger.LogInformation("Processed {TotalBytes} bytes from stream", totalRead);
}, cancellationToken);
}
} |
|
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
| // DocumentService.cs - сервис для работы с документами
public class DocumentService : IDocumentService
{
private readonly IStorageAdapter _storageAdapter;
private readonly string _documentsBucket = "documents";
private readonly JsonSerializerOptions _jsonOptions;
public DocumentService(IStorageAdapter storageAdapter)
{
_storageAdapter = storageAdapter;
_jsonOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = true
};
}
public async Task SaveDocumentAsync(Document document, CancellationToken cancellationToken = default)
{
await _storageAdapter.CreateBucketAsync(_documentsBucket, cancellationToken);
var json = JsonSerializer.Serialize(document, _jsonOptions);
var jsonBytes = Encoding.UTF8.GetBytes(json);
using var stream = new MemoryStream(jsonBytes);
var metadata = new ObjectMetadata
{
ContentType = "application/json",
CustomMetadata = new Dictionary<string, string>
{
["document-type"] = "json",
["author"] = document.Author,
["created-at"] = document.CreatedAt.ToString("O")
}
};
await _storageAdapter.PutObjectAsync(_documentsBucket, $"documents/{document.Id}.json", stream, metadata, cancellationToken);
}
public async Task<Document> GetDocumentAsync(Guid documentId, CancellationToken cancellationToken = default)
{
using var storageObject = await _storageAdapter.GetObjectAsync(_documentsBucket, $"documents/{documentId}.json", cancellationToken);
var json = await new StreamReader(storageObject.Content).ReadToEndAsync();
return JsonSerializer.Deserialize<Document>(json, _jsonOptions);
}
public async Task UpdateDocumentAsync(Document document, CancellationToken cancellationToken = default)
{
// Создаем новую версию документа
document.UpdatedAt = DateTime.UtcNow;
await SaveDocumentAsync(document, cancellationToken);
}
public async Task<IEnumerable<Document>> SearchDocumentsAsync(string searchTerm, CancellationToken cancellationToken = default)
{
var objectList = await _storageAdapter.ListObjectsAsync(_documentsBucket, "documents/", cancellationToken: cancellationToken);
var documents = new List<Document>();
foreach (var objectInfo in objectList.Items)
{
try
{
using var storageObject = await _storageAdapter.GetObjectAsync(_documentsBucket, objectInfo.Key, cancellationToken);
var json = await new StreamReader(storageObject.Content).ReadToEndAsync();
var document = JsonSerializer.Deserialize<Document>(json, _jsonOptions);
if (document.Title.Contains(searchTerm, StringComparison.OrdinalIgnoreCase) ||
document.Content.Contains(searchTerm, StringComparison.OrdinalIgnoreCase))
{
documents.Add(document);
}
}
catch (Exception ex)
{
// Логируем ошибку, но продолжаем поиск
Console.WriteLine($"Error processing document {objectInfo.Key}: {ex.Message}");
}
}
return documents;
}
} |
|
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
| // Document.cs - модель документа
public class Document
{
public Guid Id { get; set; }
public string Title { get; set; }
public string Content { get; set; }
public string Author { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime? UpdatedAt { get; set; }
public string[] Tags { get; set; }
public Dictionary<string, object> Properties { get; set; } = new();
} |
|
Конфигурация для продакшена требует дополнительных настроек безопасности и мониторинга:
| JSON | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| {
"S3Configuration": {
"ServiceUrl": "https://s3.amazonaws.com",
"Region": "us-east-1",
"ForcePathStyle": false,
"UseHttps": true,
"TimeoutSeconds": 60,
"MaxRetries": 5,
"RetryDelaySeconds": 2,
"MaxConcurrentOperations": 10,
"BufferSize": 65536,
"EnableCompression": true,
"CacheMetadata": true,
"CacheTtlMinutes": 30
},
"Logging": {
"LogLevel": {
"Default": "Information",
"S3Adapter": "Debug",
"Microsoft": "Warning"
}
},
"HealthChecks": {
"S3": {
"Enabled": true,
"TimeoutSeconds": 30,
"IntervalSeconds": 60
}
}
} |
|
Unit-тесты демонстрируют правильное тестирование адаптера с использованием моков:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
| [TestClass]
public class S3StorageAdapterTests
{
private Mock<IAmazonS3> _mockS3Client;
private Mock<ILogger<S3StorageAdapter>> _mockLogger;
private Mock<IRetryPolicy> _mockRetryPolicy;
private S3StorageAdapter _adapter;
[TestInitialize]
public void Setup()
{
_mockS3Client = new Mock<IAmazonS3>();
_mockLogger = new Mock<ILogger<S3StorageAdapter>>();
_mockRetryPolicy = new Mock<IRetryPolicy>();
_mockRetryPolicy.Setup(r => r.ExecuteAsync(It.IsAny<Func<Task<bool>>>(), It.IsAny<CancellationToken>()))
.Returns<Func<Task<bool>>, CancellationToken>((func, ct) => func());
var options = Microsoft.Extensions.Options.Options.Create(new S3AdapterOptions
{
MaxConcurrentOperations = 10
});
_adapter = new S3StorageAdapter(_mockS3Client.Object, _mockLogger.Object, _mockRetryPolicy.Object, options);
}
[TestMethod]
public async Task PutObjectAsync_ShouldUploadSuccessfully()
{
// Arrange
var bucketName = "test-bucket";
var objectKey = "test-object";
var content = new MemoryStream(Encoding.UTF8.GetBytes("test content"));
_mockS3Client.Setup(c => c.PutObjectAsync(It.IsAny<PutObjectRequest>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new PutObjectResponse { ETag = "test-etag" });
// Act
var result = await _adapter.PutObjectAsync(bucketName, objectKey, content);
// Assert
Assert.AreEqual("test-etag", result);
_mockS3Client.Verify(c => c.PutObjectAsync(It.IsAny<PutObjectRequest>(), It.IsAny<CancellationToken>()), Times.Once);
}
[TestMethod]
public async Task GetObjectAsync_ShouldReturnStorageObject()
{
// Arrange
var bucketName = "test-bucket";
var objectKey = "test-object";
var responseStream = new MemoryStream(Encoding.UTF8.GetBytes("test content"));
_mockS3Client.Setup(c => c.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new GetObjectResponse
{
ResponseStream = responseStream,
Headers = { ContentLength = responseStream.Length, ContentType = "text/plain" },
ETag = "test-etag"
});
// Act
var result = await _adapter.GetObjectAsync(bucketName, objectKey);
// Assert
Assert.IsNotNull(result);
Assert.AreEqual(bucketName, result.BucketName);
Assert.AreEqual(objectKey, result.Key);
Assert.AreEqual("text/plain", result.Metadata.ContentType);
}
} |
|
Интеграционные тесты требуют реального подключения к MinIO и демонстрируют работу всех компонентов в связке:
| C# | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
| [TestClass]
public class S3AdapterIntegrationTests
{
private IStorageAdapter _adapter;
private readonly string _testBucket = "integration-test-bucket";
[TestInitialize]
public async Task Setup()
{
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.Testing.json")
.Build();
var services = new ServiceCollection();
services.AddS3Adapter(configuration);
services.AddLogging();
var provider = services.BuildServiceProvider();
_adapter = provider.GetRequiredService<IStorageAdapter>();
// Создание тестового бакета
await _adapter.CreateBucketAsync(_testBucket);
}
[TestMethod]
public async Task FullWorkflow_ShouldExecuteSuccessfully()
{
// Загрузка файла
var testData = Encoding.UTF8.GetBytes("Integration test content");
using var uploadStream = new MemoryStream(testData);
var etag = await _adapter.PutObjectAsync(_testBucket, "integration-test.txt", uploadStream);
Assert.IsNotNull(etag);
// Получение метаданных
var metadata = await _adapter.GetObjectMetadataAsync(_testBucket, "integration-test.txt");
Assert.AreEqual(testData.Length, metadata.ContentLength);
// Скачивание файла
using var downloadedObject = await _adapter.GetObjectAsync(_testBucket, "integration-test.txt");
using var reader = new StreamReader(downloadedObject.Content);
var content = await reader.ReadToEndAsync();
Assert.AreEqual("Integration test content", content);
// Удаление файла
await _adapter.DeleteObjectAsync(_testBucket, "integration-test.txt");
}
[TestCleanup]
public async Task Cleanup()
{
try
{
var objects = await _adapter.ListObjectsAsync(_testBucket);
var keys = objects.Items.Select(o => o.Key).ToArray();
if (keys.Any())
{
await _adapter.DeleteObjectsAsync(_testBucket, keys);
}
}
catch (Exception ex)
{
Console.WriteLine($"Cleanup error: {ex.Message}");
}
}
} |
|
CI/CD pipeline конфигурация для автоматического тестирования включает запуск MinIO в Docker:
| YAML | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| name: S3 Adapter CI/CD
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
services:
minio:
image: minio/minio:latest
ports:
- 9000:9000
env:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
options: >-
--health-cmd "curl -f http://localhost:9000/minio/health/live"
--health-interval 30s
--health-timeout 20s
--health-retries 3
command: server /data
steps:
- uses: actions/checkout@v3
- name: Setup .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: 7.0.x
- name: Wait for MinIO
run: |
timeout 60 sh -c 'until curl -f [url]http://localhost:9000/minio/health/live;[/url] do sleep 2; done'
- name: Restore dependencies
run: dotnet restore
- name: Build
run: dotnet build --configuration Release --no-restore
- name: Unit Tests
run: dotnet test tests/S3Adapter.UnitTests --configuration Release --no-build
- name: Integration Tests
run: dotnet test tests/S3Adapter.IntegrationTests --configuration Release --no-build
env:
S3__ServiceUrl: [url]http://localhost:9000[/url]
S3__AccessKey: minioadmin
S3__SecretKey: minioadmin |
|
Производственные рекомендации включают настройку мониторинга и обеспечение безопасности. Используйте отдельные учетные записи для различных сред, настройте ротацию ключей доступа, включите логирование всех операций и регулярно проверяйте метрики производительности. Для высоконагруженных систем рекомендуется использовать CDN для кеширования часто запрашиваемых объектов и настроить автоматическое масштабирование инфраструктуры.
Пишем англо-русский словарь с нуля Привет всем, кто сюда будет заглядывать. Предлагаю совместными усилиями разработать простенький... Пишем необычный калькулятор Всем привет! Решил написать калькулятор, который выполняет простые арифмет операции... Пишем в Excel через OLEDB Доброго времени суток товарищи!
Не очень хорошо я владею OLEDB и у меня никак не получается... Пишем свой Хамачи Доброго времени суток! Я начинающий в C#. Пришла в голову мысль написать аналог хамачи с такими... В чем разница, если мы пишем имя класса перед инициализацией экземпляра? А в чем разница в записи
FbTransaction fbt = fg.BeginTransaction();
и такой записи
fbt =... Пишем свой билдер под net 4.5 Есть вот такой код для компиляции под 4.0 , работает идеально .
Как его переделать чтоб компилил... Считываем число и пишем через сколько дней будет следующая дата Написать программу, которая читает с экрана целое положительное число и выдает дату, которая будет... Может кто подскажет, где искали практику? Не обязательно по C#, даже в целом пишем программу Х, шаги: 1... 2.... 3.... Учу C# по Троленсу, дошел до ООП (классы, интерфейсы, коллекции, делегаты, лямбда-выражения). Вроде... Сделать базу на основе хранилища Queue() Нужно сделать базу на основе хранилища System.Collections.Generic.Queue(Of T)
Как создать и... Ошибка при использовании потока чтения изолированного хранилища Здравствуйте,все работало корректно,но решил воспользоваться DataTemplate и возникла ошибка с... Entity Указанный поставщик хранилища не найден в конфигурации или недопустим Привет всем!
Помогите, пожалуйста, решить проблему: в своем WCF-сервисе я создал соединение с... Удаление сертификата X509 из хранилища Добрый день, очередной вопрос к Гуру!
Ваяю класс по обслуживанию сертификатов X509, сейчас...
|