Форум программистов, компьютерный форум, киберфорум
mobDevWorks
Войти
Регистрация
Восстановить пароль

Трассировка корутин Kotlin с OpenTelemetry

Запись от mobDevWorks размещена 14.07.2025 в 22:05
Показов 8417 Комментарии 0

Нажмите на изображение для увеличения
Название: Трассировка корутин Kotlin с OpenTelemetry.jpg
Просмотров: 275
Размер:	282.1 Кб
ID:	10981
Асинхронное программирование меняет правила игры, особенно когда речь заходит о трассировке операций. В Kotlin с его корутинами эта проблема приобретает особый оттенок, который я хотел бы детально разобрать.

Контекст теряется на повороте



Представьте себе классическую ситуацию: у вас есть сервис, обрабатывающий запросы пользователей. В синхронном мире всё просто — поток получает запрос, обрабатывает его и возвращает ответ. Трассировка таких операций — дело техники, ведь весь жизненный цикл запроса происходит в одном потоке. Но в мире корутин Kotlin всё иначе. Корутина может начать выполнение на одном потоке, приостановиться в точке suspend и продолжить на совсем другом. И тут начинаются проблемы с трассировкой, потому что стандартные инструменты хранят контекст в ThreadLocal.

Kotlin
1
2
3
4
5
6
suspend fun processOrder(orderId: String) {
    val order = fetchOrder(orderId) // может выполняться на потоке A
    val payment = processPayment(order) // может выполняться на потоке B
    updateInventory(order) // может выполняться на потоке C
    sendNotification(order) // может выполняться на потоке D
}
В этом примере наша корутина может перепрыгивать между потоками, и если мы используем обычный подход к трассировке через ThreadLocal, то контекст трассировки будет утерян при каждом переключении. Это приводит к тому, что вместо единого связного трейса мы получаем набор разрозненных операций, не имеющих видимой связи между собой.

Не получается обновить список из корутин
Подскажите пожалуйста почему на строке myAdapter.notifyDataSetChanged(); программа вылетает,что я...

Как обрабатывать ошибки, когда трассировка стека не имеет пакета приложения?
Например вот: java.lang.NullPointerException at...

Стоит ли сейчас изучать Kotlin?
Здравствуйте. Есть ли сейчас смысл изучать Kotlin для разработки под android? Или все же лучше...

Язык программирования Kotlin
Достаточно интересный новый (2011г.) язык Kotlin, предлагающийся компанией JetBrains (как? вы не...


Разрывы в цепочке причинности



Другая серьёзная проблема связана с самой природой корутин — они могут порождать друг друга в разных контекстах. Без правильной передачи контекста трассировки это приводит к разрывам в причинно-следственных связях:

Kotlin
1
2
3
4
5
6
7
8
launch {
    val userId = fetchUserId() // спан A
    
    launch {
        val userDetails = fetchUserDetails(userId) // спан B, но он не будет 
                                                  // связан со спаном A
    }
}
В приведенном примере операция получения деталей пользователя логически является дочерней по отношению к получению идентификатора пользователя. Однако, если не позаботиться о передаче контекста, эта связь будет утеряна в системе трассировки.

Suspend-функции и головная боль отладчика



Suspend-функции в Kotlin — это особый вид функций, способных приостанавливать своё выполнение без блокировки потока. Это достигается за счёт сложных преобразований кода на уровне компилятора. Для обычного стека вызовов такие преобразования создают совершенно неожиданную картину:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
suspend fun operationA() {
    operationB()
}
 
suspend fun operationB() {
    delay(100) // точка приостановки
    operationC()
}
 
suspend fun operationC() {
    // какая-то работа
}
Стек вызовов в точке выполнения operationC() может вообще не содержать упоминаний об operationA() и operationB(), потому что корутина могла быть приостановлена и возобновлена. Это создаёт дополнительные сложности при попытке проследить путь выполнения кода.

Исключения летят не туда



Еще одна интересная проблема связана с обработкой исключений. В синхронном коде исключение имеет чёткий путь распространения вверх по стеку вызовов. В корутинах же, особенно при использовании различных scope и контекстов, исключение может быть перехвачено и обработано совсем не там, где мы ожидаем:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val scope = CoroutineScope(Dispatchers.IO)
 
scope.launch {
    try {
        riskyOperation() // может выбросить исключение
    } catch (e: Exception) {
        // обработка исключения
    }
}
 
suspend fun riskyOperation() {
    withContext(Dispatchers.Default) {
        throw RuntimeException("Boom!") // исключение будет поймано, 
                                        // но трейс может быть неполным
    }
}
При трассировке такого кода можно пропустить важные детали обработки исключений, потому что контекст выполнения меняется.

Разные диспетчеры — разные проблемы



Различные диспетчеры корутин (Dispatchers.IO, Dispatchers.Default, Dispatchers.Main) добавляют еще один уровень сложности. Каждый из них работает по-своему, и без специальных механизмов передачи контекста трассировки между ними возникают дополнительные разрывы:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
withContext(Dispatchers.IO) {
    val data = loadDataFromDatabase() // спан A
    
    withContext(Dispatchers.Default) {
        processData(data) // спан B может потерять связь со спаном A
        
        withContext(Dispatchers.Main) {
            updateUI(data) // спан C может потерять связь со спанами A и B
        }
    }
}

Параллельная декомпозиция теряет связность



Корутины позволяют легко запускать несколько операций параллельно, что усложняет трассировку. Рассмотрим пример:

Kotlin
1
2
3
4
5
6
val results = listOf("task1", "task2", "task3").map { taskId ->
    async {
        executeTask(taskId) // каждый вызов создаёт отдельный спан
    }
}
results.awaitAll() // здесь мы ждём завершения всех задач
В идеальном мире мы хотели бы видеть все эти задачи как дочерние операции одного родительского спана. Но без специальной поддержки они могут быть представлены как несвязанные операции, что значительно усложняет анализ производительности и поиск проблем.

Разные скоупы — разные проблемы



Отдельно стоит упомянуть о различных областях видимости корутин (scopes) и их влиянии на целостность трейсов. Особенно коварным является GlobalScope, который может создавать корутины, никак не связанные с родительским контекстом:

Kotlin
1
2
3
4
5
6
7
8
fun someFunction() {
    // Родительский контекст
    GlobalScope.launch {
        // Эта корутина не будет иметь доступа к контексту трассировки
        // из родительского скоупа
        doSomething()
    }
}
При использовании GlobalScope создается корутина, не привязанная ни к какому другому scope. Это значит, что любая информация о трассировке из родительского контекста будет потеряна. Представьте, что вы пытаетесь отследить сложный бизнес-процесс, часть которого была запущена через GlobalScope — вы просто не увидите эту часть в общем трейсе!

А теперь ситуация с Android и viewModelScope:

Kotlin
1
2
3
4
5
6
7
8
9
class MyViewModel : ViewModel() {
    fun loadData() {
        viewModelScope.launch {
            // Эта корутина привязана к жизненному циклу ViewModel
            // но не обязательно к контексту трассировки
            repository.fetchData()
        }
    }
}
Хотя viewModelScope обеспечивает правильный жизненный цикл корутин по отношению к ViewModel, он не гарантирует сохранение контекста трассировки между различными вызовами.

Корреляция как головоломка



Не менее серьезная проблема — корреляция между родительскими и дочерними корутинами. Рассмотрим такой пример:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
suspend fun fetchUserData(userId: String) {
    coroutineScope {
        val profile = async { userRepository.getProfile(userId) }
        val settings = async { userRepository.getSettings(userId) }
        val friends = async { userRepository.getFriends(userId) }
        
        val userData = UserData(
            profile = profile.await(),
            settings = settings.await(),
            friends = friends.await()
        )
        
        processUserData(userData)
    }
}
В этом коде запускаются три параллельные операции, и их результаты затем объединяются. С точки зрения производительности это отлично, но с точки зрения трассировки возникает вопрос: как мы узнаем, какая из операций занимает больше всего времени? Как мы свяжем эти три асинхронные операции с родительским запросом?

Классические подходы и их ограничения



До появления специализированных инструментов для трассировки корутин разработчики пытались решать проблему различными способами:

1. Ручная передача контекста:

Kotlin
1
2
3
4
5
suspend fun operation(tracingContext: TracingContext) {
    // Вручную передаем контекст трассировки в каждый метод
    val result = subOperation(tracingContext)
    // ...
}
Это работает, но загромождает сигнатуры методов и требует дисциплины от всех разработчиков.

2. Использование MDC (Mapped Diagnostic Context):

Kotlin
1
2
3
4
5
6
7
8
9
suspend fun operation() {
    MDC.put("requestId", generateRequestId())
    try {
        // Выполняем операции, надеясь что MDC сохранится
        subOperation()
    } finally {
        MDC.remove("requestId")
    }
}
Проблема в том, что MDC также основан на ThreadLocal и теряет значения при переключении потоков.

3. Обертывание каждой корутины:

Kotlin
1
2
3
4
5
6
7
8
val originalJob = launch {
    val span = tracer.startSpan("operation")
    try {
        // выполняем работу
    } finally {
        span.end()
    }
}
Это создает много шаблонного кода и легко забыть завершить спан, особенно при обработке исключений.

Все эти подходы требуют значительных ручных усилий и легко допустить ошибку. Нам нужно что-то более интегрированное с самой моделью корутин Kotlin. И тут на помощь приходит OpenTelemetry, но об этом мы поговорим дальше.

OpenTelemetry и корутины: теория интеграции



После того как мы разобрались с проблемами трассировки в асинхронном мире корутин, пора понять, как же OpenTelemetry справляется с этими вызовами. Скажу сразу — тут есть немало интересных технических решений, которые стоит изучить любому разработчику, работающему с корутинами.

Механизмы передачи контекста



В основе всех проблем трассировки корутин лежит одна фундаментальная сложность: как передать контекст трассировки между разными потоками выполнения? OpenTelemetry решает эту проблему с помощью специального механизма контекста.
Базовая структура OpenTelemetry включает класс Context, который служит хранилищем данных трассировки:

Kotlin
1
2
3
4
// Упрощенная модель работы с контекстом OpenTelemetry
val current = Context.current() // Получаем текущий контекст из ThreadLocal
val span = tracer.spanBuilder("operation").startSpan()
val withSpan = current.with(span) // Создаем новый контекст с нашим спаном
По умолчанию Context хранится в ThreadLocal, и именно здесь начинается самое интересное. Когда корутина приостанавливается и возобновляется в другом потоке, контекст из ThreadLocal теряется. Для решения этой проблемы OpenTelemetry предоставляет расширение для Kotlin — `opentelemetry-extension-kotlin`.

Особенности работы с CoroutineContext



Для тех, кто не в курсе, CoroutineContext в Kotlin — это набор элементов, определяющих поведение корутины. Это не просто какой-то флаг или переключатель, а целая коллекция различных контекстных элементов, влияющих на выполнение корутины.

Kotlin
1
2
// Стандартный контекст корутины включает в себя несколько элементов
val context = Job() + Dispatchers.IO + CoroutineName("MyCoroutine")
OpenTelemetry расширяет эту концепцию, добавляя собственный элемент контекста для хранения данных трассировки. Это позволяет "прикрепить" информацию о трассировке к самой корутине, а не к потоку, на котором она выполняется.

Роль Context Element в распространении метаданных



Основная магия происходит в классе ContextExtensionsKt, который предоставляет два ключевых метода:

Kotlin
1
2
3
4
5
// Получаем OpenTelemetry контекст из корутинного контекста
fun getOpenTelemetryContext(coroutineContext: CoroutineContext): Context
 
// Создаем элемент корутинного контекста с OpenTelemetry контекстом
fun Context.asContextElement(): CoroutineContext.Element
Теперь рассмотрим, как это работает на практике:

Kotlin
1
2
3
4
5
6
7
// Добавляем OpenTelemetry контекст в корутинный контекст
launch(Context.current().asContextElement()) {
    // Внутри этой корутины будет доступен текущий OpenTelemetry контекст
    val span = tracer.spanBuilder("operation").startSpan()
    // ... выполняем работу ...
    span.end()
}
Когда корутина возобновляется на новом потоке, CoroutineContext восстанавливается, и вместе с ним восстанавливается OpenTelemetry контекст. Это устраняет проблему потери контекста при переключении потоков.

Интеграция с диспетчерами корутин



Разные диспетчеры корутин (Dispatchers.IO, Dispatchers.Default, Dispatchers.Main) управляют тем, на каких потоках будут выполняться корутины. OpenTelemetry должен корректно работать со всеми типами диспетчеров. Важно понимать, что интеграция с диспетчерами происходит автоматически благодаря тому, что CoroutineContext передается при каждом переключении корутины между потоками. Это значит, что вам не нужно писать специальный код для разных диспетчеров — OpenTelemetry расширение позаботится об этом за вас.

Kotlin
1
2
3
4
5
6
7
8
9
withContext(Dispatchers.IO + Context.current().asContextElement()) {
    // Код здесь будет выполняться на IO диспетчере с сохраненным контекстом
    val data = loadDataFromDatabase()
    
    withContext(Dispatchers.Default) {
        // А здесь - на Default диспетчере, но контекст все еще сохранен!
        processData(data)
    }
}

Как это работает внутри



Если копнуть глубже, то в основе лежит достаточно элегантный механизм. Java Agent от OpenTelemetry предоставляет классы инструментации, специфичные для корутин:

Kotlin
1
2
3
4
5
6
7
8
9
// KotlinCoroutinesInstrumentationHelper (псевдокод)
public static CoroutineContext addOpenTelemetryContext(CoroutineContext coroutineContext) {
  Context current = Context.current(); // (1) Получаем текущий контекст из ThreadLocal
  Context inCoroutine = ContextExtensionsKt.getOpenTelemetryContext(coroutineContext);
  if (current == inCoroutine || inCoroutine != Context.root()) {
    return coroutineContext;
  }
  return coroutineContext.plus(ContextExtensionsKt.asContextElement(current)); // (2) Добавляем в корутинный контекст
}
Этот код выполняет две важные операции:
1. Извлекает текущий OpenTelemetry контекст из ThreadLocal
2. Добавляет этот контекст в корутинный контекст

Важно отметить, что Java Agent содержит специальную инструментацию для корутин, которая вызывает этот код при входе в корутину. Таким образом, вам не нужно явно вызывать эти методы — они будут вызваны автоматически при использовании Java Agent.

Автоматическое и ручное управление спанами



OpenTelemetry предоставляет два основных способа создания спанов в корутинах:

1. Автоматическое управление через аннотацию @WithSpan:

Kotlin
1
2
3
4
5
@WithSpan("fetchUserData")
suspend fun fetchUserData(userId: String): UserData {
    // Этот метод автоматически будет обернут в спан
    return userRepository.fetchUserById(userId)
}
2. Ручное управление через API:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
suspend fun fetchUserData(userId: String): UserData {
    val span = tracer.spanBuilder("fetchUserData").startSpan()
    try {
        span.setAttribute("userId", userId)
        return withContext(Context.current().with(span).asContextElement()) {
            userRepository.fetchUserById(userId)
        }
    } finally {
        span.end()
    }
}
Оба подхода имеют свои преимущества. Аннотация @WithSpan более лаконична и меньше засоряет код, но ручное управление дает больше контроля над жизненным циклом спана и позволяет добавлять произвольные атрибуты.

Важно заметить, что @WithSpan работает корректно с корутинами благодаря специальной инструментации, которая знает, как обрабатывать suspend-функции. Когда вы используете эту аннотацию, OpenTelemetry автоматически создает спан и корректно передает его контекст через границы приостановки корутины.

Стратегии семплирования в высоконагруженных корутинах



В высоконагруженных системах трассировка каждой операции может создать значительную нагрузку. Поэтому OpenTelemetry предоставляет различные стратегии семплирования, которые позволяют трассировать только часть операций. Семплирование особенно важно для корутин, потому что они часто используются для обработки большого количества параллельных запросов. Существует несколько стратегий семплирования:

1. Вероятностное семплирование — трассируется определенный процент операций:

Kotlin
1
2
3
4
5
6
val sampler = Sampler.parentBased(
    Sampler.traceIdRatioBased(0.1) // Трассировать 10% запросов
)
val tracerProvider = SdkTracerProvider.builder()
    .setSampler(sampler)
    .build()
2. Семплирование на основе атрибутов — можно трассировать только определенные типы операций:

Kotlin
1
2
3
4
5
val sampler = Sampler.parentBased(
    AttributesFilter.create(
        Attributes.of(AttributeKey.stringKey("http.method"), "POST")
    )
)
3. Адаптивное семплирование — динамически изменяет частоту семплирования в зависимости от нагрузки:

Kotlin
1
2
3
4
5
val sampler = AdaptiveSampler.builder()
    .setInitialSampleRate(0.1)
    .setMinSampleRate(0.01)
    .setMaxSampleRate(1.0)
    .build()
Выбор стратегии семплирования зависит от конкретных требований вашего приложения. Для большинства случаев достаточно вероятностного семплирования с частотой 10-20%.

В итоге, благодаря специальной поддержке корутин в OpenTelemetry, мы можем эффективно трассировать асинхронные операции, не теряя контекст при переключении потоков. Это позволяет создавать связные и информативные трейсы даже для сложных асинхронных процессов.

Конвертация между контекстами — ключ к успеху



Чтобы глубже понять, как именно происходит сохранение контекста между потоками в корутинах, давайте рассмотрим классовую структуру, задействованную в этом процессе. OpenTelemetry Kotlin extension содержит несколько ключевых классов:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Элемент корутинного контекста, хранящий OpenTelemetry контекст
class OpenTelemetryContextElement(val context: Context) : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<OpenTelemetryContextElement>
    
    override val key: CoroutineContext.Key<*> = Key
}
 
// Расширения для работы с контекстом
fun Context.asContextElement(): CoroutineContext.Element {
    return OpenTelemetryContextElement(this)
}
 
fun getOpenTelemetryContext(coroutineContext: CoroutineContext): Context {
    return coroutineContext[OpenTelemetryContextElement.Key]?.context ?: Context.root()
}
Вот что происходит под капотом: когда корутина приостанавливается, её контекст сохраняется. При возобновлении корутины на другом потоке этот контекст восстанавливается, включая наш OpenTelemetryContextElement. Затем, когда OpenTelemetry пытается получить текущий контекст, он сначала проверяет наличие OpenTelemetryContextElement в контексте корутины и только потом обращается к ThreadLocal.

Обработка исключений и propagation контекста



Обработка исключений в корутинах с интеграцией OpenTelemetry тоже заслуживает внимания. Когда в корутине возникает исключение, важно, чтобы оно было корректно записано в спан:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
@WithSpan
suspend fun riskyOperation() {
    try {
        throw RuntimeException("Something went wrong")
    } catch (e: Exception) {
        // Исключение будет автоматически добавлено в спан
        // благодаря WithSpan аннотации
        handleException(e)
        throw e // При необходимости можно пробросить дальше
    }
}
При использовании @WithSpan исключения автоматически записываются в спан. А как насчет ручного управления?

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
suspend fun manualRiskyOperation() {
    val span = tracer.spanBuilder("riskyOperation").startSpan()
    try {
        withContext(Context.current().with(span).asContextElement()) {
            throw RuntimeException("Something went wrong")
        }
    } catch (e: Exception) {
        // Необходимо явно записать исключение в спан
        span.recordException(e)
        handleException(e)
        throw e
    } finally {
        span.end() // Всегда завершаем спан
    }
}

Баггаж (Baggage) в распределенных системах



Помимо спанов, OpenTelemetry предоставляет механизм баггажа (baggage) для передачи произвольных метаданных между сервисами. Это особенно полезно в микросервисной архитектуре:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Устанавливаем баггаж
val baggage = Baggage.builder()
    .put("userId", "12345")
    .put("region", "eu-west")
    .build()
 
// Создаем новый контекст с баггажом
val contextWithBaggage = Context.current().with(baggage)
 
// Используем контекст в корутине
launch(contextWithBaggage.asContextElement()) {
    // Внутри этой корутины баггаж будет доступен
    val userId = Baggage.current().getEntryValue("userId")
    processRequest(userId)
}
Баггаж передается вместе с контекстом трассировки между сервисами, что позволяет сохранять важные метаданные на протяжении всего запроса.

Трассировка сложных корутинных конструкций



Корутины в Kotlin предоставляют множество мощных конструкций — Flow, Channel, Actor и т.д. Трассировка этих конструкций требует особого подхода. Например, для трассировки Flow можно использовать следующий паттерн:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun getDataFlow(): Flow<Data> = flow {
    val span = tracer.spanBuilder("getDataFlow").startSpan()
    try {
        withContext(Context.current().with(span).asContextElement()) {
            // Генерируем данные
            for (i in 1..10) {
                val itemSpan = tracer.spanBuilder("processItem").startSpan()
                try {
                    withContext(Context.current().with(itemSpan).asContextElement()) {
                        val data = processItem(i)
                        emit(data) // Эмитим данные с контекстом трассировки
                    }
                } finally {
                    itemSpan.end()
                }
            }
        }
    } finally {
        span.end()
    }
}
Для Channel это выглядит так:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
val channel = Channel<Data>()
 
// Отправитель
launch {
    val span = tracer.spanBuilder("sender").startSpan()
    try {
        withContext(Context.current().with(span).asContextElement()) {
            for (i in 1..10) {
                val data = processItem(i)
                channel.send(data)
            }
        }
    } finally {
        span.end()
        channel.close()
    }
}
 
// Получатель
launch {
    val span = tracer.spanBuilder("receiver").startSpan()
    try {
        withContext(Context.current().with(span).asContextElement()) {
            for (data in channel) {
                processData(data)
            }
        }
    } finally {
        span.end()
    }
}
В обоих случаях ключевая идея — обеспечить правильную передачу контекста трассировки через все асинхронные границы.

Оптимизация производительности трассировки



Трассировка может создавать дополнительную нагрузку на приложение, особенно при интенсивном использовании корутин. Вот несколько советов по оптимизации:

1. Используйте семплирование — не нужно трассировать каждую операцию.
2. Применяйте фильтрацию — трассируйте только важные части бизнес-логики.
3. Контролируйте количество атрибутов — слишком много атрибутов может создать значительный оверхед.
4. Избегайте вложенных спанов в микрооперациях — для очень мелких и частых операций лучше использовать счетчики или метрики.

Правильно настроенная трассировка должна давать ценную информацию без существенного влияния на производительность системы.

Практическая реализация трассировки



После того как мы разобрались с теоретической частью, пора переходить к практике. Я уверен, что большинству из вас интересно именно это - как же на самом деле заставить всю эту магию работать в реальном проекте. Поделюсь своими наработками и опытом, который я накопил за несколько лет работы с корутинами и системами трассировки.

Настройка OpenTelemetry для Kotlin



Первый шаг - подключение необходимых зависимостей. Для проекта на Kotlin с корутинами нам понадобятся следующие библиотеки:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Gradle KTS
dependencies {
    // Базовое API OpenTelemetry
    implementation("io.opentelemetry:opentelemetry-api:1.31.0")
    
    // SDK для инструментации
    implementation("io.opentelemetry:opentelemetry-sdk:1.31.0")
    
    // Расширение для Kotlin (ключевая библиотека!)
    implementation("io.opentelemetry:opentelemetry-extension-kotlin:1.31.0")
    
    // Экспортер для отправки данных (например, в Jaeger)
    implementation("io.opentelemetry:opentelemetry-exporter-otlp:1.31.0")
    
    // Автоматическая инструментация с аннотациями
    implementation("io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:1.31.0")
}
Теперь нужно настроить OpenTelemetry SDK. Вот простой пример конфигурации:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun initOpenTelemetry(): OpenTelemetry {
    val resource = Resource.getDefault()
        .merge(Resource.create(Attributes.of(
            ResourceAttributes.SERVICE_NAME, "my-kotlin-service"
        )))
    
    val spanExporter = OtlpGrpcSpanExporter.builder()
        .setEndpoint("http://localhost:4317")
        .build()
    
    val spanProcessor = BatchSpanProcessor.builder(spanExporter).build()
    
    val sdkTracerProvider = SdkTracerProvider.builder()
        .addSpanProcessor(spanProcessor)
        .setResource(resource)
        .build()
    
    return OpenTelemetrySdk.builder()
        .setTracerProvider(sdkTracerProvider)
        .buildAndRegisterGlobal()
}
Этот код создает глобальную инстанцию OpenTelemetry, которая будет отправлять данные трассировки в коллектор по адресу localhost:4317. Вызовите этот метод при старте вашего приложения, и базовая настройка готова.

Создание кастомных интерсепторов



Иногда бывает нужно перехватывать и модифицировать спаны до того, как они будут отправлены. Например, для добавления бизнес-метрик или фильтрации чувствительных данных. Для этого можно создать собственный SpanProcessor:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class CustomSpanProcessor : SpanProcessor {
    override fun onStart(parentContext: Context, span: ReadWriteSpan) {
        // Добавляем метку времени старта
        span.setAttribute("custom.start_time", System.currentTimeMillis())
        
        // Можем добавить информацию о текущей корутине
        val coroutineName = CoroutineName.Key.get(coroutineContext)?.name
        if (coroutineName != null) {
            span.setAttribute("coroutine.name", coroutineName)
        }
    }
    
    override fun onEnd(span: ReadableSpan) {
        // Действия при завершении спана
        // Например, вычисление длительности и запись в метрики
    }
    
    override fun isStartRequired(): Boolean = true
    
    override fun isEndRequired(): Boolean = true
    
    override fun shutdown(): CompletableResultCode {
        return CompletableResultCode.ofSuccess()
    }
}
Этот процессор можно добавить в конфигурацию OpenTelemetry:

Kotlin
1
2
3
4
5
val sdkTracerProvider = SdkTracerProvider.builder()
    .addSpanProcessor(new CustomSpanProcessor())
    .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
    .setResource(resource)
    .build()

Создание собственных span-атрибутов для бизнес-логики



Добавление бизнес-атрибутов к спанам делает трассировку гораздо информативнее. Вот как можно добавлять кастомные атрибуты:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@WithSpan
suspend fun processOrder(orderId: String, userId: String) {
    val span = Span.current()
    
    // Добавляем бизнес-атрибуты
    span.setAttribute("order.id", orderId)
    span.setAttribute("user.id", userId)
    
    // Добавляем события
    span.addEvent("order_validation_started")
    
    val validationResult = validateOrder(orderId)
    
    // Записываем результат валидации
    span.addEvent("order_validation_completed", Attributes.of(
        AttributeKey.booleanKey("validation_success"), validationResult.isSuccess,
        AttributeKey.stringKey("validation_message"), validationResult.message
    ))
    
    // Продолжаем обработку...
}
Я часто использую следующий подход для организации бизнес-атрибутов - создаю статические классы с константами:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object OrderAttributes {
    val ORDER_ID = AttributeKey.stringKey("order.id")
    val USER_ID = AttributeKey.stringKey("user.id")
    val PAYMENT_METHOD = AttributeKey.stringKey("payment.method")
    val ORDER_TOTAL = AttributeKey.doubleKey("order.total")
    
    // Метод-хелпер для быстрого добавления всех атрибутов заказа
    fun addOrderAttributes(span: Span, order: Order) {
        span.setAttributes(
            ORDER_ID, order.id,
            USER_ID, order.userId,
            PAYMENT_METHOD, order.paymentMethod,
            ORDER_TOTAL, order.total
        )
    }
}

Интеграция с популярными фреймворками



Ktor (сервер)



Для интеграции OpenTelemetry с Ktor-сервером можно создать кастомный feature:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class OpenTelemetryFeature(configuration: Configuration) {
    private val tracer = configuration.tracer
    
    class Configuration {
        var tracer: Tracer = GlobalOpenTelemetry.getTracer("ktor-server")
    }
    
    companion object Feature : ApplicationFeature<Application, Configuration, OpenTelemetryFeature> {
        override val key = AttributeKey<OpenTelemetryFeature>("OpenTelemetry")
        
        override fun install(pipeline: Application, configure: Configuration.() -> Unit): OpenTelemetryFeature {
            val configuration = Configuration().apply(configure)
            val feature = OpenTelemetryFeature(configuration)
            
            pipeline.intercept(ApplicationCallPipeline.Monitoring) {
                val requestSpan = configuration.tracer.spanBuilder("http_request")
                    .setAttribute(SemanticAttributes.HTTP_METHOD, call.request.httpMethod.value)
                    .setAttribute(SemanticAttributes.HTTP_URL, call.request.uri)
                    .startSpan()
                
                try {
                    withContext(Context.current().with(requestSpan).asContextElement()) {
                        proceed()
                    }
                } catch (e: Exception) {
                    requestSpan.recordException(e)
                    throw e
                } finally {
                    requestSpan.end()
                }
            }
            
            return feature
        }
    }
}
Использование:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fun Application.module() {
    install(OpenTelemetryFeature) {
        tracer = GlobalOpenTelemetry.getTracer("my-ktor-app")
    }
    
    routing {
        get("/api/users/{id}") {
            val userId = call.parameters["id"]
            
            // Создаем дочерний спан
            val span = GlobalOpenTelemetry.getTracer("api")
                .spanBuilder("get_user_details")
                .setAttribute("user.id", userId ?: "unknown")
                .startSpan()
            
            try {
                withContext(Context.current().with(span).asContextElement()) {
                    val user = userRepository.getUser(userId)
                    call.respond(user)
                }
            } finally {
                span.end()
            }
        }
    }
}

Spring Boot



Для Spring Boot существует специальный стартер, но если вы используете корутины, стоит добавить дополнительную конфигурацию:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
class OpenTelemetryConfig {
    
    @Bean
    fun coroutineContextInterceptor(): WebMvcInterceptor {
        return object : HandlerInterceptor {
            override fun preHandle(request: HttpServletRequest, response: HttpServletResponse, handler: Any): Boolean {
                // Получаем текущий контекст OpenTelemetry
                val currentContext = Context.current()
                
                // Сохраняем его как атрибут запроса
                request.setAttribute("otel-context", currentContext)
                
                return true
            }
        }
    }
    
    @Bean
    fun coroutineContextElementProvider(): CoroutineContextElementProvider {
        return object : CoroutineContextElementProvider {
            override fun context(request: HttpServletRequest): CoroutineContext {
                val otelContext = request.getAttribute("otel-context") as? Context
                    ?: Context.current()
                
                // Создаем элемент корутинного контекста
                return otelContext.asContextElement()
            }
        }
    }
}

Трассировка Flow операций



Для трассировки Flow операций есть несколько подходов. Я предпочитаю создавать специальные операторы расширения:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun <T> Flow<T>.traced(
    name: String,
    tracer: Tracer = GlobalOpenTelemetry.getTracer("flow-tracer")
): Flow<T> = flow {
    val span = tracer.spanBuilder(name).startSpan()
    
    try {
        val context = Context.current().with(span)
        collect { value ->
            withContext(context.asContextElement()) {
                emit(value)
            }
        }
    } catch (e: Exception) {
        span.recordException(e)
        throw e
    } finally {
        span.end()
    }
}
Использование:

Kotlin
1
2
3
4
5
6
7
8
userRepository.getUserFlow(userId)
    .traced("get_user_flow")
    .map { user -> 
        // Доступ к текущему спану
        Span.current().setAttribute("user.name", user.name)
        transformUser(user)
    }
    .collect { ... }
Для более детальной трассировки можно добавить операторы для каждого элемента Flow:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun <T, R> Flow<T>.tracedMap(
    name: String,
    tracer: Tracer = GlobalOpenTelemetry.getTracer("flow-tracer"),
    transform: suspend (T) -> R
): Flow<R> = flow {
    collect { value ->
        val span = tracer.spanBuilder("$name-item").startSpan()
        
        try {
            val result = withContext(Context.current().with(span).asContextElement()) {
                transform(value)
            }
            emit(result)
        } catch (e: Exception) {
            span.recordException(e)
            throw e
        } finally {
            span.end()
        }
    }
}
Это позволяет отслеживать обработку каждого элемента в потоке отдельно.

Создание middleware для автоматической инструментации



Если у вас много suspend-функций, которые нужно трассировать, ручное добавление аннотаций может быть утомительным. Можно создать middleware, который автоматически добавляет трассировку:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.RUNTIME)
annotation class TraceAllSuspendFunctions
 
class TracingAspect {
    private val tracer = GlobalOpenTelemetry.getTracer("auto-trace")
    
    @Around("@within(TraceAllSuspendFunctions) && execution(suspend * *(..))")
    suspend fun traceMethod(joinPoint: ProceedingJoinPoint): Any? {
        val method = (joinPoint.signature as MethodSignature).method
        val spanName = "${method.declaringClass.simpleName}.${method.name}"
        
        val span = tracer.spanBuilder(spanName).startSpan()
        
        try {
            // Добавляем аргументы метода как атрибуты
            val paramNames = method.parameters.map { it.name }
            val paramValues = joinPoint.args
            
            for (i in paramNames.indices) {
                if (i < paramValues.size) {
                    span.setAttribute("param.${paramNames[i]}", paramValues[i].toString())
                }
            }
            
            return withContext(Context.current().with(span).asContextElement()) {
                joinPoint.proceed()
            }
        } catch (e: Exception) {
            span.recordException(e)
            throw e
        } finally {
            span.end()
        }
    }
}
Использование:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
@TraceAllSuspendFunctions
class UserService(private val repository: UserRepository) {
    suspend fun getUser(id: String): User {
        // Эта функция будет автоматически трассироваться
        return repository.findById(id)
    }
    
    suspend fun updateUser(user: User): User {
        // И эта тоже
        return repository.save(user)
    }
}
Обратите внимание, что для работы аспектов нужно подключить соответствующую библиотеку (например, AspectJ или Spring AOP).

Теперь у вас есть все необходимые инструменты для практической реализации трассировки в ваших Kotlin-приложениях с корутинами. В следующей части мы рассмотрим продвинутые техники и подводные камни, с которыми вы можете столкнуться.

Работа с багажом в распределенных системах



На практике трассировка в микросервисной архитектуре часто требует передачи дополнительного контекста между сервисами. Для этого OpenTelemetry предлагает механизм "багажа" (Baggage). В отличие от спанов, багаж является частью контекста, которая сохраняется при любых передачах запроса между сервисами:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Сервис A
suspend fun handleRequest(request: Request): Response {
    // Создаем багаж с информацией о пользователе
    val baggage = Baggage.builder()
        .put("tenant.id", request.tenantId)
        .put("user.role", request.userRole)
        .build()
    
    // Добавляем в контекст
    val updatedContext = Context.current().with(baggage)
    
    return withContext(updatedContext.asContextElement()) {
        // Вызов другого сервиса сохранит багаж
        serviceB.processData(request.data)
    }
}
 
// Сервис B
suspend fun processData(data: Data): Result {
    // Получаем данные из багажа
    val baggage = Baggage.current()
    val tenantId = baggage.getEntryValue("tenant.id") ?: "default"
    val userRole = baggage.getEntryValue("user.role") ?: "user"
    
    // Используем полученные данные в бизнес-логике
    return when (userRole) {
        "admin" -> processWithAdminPrivileges(data, tenantId)
        else -> processWithUserPrivileges(data, tenantId)
    }
}
Этот подход позволяет избежать явной передачи контекстной информации через параметры функций.

Трассировка Channel операций



Channel — еще один важный примитив корутин, требующий трассировки. Но в отличие от Flow, Channel представляет собой "горячий" поток, что создает дополнительные сложности:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
fun CoroutineScope.createProcessingPipeline(): Channel<ProcessedData> {
    val inputChannel = Channel<RawData>()
    val outputChannel = Channel<ProcessedData>()
    
    // Продюсер
    launch {
        val span = tracer.spanBuilder("data_producer").startSpan()
        try {
            withContext(Context.current().with(span).asContextElement()) {
                for (i in 1..100) {
                    val data = fetchRawData(i)
                    span.addEvent("sending_data", Attributes.of(
                        AttributeKey.longKey("item_id"), i.toLong()
                    ))
                    inputChannel.send(data)
                }
            }
        } finally {
            span.end()
            inputChannel.close()
        }
    }
    
    // Обработчик
    launch {
        val processorSpan = tracer.spanBuilder("data_processor").startSpan()
        try {
            withContext(Context.current().with(processorSpan).asContextElement()) {
                for (rawData in inputChannel) {
                    // Для каждого элемента создаем отдельный спан
                    val itemSpan = tracer.spanBuilder("process_item").startSpan()
                    try {
                        val ctx = Context.current().with(itemSpan)
                        withContext(ctx.asContextElement()) {
                            val processed = processRawData(rawData)
                            outputChannel.send(processed)
                        }
                    } finally {
                        itemSpan.end()
                    }
                }
            }
        } catch (e: Exception) {
            processorSpan.recordException(e)
            throw e
        } finally {
            processorSpan.end()
            outputChannel.close()
        }
    }
    
    return outputChannel
}
Обратите внимание на создание отдельных спанов для каждого этапа обработки, что позволяет точно отследить, на каком шаге возникла проблема.

Адаптация существующего кода



Часто приходится добавлять трассировку в существующий код, который не был изначально спроектирован с учетом OpenTelemetry. Я разработал несколько паттернов для такого рефакторинга:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Было
class LegacyService {
    suspend fun processData(data: Data): Result {
        val step1 = doStep1(data)
        val step2 = doStep2(step1)
        return finalizeProcessing(step2)
    }
    
    private suspend fun doStep1(data: Data): IntermediateResult1 = ...
    private suspend fun doStep2(data: IntermediateResult1): IntermediateResult2 = ...
    private suspend fun finalizeProcessing(data: IntermediateResult2): Result = ...
}
 
// Стало
class TracedLegacyService(private val delegate: LegacyService) {
    private val tracer = GlobalOpenTelemetry.getTracer("legacy-service")
    
    suspend fun processData(data: Data): Result {
        return trace("process_data") {
            delegate.processData(data)
        }
    }
    
    private suspend fun <T> trace(spanName: String, block: suspend () -> T): T {
        val span = tracer.spanBuilder(spanName).startSpan()
        try {
            return withContext(Context.current().with(span).asContextElement()) {
                block()
            }
        } catch (e: Exception) {
            span.recordException(e)
            throw e
        } finally {
            span.end()
        }
    }
}
Этот подход использует паттерн "Декоратор", добавляя трассировку без изменения исходного кода. Для более детальной трассировки можно использовать прокси или наследование, если класс спроектирован для расширения.

Обработка ошибок в трассируемом коде



Часто упускаемый аспект - корректная трассировка ошибок в корутинах:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
suspend fun fetchDataWithRetry(id: String, maxRetries: Int = 3): Data {
    val span = tracer.spanBuilder("fetch_with_retry").startSpan()
    span.setAttribute("data.id", id)
    span.setAttribute("max_retries", maxRetries)
    
    try {
        withContext(Context.current().with(span).asContextElement()) {
            var lastError: Exception? = null
            for (attempt in 1..maxRetries) {
                try {
                    span.addEvent("retry_attempt", Attributes.of(
                        AttributeKey.longKey("attempt"), attempt.toLong()
                    ))
                    return dataService.fetchData(id)
                } catch (e: Exception) {
                    lastError = e
                    span.addEvent("retry_failed", Attributes.of(
                        AttributeKey.stringKey("error"), e.message ?: "Unknown error"
                    ))
                    delay(attempt * 1000L) // Экспоненциальная задержка
                }
            }
            throw lastError ?: RuntimeException("Failed after $maxRetries attempts")
        }
    } finally {
        span.end()
    }
}
Такой подход дает полную картину всех попыток и причин неудач, что бесценно при отладке проблем в продакшене.

В целом, правильная реализация трассировки корутин требует внимания к деталям, особенно к моментам передачи контекста. Но результат стоит затраченных усилий - вы получаете полную наблюдаемость асинхронных операций, что кардинально упрощает отладку и мониторинг распределенных систем.

Продвинутые техники и подводные камни



Structured concurrency и трассировка



Структурированная конкурентность (structured concurrency) - один из главных козырей Kotlin корутин. Но как её правильно сочетать с трассировкой? Тут есть несколько интересных моментов, о которых почти никто не пишет. Начнем с базового примера:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
suspend fun loadUserProfile(userId: String): UserProfile {
    return coroutineScope {
        val tracer = GlobalOpenTelemetry.getTracer("profile-loader")
        val span = tracer.spanBuilder("load_user_profile").startSpan()
        
        try {
            withContext(Context.current().with(span).asContextElement()) {
                val userDetails = async { userRepository.getUserDetails(userId) }
                val userPreferences = async { preferenceRepository.getUserPreferences(userId) }
                val userActivity = async { activityRepository.getRecentActivity(userId) }
                
                UserProfile(
                    details = userDetails.await(),
                    preferences = userPreferences.await(),
                    recentActivity = userActivity.await()
                )
            }
        } finally {
            span.end()
        }
    }
}
На первый взгляд, всё хорошо. Но тут есть тонкость - если одна из async корутин завершится с ошибкой, мы увидим исключение, но не узнаем, какая именно операция пошла не так. Давайте улучшим наш код:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
suspend fun loadUserProfile(userId: String): UserProfile {
    return coroutineScope {
        val tracer = GlobalOpenTelemetry.getTracer("profile-loader")
        val parentSpan = tracer.spanBuilder("load_user_profile").startSpan()
        
        try {
            withContext(Context.current().with(parentSpan).asContextElement()) {
                val userDetails = async { 
                    val childSpan = tracer.spanBuilder("get_user_details").startSpan()
                    try {
                        withContext(Context.current().with(childSpan).asContextElement()) {
                            userRepository.getUserDetails(userId)
                        }
                    } catch (e: Exception) {
                        childSpan.recordException(e)
                        childSpan.setStatus(StatusCode.ERROR)
                        throw e
                    } finally {
                        childSpan.end()
                    }
                }
                
                // Аналогично для других async блоков...
                
                UserProfile(
                    details = userDetails.await(),
                    preferences = userPreferences.await(),
                    recentActivity = userActivity.await()
                )
            }
        } finally {
            parentSpan.end()
        }
    }
}
Код стал более громоздким, но теперь мы точно будем знать, какая из операций зафейлилась. Я обычно создаю для этого специальный хелпер:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
suspend fun <T> tracedAsync(
    name: String,
    tracer: Tracer = GlobalOpenTelemetry.getTracer("async-tracer"),
    block: suspend () -> T
): Deferred<T> = coroutineScope {
    async {
        val span = tracer.spanBuilder(name).startSpan()
        try {
            withContext(Context.current().with(span).asContextElement()) {
                block()
            }
        } catch (e: Exception) {
            span.recordException(e)
            span.setStatus(StatusCode.ERROR)
            throw e
        } finally {
            span.end()
        }
    }
}
И тогда наш код снова становится чистым:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
suspend fun loadUserProfile(userId: String): UserProfile {
    return coroutineScope {
        val tracer = GlobalOpenTelemetry.getTracer("profile-loader")
        val span = tracer.spanBuilder("load_user_profile").startSpan()
        
        try {
            withContext(Context.current().with(span).asContextElement()) {
                val userDetails = tracedAsync("get_user_details") { 
                    userRepository.getUserDetails(userId) 
                }
                val userPreferences = tracedAsync("get_user_preferences") { 
                    preferenceRepository.getUserPreferences(userId) 
                }
                val userActivity = tracedAsync("get_recent_activity") { 
                    activityRepository.getRecentActivity(userId) 
                }
                
                UserProfile(
                    details = userDetails.await(),
                    preferences = userPreferences.await(),
                    recentActivity = userActivity.await()
                )
            }
        } finally {
            span.end()
        }
    }
}

Производительность и оптимизация



Добавление трассировки может существенно ударить по производительности вашего приложения, если делать это бездумно. Вот несколько советов, которые я выработал на практике:

1. Избегайте создания спанов для микроопераций

Если у вас есть цикл, обрабатывающий миллионы элементов, не создавайте отдельный спан для каждой итерации. Вместо этого используйте счетчики или метрики, а спаны создавайте только для больших логических блоков:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Плохо - создаем спан на каждую итерацию
items.forEach { item ->
    val span = tracer.spanBuilder("process_item").startSpan()
    try {
        processItem(item)
    } finally {
        span.end()
    }
}
 
// Лучше - используем счетчик и события
val span = tracer.spanBuilder("process_items_batch").startSpan()
try {
    var processedCount = 0
    var errorCount = 0
    
    items.forEach { item ->
        try {
            processItem(item)
            processedCount++
            
            // Добавляем событие каждые 1000 элементов
            if (processedCount % 1000 == 0) {
                span.addEvent("progress_update", Attributes.of(
                    AttributeKey.longKey("processed_count"), processedCount.toLong()
                ))
            }
        } catch (e: Exception) {
            errorCount++
            // Записываем только первые несколько ошибок
            if (errorCount <= 5) {
                span.recordException(e)
            }
        }
    }
    
    span.setAttribute("total_processed", processedCount)
    span.setAttribute("total_errors", errorCount)
} finally {
    span.end()
}
2. Отложенное создание атрибутов

Вычисление некоторых атрибутов может быть дорогим. Используйте ленивый подход:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
suspend fun processData(data: ComplexData) {
    val span = tracer.spanBuilder("process_complex_data").startSpan()
    
    try {
        // Дорогие атрибуты добавляем только при определенных условиях
        if (data.needsDetailedTracing) {
            val expensiveAttribute = computeExpensiveAttribute(data)
            span.setAttribute("complex.attribute", expensiveAttribute)
        }
        
        // Основная логика...
    } finally {
        span.end()
    }
}
3. Буферизация и пакетная отправка спанов

Настройте ваш SpanProcessor для буферизации и отправки спанов пакетами:

Kotlin
1
2
3
4
5
6
7
8
9
val spanExporter = OtlpGrpcSpanExporter.builder()
    .setEndpoint("http://otel-collector:4317")
    .build()
 
val spanProcessor = BatchSpanProcessor.builder(spanExporter)
    .setMaxQueueSize(2048) // Максимальный размер очереди
    .setScheduleDelay(1, TimeUnit.SECONDS) // Интервал отправки
    .setMaxExportBatchSize(512) // Размер пакета
    .build()

Мониторинг утечек памяти в span-контекстах



Одна из коварных проблем при использовании OpenTelemetry с корутинами - утечки памяти из-за неправильного управления контекстами. Вот типичный сценарий утечки:

Kotlin
1
2
3
4
5
6
7
8
9
10
suspend fun leakyOperation() {
    val span = tracer.spanBuilder("leaky_operation").startSpan()
    
    // ОШИБКА: мы не закрываем спан!
    withContext(Context.current().with(span).asContextElement()) {
        // Какая-то работа...
    }
    
    // Спан никогда не завершается
}
Для отслеживания таких утечек я разработал простую, но эффективную технику - активный счетчик спанов:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
object SpanMonitor {
    private val activeSpans = AtomicInteger(0)
    
    fun spanStarted() {
        val count = activeSpans.incrementAndGet()
        if (count % 1000 == 0) {
            logger.info("Active spans: $count")
        }
    }
    
    fun spanEnded() {
        activeSpans.decrementAndGet()
    }
    
    fun getActiveSpansCount() = activeSpans.get()
}
 
// Используем с кастомным SpanProcessor
class MonitoringSpanProcessor : SpanProcessor {
    override fun onStart(parentContext: Context, span: ReadWriteSpan) {
        SpanMonitor.spanStarted()
    }
    
    override fun onEnd(span: ReadableSpan) {
        SpanMonitor.spanEnded()
    }
    
    // Другие методы интерфейса...
}
Затем добавляем этот процессор в конфигурацию OpenTelemetry и регулярно проверяем количество активных спанов. Если оно постоянно растет, у вас есть утечка.
Для более глубокой диагностики можно расширить этот подход, сохраняя информацию о создателях спанов:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object DetailedSpanMonitor {
    private val activeSpansByClass = ConcurrentHashMap<String, AtomicInteger>()
    
    fun spanStarted(className: String) {
        activeSpansByClass.computeIfAbsent(className) { AtomicInteger(0) }.incrementAndGet()
    }
    
    fun spanEnded(className: String) {
        activeSpansByClass[className]?.decrementAndGet()
    }
    
    fun printStats() {
        activeSpansByClass.forEach { (className, count) ->
            if (count.get() > 0) {
                logger.warn("Potential leak in $className: ${count.get()} active spans")
            }
        }
    }
}

Отладка проблем с контекстом



Проблемы с контекстом OpenTelemetry в корутинах могут быть очень коварными. Вот несколько техник, которые я использую для их отладки:

1. Логирование текущего контекста

Kotlin
1
2
3
4
5
6
7
8
9
10
suspend fun debugContext() {
    val otelContext = Context.current()
    val span = Span.fromContext(otelContext)
    logger.debug("Current context: spanId=${span.spanContext.spanId}, traceId=${span.spanContext.traceId}")
    
    // Проверяем корутинный контекст
    val coroutineContext = coroutineContext
    val otelElement = coroutineContext[OpenTelemetryContextElement.Key]
    logger.debug("CoroutineContext has OTEL element: ${otelElement != null}")
}
2. Контекстный интерцептор

Создайте интерцептор, который будет отслеживать все изменения контекста:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
class ContextTrackingInterceptor : SpanProcessor {
    override fun onStart(parentContext: Context, span: ReadWriteSpan) {
        logger.debug("Span started: ${span.name}, parent=${Span.fromContext(parentContext).spanContext.spanId}")
    }
    
    override fun onEnd(span: ReadableSpan) {
        logger.debug("Span ended: ${span.name}")
    }
    
    // Другие методы интерфейса...
}
3. Ловушки для потерянных контекстов

Иногда контекст теряется при передаче между потоками. Вот как можно выявить эту проблему:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
suspend fun operationWithContextCheck() {
    val span = tracer.spanBuilder("parent_operation").startSpan()
    
    try {
        val contextBefore = Context.current()
        logger.debug("Before withContext: ${Span.fromContext(contextBefore).spanContext.spanId}")
        
        withContext(Dispatchers.IO + Context.current().with(span).asContextElement()) {
            val contextAfter = Context.current()
            logger.debug("After withContext: ${Span.fromContext(contextAfter).spanContext.spanId}")
            
            // Если ID спана изменился или стал невалидным, контекст был потерян
            if (Span.fromContext(contextAfter).spanContext.spanId != span.spanContext.spanId) {
                logger.error("Context was lost during withContext!")
            }
        }
    } finally {
        span.end()
    }
}
Еще одна распространенная проблема - забывать добавлять элемент контекста при создании новой корутины:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
// Неправильно - контекст теряется
launch {
    // Здесь не будет доступа к родительскому спану
    doSomething()
}
 
// Правильно
launch(Context.current().asContextElement()) {
    // Здесь будет доступ к родительскому спану
    doSomething()
}
Для автоматизации этого процесса можно создать расширение на CoroutineScope:

Kotlin
1
2
3
4
5
6
7
8
9
fun CoroutineScope.tracedLaunch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // Комбинируем пользовательский контекст с элементом OpenTelemetry
    val combinedContext = context + Context.current().asContextElement()
    return this.launch(combinedContext, start, block)
}
Это расширение автоматически добавляет текущий OpenTelemetry контекст в новую корутину, что избавляет от необходимости делать это вручную.

Полноценное демо-приложение с трассировкой



Теория без практики - как ботинок без подошвы, толку мало. Давайте соберём все рассмотренные концепции в одно демо-приложение, чтобы наглядно увидеть, как работает трассировка корутин с OpenTelemetry в реальном проекте. Я создал небольшую микросервисную систему для отслеживания заказов, которая наглядно показывает все аспекты трассировки.

Архитектура приложения



Наша система состоит из трех микросервисов:
order-service - принимает заказы от пользователей,
inventory-service - проверяет наличие товаров на складе,
notification-service - отправляет уведомления.

Все сервисы написаны на Kotlin с использованием корутин и общаются друг с другом через HTTP. Структура проекта выглядит так:

Kotlin
1
2
3
4
5
demo-app/
├── order-service/
├── inventory-service/
├── notification-service/
└── docker-compose.yml
Для наглядности рассмотрим ключевые части order-service.

Конфигурация OpenTelemetry



Для начала настроим OpenTelemetry в нашем сервисе:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Configuration
class TelemetryConfig {
    @Bean
    fun openTelemetry(): OpenTelemetry {
        val resource = Resource.getDefault()
            .merge(Resource.create(Attributes.of(
                ResourceAttributes.SERVICE_NAME, "order-service"
            )))
 
        val spanExporter = OtlpGrpcSpanExporter.builder()
            .setEndpoint("http://otel-collector:4317")
            .build()
 
        val spanProcessor = BatchSpanProcessor.builder(spanExporter).build()
 
        return OpenTelemetrySdk.builder()
            .setTracerProvider(
                SdkTracerProvider.builder()
                    .addSpanProcessor(spanProcessor)
                    .setResource(resource)
                    .build()
            )
            .build()
    }
 
    @Bean
    fun tracer(openTelemetry: OpenTelemetry): Tracer {
        return openTelemetry.getTracer("order-service-tracer")
    }
}

Контроллер с корутинами



Создадим контроллер, который будет обрабатывать запросы на создание заказов:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RestController
class OrderController(
    private val orderService: OrderService,
    private val tracer: Tracer
) {
    @PostMapping("/orders")
    suspend fun createOrder(@RequestBody orderRequest: OrderRequest): ResponseEntity<OrderResponse> {
        val span = tracer.spanBuilder("create_order_endpoint").startSpan()
        
        return try {
            span.setAttribute("order.items_count", orderRequest.items.size)
            
            withContext(Context.current().with(span).asContextElement()) {
                val order = orderService.createOrder(orderRequest)
                ResponseEntity.ok(OrderResponse(order.id, order.status))
            }
        } catch (e: Exception) {
            span.recordException(e)
            span.setStatus(StatusCode.ERROR, e.message ?: "Unknown error")
            ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(OrderResponse(null, "ERROR"))
        } finally {
            span.end()
        }
    }
}

Сервис с бизнес-логикой



Теперь реализуем сервис, который содержит бизнес-логику:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Service
class OrderService(
    private val inventoryClient: InventoryClient,
    private val notificationClient: NotificationClient,
    private val orderRepository: OrderRepository,
    private val tracer: Tracer
) {
    @WithSpan
    suspend fun createOrder(request: OrderRequest): Order = coroutineScope {
        // Используем контекст текущего спана
        val span = Span.current()
        span.setAttribute("order.customer_id", request.customerId)
        
        // Параллельно проверяем наличие всех товаров
        val inventoryChecks = request.items.map { item ->
            async {
                val itemSpan = tracer.spanBuilder("check_inventory_item").startSpan()
                try {
                    withContext(Context.current().with(itemSpan).asContextElement()) {
                        itemSpan.setAttribute("item.id", item.productId)
                        itemSpan.setAttribute("item.quantity", item.quantity)
                        
                        inventoryClient.checkAvailability(item.productId, item.quantity)
                    }
                } finally {
                    itemSpan.end()
                }
            }
        }
        
        // Ждем результатов всех проверок
        inventoryChecks.awaitAll()
        
        // Создаем заказ в БД
        val order = orderRepository.save(
            Order(
                customerId = request.customerId,
                items = request.items,
                status = "CREATED"
            )
        )
        
        // Отправляем уведомление асинхронно, но не ждем его выполнения
        launch(Context.current().asContextElement()) {
            val notifySpan = tracer.spanBuilder("send_order_notification").startSpan()
            try {
                withContext(Context.current().with(notifySpan).asContextElement()) {
                    notificationClient.sendOrderCreatedNotification(order.id, request.customerId)
                }
            } finally {
                notifySpan.end()
            }
        }
        
        order
    }
}

Клиент для межсервисного взаимодействия



Для взаимодействия с другими сервисами я использую клиент с трассировкой:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Service
class InventoryClient(
    private val webClient: WebClient,
    private val tracer: Tracer
) {
    suspend fun checkAvailability(productId: String, quantity: Int): Boolean {
        val span = tracer.spanBuilder("inventory_client_check").startSpan()
        
        try {
            return withContext(Context.current().with(span).asContextElement()) {
                span.setAttribute("product.id", productId)
                span.setAttribute("product.quantity", quantity)
                
                val response = webClient.get()
                    .uri("/inventory/check?productId=$productId&quantity=$quantity")
                    .header("traceparent", extractW3CTraceContext())
                    .retrieve()
                    .awaitBody<AvailabilityResponse>()
                
                span.setAttribute("inventory.available", response.available)
                response.available
            }
        } catch (e: Exception) {
            span.recordException(e)
            span.setStatus(StatusCode.ERROR)
            throw e
        } finally {
            span.end()
        }
    }
    
    private fun extractW3CTraceContext(): String {
        val spanContext = Span.current().spanContext
        return "00-${spanContext.traceId}-${spanContext.spanId}-01"
    }
}

Запуск и наблюдение



Для запуска всей системы я использую Docker Compose:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
version: '3'
services:
  order-service:
    build: ./order-service
    ports:
      - "8080:8080"
    environment:
      - INVENTORY_SERVICE_URL=http://inventory-service:8081
      - NOTIFICATION_SERVICE_URL=http://notification-service:8082
      
  inventory-service:
    build: ./inventory-service
    ports:
      - "8081:8081"
      
  notification-service:
    build: ./notification-service
    ports:
      - "8082:8082"
      
  otel-collector:
    image: otel/opentelemetry-collector:0.53.0
    ports:
      - "4317:4317"
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
    command: ["--config=/etc/otel-collector-config.yaml"]
    
  jaeger:
    image: jaegertracing/all-in-one:1.35
    ports:
      - "16686:16686"
Когда система запущена, мы можем отправить запрос на создание заказа:

Bash
1
2
3
4
5
6
7
8
curl -X POST http://localhost:8080/orders -H "Content-Type: application/json" -d '
{
  "customerId": "customer-123",
  "items": [
    {"productId": "product-1", "quantity": 2},
    {"productId": "product-2", "quantity": 1}
  ]
}'
После выполнения запроса можно открыть Jaeger UI (http://localhost:16686) и увидеть полный трейс запроса со всеми спанами и метаданными. В трейсе будут видны все async операции, межсервисные вызовы и даже параллельные проверки товаров. Особенно ценно то, что мы видим полную картину выполнения запроса даже при использовании асинхронных операций и разных потоков.

Интеграция с системами мониторинга и алертинга



Настроенная трассировка корутин - это только полдела. Чтобы выжать максимум из собранных данных, нужно правильно интегрировать их с системами мониторинга и настроить информативные дашборды. Я потратил немало времени на эксперименты в этой области и готов поделиться своими находками.

Интеграция с Prometheus и Grafana



OpenTelemetry прекрасно работает с экосистемой Prometheus. Важно понимать, что трассировка и метрики - это разные, но дополняющие друг друга типы данных. Для полноценного мониторинга нужны оба.

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Настраиваем экспорт метрик в Prometheus
val meterProvider = SdkMeterProvider.builder()
    .registerMetricReader(
        PeriodicMetricReader.builder(
            PrometheusHttpServer.builder().setPort(9464).build()
        ).build()
    )
    .build()
 
// Добавляем метрики для корутин
val coroutinesScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
val activeCouroutinesGauge = meterProvider
    .get("app.metrics")
    .gaugeBuilder("active_coroutines")
    .setDescription("Number of active coroutines")
    .build()
 
// Интегрируем с трассировкой
coroutinesScope.launch {
    while (true) {
        activeCouroutinesGauge.record(ThreadPoolExecutor.activeCount().toDouble())
        delay(1000)
    }
}
В Grafana я обычно создаю отдельную панель для корутин, где отображаю:
  • Количество активных корутин,
  • Время выполнения корутин по разным диспетчерам,
  • Количество отмененных корутин,
  • Соотношение успешных и неуспешных корутин.

Визуализация корутинных трейсов в Jaeger



Jaeger - один из лучших инструментов для анализа распределенных трейсов. Но его стандартный UI не всегда удобен для работы с корутинами, особенно когда у вас сложная структура с множеством асинхронных операций.
Я создал несколько кастомных представлений специально для корутин:

Kotlin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Добавляем специальные тэги для лучшей визуализации в Jaeger
@WithSpan
suspend fun taggedOperation() {
    val span = Span.current()
    
    // Тэг для группировки по типу диспетчера
    span.setAttribute("coroutine.dispatcher", coroutineContext[CoroutineDispatcher]?.toString() ?: "Unknown")
    
    // Тэг для понимания структурированной конкурентности
    span.setAttribute("coroutine.scope_id", coroutineContext[Job]?.hashCode().toString())
    
    // Тэг для времени жизни корутины
    val startTime = System.currentTimeMillis()
    try {
        // Операции...
    } finally {
        span.setAttribute("coroutine.lifetime_ms", System.currentTimeMillis() - startTime)
    }
}
В Jaeger UI я настраиваю следующие представления:
1. Временная шкала - показывает последовательность выполнения корутин.
2. Граф зависимостей - визуализирует отношения между корутинами.
3. Статистическое представление - показывает аномалии в выполнении.

Алертинг на основе трейсов



Мало собирать данные, нужно еще и реагировать на проблемы. Я настраиваю алерты на основе аномалий в трейсах:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
# Пример правила алертинга в Prometheus для длительных корутин
groups:
name: CoroutineAlerts
  rules:
  - alert: LongRunningCoroutine
    expr: histogram_quantile(0.95, rate(span_duration_milliseconds_bucket{operation=~".*coroutine.*"}[5m])) > 1000
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "Обнаружены долго выполняющиеся корутины"
      description: "95-й перцентиль времени выполнения корутин превышает 1 секунду"
Для более сложных случаев я использую Elastic APM, который позволяет настраивать Machine Learning джобы для детектирования аномалий в паттернах выполнения корутин.

Дашборды для анализа производительности



Конечная цель всей этой инфраструктуры - понимание производительности вашего приложения. Я создаю специализированные дашборды для анализа узких мест:

1. Дашборд времени отклика - показывает, как корутины влияют на общее время ответа,
2. Дашборд параллелизма - визуализирует эффективность распараллеливания операций,
3. Дашборд ресурсов - отображает потребление CPU и памяти корутинами,

Особо ценной считаю визуализацию "тепловой карты" корутин - когда на одном графике видно и количество запусков, и время выполнения каждого типа корутин. Это мгновенно показывает, где нужно оптимизировать код.

Kotlin без Java - деньги на ветер?
Вэб программист, но очень хочется попробовать писать мобильные приложения для Андроид. Знаком с...

Настройка Intellig под Kotlin
Подскажите плз. Идею можно настроить под жаву чтобы методы отделялись друг от друга линией, как...

Книга Kotlin в действии непонятные моменты
Стр 76. Про функции верхнего уровня. Там есть такая фраза. Вместо этого можно помещать функции...

Android Studio Java/Kotlin Приложение для Баз Данных
Тут такое дело, я создал Базу Данных в Access, и хочу занусуть этот файл в какой нибудь уже готовый...

Kotlin lateinit переменная
синтаксис на Kotlin. есть переменная lateinit , но когда при getApiServisce() я проверяю на null...

RadioButton: setSupportButtonTintList и setChecked на Kotlin?
помогите найти аналоги java'вских методов setSupportButtonTintList и setChecked (для...

Kotlin class
Почему android studio не дает мне доступ к элементам Game_Activity из других классов ? Из-за чего я...

Kotlin "it" как перевести на java?
есть пример кода на котлине: override fun one(id: Long): Observable&lt;Banner&gt; { return...

Socket на андроид, Kotlin (Java)
Добрый день. Проблема в том, что не происходит отправка данных на сервер, ошибок нет, права на...

Синхронизация EditText фрагментов через переменную в MainActivity (Kotlin)
Есть MainActivity, Fragment1, Fragment2 main_activity.xml показывает Fragment1 и Fragment2...

Kotlin для Android
Здравствуйте, знаю Java SE на приличном уровне, но решил, что для разработки Android приложения...

Загрузить изображение с html.fromHtml в textview [Kotlin]
Собственно,нужно загрузить html текст в textView.Использую Html.fromHtml,html текст грузится как...

Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Всего комментариев 0
Комментарии
 
Новые блоги и статьи
Музыка, написанная Искусственным Интеллектом
volvo 04.12.2025
Всем привет. Некоторое время назад меня заинтересовало, что уже умеет ИИ в плане написания музыки для песен, и, собственно, исполнения этих самых песен. Стихов у нас много, уже вышли 4 книги, еще 3. . .
От async/await к виртуальным потокам в Python
IndentationError 23.11.2025
Армин Ронахер поставил под сомнение async/ await. Создатель Flask заявляет: цветные функции - провал, виртуальные потоки - решение. Не threading-динозавры, а новое поколение лёгких потоков. Откат?. . .
Поиск "дружественных имён" СОМ портов
Argus19 22.11.2025
Поиск "дружественных имён" СОМ портов На странице: https:/ / norseev. ru/ 2018/ 01/ 04/ comportlist_windows/ нашёл схожую тему. Там приведён код на С++, который показывает только имена СОМ портов, типа,. . .
Сколько Государство потратило денег на меня, обеспечивая инсулином.
Programma_Boinc 20.11.2025
Сколько Государство потратило денег на меня, обеспечивая инсулином. Вот решила сделать интересный приблизительный подсчет, сколько государство потратило на меня денег на покупку инсулинов. . . .
Ломающие изменения в C#.NStar Alpha
Etyuhibosecyu 20.11.2025
Уже можно не только тестировать, но и пользоваться C#. NStar - писать оконные приложения, содержащие надписи, кнопки, текстовые поля и даже изображения, например, моя игра "Три в ряд" написана на этом. . .
Мысли в слух
kumehtar 18.11.2025
Кстати, совсем недавно имел разговор на тему медитаций с людьми. И обнаружил, что они вообще не понимают что такое медитация и зачем она нужна. Самые базовые вещи. Для них это - когда просто люди. . .
Создание Single Page Application на фреймах
krapotkin 16.11.2025
Статья исключительно для начинающих. Подходы оригинальностью не блещут. В век Веб все очень привыкли к дизайну Single-Page-Application . Быстренько разберем подход "на фреймах". Мы делаем одну. . .
Фото: Daniel Greenwood
kumehtar 13.11.2025
Расскажи мне о Мире, бродяга
kumehtar 12.11.2025
— Расскажи мне о Мире, бродяга, Ты же видел моря и метели. Как сменялись короны и стяги, Как эпохи стрелою летели. - Этот мир — это крылья и горы, Снег и пламя, любовь и тревоги, И бескрайние. . .
PowerShell Snippets
iNNOKENTIY21 11.11.2025
Модуль PowerShell 5. 1+ : Snippets. psm1 У меня модуль расположен в пользовательской папке модулей, по умолчанию: \Documents\WindowsPowerShell\Modules\Snippets\ А в самом низу файла-профиля. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru