Что же такое Consumer Seek в Kafka? По сути, это API-метод, который позволяет программно указать, с какой позиции (offset) Consumer должен начать или продолжить чтение данных из партиции. Без этого механизма потребители Kafka обычно просто последовательно обрабатывают сообщения, начиная с последнего сохраненного смещения группы потребителей. Задачи, где seek-операции незаменимы:
1. Повторная обработка сообщений. Иногда после обнаружения ошибки в логике обработки требуется "отмотать" чтение назад и повторно обработать часть сообщений.
2. Восстановление после сбоев. При крахе системы или после обслуживания нужно продолжить работу с конкретной точки.
3. Выборочная обработка данных. Когда нужно обработать только определенные сегменты истории сообщений, пропуская ненужные.
4. Отладка и тестирование. Разработчикам часто требуется воспроизвести конкретные ситуации, манипулируя позицией чтения.
5. Создание снимков состояния. Для построения материализованных представлений необходимо контролировать, какие данные когда обрабатываются.
В жизненном цикле Consumer существуют ключевые моменты, где позиционирование играет важную роль. При запуске Consumer первым делом нужно определить, откуда начинать чтение. Это может быть:- Самое раннее доступное смещение (earliest).
- Самое последнее смещение (latest).
- Последнее сохраненное смещение для группы потребителей.
- Произвольное смещение, указанное через seek.
После начала работы Consumer обычно автоматически управляет своей позицией, сохраняя прогресс потребления через коммиты. Однако в некоторых сценариях требуется ручное управление. Например, когда бизнес-логика требует условного коммита смещений или при реализации транзакционной обработки. Сложность Kafka заключается в том, что смещения существуют на уровне партиций, а не топиков целиком. Это означает, что если топик разделён на несколько партиций, seek-операция должна применяться к каждой партиции отдельно. Это усложняет код, но дает большую гибкость.
Преимущества использования seek вместо настройки auto.offset.reset:- Динамическое поведение во время выполнения.
- Точный контроль над каждой партицией.
- Возможность менять стратегию во время работы приложения.
- Гранулярное управление смещениями на основе бизнес-логики.
Операции позиционирования также тесно связаны с моделью обработки данных: at-most-once (максимум один раз), at-least-once (минимум один раз) или exactly-once (ровно один раз). Например, если вы стремитесь к exactly-once семантике, вам может потребоваться тщательно координировать операции seek с транзакциями, чтобы гарантировать, что каждое сообщение обрабатывается ровно один раз, даже при сбоях. Важно понимать, что Kafka хранит сообщения в течение ограниченного времени (настраивается через retention period). После истечения срока хранения старые сообщения удаляются, и seek к этим смещениям станет невозможным. Это накладывает практические ограничения на то, насколько далеко назад можно "перемотать" Consumer.
Механика позиционирования в Kafka
Consumer API в Kafka спроектирован таким образом, чтобы обеспечить тонкий контроль над процессом чтения, сохраняя при этом простоту использования для базовых сценариев.
Начнем с понимания того, что такое Apache Kafka на низком уровне. Каждый топик в Kafka разделен на партиции — упорядоченные, неизменяемые последовательности сообщений. Каждое сообщение в партиции имеет уникальный идентификатор, называемый смещением (offset). Смещения — это просто последовательные целые числа, которые Kafka присваивает сообщениям при их добавлении в партицию. Самое важное, что нужно запомнить — смещения уникальны только в пределах одной партиции. Для работы с позиционированием потребителя Kafka предоставляет несколько ключевых методов:
Java | 1
2
3
4
5
6
7
8
9
10
11
| // Перемещение к конкретному смещению
void seek(TopicPartition partition, long offset);
// Перемещение к начальному смещению партиции
void seekToBeginning(Collection<TopicPartition> partitions);
// Перемещение к конечному смещению партиции
void seekToEnd(Collection<TopicPartition> partitions);
// Перемещение к смещению по временной метке
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch); |
|
Метод seek() является наиболее гибким и позволяет переместить потребителя к любому конкретному смещению в указанной партиции. Важный нюанс: этот метод работает с одной партицией за раз, что требует дополнительной работы, если вам нужно управлять позицией чтения в нескольких партициях одновременно.
Java | 1
2
3
| // Пример перемещения к конкретному смещению
TopicPartition partition = new TopicPartition("my-topic", 0);
consumer.seek(partition, 1000L); |
|
Методы seekToBeginning() и seekToEnd() упрощают распространенные сценарии перемотки к началу или концу партиции соответственно. Они принимают коллекцию объектов TopicPartition, что позволяет применить операцию к нескольким партициям одновременно.
Java | 1
2
3
4
5
6
7
8
| // Перемещение к началу всех назначенных партиций
consumer.seekToBeginning(consumer.assignment());
// Перемещение к концу конкретных партиций
Set<TopicPartition> partitionsToSeek = new HashSet<>();
partitionsToSeek.add(new TopicPartition("my-topic", 0));
partitionsToSeek.add(new TopicPartition("my-topic", 1));
consumer.seekToEnd(partitionsToSeek); |
|
Интересная особенность — метод offsetsForTimes() , который позволяет найти смещение по временной метке. Это невероятно полезно для систем, где важна хронология событий. Например, вы можете захотеть начать чтение с сообщений, поступивших после определенного момента времени:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| // Найти смещения для всех партиций заданного топика, соответствующие времени час назад
long timestamp = System.currentTimeMillis() - 3600000; // 1 час назад
List<PartitionInfo> partitionInfos = consumer.partitionsFor("my-topic");
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (PartitionInfo info : partitionInfos) {
timestampsToSearch.put(new TopicPartition("my-topic", info.partition()), timestamp);
}
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampsToSearch);
// Использование полученных смещений для позиционирования
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
if (entry.getValue() != null) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
} |
|
Такой подход особенно ценен при разработке систем, которые нужно восстанавливать после сбоя до определенного состояния по времени. Важно понимать, что операции seek не изменяют непосредственно сохраненные смещения потребительской группы (committed offsets). Они влияют только на текущую сессию потребителя. Чтобы изменения позиции стали постоянными между перезапусками, необходимо явно зафиксировать (commit) новые позиции чтения.
Java | 1
2
3
4
| // После seek необходимо зафиксировать новую позицию
consumer.seek(partition, newOffset);
// ... обработка сообщений ...
consumer.commitSync(); // Фиксирует текущие позиции для всех назначенных партиций |
|
Альтернативно можно использовать метод commitSync с параметром, чтобы зафиксировать конкретные смещения:
Java | 1
2
3
| Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
offsetsToCommit.put(new TopicPartition("my-topic", 0), new OffsetAndMetadata(1000L));
consumer.commitSync(offsetsToCommit); |
|
Еще один важный аспект — это поведение метода poll() после выполнения seek-операции. Когда вы вызываете seek() , эффект применяется к следующему вызову poll() . Если у вас есть необработанные сообщения из предыдущего вызова poll() , они не будут затронуты операцией seek. Это может создать путаницу, если вы не учитываете это поведение в своем коде.
Java | 1
2
3
4
5
6
7
| // Типичный паттерн использования seek
consumer.seek(partition, targetOffset);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Обработка записей, вернувшихся из poll() ПОСЛЕ операции seek
// ...
} |
|
Существуют также некоторые ограничения и особенности при использовании seek-операций:
1. Вы не можете выполнить seek к смещению, которое находится за пределами доступных данных. Например, seek к смещению -1 или к смещению, превышающему текущий конец партиции, вызовет исключение.
2. Для использования seek вы должны сначала назначить (assign) потребителя конкретным партициям. Если вы используете высокоуровневые методы подписки (subscribe), вам также нужно дождаться, пока партиции будут назначены.
3. Если вы выполняете seek к смещению, для которого сообщение уже удалено из лога (из-за политики хранения), Kafka автоматически скорректирует позицию к первому доступному смещению.
Глубокое понимание этих механизмов позволяет реализовать весьма изощренные сценарии управления потоками данных. Например, вы можете создать потребителя, который обрабатывает только определенные временные окна данных, пропуская остальные, или реализовать сложную логику перебалансировки нагрузки, основанную на состоянии обработки. Ключевым моментом является то, что seek-операции предоставляют низкоуровневый контроль, но требуют от разработчика большей ответственности за управление состоянием. В следующих разделах мы рассмотрим, как эти низкоуровневые операции могут быть использованы для решения практических задач и построения надежных систем обработки данных.
Кроме основных методов позиционирования, стоит упомянуть о некоторых вспомогательных методах API, которые могут быть полезны при работе с позициями:
position(TopicPartition partition) - возвращает текущее положение потребителя в указанной партиции.
committed(TopicPartition partition) - возвращает последнее зафиксированное смещение для указанной партиции.
Эти методы часто используются в связке с seek-операциями для реализации логики, основанной на текущем состоянии потребления.
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // Пример расширенного использования position и committed
Map<TopicPartition, Long> currentPositions = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
for (TopicPartition partition : consumer.assignment()) {
currentPositions.put(partition, consumer.position(partition));
OffsetAndMetadata committed = consumer.committed(partition);
if (committed != null) {
committedOffsets.put(partition, committed);
}
}
// Анализ отставания
for (TopicPartition partition : currentPositions.keySet()) {
long lag = currentPositions.get(partition);
OffsetAndMetadata committed = committedOffsets.get(partition);
if (committed != null) {
lag -= committed.offset();
System.out.printf("Партиция %s: необработанных сообщений: %d%n",
partition, lag);
}
} |
|
Важным аспектом работы с позиционированием является понимание разницы между автоматическим и ручным управлением смещениями. По умолчанию Kafka использует автоматический коммит смещений, который происходит с заданным интервалом (по умолчанию 5 секунд). Это удобно для простых приложений, но может быть недостаточно точным для сложных сценариев.
При ручном управлении вы полностью контролируете, когда и какие смещения фиксируются:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| // Отключение автоматического коммита
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Ручной коммит смещений
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Обработка записи
// ...
// Фиксирование обработанной записи
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(offsets);
}
} |
|
При использовании seek в сочетании с ручным коммитом вы получаете максимальный контроль над процессом потребления. Например, вы можете реализовать транзакционную обработку, которая коммитит смещения только после успешного завершения транзакции:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| try {
// Начало транзакции
db.beginTransaction();
// Обработка сообщений
for (ConsumerRecord<String, String> record : records) {
db.execute("INSERT INTO processed_messages VALUES (?)", record.value());
}
// Фиксирование транзакции
db.commitTransaction();
// Фиксирование смещений только после успешной обработки
consumer.commitSync();
} catch (Exception e) {
// Откат транзакции при ошибке
db.rollbackTransaction();
// При необходимости можно также откатить позицию чтения
for (TopicPartition partition : consumer.assignment()) {
OffsetAndMetadata committed = consumer.committed(partition);
if (committed != null) {
consumer.seek(partition, committed.offset());
}
}
} |
|
Такой подход гарантирует, что сообщения будут обработаны как минимум один раз (at-least-once semantics). Если вы стремитесь к семантике "ровно один раз" (exactly-once), вам потребуется дополнительная логика для отслеживания дубликатов или использование транзакционного API Kafka.
Для сложных приложений часто требуется хранить дополнительный контекст вместе со смещениями. Класс OffsetAndMetadata позволяет прикреплять метаданные к коммитам:
Java | 1
2
3
4
5
6
| // Хранение дополнительной информации вместе со смещением
String processingInfo = "{\"processor\":\"node-1\",\"timestamp\":\"" +
System.currentTimeMillis() + "\"}";
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, processingInfo);
offsets.put(partition, offsetAndMetadata);
consumer.commitSync(offsets); |
|
Это может быть полезно для отслеживания, какой именно процесс обработал определенный сегмент данных.
Наконец, стоит упомянуть о взаимодействии операций позиционирования с механизмом ребалансировки группы потребителей. Когда происходит ребалансировка (например, при добавлении или удалении потребителя из группы), все настройки позиционирования, выполненные через seek, теряются. Потребители получают новые назначения партиций и начинают чтение с последних коммитированных смещений. Если вам нужно сохранить информацию о позиционировании через ребалансировки, вам следует использовать слушатели ConsumerRebalanceListener:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| consumer.subscribe(Collections.singletonList("my-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Сохранение позиций перед ребалансировкой
for (TopicPartition partition : partitions) {
savePosition(partition, consumer.position(partition));
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Восстановление позиций после ребалансировки
for (TopicPartition partition : partitions) {
Long savedPosition = getSavedPosition(partition);
if (savedPosition != null) {
consumer.seek(partition, savedPosition);
}
}
}
}); |
|
Kafka consumer returns null Есть Кафка. Создан топик. Consumer и producer, которые идут в комплекте, работают как положено.
Пишу свои consumer и producer. Код взят из доков,... Spring Kafka. Ошибка Connection refused при подключении к брокеру Kafka Пишу Kafka Broker и Consumer, чтобы ловить сообщения от приложения. При попытке достать сообщения из Consumer вылетает ошибка
... Java & Apache Kafka Всем доброго времени суток!
С кафкой раньше не сталкивался.
Задача такая: генератор генерит сообщение, в котором сериализуется объект с полями... Не могу запустить kafka на Win10 Прошу поддержки переюзал все варианты
вот конкретно эксепшен
все права на запись диска есть все есть
Практическое применение
Начнем с примера обработки ошибок и повторного чтения сообщений. Представим, что ваш сервис обрабатывает платежные транзакции из Kafka, и в какой-то момент возникает временный сбой в базе данных:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processPayment(record);
// Сохраняем смещение текущего сообщения
currentOffset = record.offset();
} catch (DatabaseException e) {
// Сохраняем смещение проблемного сообщения и прерываем обработку
failedOffset = record.offset();
break;
}
}
// Коммитим последнее успешно обработанное сообщение
if (currentOffset >= 0) {
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(currentOffset + 1)
));
}
} catch (Exception e) {
// Если произошла ошибка, возвращаемся к проблемному смещению
if (failedOffset >= 0) {
TopicPartition partition = new TopicPartition("payments", 0);
consumer.seek(partition, failedOffset);
// Добавляем экспоненциальную задержку перед повторной попыткой
Thread.sleep(retryBackoff);
retryBackoff *= 2; // Увеличиваем задержку для следующей попытки
}
} |
|
Этот паттерн особенно полезен для обработки транзакционных операций, где необходимо гарантировать обработку сообщений даже при временных сбоях внешних систем.
Другой распространенный сценарий — миграция данных между топиками. Допустим, вы хотите перенести сообщения из одного топика в другой с возможностью трансформации:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
| // Создаем отдельных потребителя и производителя
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// Назначаем потребителя на исходный топик
TopicPartition sourcePart = new TopicPartition("source-topic", 0);
consumer.assign(Collections.singletonList(sourcePart));
// По умолчанию начинаем с начала топика
consumer.seekToBeginning(Collections.singletonList(sourcePart));
// Или с определенной точки
// consumer.seek(sourcePart, startOffset);
// Процесс миграции
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
// Проверяем, достигли ли мы конечного смещения
long endOffset = consumer.endOffsets(Collections.singletonList(sourcePart))
.get(sourcePart);
long currentPosition = consumer.position(sourcePart);
if (currentPosition >= endOffset) {
break; // Миграция завершена
}
continue;
}
for (ConsumerRecord<String, String> record : records) {
// Трансформация данных при необходимости
String transformedValue = transform(record.value());
// Отправка в целевой топик
ProducerRecord<String, String> newRecord =
new ProducerRecord<>("target-topic", record.key(), transformedValue);
// Добавляем исходные метаданные в заголовки
newRecord.headers().add("source-topic", record.topic().getBytes());
newRecord.headers().add("source-partition",
String.valueOf(record.partition()).getBytes());
newRecord.headers().add("source-offset",
String.valueOf(record.offset()).getBytes());
producer.send(newRecord);
}
// Периодически сохраняем прогресс миграции
producer.flush();
consumer.commitSync();
// Для отслеживания прогресса
System.out.printf("Мигрировано %d/%d сообщений (%.2f%%)%n",
consumer.position(sourcePart),
consumer.endOffsets(Collections.singletonList(sourcePart))
.get(sourcePart),
(double) consumer.position(sourcePart) * 100 /
consumer.endOffsets(Collections.singletonList(sourcePart))
.get(sourcePart));
} |
|
Такой подход позволяет реализовать управляемую миграцию данных с контролем процесса и возможностью отслеживания прогресса.
Еще одним интересным применением является реализация отложенной обработки сообщений. Представьте, что нужно обрабатывать сообщения в определенное время суток или когда загрузка системы низкая:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
| // Класс для хранения информации о запланированной обработке
class ScheduledProcessing {
private final TopicPartition partition;
private final long offset;
private final long scheduledTime;
// Конструктор, геттеры и прочее опущены для краткости
}
// Хранилище запланированных обработок
Queue<ScheduledProcessing> scheduledProcessings = new PriorityQueue<>(
Comparator.comparingLong(ScheduledProcessing::getScheduledTime));
// В основном цикле обработки
while (true) {
// Проверяем, есть ли запланированные операции, время которых настало
long currentTime = System.currentTimeMillis();
while (!scheduledProcessings.isEmpty() &&
scheduledProcessings.peek().getScheduledTime() <= currentTime) {
ScheduledProcessing sp = scheduledProcessings.poll();
consumer.seek(sp.getPartition(), sp.getOffset());
// Обрабатываем только одно сообщение из этой позиции
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (record.partition() == sp.getPartition().partition() &&
record.offset() == sp.getOffset()) {
processRecord(record);
// Коммитим только это конкретное сообщение
Map<TopicPartition, OffsetAndMetadata> offsetMap =
Collections.singletonMap(
sp.getPartition(),
new OffsetAndMetadata(sp.getOffset() + 1)
);
consumer.commitSync(offsetMap);
break;
}
}
}
// Обычная обработка поступающих сообщений
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (shouldProcess(record)) {
processRecord(record);
} else {
// Планируем обработку на будущее
ScheduledProcessing sp = new ScheduledProcessing(
new TopicPartition(record.topic(), record.partition()),
record.offset(),
calculateProcessingTime(record)
);
scheduledProcessings.add(sp);
// Коммитим сообщение, чтобы не получать его снова через poll
Map<TopicPartition, OffsetAndMetadata> offsetMap =
Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
consumer.commitSync(offsetMap);
}
}
} |
|
Этот пример демонстрирует гибкий механизм, где consumer может как обрабатывать новые сообщения, так и возвращаться к ранее пропущенным по расписанию.
Для простого примера перемотки к конкретному смещению, давайте рассмотрим универсальный класс-помощник:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| public class KafkaSeekUtil {
/**
* Перемещает консьюмера к заданному смещению во всех назначенных партициях
*/
public static void seekToOffset(KafkaConsumer<?, ?> consumer, long offset) {
for (TopicPartition partition : consumer.assignment()) {
consumer.seek(partition, offset);
}
}
/**
* Перематывает консьюмера на указанное количество сообщений назад
*/
public static void seekBack(KafkaConsumer<?, ?> consumer, long messagesBack) {
for (TopicPartition partition : consumer.assignment()) {
long currentPosition = consumer.position(partition);
long newPosition = Math.max(0, currentPosition - messagesBack);
consumer.seek(partition, newPosition);
}
}
/**
* Перемещает к смещению по метке времени
*/
public static void seekToTime(KafkaConsumer<?, ?> consumer, long timestamp) {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : consumer.assignment()) {
timestampsToSearch.put(partition, timestamp);
}
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes =
consumer.offsetsForTimes(timestampsToSearch);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
offsetsForTimes.entrySet()) {
if (entry.getValue() != null) {
consumer.seek(entry.getKey(), entry.getValue().offset());
} else {
// Если для временной метки нет смещения,
// перейти в конец партиции
consumer.seekToEnd(Collections.singletonList(entry.getKey()));
}
}
}
} |
|
Такой утилитный класс может значительно упростить типовые операции позиционирования в реальных приложениях.
При тестировании приложений, работающих с Kafka, seek-операции также незаменимы. Они позволяют создавать воспроизводимые тестовые сценарии:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| @Test
public void testMessageProcessing() {
// Подготовка тестовой среды
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Создаем тестовый потребитель
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Заполняем топик тестовыми данными через производителя
produceTestMessages("test-topic", 10);
// Назначаем потребителя на тестовый топик
TopicPartition partition = new TopicPartition("test-topic", 0);
consumer.assign(Collections.singletonList(partition));
// Находим конкретное сообщение для тестирования
consumer.seekToBeginning(Collections.singletonList(partition));
ConsumerRecord<String, String> targetRecord = null;
while (targetRecord == null) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (record.value().contains("test-condition")) {
targetRecord = record;
break;
}
}
}
// Создаем экземпляр тестируемого процессора
MessageProcessor processor = new MessageProcessor();
// Перематываем к целевому сообщению и проверяем обработку
consumer.seek(partition, targetRecord.offset());
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (record.offset() == targetRecord.offset()) {
ProcessingResult result = processor.process(record);
assertEquals(ProcessingResult.SUCCESS, result);
break;
}
}
consumer.close();
} |
|
Такой подход к тестированию особенно ценен в CI/CD-пайплайнах, где каждый коммит может запускать тесты на реальных данных, имитируя различные сценарии обработки.
Рассмотрим еще один практический пример: допустим, мы разрабатываем систему мониторинга, которая отслеживает прогресс обработки и позволяет оператору вмешаться при необходимости:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| public class KafkaConsumerMonitor {
private final KafkaConsumer<String, String> consumer;
private final Map<TopicPartition, Long> lastProcessedOffsets = new ConcurrentHashMap<>();
private final Map<TopicPartition, Long> partitionEndOffsets = new ConcurrentHashMap<>();
public KafkaConsumerMonitor(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;
}
// Обновить состояние монитора
public void update() {
// Получить текущие позиции
for (TopicPartition partition : consumer.assignment()) {
lastProcessedOffsets.put(partition, consumer.position(partition));
}
// Получить конечные смещения
partitionEndOffsets.putAll(consumer.endOffsets(consumer.assignment()));
}
// Получить отставание для конкретной партиции
public long getLag(TopicPartition partition) {
Long currentEnd = partitionEndOffsets.get(partition);
Long currentPosition = lastProcessedOffsets.get(partition);
if (currentEnd == null || currentPosition == null) {
return -1L; // Неизвестно
}
return currentEnd - currentPosition;
}
// Получить общее отставание
public long getTotalLag() {
return lastProcessedOffsets.keySet().stream()
.mapToLong(this::getLag)
.filter(lag -> lag >= 0)
.sum();
}
// Перемотать к началу всех партиций
public void resetToBeginning() {
consumer.seekToBeginning(consumer.assignment());
update();
}
// Перемотать вперед для уменьшения отставания
public void seekForward(long maxLagThreshold) {
for (TopicPartition partition : consumer.assignment()) {
long lag = getLag(partition);
if (lag > maxLagThreshold) {
long currentPosition = lastProcessedOffsets.get(partition);
long newPosition = partitionEndOffsets.get(partition) - maxLagThreshold;
consumer.seek(partition, Math.max(currentPosition, newPosition));
}
}
update();
}
} |
|
Такой монитор может быть интегрирован в консоль администратора или информационную панель, предоставляя оператору возможность контролировать процесс обработки и вмешиваться в случае значительного отставания.
Для высоконагруженных систем также полезны операции скачкообразного позиционирования, когда необходимо быстро перейти к актуальному состоянию, пропустив накопившиеся устаревшие данные:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| // Скачок вперед при критическом отставании
if (monitor.getTotalLag() > CRITICAL_LAG_THRESHOLD) {
// Сохраняем текущие смещения
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
for (TopicPartition partition : consumer.assignment()) {
currentOffsets.put(partition,
new OffsetAndMetadata(consumer.position(partition)));
}
// Логируем информацию о пропуске
log.warn("Критическое отставание: {} сообщений. Пропускаем устаревшие данные",
monitor.getTotalLag());
// Переходим к новым смещениям
Map<TopicPartition, Long> endOffsets =
consumer.endOffsets(consumer.assignment());
for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
// Оставляем небольшой буфер для обработки
long newPosition = entry.getValue() - BUFFER_SIZE;
if (newPosition > consumer.position(entry.getKey())) {
consumer.seek(entry.getKey(), newPosition);
}
}
// Записываем информацию о пропущенном диапазоне
for (TopicPartition partition : consumer.assignment()) {
long oldOffset = currentOffsets.get(partition).offset();
long newOffset = consumer.position(partition);
long skipped = newOffset - oldOffset;
if (skipped > 0) {
logSkippedRange(partition, oldOffset, newOffset, skipped);
sendSkipMetric(partition, skipped);
}
}
} |
|
Этот код демонстрирует продвинутый сценарий, когда система может динамически адаптироваться к нагрузке, жертвуя полнотой обработки исторических данных ради своевременной обработки новых.
Еще один интересный сценарий — использование seek для реализации повторных попыток с экспоненциальной задержкой для сообщений, которые не удалось обработать сразу:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
| // Класс для хранения информации о повторных попытках
class RetryInfo {
private final TopicPartition partition;
private final long offset;
private int attempts;
private long nextRetryTime;
// Обновление информации о следующей попытке
public void incrementAttempt() {
attempts++;
// Экспоненциальная задержка: 10ms, 20ms, 40ms, 80ms, ...
long delay = 10 * (1L << (attempts - 1));
nextRetryTime = System.currentTimeMillis() + delay;
}
// Геттеры и прочие методы опущены для краткости
}
// Хранилище для сообщений на повторную обработку
Map<String, RetryInfo> messagesToRetry = new HashMap<>();
// Основной цикл обработки
while (running) {
// Обработка запланированных повторных попыток
long currentTime = System.currentTimeMillis();
List<String> keysToProcess = messagesToRetry.entrySet().stream()
.filter(e -> e.getValue().getNextRetryTime() <= currentTime)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
for (String key : keysToProcess) {
RetryInfo info = messagesToRetry.get(key);
consumer.seek(info.getPartition(), info.getOffset());
// Обработка конкретного сообщения
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(50));
for (ConsumerRecord<String, String> record : records) {
if (record.partition() == info.getPartition().partition() &&
record.offset() == info.getOffset()) {
try {
processMessage(record);
// Успешная обработка — удаляем из списка повторных попыток
messagesToRetry.remove(key);
} catch (ProcessingException e) {
// Неудачная попытка — обновляем информацию для следующей попытки
info.incrementAttempt();
if (info.getAttempts() >= MAX_RETRY_ATTEMPTS) {
// Превышено число попыток — отправляем в DLQ
sendToDLQ(record, e);
messagesToRetry.remove(key);
}
}
break;
}
}
}
// Обычная обработка новых сообщений
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record);
} catch (ProcessingException e) {
// Добавляем в список повторных попыток
String key = record.topic() + "-" + record.partition() + "-" + record.offset();
RetryInfo info = new RetryInfo(
new TopicPartition(record.topic(), record.partition()),
record.offset());
info.incrementAttempt();
messagesToRetry.put(key, info);
}
}
} |
|
Такой подход к повторным попыткам с использованием seek позволяет очень гибко управлять обработкой проблемных сообщений, не блокируя при этом обработку новых.
Продвинутые техники и сценарии
Переходим к более сложным сценариям использования seek-операций в Kafka. Эти техники полезны для тонкого контроля над потоком данных.
Работа с метками времени
Одна из самых мощных возможностей Kafka — позиционирование по метке времени. Это позволяет начинать чтение с конкретного момента времени, что незаменимо при восстановлении состояния или анализе исторических данных:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| public static Map<TopicPartition, Long> seekToTimestamp(
KafkaConsumer<?, ?> consumer, String topic, long timestamp) {
// Получаем список партиций для топика
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
if (partitionInfos == null || partitionInfos.isEmpty()) {
throw new RuntimeException("Не найдены партиции для топика: " + topic);
}
// Создаем список TopicPartition из PartitionInfo
List<TopicPartition> topicPartitions = partitionInfos.stream()
.map(info -> new TopicPartition(topic, info.partition()))
.collect(Collectors.toList());
// Назначаем потребителя на все партиции топика
consumer.assign(topicPartitions);
// Создаем карту запрашиваемых временных меток
Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> timestamp));
// Получаем смещения для указанных временных меток
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes =
consumer.offsetsForTimes(timestampsToSearch);
// Применяем полученные смещения
Map<TopicPartition, Long> appliedOffsets = new HashMap<>();
offsetsForTimes.forEach((tp, offsetAndTimestamp) -> {
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
appliedOffsets.put(tp, offsetAndTimestamp.offset());
} else {
// Если сообщений после указанной временной метки нет,
// перемещаемся в конец партиции
consumer.seekToEnd(Collections.singleton(tp));
long endOffset = consumer.position(tp);
appliedOffsets.put(tp, endOffset);
}
});
return appliedOffsets;
} |
|
Этот метод особенно полезен для сценариев, когда вы хотите воссоздать состояние системы на определенный момент времени или проанализировать события, происходившие в конкретный интервал.
Восстановление после сбоев
При разработке надежных систем крайне важно учитывать возможные сбои. Ниже приведена реализация шаблона восстановления с контрольными точками:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
| public class CheckpointedKafkaConsumer<K, V> implements AutoCloseable {
private final KafkaConsumer<K, V> consumer;
private final CheckpointStore checkpointStore;
private final String consumerId;
private final String topic;
// Время между сохранениями контрольных точек (мс)
private final long checkpointIntervalMs;
private long lastCheckpointTime;
public CheckpointedKafkaConsumer(Properties props, CheckpointStore store,
String consumerId, String topic) {
this.consumer = new KafkaConsumer<>(props);
this.checkpointStore = store;
this.consumerId = consumerId;
this.topic = topic;
this.checkpointIntervalMs = 60000; // 1 минута
this.lastCheckpointTime = System.currentTimeMillis();
// Инициализация и восстановление из последней контрольной точки
initialize();
}
private void initialize() {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = partitionInfos.stream()
.map(info -> new TopicPartition(topic, info.partition()))
.collect(Collectors.toList());
consumer.assign(topicPartitions);
// Восстановление из контрольных точек
Map<TopicPartition, Long> checkpoints =
checkpointStore.getCheckpoints(consumerId);
for (TopicPartition partition : topicPartitions) {
Long checkpoint = checkpoints.get(partition);
if (checkpoint != null) {
consumer.seek(partition, checkpoint);
} else {
// Для новых партиций начинаем с начала
consumer.seekToBeginning(Collections.singleton(partition));
}
}
}
public ConsumerRecords<K, V> poll(Duration timeout) {
ConsumerRecords<K, V> records = consumer.poll(timeout);
// Периодически сохраняем контрольные точки
long currentTime = System.currentTimeMillis();
if (currentTime - lastCheckpointTime > checkpointIntervalMs) {
saveCheckpoints();
lastCheckpointTime = currentTime;
}
return records;
}
public void saveCheckpoints() {
Map<TopicPartition, Long> positions = new HashMap<>();
for (TopicPartition partition : consumer.assignment()) {
positions.put(partition, consumer.position(partition));
}
checkpointStore.saveCheckpoints(consumerId, positions);
}
@Override
public void close() {
try {
saveCheckpoints();
} finally {
consumer.close();
}
}
} |
|
В этом примере CheckpointStore может быть реализован через базу данных, файловую систему или даже специальный топик в Kafka. Такой подход обеспечивает сохранение прогресса обработки и быстрое восстановление после сбоев.
Интеграция с системами мониторинга
Мониторинг положения потребителей — критически важная задача в промышленных системах. Вот пример интеграции:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
| public class MonitoredKafkaConsumer<K, V> implements AutoCloseable {
private final KafkaConsumer<K, V> consumer;
private final MetricsRegistry metrics;
private final String consumerGroup;
private final ScheduledExecutorService scheduler;
public MonitoredKafkaConsumer(Properties props, MetricsRegistry metrics) {
this.consumer = new KafkaConsumer<>(props);
this.metrics = metrics;
this.consumerGroup = props.getProperty("group.id", "unknown-group");
// Запускаем фоновый мониторинг
this.scheduler = Executors.newSingleThreadScheduledExecutor();
this.scheduler.scheduleAtFixedRate(
this::reportMetrics, 0, 30, TimeUnit.SECONDS);
}
private void reportMetrics() {
try {
// Получаем текущие помещения и лаги партиций
Map<TopicPartition, Long> currentPositions = new HashMap<>();
for (TopicPartition partition : consumer.assignment()) {
currentPositions.put(partition, consumer.position(partition));
}
// Запрашиваем конечные смешения
Map<TopicPartition, Long> endOffsets =
consumer.endOffsets(consumer.assignment());
// Рассчитываем и отправляем метрики лага
for (TopicPartition partition : currentPositions.keySet()) {
long position = currentPositions.get(partition);
long endOffset = endOffsets.getOrDefault(partition, position);
long lag = Math.max(0, endOffset - position);
// Публикуем метрику лага
metrics.gauge(
String.format("kafka.consumer.lag.%s.%s.%d",
consumerGroup, partition.topic(),
partition.partition()),
lag
);
// Публикуем метрику текущей позиции
metrics.gauge(
String.format("kafka.consumer.position.%s.%s.%d",
consumerGroup, partition.topic(),
partition.partition()),
position
);
}
} catch (Exception e) {
// Логируем ошибку, но не прерываем работу
log.error("Ошибка при сборе метрик Kafka consumer", e);
}
}
// Делегирование основных методов
public ConsumerRecords<K, V> poll(Duration timeout) {
return consumer. poll(timeout);
}
public void seek(TopicPartition partition, long offset) {
consumer.seek(partition, offset);
// Обновляем метрики после преднамеренного изменения позиции
reportMetrics();
}
@Override
public void close() {
try {
scheduler.shutdown();
scheduler.awaitTermination(5, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("Ошибка при завершении планировщика", e);
} finally {
consumer.close();
}
}
} |
|
Такая интеграция позволяет наблюдать в реальном времени за потреблением сообщений, отслеживать отставания и быстро реагировать на проблемы.
Применение seek для A/B тестирования
Оригинальное применение seek-операций — реализация A/B тестирования обработки сообщений:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| public class ABTestingConsumer<K, V> {
private final KafkaConsumer<K, V> consumer;
private final MessageProcessor processorA;
private final MessageProcessor processorB;
private final double samplingRate;
private final Random random;
public ABTestingConsumer(Properties props, MessageProcessor processorA,
MessageProcessor processorB, double samplingRate) {
this.consumer = new KafkaConsumer<>(props);
this.processorA = processorA;
this.processorB = processorB;
this.samplingRate = samplingRate;
this.random = new Random();
}
public void processMessages() {
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
// Сохраняем текущее положение для каждой партиции
Map<TopicPartition, Long> savedPositions = new HashMap<>();
for (TopicPartition partition : consumer.assignment()) {
savedPositions.put(partition, consumer.position(partition));
}
// Обрабатываем записи с процессором A
for (ConsumerRecord<K, V> record : records) {
processorA.process(record);
}
// Решаем, нужно ли запускать тестовый вариант B
if (random.nextDouble() < samplingRate) {
// Возвращаемся к исходной позиции для повторной обработки
savedPositions.forEach(consumer::seek);
// Обрабатываем те же записи с процессором B и сравниваем результаты
for (ConsumerRecord<K, V> record : records) {
ComparisonResult result = new ComparisonResult(
processorA.getLastResult(record),
processorB.process(record)
);
// Анализируем различия
analyzeAndReportComparison(record, result);
}
}
// Коммитим обработанные сообщения
consumer.commitSync();
}
}
} |
|
Этот подход позволяет сравнивать новые алгоритмы обработки с существующими на одних и тех же данных, не нарушая основной поток обработки.
Нестандартные решения
Иногда требуются совсем нетривиальные сценарии. Например, механизм "частичного коммита" для случаев с агрегацией данных:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
| public class PartialCommitConsumer<K, V> {
private final KafkaConsumer<K, V> consumer;
private final Map<String, Set<TopicPartition>> completedBatches = new HashMap<>();
public void processBatchWithPartialCommits(String batchId, int totalExpectedRecords) {
// Схема обработки сообщений с частичными фиксациями
int processedRecords = 0;
boolean batchComplete = false;
// Партиции, для которых достигнуты условия коммита
Set<TopicPartition> completedPartitions =
completedBatches.computeIfAbsent(batchId, k -> new HashSet<>());
while (!batchComplete) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (ConsumerRecord<K, V> record : records) {
TopicPartition partition =
new TopicPartition(record.topic(), record.partition());
// Пропускаем уже обработанные партиции
if (completedPartitions.contains(partition)) continue;
// Обработка сообщения
processRecord(record);
processedRecords++;
// Обновляем смещения для коммита
offsetsToCommit.put(
partition,
new OffsetAndMetadata(record.offset() + 1, batchId)
);
// Проверяем критерии завершения пакета
if (isPartitionComplete(partition, batchId)) {
completedPartitions.add(partition);
// Сохраняем прогресс обработки для конкретной партиции
consumer.commitSync(
Collections.singletonMap(
partition,
new OffsetAndMetadata(record.offset() + 1,
batchId + ":completed")
)
);
}
}
// Коммитим промежуточный прогресс для активных партиций
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit);
}
// Проверяем общее завершение пакета
batchComplete = processedRecords >= totalExpectedRecords ||
completedPartitions.containsAll(consumer.assignment());
}
// Очистка
completedBatches.remove(batchId);
}
// Перезапуск обработки с учётом данных частичного коммита
public void resumeBatch(String batchId) {
Set<TopicPartition> partitionsToProcess = new HashSet<>(consumer.assignment());
// Проверяем, какие партиции уже полностью обработаны
for (TopicPartition partition : consumer.assignment()) {
OffsetAndMetadata metadata = consumer.committed(partition);
if (metadata != null && metadata.metadata().contains(batchId + ":completed")) {
partitionsToProcess.remove(partition);
completedBatches.computeIfAbsent(batchId, k -> new HashSet<>())
.add(partition);
}
}
// Продолжаем обработку только для незавершённых партиций
// Для них позиции уже установлены на последние коммитированные
}
} |
|
Такой подход позволяет эффективно обрабатывать сценарии, где требуется обрабатывать данные из Kafka не последовательно, а с учетом сложной логики завершенности.
Потенциальные проблемы и оптимизации
При работе с seek-операциями в Kafka разработчики нередко сталкиваются с подводными камнями, которые могут серьезно повлиять на производительность и надежность приложения.
Производительность при использовании seek-операций
Частое выполнение операций seek может существенно снизить производительность потребителя. Это связано с несколькими факторами:
1. Нарушение паттерна последовательного чтения. Kafka оптимизирована для последовательного чтения данных, и произвольный доступ через seek нарушает этот принцип. Особенно заметно это влияет на производительность при работе с большими объемами данных.
2. Необходимость загрузки сегментов лога. Когда вы выполняете seek к смещению, которое находится в другом сегменте лога, Kafka должна найти и загрузить этот сегмент, что увеличивает латентность операций.
3. Дополнительные сетевые запросы. Частые перемещения по партиции могут вызывать дополнительные сетевые запросы между потребителем и брокером.
Оптимизация:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // Группировка операций seek для снижения накладных расходов
public void optimizedSeeks(KafkaConsumer<String, String> consumer,
Map<TopicPartition, Long> targetOffsets) {
// Сгруппируем операции по топику и партиции
Map<String, List<TopicPartition>> topicPartitions = new HashMap<>();
for (TopicPartition tp : targetOffsets.keySet()) {
topicPartitions.computeIfAbsent(tp.topic(), k -> new ArrayList<>())
.add(tp);
}
// Выполняем операции seek группами для каждого топика
for (Map.Entry<String, List<TopicPartition>> entry : topicPartitions.entrySet()) {
for (TopicPartition tp : entry.getValue()) {
consumer.seek(tp, targetOffsets.get(tp));
}
// Одна операция poll после группы seek для одного топика
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));
// Игнорируем результат - это просто для применения операций seek
}
} |
|
Этот подход снижает количество сетевых запросов за счет группировки операций seek по топикам.
Обход подводных камней
Одна из распространенных ошибок — игнорирование эффекта выполнения seek на последующие вызовы poll(). Рассмотрим типичную проблему и её решение:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
| // Проблемный код
consumer.seekToBeginning(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Предполагается, что здесь будут записи с начала партиции, но...
// Если в буфере потребителя уже были записи из предыдущего poll(),
// они будут возвращены ПЕРЕД тем, как будет выполнен seek!
// Исправленный вариант
consumer.seekToBeginning(consumer.assignment());
// Сначала выполняем poll с нулевым таймаутом для применения seek
consumer.poll(Duration.ofMillis(0));
// Теперь получаем записи с новой позиции
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); |
|
Еще одна распространенная проблема возникает при использовании seek в сочетании с автоматическим управлением смещениями:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| // Проблемный код с автоматическим коммитом
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// ...
consumer.seek(partition, specificOffset);
// Обработка...
// Проблема: автоматический коммит может произойти в любой момент,
// перезаписав смещение, которое мы установили через seek
// Решение: отключить автоматический коммит и управлять вручную
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// ...
consumer.seek(partition, specificOffset);
// Обработка...
// Явный коммит только когда это нужно
consumer.commitSync(); |
|
Также критично понимать, что happen-before смещения: seek влияет только на будущие операции poll, а не на уже выполненные или текущие.
Влияние партиционирования на эффективность seek-операций
Количество и размер партиций серьезно влияют на эффективность операций seek:
1. Много маленьких партиций увеличивают гранулярность контроля через seek, но повышают накладные расходы.
2. Крупные партиции снижают число необходимых seek-операций, но затрудняют точное позиционирование.
3. Неравномерное распределение данных между партициями может создать "горячие" партиции, где seek-операции будут работать значительно медленнее.
Анализ эффективности при разном количестве партиций:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
| public class SeekPerformanceAnalyzer {
public static void analyzePartitioningImpact(String bootstrapServers, String topic,
int[] partitionCounts) throws Exception {
for (int partitionCount : partitionCounts) {
// Создаем топик с заданным количеством партиций
AdminClient admin = AdminClient.create(
Collections.singletonMap(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers
)
);
NewTopic newTopic = new NewTopic(
topic + "-" + partitionCount,
partitionCount,
(short) 1
);
admin.createTopics(Collections.singleton(newTopic)).all().get();
admin.close();
// Заполняем данными
populateTestData(bootstrapServers, topic + "-" + partitionCount, 100000);
// Тестируем seek-операции
long startTime = System.currentTimeMillis();
testSeekOperations(bootstrapServers, topic + "-" + partitionCount, 100);
long endTime = System.currentTimeMillis();
System.out.printf(
"Топик с %d партициями: время выполнения seek-операций: %d ms%n",
partitionCount, (endTime - startTime)
);
}
}
private static void testSeekOperations(String bootstrapServers,
String topic, int seekOperations) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "seek-performance-test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = partitions.stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
consumer.assign(topicPartitions);
Random random = new Random();
for (int i = 0; i < seekOperations; i++) {
// Выбираем случайную партицию
TopicPartition tp = topicPartitions.get(
random.nextInt(topicPartitions.size())
);
// Получаем границы смещений
Map<TopicPartition, Long> beginningOffsets =
consumer.beginningOffsets(Collections.singleton(tp));
Map<TopicPartition, Long> endOffsets =
consumer.endOffsets(Collections.singleton(tp));
long start = beginningOffsets.get(tp);
long end = endOffsets.get(tp);
// Выполняем seek к случайной позиции
if (end > start) {
long randomOffset = start + random.nextInt((int)(end - start));
consumer.seek(tp, randomOffset);
consumer.poll(Duration.ofMillis(0));
}
}
}
}
} |
|
Результаты такого анализа обычно показывают, что оптимальное количество партиций зависит от паттернов доступа и объема данных.
Распределенная координация позиций между несколькими Consumer
В сложных системах с множеством экземпляров потребителей координация позиций становится критически важной задачей. Вот два подхода к ее решению:
1. Централизованное хранилище состояния:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
| public class CoordinatedSeekManager {
private final StateStore stateStore;
private final String applicationId;
public CoordinatedSeekManager(StateStore stateStore, String applicationId) {
this.stateStore = stateStore;
this.applicationId = applicationId;
}
// Сохранение позиции с возможностью атомарного обновления
public void savePosition(String topic, int partition, long offset,
String metadata) throws Exception {
String key = formatKey(topic, partition);
PositionInfo current = stateStore.getPosition(applicationId, key);
// Обновляем только если позиция новее
if (current == null || offset > current.offset) {
PositionInfo newPosition = new PositionInfo(offset, metadata,
System.currentTimeMillis());
stateStore.updatePosition(applicationId, key, newPosition);
}
}
// Загрузка координированной позиции
public PositionInfo loadPosition(String topic, int partition) throws Exception {
String key = formatKey(topic, partition);
return stateStore.getPosition(applicationId, key);
}
// Координированный seek для группы потребителей
public void coordinatedSeek(KafkaConsumer<?, ?> consumer,
Collection<TopicPartition> partitions) throws Exception {
for (TopicPartition tp : partitions) {
PositionInfo position = loadPosition(tp.topic(), tp.partition());
if (position != null) {
consumer.seek(tp, position.offset);
}
}
}
private String formatKey(String topic, int partition) {
return topic + "-" + partition;
}
// Класс для хранения информации о позиции
public static class PositionInfo {
public final long offset;
public final String metadata;
public final long timestamp;
public PositionInfo(long offset, String metadata, long timestamp) {
this.offset = offset;
this.metadata = metadata;
this.timestamp = timestamp;
}
}
// Интерфейс для хранилища состояния
public interface StateStore {
PositionInfo getPosition(String appId, String key) throws Exception;
void updatePosition(String appId, String key, PositionInfo position)
throws Exception;
}
} |
|
2. Использование самой Kafka в качестве хранилища координации:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
| public class KafkaBasedCoordinationStore implements CoordinatedSeekManager.StateStore {
private final KafkaProducer<String, String> producer;
private final KafkaConsumer<String, String> consumer;
private final String coordinationTopic;
private final ObjectMapper objectMapper;
public KafkaBasedCoordinationStore(Properties producerProps,
Properties consumerProps,
String coordinationTopic) {
this.producer = new KafkaProducer<>(producerProps);
this.consumer = new KafkaConsumer<>(consumerProps);
this.coordinationTopic = coordinationTopic;
this.objectMapper = new ObjectMapper();
// Подписываемся на топик координации
consumer.subscribe(Collections.singleton(coordinationTopic));
}
@Override
public CoordinatedSeekManager.PositionInfo getPosition(String appId, String key)
throws Exception {
// Формируем полный ключ для поиска
String fullKey = appId + ":" + key;
// Переходим к последнему смещению
consumer.seekToEnd(consumer.assignment());
// Читаем последние N сообщений (можно настроить)
long startTime = System.currentTimeMillis();
Map<String, CoordinatedSeekManager.PositionInfo> latestPositions =
new HashMap<>();
while (System.currentTimeMillis() - startTime < 5000) { // Таймаут 5 секунд
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Ищем записи с нашим ключом
if (record.key().equals(fullKey)) {
CoordinatedSeekManager.PositionInfo position =
objectMapper.readValue(record.value(),
CoordinatedSeekManager.PositionInfo.class);
// Сохраняем только самую свежую версию
if (!latestPositions.containsKey(fullKey) ||
position.timestamp > latestPositions.get(fullKey).timestamp) {
latestPositions.put(fullKey, position);
}
}
}
// Если нашли позицию, можно прервать поиск
if (latestPositions.containsKey(fullKey)) {
break;
}
}
return latestPositions.get(fullKey);
}
@Override
public void updatePosition(String appId, String key,
CoordinatedSeekManager.PositionInfo position)
throws Exception {
// Формируем полный ключ
String fullKey = appId + ":" + key;
// Сериализуем позицию в JSON
String positionJson = objectMapper.writeValueAsString(position);
// Отправляем в топик координации
ProducerRecord<String, String> record =
new ProducerRecord<>(coordinationTopic, fullKey, positionJson);
// Ждем подтверждения записи для гарантии доставки
producer.send(record).get();
}
public void close() {
producer.close();
consumer.close();
}
} |
|
Эта реализация использует отдельный топик Kafka для хранения и координации позиций между потребителями.
Балансировка нагрузки с помощью seek-операций
Интересным применением seek-операций является динамическая балансировка нагрузки между экземплярами потребителей:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
| public class LoadBalancedConsumerGroup implements Runnable {
private final List<KafkaConsumer<String, String>> consumers;
private final int totalPartitions;
private final AtomicLong[] partitionLags;
private final ReentrantLock rebalanceLock = new ReentrantLock();
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
public LoadBalancedConsumerGroup(List<Properties> consumerConfigs,
String topic, int partitionCount) {
this.consumers = consumerConfigs.stream()
.map(props -> new KafkaConsumer<String, String>(props))
.collect(Collectors.toList());
this.totalPartitions = partitionCount;
this.partitionLags = new AtomicLong[partitionCount];
for (int i = 0; i < partitionCount; i++) {
partitionLags[i] = new AtomicLong(0);
}
// Запускаем периодическую ребалансировку
scheduler.scheduleAtFixedRate(
this::rebalancePartitions, 30, 30, TimeUnit.SECONDS);
}
@Override
public void run() {
// Запускаем потребителей в отдельных потоках
List<Thread> consumerThreads = new ArrayList<>();
for (int i = 0; i < consumers.size(); i++) {
final int consumerId = i;
Thread thread = new Thread(() -> consumeMessages(consumerId));
consumerThreads.add(thread);
thread.start();
}
// Ожидаем завершения всех потоков
for (Thread thread : consumerThreads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private void consumeMessages(int consumerId) {
KafkaConsumer<String, String> consumer = consumers.get(consumerId);
// Начальное назначение партиций (равномерно)
assignInitialPartitions(consumer, consumerId);
while (true) {
try {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
// Обработка записей
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
// Обновляем информацию об отставании
int partition = record.partition();
partitionLags[partition].decrementAndGet();
}
// Обновляем информацию о лаге для назначенных партиций
updateLagInfo(consumer);
// Проверка на блокировку ребалансировки
if (rebalanceLock.isLocked() &&
rebalanceLock.isHeldByCurrentThread()) {
// Выполняем специфичные для ребалансировки операции
handleRebalance(consumer, consumerId);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void assignInitialPartitions(KafkaConsumer<String, String> consumer,
int consumerId) {
int consumerCount = consumers.size();
List<TopicPartition> partitionsToAssign = new ArrayList<>();
// Распределяем партиции равномерно
for (int i = consumerId; i < totalPartitions; i += consumerCount) {
partitionsToAssign.add(new TopicPartition("test-topic", i));
}
consumer.assign(partitionsToAssign);
consumer.seekToEnd(partitionsToAssign);
}
private void updateLagInfo(KafkaConsumer<String, String> consumer) {
Set<TopicPartition> assignment = consumer.assignment();
// Получаем конечные смещения
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
// Обновляем информацию о лаге
for (TopicPartition tp : assignment) {
long currentPosition = consumer.position(tp);
long endOffset = endOffsets.get(tp);
long lag = Math.max(0, endOffset - currentPosition);
partitionLags[tp.partition()].set(lag);
}
}
private void rebalancePartitions() {
try {
// Блокируем доступ к ребалансировке
rebalanceLock.lock();
// Анализируем текущее распределение нагрузки
long[] consumerLoads = new long[consumers.size()];
Map<Integer, Set<Integer>> consumerToPartitions = new HashMap<>();
for (int i = 0; i < consumers.size(); i++) {
consumerToPartitions.put(i, new HashSet<>());
}
KafkaConsumer<String, String> firstConsumer = consumers.get(0);
for (int i = 0; i < totalPartitions; i++) {
TopicPartition tp = new TopicPartition("test-topic", i);
// Определяем, какому потребителю назначена эта партиция
for (int j = 0; j < consumers.size(); j++) {
KafkaConsumer<String, String> consumer = consumers.get(j);
if (consumer.assignment().contains(tp)) {
consumerToPartitions.get(j).add(i);
consumerLoads[j] += partitionLags[i].get();
break;
}
}
}
// Находим наиболее и наименее загруженных потребителей
int maxLoadConsumer = 0;
int minLoadConsumer = 0;
for (int i = 1; i < consumers.size(); i++) {
if (consumerLoads[i] > consumerLoads[maxLoadConsumer]) {
maxLoadConsumer = i;
}
if (consumerLoads[i] < consumerLoads[minLoadConsumer]) {
minLoadConsumer = i;
}
}
// Если дисбаланс достаточно большой, перераспределяем партиции
if (consumerLoads[maxLoadConsumer] > consumerLoads[minLoadConsumer] * 1.5) {
// Находим партицию с наибольшим лагом у самого загруженного потребителя
int partitionToMove = -1;
long maxLag = -1;
for (int partitionId : consumerToPartitions.get(maxLoadConsumer)) {
if (partitionLags[partitionId].get() > maxLag) {
maxLag = partitionLags[partitionId].get();
partitionToMove = partitionId;
}
}
if (partitionToMove >= 0) {
// Переносим партицию от максимально к минимально загруженному
TopicPartition tp = new TopicPartition("test-topic", partitionToMove);
// Сохраняем текущую позицию
KafkaConsumer<String, String> sourceConsumer =
consumers.get(maxLoadConsumer);
long currentPosition = sourceConsumer.position(tp);
// Удаляем из назначения источника
Set<TopicPartition> newSourceAssignment = new HashSet<>(
sourceConsumer.assignment());
newSourceAssignment.remove(tp);
sourceConsumer.assign(newSourceAssignment);
// Добавляем в назначение приемника
KafkaConsumer<String, String> targetConsumer =
consumers.get(minLoadConsumer);
Set<TopicPartition> newTargetAssignment = new HashSet<>(
targetConsumer.assignment());
newTargetAssignment.add(tp);
targetConsumer.assign(newTargetAssignment);
// Позиционируем на той же позиции
targetConsumer.seek(tp, currentPosition);
System.out.printf(
"Перенесена партиция %d от потребителя %d к потребителю %d%n",
partitionToMove, maxLoadConsumer, minLoadConsumer
);
}
}
} finally {
rebalanceLock.unlock();
}
}
private void handleRebalance(KafkaConsumer<String, String> consumer,
int consumerId) {
// Здесь может быть реализована дополнительная логика
// при ребалансировке
}
private void processRecord(ConsumerRecord<String, String> record) {
// Логика обработки сообщения
}
} |
|
Такой подход к балансировке позволяет динамически адаптировать распределение нагрузки между экземплярами потребителей в зависимости от интенсивности потоков данных.
Контроль пропускной способности с помощью seek
В некоторых сценариях требуется контролировать скорость потребления сообщений. Это можно реализовать с использованием seek-операций:
Java | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
| public class ThrottledConsumer<K, V> {
private final KafkaConsumer<K, V> consumer;
private final long maxMessagesPerSecond;
private long messagesProcessedInCurrentSecond = 0;
private long currentSecondStartTime = System.currentTimeMillis();
public ThrottledConsumer(KafkaConsumer<K, V> consumer, long maxMessagesPerSecond) {
this.consumer = consumer;
this.maxMessagesPerSecond = maxMessagesPerSecond;
}
public ConsumerRecords<K, V> throttledPoll(Duration timeout) {
updateThrottlingState();
if (messagesProcessedInCurrentSecond >= maxMessagesPerSecond) {
// Достигли лимита - возвращаем пустой результат
return ConsumerRecords.empty();
}
// Оставшееся количество сообщений, которое можно обработать в этой секунде
long remainingQuota = maxMessagesPerSecond - messagesProcessedInCurrentSecond;
ConsumerRecords<K, V> records = consumer.poll(timeout);
int recordCount = countRecords(records);
if (recordCount <= remainingQuota) {
// Можем обработать все записи
messagesProcessedInCurrentSecond += recordCount;
return records;
} else {
// Нужно ограничить количество записей
messagesProcessedInCurrentSecond += remainingQuota;
// Обрабатываем только часть записей, и перемещаемся к позициям
// для последующего потребления
Map<TopicPartition, List<ConsumerRecord<K, V>>> filteredRecords =
new HashMap<>();
Map<TopicPartition, Long> newPositions = new HashMap<>();
long processed = 0;
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<K, V>> partitionRecords =
records.records(partition);
if (processed + partitionRecords.size() <= remainingQuota) {
// Можем взять все записи из этой партиции
filteredRecords.put(partition, new ArrayList<>(partitionRecords));
processed += partitionRecords.size();
} else {
// Берем только часть записей
long canTake = remainingQuota - processed;
List<ConsumerRecord<K, V>> limitedRecords =
partitionRecords.subList(
0, (int) canTake);
filteredRecords.put(partition,
new ArrayList<>(limitedRecords));
// Запоминаем позицию для последующего чтения
newPositions.put(partition,
limitedRecords.get(
limitedRecords.size() - 1).offset() + 1);
processed += canTake;
// Прерываем цикл, если квота исчерпана
if (processed >= remainingQuota) {
break;
}
}
}
// Устанавливаем позиции для следующего чтения
for (Map.Entry<TopicPartition, Long> entry : newPositions.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue());
}
return new ConsumerRecords<>(filteredRecords);
}
}
private void updateThrottlingState() {
long currentTime = System.currentTimeMillis();
long elapsedMs = currentTime - currentSecondStartTime;
if (elapsedMs >= 1000) {
// Прошла секунда - сбрасываем счетчик
currentSecondStartTime = currentTime;
messagesProcessedInCurrentSecond = 0;
}
}
private int countRecords(ConsumerRecords<K, V> records) {
int count = 0;
for (TopicPartition partition : records.partitions()) {
count += records.records(partition).size();
}
return count;
}
} |
|
Такая реализация позволяет контролировать скорость потребления сообщений, не превышая заданный лимит.
Управление позицией в Kafka — серьезный инструмент, который требует аккуратного использования. При правильном подходе к проектированию и учете потенциальных проблем можно создавать высокопроизводительные и надежные приложения, использующие все преимущества этого механизма. Основной принцип — найти баланс между гибкостью, предоставляемой seek-операциями, и накладными расходами, которые они вносят.
Написание Kafka Server Mock Приложение передает некоторые сообщения по TCP в Kafka Server. Нужно реализовать заглушку Kafka Server, которая будет ловить эти сообщения и... Проблемы с java kafka и zookeeper на windows 10 Здраствуйте.
Я сейчас пытаюсь настроить zookeeper и kafka по https://habr.com/ru/post/496182/
вот что я сделал.
в файл zoo в... Consumer<Sendable<T>> Дано на текущий момент 3 класса и интерфейс.
1. Интерфейс Sendable<T>
public interface Sendable<T> {
String getTo();
String... Spring Boot + Kafka, запись данных после обработки Добрый вечер, много времени уже мучаюсь над одной проблемой, я извиняюсь, может мало ли вдруг такая тема есть, но значит я плохо искал, в общем я... Spring Kafka: Запись в базу данных и чтение из неё Гайз, нужен хэлп.
Киньте инфу или подскажите как записывать данные из Kafka в базу данных, а потом читать из базы и писать в топики Kafka.
Нужно... JAX-RS consumer для множества объектов Привет. Возник тут вопрос, ради облегчения жизни :)
Java EE, JAX-RS Метод
@POST
@Path("path")
... Реализовать следующие варианты примера "Producer/Consumer" Может кто сталкивался,если не затруднит,помогите пожалуйста
1) без синхронизации
2) с помощью ключевого слова synchronized
3) с помощью... Consumer apache kafka Доброго времени суток уважаемые форумчане.
С apache kafka работаю совсем недавно и столкнулся с неприятной проблемой. Работу с kafka осуществляю... MutationObserver не перехватывает программные события Подскажите пожалуйста, вот ставлю MutationObserver на элемент к примеру ввода. Затем просто веду курсор мышки на элемент ввода и MutationObserver -... Не получается изменить имя родительского блока в цикле массива Есть функция, которая печатает имя пользователя и его числа.
При выводе результата в echo(я эти две строки пометил комментами)
я создаю... Найти подстановку, при которой заданное множ-во дизъюнктов~P(x)~Q(g(a),y)Q(x,f(x))∨R(y)P(x)∨Q(x,f(x))становится невыполн Найти подстановку, при которой заданное множество дизъюнктов
~P(x)
~Q(g(a),y)
Q(x,f(x))∨R(y)
P(x)∨Q(x,f(x))
становится невыполнимым. ... Блокировка интерфейса pyside (Qt) при реализации многопоточных приложений Здравствуйте. Реализовал приложение для опроса (пинговки) серверов, при помощи TCP запросов. Отправка запросов и прием ответов осуществляются в...
|