개요
이전에는 Kafka 프로듀서가 메시지를 어떻게 Kafka 브로커에게 전송하는지 전반적인 흐름을 살펴 보았다. 단순해 보이는 producer.send() 한 줄에도 직렬화, 파티션 분배, 배치, 전송, 응답 처리, 재시도 등 다양한 과정이 숨어 있었다.
이번에는 이 흐름을 조금 더 깊이 파고들어, 메시지가 전송되기 전과 후에 어떤 기술적인 처리들이 일어나는지 정리해보려한다.
시리얼라이저부터 시작해, Avro 레코드, 파티셔너, 헤더, 인터셉터 그리고 쿼터 및 스로틀링 까지 카프카 프로듀서를 구성하는 핵심 개념들에 대해 학습하고 기록해보자.
1. Kafka에서의 시리얼라이저 (Serializer)
카프카에서는 메시지를 네트워크로 전송하기 전에 반드시 바이트 배열(byte[])로 바꿔야한다. 왜냐하면 네트워크는 오직 0과1로 이루어진 데이터를 전송할 수 있기 때문이다. 그래서 자바의 String, Integer 같은 객체는 카프카에 그대로 보낼 수 없다. 카프카 프로듀서는 이 객체들을 바이트 배열로 바꿔주는 시리얼라이저를 이용해서 데이터를 전송할 수 있는 형태로 바꿔준다.
카프카는 자주 쓰는 타입들에 대해서 기본 시리얼 라이저를 제공해준다.
- String 타입 : org.apache.kafka.common.serialization.StringSerializer
- Integer 타입 : org.apache.kafka.common.serialization.IntegerSerializer
- Long 타입 : org.apache.kafka.common.serialization.LongSerializer
예를 들어 String 타입의 메시지를 보내려면 이렇게 설정한다.
Properties props = new Properties();
props.put("bootstap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
하지만 이런 기본적인 시리얼라이저로 모든 데이터를 직렬화 할 수는 없다. 결국에는 더 일반적인 레코드를 직렬화할 수 있어야한다.
시리얼라이저를 직접 만들 수도 있지만, 에이브로, 스리프트, 프로토버프와 같은 범용 직렬화 라이브러리를 사용하기를 강력히 권장한다.
2. Apache Avro와 Kafka에서 Avro 레코드 사용하기
아파치 에이브로는 언어 중립적인 데이터 직렬화 형식이다. 에이브로 데이터는 언어에 독립적인 스키마의 형태로 기술된다.
장점
- JSON보다 훨씬 작기 때문에 성능과 저장공간 측면에서 유리하다.
- 언어에 독립적인 스키마 형태로 기술되는데, 이 스키마는 보통 JSON 형식으로 정의된다.
- 새로운 필드 추가, 제거 등의 구조적 변화에도 유연하게 대응이 가능하다.
- 자바 코드 -> Avro, Avro -> 자바 코드 자동 생성이 가능하다.
특히 에이브로가 카프카와 같은 메시지 전달 시스템을 사용하는데 적합한 이유는 메시지를 쓰는 애플리케이션(프로듀서)이 새로운 스키마로 전환하더라도 기존 스키마와 호환성을 유지하는 한, 데이터를 읽는 애플리케이션(컨슈머)은 일체의 변경이나 업데이트 없이 계속해서 메시지를 처리할 수 있다는 것이다.
Avro 구조
{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "faxNumber", "type": "string"}
// {"name": "email", "type": "string"} -> 정책에 따라 추후에 변경됨
]
}
이걸 기반으로 에이브로는 자동으로 자바 클래스를 만들어준다. 이렇게 생성된 클래스는 카프카에서 에이브로 메시지를 전송할 때 사용할 수 있다.
회사 정책이 바뀌어서 faxNumber는 필요없고, 대신 email을 쓰기로 했다. 그래서 새 스키마를 email로 변경하였다. 어 그런데 예전 메시지는 faxNumber를 가지고 있고, 새로운 메시지는 email을 가지고 있는데, 그럼 예전 메시지를 읽던 Consumer에서 에러가 나지않을까?\
결론을 먼저 말하자면, 컨슈머가 예전 스키마(faxNumber)를 알고 있는 상태에서, 새로운 메시지(email)이 와도 문제가 없다. 왜냐하면 에이브로는 필드가 빠져도, 디폴트 값으로 처리하거나 무시할 수 있도록 설계되어있다. 즉, 프로듀서가 스키마를 바꿔도, 컨슈머가 그걸 몰라도 메시지를 문제 없이 처리할 수 있다.
카프카에 메시지를 보낼 떄, 프로듀서는 항상 "이 메시지는 어떤 스키마로 만들어졌는지" 정보를 같이 붙여서 보낸다. 그리고 그 스키마는 Schema Registy에 등록 되어 있다. 예전 메시지는 faxNumber 버전 스키마 ID와 함께 전송되고, 새로운 메시지는 email 버전 스키마ID와 함께 전송된다.
Consumer는 메시지를 받을 때, 메시지가 어떤 스키마였는지에 대해 메시지에 붙은 ID로 Schema Registry에서 해당 스키마를 가져온다. 그리고 그 스키마에 맞게 메시지를 해석(역직렬화)한다. 중요한점은 스키마를 레지스트리에 저장하고 필요할 떄 가져오는 이 모든 작업이 주어진 객체를 직렬화하는 시리얼라이저와 직렬화된 데이터를 객체로 복원하는 디시리얼라이저 내부에서 수행된다는 점이다.

Producer는 메시지를 에이브로 포맷으로 만든다. 에이브로는 현재 사용하는 스키마의 ID를 붙여서 메시지를 만든다. 그리고 메시지를 전송한다. 카프카 브로커는 메시지를 그저 저장하고 전달할 뿐 스키마에는 관심이 없다. Consumer는 메시지를 받고 스키마ID를 확인한다. 해당 ID를 이용해 스키마 레지스트리에서 스키마 정의를 가져온다. 그리고 디시리얼라이저가 해당 스키마대로 데이터를 읽는다.
즉, Kafka, Avro, 스키마 레지스트리를 쓰면 스키마가 변경되어도 데이터 호환성을 유지할 수 있다. 프로듀서가 새로운 필드를 추가하거나 제거해도 컨슈머는 스키마 ID만 보고 메시지를 정확하게 읽을 수 있다.
3. 파티션
ProducerRecord 객체는 카프카에서 메시지를 전송할 때 사용하는 객체로 "토픽, 키, 밸류" 정보를 포함한다. 카프카 메시지는 기본적으로 Key-Value 구조를 가지는데, 키 값이 null인 상태에서도 토픽과 밸류 만으로 메시지를 생성하는건 가능하다.
하지만 실무에서는 대부분 Key값을 지정해서 메시지를 전송하는 경우가 많은데 그 이유는 키가 다음 두 가지 중요한 역할을 하기 때문이다.
- 메시지와 함께 저장되는 추가적인 정보로 활용될 수 있다.
- 메시지가 저장될 파티션을 결정 짓는 기준이 된다.
카프카의 기본 파티셔너(Default Partitioner)를 사용할 때,
- 키 값이 null 이라면, 카프카는 해당 메시지를 토픽에 속한 사용 가능한 파티션 중 하나를 무작위로 선택하여 저장한다. 이때 파티션 간 메시지 수의 균형을 맞추기 위해, 내부적으로 라운드 로빈 알고리즘이 사용된다.
- 반대로 키 값이 지정된 경우, 카프카는 이 키 값을 해시(hash)한 경과를 이용해 어떤 파티션에 메시지를 저장할지 결정한다.
즉, 같은 키를 가진 메시지는 항상 같은 파티션에 저장된다.
ProducerRecord<String, STring> record1 =
new ProducerRecord<>("my-topic", "user1", "첫 번째 메시지");
ProducerRecord<String, String> record2 =
new ProducerRecord<>("my-topic", null, "키가 없는 메시지");
record1의 경우 user1이라는 키가 있으니 같은 파티션에 계속 들어간다.
record2의 경우 키가 없으니 카프카가 내부적으로 분산시켜 저장한다. (라운드 로빈으로)
Paritioner 인터페이스를 구현하여 커스텀 파티셔너(Custom Partitioner)를 구성할 수도 있다.
// VIP는 0번 파티션에, 일반 고객은 1번 파티션으로 보낸다
public class VipPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, ...) {
if (key.toString().startWith("VIP")) {
return 0;
} else {
return 1;
}
}
}
4. 헤더
카프카 메시지(record)는 키 값, 밸류 값 이외에도 헤더를 포함할 수 있다. 레코드 헤더는 카프카 레코드의 키,밸류 값을 건드리지 않고 추가 메타데이터를 심을 때 사용한다. 헤더의 주된 용도 중 하나는 메시지의 전달 내역을 기록하는 것이다. 데이터가 생성된 곳의 정보를 헤더에 저장해두면, 메시지를 파싱할 필요없이 헤더에 심어진 정보만으로 메시지를 라우팅하거나 출처를 추적할 수 있다.
카프카 헤더는 key-value 형태로 구성되고, key 값은 언제나 String 타입이여야하지만 밸류 값은 아무 직렬화된 객체면 상관없다.(byte 배열)
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", null, "hello kafka")
record.headers().add("traceId", "123456".getBytes());
record.headers().add("type", "log".getBytes());
5. 인터셉터
인터셉터는 메시지를 전송하기 전/후에 동작하는 훅(hook) 같은 개념이다. 예를 들어 회사 내에서 사용하는 모든 애플리케이션에 동일한 작동을 집어넣는다거나 아니면 원래 코드를 사용할 수 없는 상황 등에 사용된다.
Producer를 생성할 때, intercepter.classes 속성에 클래스명을 추가해야 인터셉터가 동작한다.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serializer.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serializer.StringSerializer");
// 인터셉터 등록
props.put("interceptor.classes", "com.example.kafka.interceptor.CustomProducerinterceptor");
Producer<String, String> producer = new KafkaProducer<>(props);
public clas CustomerProducerInterceptor implements ProducerInterceptor<String, String> {
// 메시지를 보내기 전에 실행된다.
@Override
public ProduceRecord<String, String> onSend()(ProducerRecord<String, String> record) {
String modifiedValue = record.value() + "[수정됨]";
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(),
record.key(), modifiedValue, record.header());
}
// 메시지를 보낸 후 실행된다.
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("메시지가 전송에 성공: " + metadata.topic());
} else {
System.out.println("메시지 전송에 실패: " + exception.getMessage());
}
}
// 인터셉터의 내부 상태를 정리하는 용도로 사용된다.
// 만약 파일을 열거나 원격저장소에 연결을 생성했다거나 할 경우 여기에서 닫아줘야 리소스 유실이 발생하지 않는다.
@Override
public void close() {}
// 설정정보를 받아올 수 있다.
@Override
public void configure(Map<String, ?> configs) {}
}
- onSend() : 메시지를 보내기 전에 실행된다. 실습에서는 원래 보내려던 record 메시지에 "[수정됨]" 이라는 문구를 붙여서 새로운 메시지를 만들어서 보내고 있다.
- onAcknowledgement() : 메시지를 전송 후 결과를 처리한다. 실습에서는 메시지를 보낸 후 성공,실패 여부를 출력하고 있다.
- close() : 인터셉터를 더 이상 안쓸때, 리소스를 정리할 수 있는 메서드이다.
- configure() : 외부에서 설정 값을 받아 올 수 있다.
즉, 카프카 메시지를 보내기 전(onSend)에 편집을 하거나, 보낸 뒤(onAcknowledgement)에 결과를 로깅하고 싶거나 할 때 ProducerInterceptor를 사용하면 된다.
6. 쿼터와 스로틀링
카프카는 여러 사용자, 앱, 서비스들이 동시에 메시지를 보내고 받는 시스템이다. 그런데 어떤 한 서비스에서 엄청나게 많은 메시지를 보내면 다른 서비스의 성능이 저하되거나, 더 심할 경우 브로커가 과부하 걸려서 전체적인 장애로 이어질 수도 있다. 이런 상황을 바지하기 위해 카프카는 쓰기/읽기 속도를 제한할 수 있는 장치로 쿼터(Quota)와 스로틀링(Throttling)을 사용한다.
쿼터(Quota)
카프카에는 3가지의 쿼터 타입에 대해 한도(쿼터)를 설정할 수 있다.
- 쓰기 쿼터 (Producer Quota) : 프로듀서가 초당 얼마나 많은 데이터를 보낼 수 있는지 제한
- 읽기 쿼터 (Consumer Quota) : 컨슈머가 초당 얼마나 많은 데이터를 받을 수 있는지 제한
- 요청 쿼터 (Request Quota) : 브로커가 하나의 클라이언트에게 처리할 수 있는 전체 처리량을 제한
쓰기 쿼터와 읽기 쿼터는 메시지를 얼마나 빠르게 보내거나 받을 수 있는지를 초당 바이트 수 단위로 제한한다.
요청 쿼터의 경우 브로커가 한 클라이언트에게 허용하는 전체 처리량을 의미한다.
스로틀링(Throttling)
클라이언트가 설정된 쿼터를 초과해서 데이터를 보내거나 받으려고 하면, 카프카는 해당 요청을 지연(스로틀링) 시킨다. 즉, 브로커가 일부러 응답을 늦게 보내면서 클라이언트 속도를 늦추는 방식이다. 카프카 클라이언트는 기본적으로 이 스로틀링 정보를 받아서 정해진 시간만큼 대기 후 자동으로 재요청 하게 된다.
이 두 기능을 통해 카프카는 특정 클라이언트의 과도한 사용을 제어하고 전체 시스템의 안정성과 공정한 자원 분배를 유지한다.
결론
이번 학습에서는 메시지를 전송하기 전 자바 객체를 바이트 배열로 변환하는 시리얼라이저의 역할을 이해하고, Avro 레코드를 카프카에서 사용하는 이유와 활용 방법도 살펴보았다.
또한, 메시지를 어느 파티션에 보낼지 결정하는 파티셔너, 메시지에 부가 정보를 담을 수 있는 헤더, 메시지 전송 전/후 특정 작업을 할 수 있는 인터셉터, 시스템 과부하를 방지하기 위한 쿼터와 스로틀링 까지 카프카 프로듀서의 주요 개념들을 정리해보았다.Kafka 프로듀서 핵심 기능 정리: 직렬화부터 스로틀링까지
'Book > 카프카 핵심 가이드' 카테고리의 다른 글
| Kafka 컨슈머 만들기: 동작 흐름과 기본 설정 이해하기 (1) | 2025.04.16 |
|---|---|
| Kafka 프로듀서 만들기: 개념, 설정, 전송 방식 정리 (0) | 2025.04.07 |
| Kafka 설치부터 첫 메시지 전송까지: Producer와 Consumer (0) | 2025.04.03 |
| Kafka 처음 배우기: 카프카란 무엇인가? (0) | 2025.04.02 |