본문 바로가기

책/카프카 핵심 가이드

Chapter 3. Kafka Producer [카프카에 메시지 쓰기]

반응형

짹짹

 

 

카프카를 큐로 사용하든 메시지 버스나 데이터 저장 플랫폼(data storage platform)으로 사용하든 간에 카프카를 사용할 때는 카프카에 데이터를 쓸 때 사용하는 프로듀서나 읽어 올때 사용하는 컨슈머, 혹은 두 가지 기능 모두를 수행하는 application을 생성해야한다 

 

 

카프카는 개발자들이 카프카와 상호작용하는 application을 개발할 때 사용할 수 있는 client API와 함께 배포된다. 

 

프로듀서의 디자인과 주요 요소의 전체적인 모습을 살펴본 뒤 KafkaProducer 와 ProducerRecord 객체를 어떻게 생성하는지, 

어떻게 카프카에 레코드를 전송하는지, 그리고 리턴할 수 있는 에러를 어떻게 처리 대응하는지 알아보자 

 

끝으로 파티션 할당 방식을 정의하는 파티셔너과 객체의 직렬화 방식을 정의하는 serializer에는 어떠한 것들이 있는지, 

이들을 작성하기 위해서는 어떻게 해야하는지 깊이 있게 살펴보자 

 

 

3.1 프로듀서 개요 

카프카에 메시지를 써야하는 상황에는 여러 가지가 있을 수 있음 여러가지 목적에 따라 다양한 만큼 요구 조건 또한 다양한다 

  1. 메시지 유실이 용납되지 않는지
  2. 유실이 허용되는지 
  3. 중복이 허용되도 상관없는지 
  4. 반드시 지켜야할 지연이나 처리율(throughput)이 있는지

 

웹사이트에서 생성되는 클릭 정보를 저장하는 경우가 있을 수 있다 이 경우 메시지가 조금 유실되거나 중복되는 것은 문제가 되지 않는다 

사용자 경험에 영향을 주지 않는 한, 지연 역시 높아도 상관없음 

 

이처럼 서로 다른 요구 조건은 카프카에 메시지를 쓰기 위해 프로듀서 API를 사용하는 방식과 설정에 영향이 미침 

 

 

 

카프카에 메시지를 쓰는 작업을 ProducerRecord 객체를 생성함으로써 시작함 여기서 레코드가 저장될 토픽과 밸류 지정은 필수사항 

키와 파티션 지정은 선택사항이다 일단 ProducerRecord를 전송하는 API를 호출했을 때

 

Producer가 가장 먼저 하는 일은 키와 값 객체가 네트워크 상에서 전송될 수 있도록 직렬화해서 바이트 배열로 변환하는 과정 


그 다음, 만약 파티션을 명시적으로 지정하지 않았다면 해당 데이터를 파티셔너에게로 보낸다 파티셔너는 파티션을 결정하는 역할을 하는데, 그 기준은 보통 ProducerRecord 객체의 키의 값이다 

 

메시지가 성공적으로 저장되었을 경우 브로커는 토픽, 파티션, 그리고 해당 파티션 안에서의 레코드의 오프셋을 담은 RecordMetadata 객체를 리턴함 메시지가 저장에 실패했을 경우에는 에러가 리턴함 프로듀서가 에러를 수신했을 경우, 메시지 쓰기를 포기하고 사용자에게 에러를리턴하기 전까지 몇 번 더 재전송을 시도할 수 있음 

 

 

 

3.2 카프카 프로듀서 생성하기 

카프카에 미시지를 쓰려면 우선 원하는 속성을 지정해서 프로듀서 객체를 생성해야함 필수 값은 다음과 같음 

  1. bootstrap.servers
    1. 브로커의 host: port 목록임 이 값은 모든 브로커를 포함할 필요는 없는데, 프로듀서가 첫 연결을 생성한 뒤 추가 정보를 받아오게 되어 있기 때문임 다만 브로커 중 하나가 작동을 정지하는 경우에도 프로듀서가 클러스터에 연결할 수 있도록 최소 2개는 해야함
  2. key.serializer
    1. 카프카에 쓸 레코드의 키의 값을 직렬화하기 위해 사용하는 시리얼라이저 클래스의 이름임 카프카 브로커는 메시지의 키값, 밸류값으로 바이트 배열을 받음, 하지만, 프로듀서 인터페이스는 임의의 자바 객체를 키 혹은 밸류로 전송할 수 있도록 매개변수화된 타입을 사용할 수 있도록함 덕분에 가독성 높은 코드를작성할 수 있지만, 프로듀서 입장에서는 이 객체를 어떻게 바이트 배열로 바꿔야 하는지 알아야 한다는의미기도 함 
      1. key.serializer에는 org.apache.kafka.common.serialization.Serializer Interface를 구현하는 클래스의이름이 지정되어야함 키값 없이 밸류값만 보낼 때도 key.serializer 설정은 해줘야하는데 VoidSerializer를 사용해서 키타입을 Void 타입을 설정할 수 있음 
  3. value.seralizer
    1. 카프카에 쓸 레코드의 밸류값을 직렬화하기 위해 사용하는 시리얼라이저 클래스의 이름 

 

다음 예제 처럼 필수 속성만 지정하고 나머지는 전부 기본 설정값으로 사용하는 방식으로 새로운 프로듀서를 생성하는 방법을 보여줌 

package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;


public class KafkaProducerExample {
    public static void main(String[] args){
        // kafka topic settings
        String bootstrapServers = null;
        String topic = "Hello";

        // producer setting Kafka setting 하는 법 
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", bootstrapServers);

        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        try (Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps)) {
            // 메시지 생성 및 토픽으로 전송
            String message = "Hello, Kafka!";
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
            producer.send(record);

            System.out.println("메시지 전송 성공: " + message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

1. 우선 Properties 객체를 생성함 

2. 메시지의 키값과 밸류값으로 문자열(String type)을 사용할 것이므로, 카프카에서 기본 제공되는 StringSerializer를 사용함 

3. 적잘한 키와 밸류 타입을 설정 하고 Properties 객체를 넘겨줌으로 서 새로운 프로듀서를 생성함 

 

 

프로듀서 객체를 생성했으니 이제 메시지를 전송할 수 있다. 메시지 전송 방법에는 크게 3가지 방법이 있음 

 

파이어 인 포겟 ( Fire and forget )

메시지를 서버에 전송만 하고 성공 혹은 실패 여부에는 신경 쓰지 않는다. 카프카가 가용성을 높고 프로듀서는 자동으로 전송 실패한 메시지를 재전송 시도하기 때문에 대부분의 경우 메시지는 성공적으로 전달된다  

다만, 재시도를할 수 있는 에러가 발생하거나 타임아웃이 발생했을 경우 메시지는 유실되며
application은 여기에 대해 아무런 예외를 전달받지 않게 됨 [메시지 유실 대참사 난다는거임]

동기적 전송 ( Synchronous send )

기술적으로 얘기하면, 카프카 프로듀서는 언제나 비동기적으로 작동함 즉, 메시지를 보내면 send() 메서드는  Future 객체를 리턴함 

하지만 당므 메시지를 전송하기 전 get() 메서드를 호출해서 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인해야함 (안하면 유실될 확률이 높을 수 있음)

 

 

비동기적 전송 ( Asynchronous send )

콜백 함수와 함께 send() method를 호출하면 카프카 브로커로부터 응답을 받는 시점에서 자동으로 콜백 함수가 호출됨 

 

 

프로듀서 객체는 메시지를 전송하려는 다수의 스레드가 동시에 사용할 수 있음 

반응형