개요
카프카를 처음 접하면서 가장 먼저 마주친 개념 중 하나가 바로 프로듀서 였다. 카프카는 단순히 데이터를 저장하고 전달하는 분산 메시징 시스템일 뿐이다. 결국 카프카에 데이터를 써주는 누군가가 필요한데, 이 역할 바로 프로듀서가 한다.
이번 글에서는 카프카 프로듀서가 어떤 식으로 메시지를 전송하는지 동작 원리부터 시작해서 순수 Java를 사용한 기본 구현 방식들을, 그리고 성능이나 안정성과 관련된 여러 설정들에 대해 정리해 보았다.
1. Kafka 프로듀서란 무엇인가?
Kafka 프로듀서는 카프카에게 메시지를 보내는 역할을 하는 애플리케이션의 구성 요소이다. 쉽게 말해 카프카는 단순한 메시지 저장소에 가깝다. 저장소에 데이터가 쌓이려면 누군가는 데이터를 보내줘야할텐데, 이 때 데이터를 보내는 역할을 하는게 바로 프로듀서(Producer)이다. 프로듀서가 메시지를 보내면 나중에 컨슈머(Consumer)가 이 메시지를 읽을 수 있다.
내부 동작 흐름

- 메시지를 보내기 위해 ProducerRecord 객체를 생성한다. 레코드가 저장될 토픽과 밸류 지정은 필수이지만, [키]와 [파티션] 지정은 선택사항이다.
- 프로듀서가 send() 메서드를 호출하면, 키와 밸류가 네트워크 상에서 전송될 수 있도록 바이트 배열로 변환된다. 파티션이 지정되지 않았다면, 카프카는 파티셔너를 이용해 파티션을 자동 결정한다. 일반적으로 키 값을 기준으로 해시를 적용해 파티션을 결정한다.
- 파티션이 결정되어 메시지가 전송될 토픽과 파티션이 확정되면 프로듀서는 이 레코드를 같은 토픽 파티션으로 전송될 레코드들을 모은 레코드 배치(record batch)에 추가한다.
- 별도의 스레드가 이 레코드 배치를 적절한 카프카 브로커에게 전송한다.
브로커가 메시지를 받으면 응답을 돌려주게 되어있다. 메시지가 성공적으로 저장되었을 경우 브로커는 토픽, 파티션, 그리고 해당 파티션 안에서의 레코드의 오프셋을 담은 RecordMetadata 객체를 리턴한다. 메시지가 저장에 실패했을 경우에는 에러가 리턴된다. 메시지 전송 실패시에는 프로듀서가 자동으로 재시도를 할 수 있다.
2. 카프카 프로듀서 생성하기
카프카에 메시지를 보내려면, 먼저 몇가지 설정 값을 이용해 프로듀서 객체를 생성해야 한다.
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092, broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serializtion.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serializtion.StringSerializer");
KafkaProducer<String, String> = new KafkaProducer<String, String>(kafkaProps);
1. 우선 Properties 객체를 생성한다.
2. 키와 밸류를 문자열로 전송할 예정이므로, 카프카 기본 제공 직렬화 클래스인 StringSerializer 를 사용한다.
3. 설정값을 넣은 Properties 객체를 전달해 프로듀서 객체를 생성한다.
3개의 필수 속성 이외에 성능, 안정성 등에 영향을 미치는 다른 속성들도 살펴보자.
- 안정성 : acks, retries, max.in.flight.request.per.connection -> 내 메시지가 진짜 잘 갔는가?
- 성능: batch.size, linger.ms, compression.type -> 빠르게 많이 보낼 수 잇는가?
1. 안정성과 관련된 옵션들
- acks (기본값: 1)
: 메시지가 얼마나 많은 브로커에게 도달해야 전송 성공으로 간주할지 설정한다.- acks = 0 : 확인 없이 바로 성공 처리 한다. 속도는 매우 빠르지만 신뢰도가 낮다.
- acks = 1 : 리더 브로커만 응답하면 성공 처리 한다.
- acks = all : 리더 + 팔로워 모두 응답해야 성공 처리한다.
- retries (기본값: 0)
: 메시지 전송 실패 시 재시도 횟수
- 네트워크 오류나 일시적 장애에 유용하다.
- 순서가 바뀔 수도 있으므로, max.in.flight.requests.per.connection 과 같이 조절이 필요하다.
- max.in.flight.request.per.connection
: 동시에 몇개의 메시지를 보낼 수 있을지 설정- 이 값이 클수록 성능은 올라가지만 순서가 바뀔 수 있다.
- retries 가 0보다 크면, 1~5 사이로 조절하는게 안전하다.
실무에서는 보통 acks = all + retries 설정을 같이 써서 안정성을 보장한다.
2. 성능과 관련된 옵션들
- batch.size (기본값: 16KB)
: 한번에 묶어서 보내는 메시지 묶음의 최대 크기 (byte)- batch로 보내면 성능이 향상된다. (네트워크 I/O가 줄어들어서)
- 너무 크기가 크면 메모리 소비가 증가한다.
- linger.ms (기본값: 0)
: batch가 다 안찼어도 일정 시간 기다렸다가 전송한다.- linger.ms = 5 라면, batch가 16KB인데 다 안찼다면 최대 5ms까지 기다렸다가 전송한다.
- 약간의 지연을 허용해서 성능을 최적화 한다.
- compression.type (기본값: none)
: 메시지를 압축해서 전송- none, gzip, snappy, lz4, zstd 중 선택
- 전송량 줄이기에 효과적
- cpu 사용량은 조금 올라갈 수 있다.
3. 카프카로 메시지 전달하기
카프카의 메시지 전송방법에는 크게 3가지가 있다.
- 파이어 앤 포겟 (Fire and Forget) : 메시지를 서버에 전송만 하고 성공 혹은 실패 여부를 신경 쓰지 않는다. 카프카는 가용성이 높고 프로듀서는 자동으로 전송 실패한 메시지를 재전송 시도 하기 떄무넹 대부분의 경우 메시지는 성공적으로 전달된다. 하지만, 재시도를 할 수 없는 에러가 발생하거나 타임아웃이 발생했을 경우 메시지는 유실되며 애플리케이션은 여기에 대해 아무런 정보나 예외를 전달받지 않게 된다.
- 동기적 전송(Synchronous send) : 카프카 프로듀서는 언제나 비동기적으로 작동한다. 메시지를 보내면 send() 메서드는 Future 객체를 리넡ㄴ한다. 하지만 다음 메시지를 전송하기 전 get() 메서드를 호출해서 작업이 완료될 떄 까지 기다렷다가 실제 성공 여부를 확인해야한다.
- 비동기적 전송(Asynchronous send): 콜백 함수와 함께 send() 메서드를 호출하면 카프카 브로커로부터 응답을 받는 시점에서 자동으로 콜백 함수가 호출된다.
- 파이어 앤 포겟
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "Hello Kafka"); // 토픽이름, 키, 밸류
try {
producer.send(record); // 메시지 전송
} catch (Exeption e) {
e.printStackTrace();
}
- ProducerRecord 객체를 생성하고 생성자로 토픽 이름, 키, 밸류 값을 사용하였다. 키와 밸류 타입은 Properties에 지정한 key.selializer, value.selializer와 맞아야한다.
- send() 메서드를 사용하여 전송한다. 메시지는 버퍼에 저장되었다가 별도 스레드에 의해 브로커로 보내진다.
- 카프카 브로커에 메시지를 전송할 때 발생하는 에러 혹은 브로커 자체에서 발생한 에러를 무시하더라도 프로듀서가 카프카로 메시지를 보내기 전 에러가 발생할 경우 예외가 발생할 수 있다.
- 동기적 전송
동기적으로 메시지를 전송할 경우 요청하는 스레드는 이 시간 동안 아무것도 안하면서 기다려야 한다. 다른 메시지도 전송할 수 없다. 결과적으로 성능이 크게 낮아지기 때문에 동기적 전송은 실제로 사용되는 애플리케이션에서는 잘 사용되지 않는다.
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "Hello Sync");
try {
producer.send(record).get(); // 응답이 올 떄까지 기다림
} catch (Exception e) {
e.printStackTrace();
}
응답이 올 때까지 대기하기 위해 Future.get() 메서드를 사용하고 있다.
- 비동기적 전송
애플리케이션과 카프카 클러스터 사이의 네트워크 왕복 시간이 10ms 라고 가정했을 때, 메시지를 보낼 떄 마다 응답을 기다리면 100개의 메시지를 전송하는데 약 1초가 걸린다. 반면 메시지를 전부 전송하고 응답을 기다리지 않는다면 거의 시간이 걸리지 않을 것이다. 실제로 대부분의 경우 굳이 응답이 필요 없다.
카프카는 레코드를 쓴 뒤 해당 레코드의 토픽, 파티션, 오프셋을 리턴하는데 대부분의 애플리케이션에는 이런 메타데이터가 필요 없다.
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exceoption e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "Hello Async");
producer.send(record, new DemoProducerCallback());
- 콜백을 사용하려면 org.apache.kafka.clients.producer.Callack 인터페이스를 구현하는 클래스가 필요하다. 이 인터페이스에는 onCompletion() 단 하나의 메서드만 정의되어 있다.
- 만약 카프카가 에러를 리턴한다면 onCompletion() 메서드가 null이 아닌 Exception 객체를 받게 된다. 여기서는 그냥 내용을 화면에 출력해주는 정도로 처리되어있지만, 실제 애플리케이션에서는 더 확실한 에러 처리 함수가 필요하다.
- 레코드를 전송할 떄 Callback 객체를 함께 매개변수로 전달한다.
결론
카프카 프로듀서를 직접 구현해보면서 느낀건 단순해 보이는 producer.send() 한 줄에도 보이지 않는 수많은 과정들이 있다는 점이엿다. 네트워크 전송을 위해 메시지를 바이트 배열로 바꾸고, 어디로 보낼지 결정하고, 여러 개를 한꺼번에 묶어서 보내고, 중간에 실패하면 다시 재시도 까지 하는 등.. 예전에는 카프카를 그냥 메시지를 보내는 시스템 정도로만 생각했는데, acks, retries, batch.size, liger.ms 같은 설정값을 통해 안정성과 성능을 조절할 수 있는 시스템이라는걸 알게 되었다.
이렇게 하나씩 직접 구현해보면서 카프카의 흐름을 몸에 익히고 까먹지 않기 위해 블로그에 차근차근 기록해나가야겠다.
'Book > 카프카 핵심 가이드' 카테고리의 다른 글
| Kafka 컨슈머 만들기: 동작 흐름과 기본 설정 이해하기 (1) | 2025.04.16 |
|---|---|
| Kafka 프로듀서 더 깊이 보기: 직렬화부터 스로틀링까지 (0) | 2025.04.10 |
| Kafka 설치부터 첫 메시지 전송까지: Producer와 Consumer (0) | 2025.04.03 |
| Kafka 처음 배우기: 카프카란 무엇인가? (0) | 2025.04.02 |