본문 바로가기

책/카프카 핵심 가이드

Chapter 3. Kafka Producer [카프카로 메시지 전달하기]

반응형

짹쨱

 

 

3.3 카프카로 메시지 전달하기 

메시지를 전송하는 가장 간단한 방법은 다음과 같다 

try (Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps)) {
    // 메시지 생성 및 토픽으로 전송
    String message = "Hello, Kafka!";
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    producer.send(record);

    System.out.println("메시지 전송 성공: " + message);
} catch (Exception e) {
    e.printStackTrace();
}

1. producer는 ProducerRecord 객체를 받으므로 이 객체를 생성하는 것에부터 시작함. 조금 있다가 살펴볼 것이지만 ProducerRecord 클래스에는 생성자가 여러 개 있다 여기서 우리가 사용한 것은 (언제나 문자열인) 토픽 이름과 (이 경우 문자열인) 키, 밸류값을 사용하는 것이다.

 

키와 밸류 타입은 여기서 이루어지며 타입은 무조건 맞아야하는거 팩트

Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", bootstrapServers);

kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 

 

2. ProducerRecord를 전송하기 위해 프로듀서 객체의 send 메시지를 사용함 저번 포슽팅에서 그림에서 보았듯이 메시지는 버퍼에 저장되었다가 별도 스레드에 브로커로 보내진다 send() 메서드는 RecordMetadata를 포함한 Future 객체를 리턴하지만 

여기서는 리턴값을 무시하기 때문에 메시지 전송의 성공 여부를 알아낼 방법은 없다 
이러한 방법은 메시지가 조용한 누락되어도 상관없는 경우 사용될 수 있지만 
그럴 경우가 있을까? 나는 못봄 ㅋ


3. 마지막은 에러가 발생했을때 예외가 발생할수 있는데 이 에러가 뭔질 알아야 대처를 할 수 있을것이다 

    3.1 예를 들어서 메시지를 직렬화 하는 데 실패할 경우 SerializationException, 버퍼가 가득 찰 경우 TimeoutException 인터럽트가            걸리면 InterruptException이 발생함 

 

 

3.3.1 동기적으로 메시지 전송하기 

동기적으로 메시지를 전송하는 방법은 단순하지만,

 여전히 카프카 브로커가 쓰기 요청(producer request)에 에러 응답을 내놓거나 재전송 횟수가 소진되었을 때 발생되는 여외를 받아서 처리할 수 있다 
 재전송 횟수가 소진되었을 때 발생되는 예외를 받아서 처리할 수 있다
여기서의 주요한 균형점은 성능이다 


카프카 클러스터에 얼마나 작업이 몰리느냐에 따라서 브로터는 쓰기 요청에 응답하기까지 최소 2ms에서 최대 몇 초까지 지연될 수 있음 

동기적으로 메시지를 전송할 경우 전송을 요청하는 스레드는 이 시간 동안 아무것도 안하면서 기다려야한는 것... (성능 낭비 지리구요)

 

다른 메시지를 전송할 수 없는 것은 물론 결과적으로 성능이 크게 낮아지기 때문에 동기적 전송은 실제 어플리케이션에서는 사용되지 않음 

지금까지 본 코드도 동기적 코드임 ㅋ 

try (Producer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
    // 메시지 생성 및 토픽으로 전송
    String message = "Hello, Kafka!";
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    producer.send(record).get();

    System.out.println("메시지 전송 성공: " + message);
} catch (Exception e) {
    e.printStackTrace();
}

 

카프카로부터 응답이 올 때까지 대기하기 위해 Future.get() 메서드를 사용하고 있는데 

이 메서드는 레코드가 카프카로 성공적으로 전송되지 않았을 경우 예외를 발생시킴 에러가 발생하지 않았을 경우,

RecordMetadata 객체를 리턴하는데 여기서 메시지가 쓰여진 오프셋과 다른 메타데이터를 가져올 수 있음 

근데 동기가 필요한가....

 

 

 

KafkaProducer 의 2가지의 에러 

재시도 가능한 에러는 메시지를 다시 전송함으로써 해결될 수 있는 에러를 가리킨다

예를 들면, 연결 에러는 연결이 회복되면 해결될 수 있다. 

 

메시지를 전송받은 브로커가 '해당 파티션의 리더가 아닐경우' 발생하는 에러는 해당 파티션에 새 리더가 선출되고 클라이언트 메티데이터가 업데이트되면 해결될 수 있다 

 

이런 류의 에러가 밸생했을 때 자동으로 재시도하도록 설정할 수 있기 때문에 이 경우 재전송 횟수가 소진되고서도 에러가 해결되지 않는 경우에 한해 재시도 가능한 예외가 발생한다 어떠한 에러는 재시도를 한다고 해서 해결되지 않는데 메시지크기가 너무 클 경우가 한예

이 경우 에러는 없고 바로 예외 발생 시킴 

 

 

3.3.2 비동기적으로 메시지 전송하기 

카프카는 레코드를 쓴 뒤 해당케로드의 토픽, 파이션 그리고 오프셋을 리턴하는데, 대부분의 애플리케이션에는 이런 메타데이터가 필요 없기 때문이다. 반대로, 메시지 전송에 완전히 실패했을 경우에는 그런 내용을 알아야한다 

 

그래야 예외를 발생 시키든지 에러를 로그에 쓰든지,
아니면 사후 분석을 위해 에러 파일에 메시지를 쓰거나 할 수 있기 때문이다 

뭘 알아야 대처를 하지 모르면 대처를 어떻게 ...하지...요..?

모르는데 어케된지 어케알아..

 

 

메시지를 비동기적으로 전송하고도 여전히 에러를 처리하는 경우를 위해 프로듀서는 레코드를 전송할 떄 콜백을 지정할 수 있도록 한다 

private static class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        if (e != null){
            e.printStackTrace();
        }
    }
}

1. Callback 을 사용하려면 org.apahe.kafka.clients.producer.Callback interface를 구현하는 클래스가 필요함 이 인터페이스는 onCompletion() 단 하나의 메서드만 정의되어 있음 

2. 에러를 리턴한다면 onCompletion() 메서드가 null이 아닌 Exception 객체를 받게 됨 여기서는 그냥 내용을 화면에 출력해주는 정도로 처리 했지만, 실제 어플리케이션에서는 좀 더 확실한 에러 처리 함수가 필요할것임 

 

 

콜백은 프로듀셔의 메인 스레드에서 실행됨 만약 두 개 의 메시지를 동일한 파티션에 전송한다면,
콜백 역시 우리가 보낸 순서대로 실행됨, 하지만 이는 뒤집어 생각하면, 전송되어야 할 메시지가 전송이 안되고 
프로듀서가 지연되는 상황을 막기 위해서는 콜백이 충분히 빨라야한다는건데
콜백 안에서 블로킹 작업을 수행하는 것 역시 권장되는건 아님 대신,
블로킹 작업을 동시에 수행하는다른 스레드를 사용해야함 

 

 

 

 

비동기로 보내는 코드를 조잡하게라도 만들어봣다 

package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;


import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;

public class AsyncKafkaProducer {
    private static final Logger logger = Logger.getLogger(AsyncKafkaProducer.class.getName());
    private static final String BOOTSTRAP_SERVER = "kafka1:19092,kafka2:29092,kafka3:39092";
    private static final int PY4J_PORT = 25333;  // Set the port for Py4J to connect

    public static void main(String[] args) {
        // Start Py4J Gateway
        // You can use this instance to send messages from Java
        AsyncKafkaProducer javaKafkaProducer = new AsyncKafkaProducer();

        // Example usage:
        String topic = "your_topic";
        byte[] messageBytes = "your_message".getBytes();

        CompletableFuture<RecordMetadata> result = javaKafkaProducer.sendAsync(topic, messageBytes);
        result.whenComplete((metadata, exception) -> {
            if (exception == null) {
                logMessageDelivery(metadata, messageBytes.length);
            } else {
                logger.warning("Error sending data message " + exception.getMessage());
            }
        });

    }

    public CompletableFuture<RecordMetadata> sendAsync(String topic, byte[] message) {
        try (Producer<String, byte[]> producer = new KafkaProducer<>(configureKafkaProperties())) {
            return sendMessage(producer, topic, message);
        }
    }

    private static Properties configureKafkaProperties(){
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", BOOTSTRAP_SERVER);
        kafkaProps.put("security.protocol", "PLAINTEXT");
        kafkaProps.put("batch.size", 16384);
        kafkaProps.put("max.request.size", 7000);
        kafkaProps.put("enable.idempotence", false);
        kafkaProps.put("acks", "all");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        return kafkaProps;
    }

    // 비동기 처리 custom
    private static CompletableFuture<RecordMetadata> sendMessage(
            Producer<String, byte[]> producer,
            String topic,
            byte[] message
    ){
        CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<>();
        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, message);

        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                completableFuture.complete(metadata);
                logger.info(
                        "message delivered to: " + metadata.topic() +
                                "partition: " + metadata.partition() +
                                "offset: " + metadata.offset());
            } else {
                completableFuture.completeExceptionally(exception);
                logger.warning("Error sending data message " + exception.getMessage());
            }

        });
        return completableFuture;
    }
     
    private static void logMessageDelivery(RecordMetadata metadata, int size) {
        logger.info("Message delivered to: " + metadata.topic() +
                " partition: " + metadata.partition() +
                " offset: " + metadata.offset() +
                " size: " + size);
    }
}

 

python에서 만드는 데이터를 JAVA를 어떻게 보내서 토픽으로 쏴줘야하는지 고민을 좀 해봐야지...

반응형