Форум программистов, компьютерный форум, киберфорум
Java: Spring, Spring Boot
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  
 
0 / 0 / 0
Регистрация: 15.09.2018
Сообщений: 2

Spring Boot + Kafka, запись данных после обработки

22.03.2020, 23:06. Показов 1045. Ответов 0

Студворк — интернет-сервис помощи студентам
Добрый вечер, много времени уже мучаюсь над одной проблемой, я извиняюсь, может мало ли вдруг такая тема есть, но значит я плохо искал, в общем я получаю информацию из Topic и обрабатываю её в kTable, ниже я прикреплю свой код, далее после обработки я должен всю эту обработанную информацию отправить обратно в Topic, только другой, но никак не могу написать корректно, так как каждый раз ругается на Serde, данные классы с сериализацией и десериализацией у меня написаны, может кто подсказать, в каком направлении идти или как правильно дописать строчку, чтобы отправлялась в Kafka Topic? спасибо


Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConsumerConfig {
 
    private static final String TOPIC = "All_Users1234449313123q4r345";
 
    private static final String strAdmin = "topicAdmin";
 
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs(KafkaProperties kafkaProperties) {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "tak-takisqrt");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, UsersSerde.class);
        config.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        //config.put("default.deserialization.exception.handler", LogAndFailExceptionHandler.class);
        return new KafkaStreamsConfiguration(config);
    }
 
    @Bean
    public KStream<String, Users> kStream(StreamsBuilder kStreamBuilder) {
        Serde<String> stringSerde = new Serdes.StringSerde();
        Serde<Users> usersSerde = new UsersSerde();
        Serde<UsersAggregation> usersAggregationSerde = new UsersAggregationSerde();
 
        KStream<String, Users> kStream = kStreamBuilder
                .stream(TOPIC, Consumed.with(stringSerde,
                        usersSerde));//.to("opapapa", Produced.with(Serdes.String().getClass(), UsersSerde.class));
 
        KTable<String, UsersAggregation>  kTable = kStream
                .filter((k,v) -> v.getRole().compareTo("Admin") == 0)
                .selectKey((a, b) -> b.getDate() + b.getName() + b.getRole())
                .groupByKey()
                .aggregate(UsersAggregation::new,
                        (k, v, agg) -> {
                            agg.setName(v.getName());
                            agg.setDate(v.getDate());
                            agg.setRole(v.getRole());
                            agg.setCount(agg.getCount() + 1);
                            agg.setActions(v.getAction());
                            return agg;
                            //});
                        }, Materialized.with(stringSerde, usersAggregationSerde))
                .mapValues(v -> {
                    v.sortActions();
                    System.out.println(v.toString());
                    return v;
                });//тут нужно отправить данные, которые обработал в kTable
 
        return kStream;
    }
}
Добавлено через 2 часа 16 минут
Решение нашел, после своих обработок надо было на следующей строчке написать так вот, а то я писал был именно с kTable, где и обрабатывал. Спасибо всем тем, кто смотрел и подумывал над решением, я благодарен вам!

Java
1
kTable.toStream().to("tuktuktuk", Produced.with(Serdes.String(), usersAggregationSerde));
0
cpp_developer
Эксперт
20123 / 5690 / 1417
Регистрация: 09.04.2010
Сообщений: 22,546
Блог
22.03.2020, 23:06
Ответы с готовыми решениями:

Spring Kafka: Запись в базу данных и чтение из неё
Гайз, нужен хэлп. Киньте инфу или подскажите как записывать данные из Kafka в базу данных, а потом читать из базы и писать в топики...

Spring Kafka. Ошибка Connection refused при подключении к брокеру Kafka
Пишу Kafka Broker и Consumer, чтобы ловить сообщения от приложения. При попытке достать сообщения из Consumer вылетает ошибка ...

Spring boot обработка данных после ввода в форму, вывод результата
Начал изучение Springboot, застрял на этапе когда данные из формы нужно передать в другой класс, провести расчет и вывести результат.После...

0
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
raxper
Эксперт
30234 / 6612 / 1498
Регистрация: 28.12.2010
Сообщений: 21,154
Блог
22.03.2020, 23:06
Помогаю со студенческими работами здесь

Добавление объекта в Redis после старта Spring Boot
Я создал объект: @Data @AllArgsConstructor public class RedisStatistic { private long id; private long...

Что такое Spring, Spring Boot?
Здравствуйте. Никогда не использовал Spring, Spring Boot. Возник такой вопрос можно ли его использовать в IDE для java Se. Или для...

Spring в Spring Boot context
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( &quot;applicationContext.xml&quot; ); ...

Ошибка после переустановки ОС: Reboot and Select proper Boot device Or Insert boot Media in selected Boot device and press a key
У меня такая проблема , я решил переустановить винду, зашёл в Биос , поменял там приоритеты, комп перезагрузился , после этого ...

Spring Boot
Всем привет, подскажите пожалуйста, создаю проект через Spring Initializer! Создаю класс SpringBootWebApplication ...


Искать еще темы с ответами

Или воспользуйтесь поиском по форуму:
1
Ответ Создать тему
Новые блоги и статьи
Access
VikBal 11.12.2025
Помогите пожалуйста !! Как объединить 2 одинаковые БД Access с разными данными.
Новый ноутбук
volvo 07.12.2025
Всем привет. По скидке в "черную пятницу" взял себе новый ноутбук Lenovo ThinkBook 16 G7 на Амазоне: Ryzen 5 7533HS 64 Gb DDR5 1Tb NVMe 16" Full HD Display Win11 Pro
Музыка, написанная Искусственным Интеллектом
volvo 04.12.2025
Всем привет. Некоторое время назад меня заинтересовало, что уже умеет ИИ в плане написания музыки для песен, и, собственно, исполнения этих самых песен. Стихов у нас много, уже вышли 4 книги, еще 3. . .
От async/await к виртуальным потокам в Python
IndentationError 23.11.2025
Армин Ронахер поставил под сомнение async/ await. Создатель Flask заявляет: цветные функции - провал, виртуальные потоки - решение. Не threading-динозавры, а новое поколение лёгких потоков. Откат?. . .
Поиск "дружественных имён" СОМ портов
Argus19 22.11.2025
Поиск "дружественных имён" СОМ портов На странице: https:/ / norseev. ru/ 2018/ 01/ 04/ comportlist_windows/ нашёл схожую тему. Там приведён код на С++, который показывает только имена СОМ портов, типа,. . .
Сколько Государство потратило денег на меня, обеспечивая инсулином.
Programma_Boinc 20.11.2025
Сколько Государство потратило денег на меня, обеспечивая инсулином. Вот решила сделать интересный приблизительный подсчет, сколько государство потратило на меня денег на покупку инсулинов. . . .
Ломающие изменения в C#.NStar Alpha
Etyuhibosecyu 20.11.2025
Уже можно не только тестировать, но и пользоваться C#. NStar - писать оконные приложения, содержащие надписи, кнопки, текстовые поля и даже изображения, например, моя игра "Три в ряд" написана на этом. . .
Мысли в слух
kumehtar 18.11.2025
Кстати, совсем недавно имел разговор на тему медитаций с людьми. И обнаружил, что они вообще не понимают что такое медитация и зачем она нужна. Самые базовые вещи. Для них это - когда просто люди. . .
Создание Single Page Application на фреймах
krapotkin 16.11.2025
Статья исключительно для начинающих. Подходы оригинальностью не блещут. В век Веб все очень привыкли к дизайну Single-Page-Application . Быстренько разберем подход "на фреймах". Мы делаем одну. . .
Фото: Daniel Greenwood
kumehtar 13.11.2025
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru