Reactor Kafka — это библиотека, объединяющая Apache Kafka с реактивным программированием на базе Project Reactor. Такое сочетание позволяет строить неблокирующие, асинхронные приложения с контролем обратного давления (backpressure). Для тех, кто не погружался в реактивное программирование, концепция может показаться сложной, но преимущества стоят усилий на изучение.
Представьте систему, обрабатывающую миллионы сообщений в секунду. Традиционный блокирующий подход создает проблемы: потоки ждут завершения операций ввода-вывода, ресурсы используются неэффективно, а система теряет отзывчивость при пиковых нагрузках. Реактивный подход решает эти проблемы, позволяя обрабатывать сообщения асинхронно, с автоматическим управлением потоком данных. Особую важность Reactor Kafka представляет для микросервисных архитектур, где отдельные сервисы должны эффективно взаимодействовать через очереди сообщений. Вместо создания сложной логики для обработки высоких нагрузок, разработчики могут сосредоточиться на бизнес-логике, доверив управление потоком данных реактивному фреймворку.
Что отличает Reactor Kafka от стандартного клиента? В первую очередь — интеграция с типами Flux и Mono из Project Reactor, которые представляют потоки данных как функциональные композиции, к которым можно применять операторы трансформации, фильтрации и комбинирования. Это создает API, позволяющий описывать сложные сценарии обработки данных в декларативном стиле. Кроме того, Reactor Kafka автоматически управляет обратным давлением — механизмом, который регулирует скорость получения данных в зависимости от скорости их обработки. Когда потребитель не успевает обрабатывать сообщения, система автоматически замедляет их получение, предотвращая переполнение памяти и сбои.
Основы Reactor Kafka
Прежде чем погрузиться в реализацию, важно понять ключевые компоненты и концепции Reactor Kafka. Эта библиотека строится на двух мощных фундаментах: Apache Kafka — для надежной передачи сообщений и Project Reactor — для реактивного программирования.
Ключевые компоненты
В центре Reactor Kafka находятся два основных класса:
1. ReactiveKafkaConsumerTemplate — шаблон для создания реактивных потребителей, предоставляющий методы для подписки на темы Kafka и получения сообщений как реактивных потоков.
2. ReactiveKafkaProducerTemplate — шаблон для отправки сообщений в Kafka в реактивном стиле, позволяющий работать с результатами публикации как с типами Mono.
Эти классы представляют высокоуровневые абстракции над Kafka API и предоставляют простой, но гибкий интерфейс для реактивного взаимодействия с брокером. Для настройки этих компонентов используются соответственно SenderOptions и ReceiverOptions — классы, инкапсулирующие конфигурационные параметры, такие как адреса брокеров, сериализаторы/десериализаторы, группы потребителей и другие настройки Kafka.
Сравнение с традиционными потребителями
Традиционный подход к созданию потребителей Kafka часто использует блокирующий прием сообщений или систему обратных вызовов. Хотя это работает в простых случаях, при высокой нагрузке возникают проблемы:
Java | 1
2
3
4
5
6
7
8
9
| // Традиционный подход (блокирующий)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Обработка сообщения (блокирует поток)
processRecord(record);
}
consumer.commitAsync();
} |
|
Этот код блокирует поток во время обработки каждого сообщения и не предоставляет механизмов для управления потоком данных при перегрузке.
В отличие от него, реактивный подход с Reactor Kafka выглядит так:
Java | 1
2
3
4
5
6
7
8
9
| // Реактивный подход
Flux<ReceiverRecord<String, String>> kafkaFlux = reactiveKafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(record -> {
// Неблокирующая обработка
processRecordReactively(record);
});
kafkaFlux.subscribe(); |
|
Преимущества реактивного подхода:- Неблокирующая обработка — потоки не простаивают, ожидая завершения операций ввода-вывода.
- Декларативный стиль — обработка выражается как цепочка операций над потоком данных.
- Контроль обратного давления — автоматическое регулирование скорости получения сообщений.
- Функциональная композиция — возможность комбинировать различные операторы для сложной обработки.
ReactiveKafkaConsumerTemplate и его возможности
ReactiveKafkaConsumerTemplate — это основной инструмент для создания реактивных потребителей Kafka. Он предлагает несколько методов для получения сообщений:
receive() — базовый метод, возвращающий Flux<ReceiverRecord>, где каждый элемент представляет сообщение из Kafka. Требует ручного подтверждения записей.
receiveAutoAck() — возвращает Flux<ConsumerRecord>, где записи автоматически подтверждаются после успешной обработки.
receiveAtmostOnce() — гарантирует обработку каждой записи максимум один раз, с подтверждением до обработки.
receiveExactlyOnce() обеспечивает точную однократную обработку с транзакционной семантикой.
Эти методы предоставляют различные гарантии доставки, позволяя выбрать подходящий компромисс между производительностью и надежностью.
Пример использования метода receiveAutoAck() :
Java | 1
2
3
4
5
6
7
8
| kafkaConsumerTemplate
.receiveAutoAck()
.map(record -> {
// Преобразование сообщения
return processMessage(record.value());
})
.doOnNext(result -> log.info("Обработано сообщение: {}", result))
.subscribe(); |
|
Этот код подписывается на поток сообщений из Kafka, преобразует каждое сообщение с помощью метода processMessage() , логирует результат и автоматически подтверждает сообщения.
Возможности ReactiveKafkaConsumerTemplate не ограничиваются простым получением сообщений. С помощью различных операторов реактивных потоков можно реализовать сложные сценарии:- Фильтрация нежелательных сообщений.
- Трансформация и агрегация данных.
- Ограничение скорости обработки.
- Параллельная обработка независимых сообщений.
- Повторные попытки при сбоях.
- Применение таймаутов.
Например, для ограничения скорости обработки можно использовать оператор limitRate :
Java | 1
2
3
4
5
| kafkaConsumerTemplate
.receiveAutoAck()
.limitRate(100) // Ограничение до 100 сообщений
.flatMap(record -> processAsync(record), 10) // Параллельная обработка с ограничением в 10 параллельных операций
.subscribe(); |
|
Модель подписки и потребления в Reactor Kafka
Одной из ключевых концепций Reactor Kafka является модель подписки, которая определяет, как потребитель взаимодействует с темами и разделами (партициями). В отличие от императивного подхода стандартного Kafka API, где разработчик должен явно управлять циклами опроса, Reactor Kafka абстрагирует этот процесс через реактивные потоки. Модель подписки в Reactor Kafka основана на концепции холодных издателей (cold publishers). Это означает, что сообщения начинают потребляться только когда подписчик вызывает метод subscribe() на созданном Flux. До этого момента никаких взаимодействий с Kafka не происходит:
Java | 1
2
3
4
5
6
7
| // Создание потока не запускает потребление
Flux<ReceiverRecord<String, String>> messageFlux = receiverTemplate.receive();
// Подписка запускает процесс потребления
messageFlux.subscribe(record -> {
System.out.println("Получено сообщение: " + record.value());
}); |
|
Этот механизм отложенного выполнения позволяет гибко настраивать цепочки обработки до начала фактического потребления данных.
Reactor Kafka поддерживает два принципиально разных подхода к обработке смещений (offsets):
1. Ручное управление смещениями - когда разработчик явно контролирует, когда и какие сообщения подтверждаются:
Java | 1
2
3
4
5
6
7
8
| receiverTemplate.receive()
.doOnNext(record -> {
// Обработка сообщения
processMessage(record);
// Явное подтверждение обработки
record.receiverOffset().acknowledge();
})
.subscribe(); |
|
2. Автоматическое управление смещениями - когда подтверждение происходит автоматически после успешной обработки цепочки операторов:
Java | 1
2
3
| receiverTemplate.receiveAutoAck()
.map(record -> processMessage(record))
.subscribe(); |
|
Выбор между этими подходами зависит от требований к надежности. Ручное управление дает больше контроля, но требует внимательного кодирования для избежания потери или дублирования сообщений. Что касается управления разделами (партициями), Reactor Kafka автоматически распределяет нагрузку между экземплярами потребителей в группе, следуя стандартной Kafka-модели балансировки. Когда новый потребитель присоединяется к группе или существующий выходит из нее, Kafka перебалансирует разделы между оставшимися участниками.
Интересная особенность взаимодействия Reactor и Kafka проявляется при ограничении скорости потребления. В традиционном Kafka API разработчик должен самостоятельно реализовывать такие механизмы, тогда как в Reactor Kafka можно использовать встроенные операторы для управления темпом потребления:
Java | 1
2
3
4
| receiverTemplate.receiveAutoAck()
.limitRate(50) // Ограничение до 50 элементов в буфере
.delayElements(Duration.ofMillis(100)) // Искусственная задержка
.subscribe(); |
|
Такой код естественным образом создаст обратное давление, которое будет распространяться до самого источника данных, регулируя скорость чтения из Kafka.
Важно понимать, что Reactor Kafka обеспечивает реактивный интерфейс поверх традиционного клиента Kafka. Это значит, что под капотом всё ещё работают те же механизмы консенсуса, репликации и гарантий доставки, которые предоставляет Kafka. Reactor лишь меняет модель программирования, делая её более декларативной и ориентированной на потоки данных.
Spring Kafka. Ошибка Connection refused при подключении к брокеру Kafka Пишу Kafka Broker и Consumer, чтобы ловить сообщения от приложения. При попытке достать сообщения из Consumer вылетает ошибка
... Написание Kafka Server Mock Приложение передает некоторые сообщения по TCP в Kafka Server. Нужно реализовать заглушку Kafka Server, которая будет ловить эти сообщения и... Kafka consumer returns null Есть Кафка. Создан топик. Consumer и producer, которые идут в комплекте, работают как положено.
Пишу свои consumer и producer. Код взят из доков,... Java & Apache Kafka Всем доброго времени суток!
С кафкой раньше не сталкивался.
Задача такая: генератор генерит сообщение, в котором сериализуется объект с полями...
Реализация реактивного потребителя Kafka
В этом разделе мы рассмотрим, как настроить и реализовать реактивного потребителя сообщений Kafka на Java.
Настройка проекта
Первым делом нам нужно создать проект Spring Boot и добавить необходимые зависимости. Обычно для работы с Reactor Kafka требуется несколько ключевых библиотек:
XML | 1
2
3
4
5
6
7
8
9
10
11
12
| <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency> |
|
Здесь spring-kafka предоставляет базовую интеграцию со Spring, reactor-kafka обеспечивает реактивную обертку над клиентом Kafka, а spring-boot-starter-webflux добавляет поддержку реактивного веб-стека Spring (необязательно, но полезно, если вы строите реактивное приложение целиком).
Базовая конфигурация потребителя
Конфигурация Kafka-потребителя в реактивном стиле начинается с определения свойств в файле application.yml :
YAML | 1
2
3
4
5
6
7
8
| spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: reactor-consumer-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer |
|
Эти настройки указывают адрес брокера Kafka (bootstrap-servers ), идентификатор группы потребителей (group-id ), стратегию позиционирования при отсутствии сохраненных смещений (auto-offset-reset ) и классы для десериализации ключей и значений сообщений.
Теперь создадим конфигурационный класс, который определит бины для работы с Reactor Kafka:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| @Configuration
public class KafkaConsumerConfig {
@Bean
public ReceiverOptions<String, String> receiverOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return ReceiverOptions.<String, String>create(props)
.subscription(Collections.singleton("reactor-topic"));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(
ReceiverOptions<String, String> receiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
} |
|
В этом классе мы:
1. Создаем ReceiverOptions с необходимыми настройками Kafka.
2. Указываем тему для подписки (reactor-topic ).
3. Создаем ReactiveKafkaConsumerTemplate , который будет использовать эти настройки.
Десериализаторы и их настройка
Важная часть работы с Kafka — правильная десериализация сообщений. В простейшем случае мы используем стандартные десериализаторы строк, но в реальных приложениях часто требуется десериализовать JSON или другие форматы.
Для работы с JSON можно использовать JsonDeserializer из Spring Kafka:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| @Bean
public ReceiverOptions<String, MyEvent> jsonReceiverOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json-reactor-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, MyEvent.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return ReceiverOptions.<String, MyEvent>create(props)
.subscription(Collections.singleton("events-topic"));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> jsonKafkaConsumerTemplate(
ReceiverOptions<String, MyEvent> jsonReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(jsonReceiverOptions);
} |
|
Здесь мы настраиваем JsonDeserializer для десериализации сообщений в объекты типа MyEvent . Параметр TRUSTED_PACKAGES указывает, из каких пакетов разрешено десериализовать классы (значение "*" разрешает любые пакеты, но в производственной среде лучше указать конкретные пакеты).
Пример кода с пояснениями
Теперь реализуем сервис, который будет потреблять сообщения с использованием Reactor Kafka:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| @Service
public class ReactiveKafkaConsumerService {
private static final Logger log = LoggerFactory.getLogger(ReactiveKafkaConsumerService.class);
private final ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate;
public ReactiveKafkaConsumerService(ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate) {
this.kafkaConsumerTemplate = kafkaConsumerTemplate;
}
@PostConstruct
public void consumeMessages() {
kafkaConsumerTemplate
.receiveAutoAck()
.map(record -> {
log.info("Получено сообщение: topic={}, partition={}, offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
return record.value();
})
.doOnNext(value -> processMessage(value))
.doOnError(error -> log.error("Ошибка при получении сообщения", error))
.subscribe();
}
private void processMessage(String message) {
// Бизнес-логика обработки сообщения
log.info("Обработка сообщения: {}", message);
}
} |
|
В этом сервисе:
1. Мы внедряем ReactiveKafkaConsumerTemplate через конструктор
2. В методе consumeMessages (помеченном @PostConstruct ) мы:
- Вызываем receiveAutoAck() для автоматического подтверждения смещений
- Преобразуем записи в значения с помощью оператора map
- Логируем детали каждой записи
- Обрабатываем значение в методе processMessage
- Добавляем обработку ошибок с помощью doOnError
- Запускаем поток с помощью subscribe()
Обратите внимание, что этот код запускает потребление сообщений при инициализации сервиса и продолжает работать в фоновом режиме.
Реализация с ручным подтверждением
Если вам нужен больший контроль над подтверждением смещений, можно использовать метод receive() вместо receiveAutoAck() :
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| @PostConstruct
public void consumeMessagesWithManualAck() {
kafkaConsumerTemplate
.receive()
.doOnNext(record -> {
log.info("Получено сообщение: {}", record.value());
try {
// Обработка сообщения
processMessage(record.value());
// Подтверждение успешной обработки
record.receiverOffset().acknowledge();
} catch (Exception e) {
// В случае ошибки не подтверждаем сообщение
log.error("Ошибка обработки", e);
}
})
.subscribe();
} |
|
В этом варианте мы явно вызываем acknowledge() только после успешной обработки сообщения. Если обработка завершается исключением, сообщение не подтверждается и будет получено повторно при следующем цикле опроса.
Расширенный пример: обработка в параллельных потоках
Reactor позволяет легко распараллелить обработку сообщений, что может значительно повысить производительность:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| @PostConstruct
public void consumeMessagesInParallel() {
kafkaConsumerTemplate
.receiveAutoAck()
.flatMap(record -> {
// Асинхронная обработка каждого сообщения
return Mono
.fromCallable(() -> processRecordAsync(record))
.subscribeOn(Schedulers.boundedElastic());
}, 4) // Максимальное количество параллельных операций
.doOnError(e -> log.error("Ошибка параллельной обработки", e))
.subscribe();
}
private String processRecordAsync(ConsumerRecord<String, String> record) {
log.info("Асинхронная обработка: {}", record.value());
// Имитация длительной обработки
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return record.value();
} |
|
Здесь используем flatMap вместе с Schedulers.boundedElastic() для распределения обработки по нескольким потокам из пула с ограниченным размером. Параметр 4 в flatMap ограничивает максимальное количество параллельных операций, что помогает контролировать нагрузку.
Правильное использование параллельной обработки может существенно увеличить пропускную способность вашего приложения, особенно если обработка включает операции ввода-вывода или другие блокирующие вызовы. Учитывая гибкость Reactor и его богатый набор операторов, можно создавать сложные цепочки обработки, адаптированные к конкретным требованиям вашего приложения. Это одно из ключевых преимуществ использования Reactor Kafka по сравнению с традиционным клиентом Kafka.
Продвинутые техники
Мир Reactor Kafka не ограничивается базовыми сценариями потребления сообщений. Существует ряд продвинутых техник, которые помогают создавать надежные, устойчивые к сбоям и высокопроизводительные приложения.
Обработка ошибок
Ошибки в реактивных потоках требуют особого внимания, поскольку необработанное исключение может привести к завершению всего потока. Reactor Kafka предлагает несколько стратегий обработки ошибок:
Java | 1
2
3
4
5
| kafkaConsumerTemplate
.receiveAutoAck()
.doOnError(e -> log.error("Ошибка при получении сообщения", e))
.retry(3) // Простая стратегия: повторить до 3 раз
.subscribe(); |
|
Для более сложных сценариев можно использовать retryWhen с экспоненциальной задержкой:
Java | 1
2
3
4
5
6
| kafkaConsumerTemplate
.receiveAutoAck()
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2))
.maxBackoff(Duration.ofMinutes(1))
.jitter(0.3))
.subscribe(); |
|
Этот код применяет экспоненциальную стратегию повторов с начальной задержкой в 2 секунды, максимальной задержкой в 1 минуту и случайным отклонением (jitter) в 30%, что помогает избежать "эффекта стадности" при повторных попытках. Для более точечной обработки ошибок можно использовать оператор onErrorResume :
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| kafkaConsumerTemplate
.receiveAutoAck()
.flatMap(record -> {
try {
return Mono.just(processRecord(record));
} catch (Exception e) {
// Обработка конкретных типов ошибок
if (e instanceof DataAccessException) {
return saveToDeadLetterQueue(record)
.then(Mono.empty());
}
return Mono.error(e); // Ошибки других типов пробрасываем дальше
}
})
.subscribe(); |
|
Управление смещениями (offsets)
Корректное управление смещениями критически важно для надежной обработки сообщений. Reactor Kafka предлагает несколько моделей подтверждения, каждая с разными гарантиями:
1. Точно один раз (Exactly once) — самые строгие гарантии, но с наибольшими накладными расходами:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| kafkaConsumerTemplate
.receiveExactlyOnce()
.concatMap(consumerRecord -> {
// Обработка записи
String result = processRecord(consumerRecord.value());
// Возвращаем SenderResult для подтверждения
return reactiveKafkaProducerTemplate
.send("result-topic", result)
.map(senderResult -> consumerRecord);
})
.concatMap(consumerRecord ->
// Явное коммит транзакции
consumerRecord.commitTransaction()
.then(Mono.just(consumerRecord.offset())))
.subscribe(); |
|
2. Ручное управление с группировкой подтверждений — позволяет подтверждать группы сообщений для повышения производительности:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| kafkaConsumerTemplate
.receive()
.bufferTimeout(100, Duration.ofSeconds(1))
.flatMap(records -> {
return Flux.fromIterable(records)
.map(record -> processRecord(record))
.collectList()
.doOnSuccess(results -> {
// Подтверждаем последнее смещение в группе
records.get(records.size() - 1)
.receiverOffset()
.acknowledge();
});
})
.subscribe(); |
|
Параллельная обработка сообщений
Высокая производительность часто требует параллельной обработки. Reactor предоставляет для этого мощные инструменты:
Java | 1
2
3
4
5
6
7
8
9
10
| kafkaConsumerTemplate
.receiveAutoAck()
.groupBy(record -> record.partition()) // Группировка по партиции
.flatMap(partitionFlux -> {
// Обрабатываем каждую партицию в отдельном потоке
return partitionFlux
.publishOn(Schedulers.parallel())
.flatMap(record -> processRecordAsync(record), 10); // 10 параллельных обработчиков для каждой партиции
})
.subscribe(); |
|
Этот код группирует сообщения по партициям и обрабатывает каждую группу в отдельном потоке, сохраняя порядок сообщений внутри партиции (что важно для многих приложений).
Для наиболее ресурсоемких операций можно использовать комбинацию publishOn и flatMap :
Java | 1
2
3
4
5
6
7
8
9
10
| kafkaConsumerTemplate
.receiveAutoAck()
.publishOn(Schedulers.boundedElastic())
.flatMap(record -> {
return Mono.fromCallable(() -> {
// Ресурсоемкая операция
return expensiveOperation(record);
}).subscribeOn(Schedulers.boundedElastic());
}, 20) // Максимум 20 параллельных операций
.subscribe(); |
|
Такой подход позволяет контролировать степень параллелизма и предотвращать перегрузку системы при всплесках трафика.
При работе с параллельной обработкой и управлением смещениями следует быть осторожным: необходимо гарантировать, что смещение подтверждается только после успешной обработки всех предыдущих сообщений в той же партиции, иначе возможна потеря данных.
Оптимизация и рекомендации
При работе с Reactor Kafka часто возникают типичные проблемы, которые могут существенно снизить производительность или стабильность приложения.
Избыточное потребление памяти - одна из распространенных проблем при неправильной настройке буферизации. Если поток сообщений не ограничивать, приложение может быстро исчерпать доступную память:
Java | 1
2
3
4
5
6
7
8
| // Проблема
kafkaConsumerTemplate.receiveAutoAck().subscribe();
// Решение
kafkaConsumerTemplate
.receiveAutoAck()
.limitRate(1000) // Ограничиваем количество сообщений в буфере
.subscribe(); |
|
Нерациональная группировка сообщений может привести к задержкам в обработке. Слишком большие группы увеличивают задержку, а слишком маленькие снижают пропускную способность:
Java | 1
2
3
4
5
6
| // Оптимальная группировка
kafkaConsumerTemplate
.receive()
.bufferTimeout(100, Duration.ofMillis(200))
.flatMap(group -> processGroup(group))
.subscribe(); |
|
Этот подход сбалансирован: группы формируются либо по достижении 100 сообщений, либо по истечении 200 мс — в зависимости от того, что наступит раньше.
"Утечка" подписок происходит, когда подписки на потоки Kafka не отменяются явно при остановке приложения. Решение:
Java | 1
2
3
4
5
6
| @PreDestroy
public void cleanup() {
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
} |
|
Не могу запустить kafka на Win10 Прошу поддержки переюзал все варианты
вот конкретно эксепшен
все права на запись диска есть все есть Проблемы с java kafka и zookeeper на windows 10 Здраствуйте.
Я сейчас пытаюсь настроить zookeeper и kafka по https://habr.com/ru/post/496182/
вот что я сделал.
в файл zoo в... Spring Boot + Kafka, запись данных после обработки Добрый вечер, много времени уже мучаюсь над одной проблемой, я извиняюсь, может мало ли вдруг такая тема есть, но значит я плохо искал, в общем я... Spring Kafka: Запись в базу данных и чтение из неё Гайз, нужен хэлп.
Киньте инфу или подскажите как записывать данные из Kafka в базу данных, а потом читать из базы и писать в топики Kafka.
Нужно... [ERROR] The projects in the reactor contain a cyclic reference: Edge between 'Vertex{label=' The projects in the reactor contain a cyclic reference: Edge between 'Vertex{label='
Использую Maven
Эта ошибка появляется когда модули в... API GW, Kafka Здравствуйте!
Подскажите по арзитектуре микросервисов.
Простой пример: сервис авторизации и проверки токена и сервис ToDo листа (вообразим,... Apache Kafka Подскажите как можно посмотреть топик кафки с другой виртуальной машины.
... Kafka - Keycloak Проверить sample key cloak
{
"realm": "kafka-authz",
"accessTokenLifespan": 120,
"ssoSessionIdleTimeout": 864000,
... Kafka - брокер сообщений Доброго времени суток! Подскажите кто-то работал с Kafka? Можете пожалуйста подкинуть литературу и желательно примеры?) Публиковать данные в Kafka Доброго времени суток.
Есть потребность пушить данные в кафку из PL/SQL.
При этом хочется вот просто закидывать их на HaProxy а дальше чтобы оно... Consumer apache kafka Доброго времени суток уважаемые форумчане.
С apache kafka работаю совсем недавно и столкнулся с неприятной проблемой. Работу с kafka осуществляю... Огромная задержка логов в Kafka Добрый день! На сервере компании для мониторинга логов используется связка ELK + Kafka. Разница между считанными и входящими логами быстро растёт, и...
|