Apache Kafka Consumer를 구독하여 값을 읽어오는 기능을 Java언어로 구현해봅니다. apache.org에서 제공하는 Kafka API문서를 참고하였고, Apache Kafka의 개념을 먼저 학습한 후 예제 코드를 통하여 카프카에 대해 구현해보는 걸 권장합니다.
1. Consumer 데이터 가져오는 예제 코드 및 설명
// Properties 객체를 생성하여 설정값 셋팅하기
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Kafka Consumer 객체 생성하기
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
- Properties 해석
- apache Kafka의 기본 포트 9092 포트를 바라보는 localhost에 설치되어있는 카프카 서버를 가리킨다.
- group id를 설정하여 컨슈머 그룹을 특정지을 수 있는데 test그룹으로 설정한다.
- kafka 기능중 하나인 auto commit의 활성화를 비활성화시킨다.
- key.deserializer 및 value.deserializer의 값은 org.apache에서 제공하는 값을 쓴다.
- Kafka Consumer 해석
- Properties로 설정한 값들로 KafkaConsuer객체를 생성한다.
- "foo", "bar"토픽을 구독한다.
- 각 구독한 토픽들을 poll 하여 records를 읽어온 후 레코드가 있다면 buffer에 담는다.
- buffer에 담긴 내용이 있다면 db에 값을 저장하고 consumer를 커밋하여 해당 내용은 다음부터 가져오지 않게 한다.
2. ConsumerConfig를 사용하여 만든 Consumer 예제 코드
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "mingyu");
this.kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Collections.singletonList("testTopic"));
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(2000));
for (ConsumerRecord<String, String> recode : records) {
// 카프카 토픽에 저장된 값을 message로 사용 가능함.
message = recode.value();
// 이후 처리 코드..
}
'Language > Java' 카테고리의 다른 글
Java로 Firebase Cloude Message (Application Push) 구현하기 (0) | 2022.07.19 |
---|---|
Java로 Data 암호화 및 복호화 하기 - AES256 (0) | 2022.06.27 |
[Gradle] Gradle로 Java Application 생성 및 실행하기 (0) | 2022.06.08 |
Log4j에 대한 학습 - Log4j 의 개념 및 Log4j2 (0) | 2022.05.30 |
그래들 ( Gradle ) 에 대한 학습 - Gradle 구조 및 Gradle Wrapper (0) | 2022.05.27 |