-
Kafka consumerKafka 2018. 11. 28. 22:51
Version compatibility
Spring Integration이란?자바 플랫폼을 위한 오픈소스 애플리케이션 프레임워크인 Spring과 EAI(Enterprise Application Integration)을 이용한애플리케이션 내부-외부 사이의 메시징을 가능하게 하는 프레임워크이다.Spring apache kafka는 `kafka-clients` jar를 기반한다.Spring for Apache Kafka Version
Spring Integration for Apache Kafka Version
kafka-clients
2.2.x
3.1.x
2.0.0
2.1.x
3.0.x
1.0.x, 1.1.x, 2.0.0
2.0.x
3.0.x
0.11.0.x, 1.0.x
1.3.x
2.3.x
0.11.0.x, 1.0.x
1.2.x
2.2.x
0.10.2.x
1.1.x
2.1.x
0.10.0.x, 0.10.1.x
1.0.x
2.0.x
0.9.x.x
N/A*
1.3.x
0.8.2.2
broker 기준 버전 정보
Brokerkafka-streamskafka-clientskafka connect0.10.0 0.10.0
(Versions >= 0.10.2 not supported)
clients <= 0.10.0 or >= 0.10.2 versions <= 0.10.0 or >= 0.10.2 0.10.1 any version any version versions <= 0.10.1 0.10.2 any version any version any version 0.11.0 any version any version any version 1.0.0 any version any version any version Consumer
Configuration options (버전마다 Configuration option이 다르기 때문에 반드시 버전 정보를 확인 할것)
option설명ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
bootstrap.server ConsumerConfig.GROUP_ID_CONFIG
Consumer가 속한 consumer group의 ID. Zookeeper에서는 각 consumer group의 메시지 offset을 관리하는데, 이 때 이 ID가 키로써 사용된다. 따라서 consumer group ID가 같으면 모두 같은 consumer group에 속한 것으로 간주되며 메시지 offset 값 또한 공유된다.
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
Consuming이 실패 됬을때 사용되는 offset으로 true을 지정하면 자동으로 offset 위치를 갱신한다. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
초기 offset이 없거나 offset 범위를 벗어난 경우 수행되며 옵션마다 수행되는 내용이다르다.
- smallest: 자동으로 가장 작은 offset을 사용한다.
- largest: 자동으로 가장 큰 offset을 사용한다.
- disable: 정의 되지 않은 offset이 있으면 Exception을 던진다.
- anything else: throw exception to the consumer.
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
key을 어떻게 deserialize할지를 지정한다.
StringDeserializer.class
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
value을 어떻게 deserialize할지를 지정한다.
StringDeserializer.class
JsonDeserializer.classConsumerConfig.MAX_POLL_RECORDS_CONFIG
기본적으로 일괄 처리하는 레코드의 수는 자동으로 계산되지만 MAX_POLL_RECORDS_CONFIG을 설정하면 레코드 상한선의 수를 지정할 수 있다. Batch모드의 Factory주요 설정
factory.setConsumerFactory(consumerFactory);
consumer factory 주입 factory.setConcurrency(numberOfConsumers);
Consumer의 수, 즉 몇개의 Consumer을 띄울것인가 factory.getContainerProperties().setPollTimeout(pollingTimeOut);
메세지 폴링의 timeout factory.getContainerProperties()
.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
AckMode란 Consumer configuration에서 ENABLE_AUTO_COMMIT_CONFIG을 false로 설정했다면, 즉 commit을 자동으로 하지 않았을때 어떻게 commit을 할지를 지정한다.
optiosn
BATCH : Lisner에게 전달된후 Consumer.commitAsync()을 호출한다.
COUNT : ackCount가 초과 했을경우 Consumer.commitAsync()을 호출한다.
COUNT_TIME : ackCount가 기준치를 넘거나, ackTime이 경과했을경우 Consumer.commitAsync()을 호출한다.
MANUAL : pending중인경우를 제외하고는 COUNT_TIME과 같다.
MANUAL_IMMEDIATE : 즉각적으로 ack을 날린다.factory.setBatchListener(true);
배치모드 활성화 dependencies
compile('org.apache.kafka:kafka-clients:1.1.0') compile('org.springframework.kafka:spring-kafka:2.1.7.RELEASE') testCompile('org.springframework.kafka:spring-kafka-test:2.1.7.RELEASE')
Consumer sample code
@Configuration @EnableKafka public class KafkaConsumerConfig { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class); @Value("${kafka.bootstrapServers}") private String bootstrapServers; @Value("${kafka.enableAutoCommit}") private boolean enableAutoCommit; @Value("${kafka.autoOffsetReset}") private String autoOffSetReset; @Value("${kafka.maxPollRecords}") private int maxPollRecords; @Value("${kafka.numberOfConsumers}") private int numberOfConsumers; @Value("${kafka.pollingTimeOut}") private int pollingTimeOut; @Value("${kafka.some.groupId}") private String groupId; @Value("${kafka.some.topic}") private String topic; @Value("${serviceName}") private String serviceName; @Bean public Map
consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffSetReset); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); return props; } @Bean public ConsumerFactory consumerFactory(Map consumerConfigs) { return new DefaultKafkaConsumerFactory<>(consumerConfigs); } @Bean public KafkaListenerContainerFactory > kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(numberOfConsumers); factory.getContainerProperties().setPollTimeout(pollingTimeOut); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); factory.setBatchListener(true); factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler()); LOGGER.info("serviceName:{}, kafka config:{}, numberOfConsumers:{}, pollingTimeOut:{}", serviceName, consumerFactory, numberOfConsumers, pollingTimeOut); LOGGER.info("[ Config ] kafkaListenerContainerFactory is loaded."); return factory; } @Bean public Listener Listener() { switch (serviceName) { case "something1": System.out.println(String.format("[ Config ] Listener class is %s", SomeListener.class.toString()));
return new SomeListener();
.... } LOGGER.error("{} is not defined. please check KafkaConsumerConfig listener", serviceName); return null; } }Listener sameple code
public class SomeListener extends Listener { @KafkaListener(id = "${kafka.some.groupId}", topics = "${kafka.some.topic}", containerFactory = "kafkaListenerContainerFactory") public void receive(List
records, Acknowledgment acknowledgment) { // do something with records // commit acknowledgment.acknowledge(); } }