Java로 Apache Kafka Consumer 구독 구현하기

Apache Kafka Consumer를 구독하여 값을 읽어오는 기능을 Java언어로 구현해봅니다. apache.org에서 제공하는 Kafka API문서를 참고하였고, Apache Kafka의 개념을 먼저 학습한 후 예제 코드를 통하여 카프카에 대해 구현해보는 걸 권장합니다.

 

 

Apache Kafka란? - 아파치 카프카에 대한 학습

대학생 시절에는 프로그래밍 언어를 위주로 공부하였고, 개발자가 되어 4년차가 된 지금, 프로그래밍 언어의 장벽은 낮아졌고 오히려 프로그래밍 아키텍쳐, 디자인 패턴, 파이프라인 구축 등

min-nine.tistory.com

 

kafka 2.6.0 API

 

kafka.apache.org


1.  Consumer 데이터 가져오는 예제 코드 및 설명

// Properties 객체를 생성하여 설정값 셋팅하기
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// Kafka Consumer 객체 생성하기
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 for (ConsumerRecord<String, String> record : records) {
     buffer.add(record);
 }
 if (buffer.size() >= minBatchSize) {
     insertIntoDb(buffer);
     consumer.commitSync();
     buffer.clear();
 }
}
  • Properties 해석
    • apache Kafka의 기본 포트 9092 포트를 바라보는 localhost에 설치되어있는 카프카 서버를 가리킨다. 
    • group id를 설정하여 컨슈머 그룹을 특정지을 수 있는데 test그룹으로 설정한다.
    • kafka 기능중 하나인 auto commit의 활성화를 비활성화시킨다.
    • key.deserializer 및 value.deserializer의 값은 org.apache에서 제공하는 값을 쓴다.
  • Kafka Consumer 해석
    • Properties로 설정한 값들로 KafkaConsuer객체를 생성한다.
    • "foo", "bar"토픽을 구독한다.
    • 각 구독한 토픽들을 poll 하여 records를 읽어온 후 레코드가 있다면 buffer에 담는다. 
    • buffer에 담긴 내용이 있다면 db에 값을 저장하고 consumer를 커밋하여 해당 내용은 다음부터 가져오지 않게 한다.

 

2. ConsumerConfig를 사용하여 만든 Consumer 예제 코드

 

ConsumerConfig (kafka 2.6.0 API)

 

kafka.apache.org

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "mingyu");

this.kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Collections.singletonList("testTopic"));

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(2000));

for (ConsumerRecord<String, String> recode : records) {
    // 카프카 토픽에 저장된 값을 message로 사용 가능함.
	message = recode.value();
  	
    // 이후 처리 코드..
}