Kafka Producer란?
프로듀서는 새로운 메시지를 생성해서 카프카 브로커로 보내는 클라이언트 API이다.
프로듀서 기본 설정
- bootstrap.servers: 프로듀서가 사용할 브로커의 host:port 목록
- key.serializer: 레코드의 키 값을 직렬화하기 위해 사용하는 시리얼라이저 클래스의 이름
- value.serialize: 레코드의 밸류값을 직렬화하기 위해 사용하는 시리얼라이저 클래스의 이름
public class Main {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
producer.send(new ProducerRecord<String, String>("topic1", "hello world"));
}
}
프로듀서에서 메시지를 카프카로 보내는 과정
프로듀서 -> 시리얼라이저 -> 파티셔너 -> 카프카 브로커
1. ProducerRecord를 통해 메시지를 만든다. 토픽과 value는 필수값이다. 위에서 선언한 시리얼라이저와 타입이 맞아야 한다.
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
...
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
}
2. producer에서 record를 send 한다. 리턴값은 Future이므로 결과를 받은뒤 무언가를 하고 싶다면 콜백함수를 두번째 인자에 작성해줘야 한다.
try {
producer.send(new ProducerRecord<String, String>("my-topic", "hello!"), (metadata, exception) -> {
if (exception != null) {
System.out.println("metadata = " + metadata);
}
});
} catch (Exception e) {
e.printStackTrace();
}
3. producer의 doSend 코드를 자세히 보자. 트랜잭션이나 예외 처리 부분은 일부 생략하였다.
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
AppendCallbacks<K, V> appendCallbacks = new AppendCallbacks<K, V>(callback, this.interceptors, record);
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
...
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
...
}
int partition = partition(record, serializedKey, serializedValue, cluster);
...
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
...
RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;
if (result.abortForNewBatch) {
int prevPartition = partition;
onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartit
}
result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
}
...
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
this.sender.wakeup();
}
return result.future;
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null) {
TopicPartition tp = appendCallbacks.topicPartition();
RecordMetadata nullMetadata = new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
callback.onCompletion(nullMetadata, e);
}
this.errors.record();
this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);
if (transactionManager != null) {
transactionManager.maybeTransitionToErrorState(e);
}
return new FutureFailure(e);
} ...
코드를 보면 아래와 같은 플로우로 진행된다.
- 브로커에서 메타데이터를 받아온다.
- key, value를 직렬화 한다.
- 파티션을 지정해준다.(디폴트는 빌트인 파티셔너)
- 직렬화된 레코드를 레코드 배치에 추가한다.
linger.ms
나batch.size
조건을 만족하면 서버로 레코드를 전송한다.
linger.ms, batch.size
프로듀서는 설정값에 따라 레코드를 하나씩 전송하는것이 아니라 배치로 모아서 전송할 수 있다. 그에 관련한 설정으론 linger.ms와 batch.size가 있다.
linger.ms
: 설정된 시간이 지나면 전송한다. 그동안 레코드는 레코드 배치에 쌓인다.batch.size
: 설정된 크기 만큼 레코드가 쌓이면 전송한다.
브로커에서 성공적으로 레코드를 저장한 경우
브로커가 메시지를 받아 성공적으로 저장했을 경우 브로커는 토픽, 파티션, 해당 파티션 안에서의 레코드 오프셋을 담은 RecordMetadata 객체를 리턴한다. 메시지가 저장에 실패했을 경우에는 에러가 리턴된다.
전송에 실패했을 경우
프로듀서가 서버로부터 에러 메시지를 받았을 때, 일시적인 에러일 수도 있으므로 retries, retry.backoff.ms를 설정해 재시도를 할 수 있다. 기본값으로 retries는 최신 버전의 경우 Integer.MAX
, retry.backoff.ms
는 100ms이다.
참고로, 여기서 retries를 0으로 설정한 경우, at-most-once가 된다(최대 한 번 메시지 전달 보장)
카프카는 acks=all
설정으로 모든 레플리카에서 응답을 받은걸 확인하면서 at-least-once(최소 한 번 메시지 전달)을 보장하는데, 이에 대한건 다음 포스트에서 다룰 예정이다.
참고
https://m.yes24.com/Goods/Detail/118397432
카프카 핵심 가이드 - 예스24
카프카를 개발한 컨플루언트와 링크드인의 엔지니어들이 직접 저술한카프카 환경 구축과 운영에 대한 핵심 실무서애플리케이션 아키텍트, 개발자에서부터 카프카 스트리밍 플랫폼이 처음인
m.yes24.com
'기타' 카테고리의 다른 글
GraphQL DataLoader로 N + 1문제 해결하기 (0) | 2024.07.28 |
---|---|
뮤텍스와 세마포어 (0) | 2024.06.24 |
Redis maxmemory와 eviction정책 (1) | 2024.06.19 |
Lambda@Edge + Typscript로 이미지 리사이징 적용하기 (2) | 2024.06.05 |
계층형 모델 테이블 설계(인접 모델, MPTT) (0) | 2024.04.27 |