ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka consumer
    Kafka 2018. 11. 28. 22:51

    Version compatibility

    Spring Integration이란? 
    자바 플랫폼을 위한 오픈소스 애플리케이션 프레임워크인 Spring과 EAI(Enterprise Application Integration)을 이용한
    애플리케이션 내부-외부 사이의 메시징을 가능하게 하는 프레임워크이다.

    Spring apache kafka는 `kafka-clients` jar를 기반한다.

    Spring for Apache Kafka Version

    Spring Integration for Apache Kafka Version

    kafka-clients

    2.2.x

    3.1.x

    2.0.0

    2.1.x

    3.0.x

    1.0.x, 1.1.x, 2.0.0

    2.0.x

    3.0.x

    0.11.0.x, 1.0.x

    1.3.x

    2.3.x

    0.11.0.x, 1.0.x

    1.2.x

    2.2.x

    0.10.2.x

    1.1.x

    2.1.x

    0.10.0.x, 0.10.1.x

    1.0.x

    2.0.x

    0.9.x.x

    N/A*

    1.3.x

    0.8.2.2


    broker 기준 버전 정보

    Broker
    kafka-streams
    kafka-clients
    kafka connect
    0.10.0

    0.10.0

    (Versions >= 0.10.2 not supported)

    clients <= 0.10.0 or >= 0.10.2versions <= 0.10.0 or >= 0.10.2
    0.10.1any versionany versionversions <= 0.10.1
    0.10.2any versionany versionany version
    0.11.0any versionany versionany version
    1.0.0any versionany versionany version


    Consumer

    Configuration options (버전마다 Configuration option이 다르기 때문에 반드시 버전 정보를 확인 할것)

    option
    설명
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
    bootstrap.server
    ConsumerConfig.GROUP_ID_CONFIG

    Consumer가 속한 consumer group의 ID. Zookeeper에서는 각 consumer group의 메시지 offset을 관리하는데, 이 때 이 ID가 키로써 사용된다. 따라서 consumer group ID가 같으면 모두 같은 consumer group에 속한 것으로 간주되며 메시지 offset 값 또한 공유된다.

    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
    Consuming이 실패 됬을때 사용되는 offset으로 true을 지정하면 자동으로 offset 위치를 갱신한다.
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG

    초기 offset이 없거나 offset 범위를 벗어난 경우 수행되며 옵션마다 수행되는 내용이다르다.

    • smallest: 자동으로 가장 작은 offset을 사용한다.
    • largest: 자동으로 가장 큰 offset을 사용한다.
    • disable: 정의 되지 않은 offset이 있으면 Exception을 던진다.
    • anything else: throw exception to the consumer.
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG

    key을 어떻게 deserialize할지를 지정한다.

    StringDeserializer.class

    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG

    value을 어떻게 deserialize할지를 지정한다.

    StringDeserializer.class
    JsonDeserializer.class

    ConsumerConfig.MAX_POLL_RECORDS_CONFIG
    기본적으로 일괄 처리하는 레코드의 수는 자동으로 계산되지만 MAX_POLL_RECORDS_CONFIG을 설정하면 레코드 상한선의 수를 지정할 수 있다.


    Batch모드의 Factory주요 설정



    factory.setConsumerFactory(consumerFactory);

    consumer factory 주입

    factory.setConcurrency(numberOfConsumers);

    Consumer의 수, 즉 몇개의 Consumer을 띄울것인가

    factory.getContainerProperties().setPollTimeout(pollingTimeOut);

    메세지 폴링의 timeout

    factory.getContainerProperties()

    .setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

    AckMode란 Consumer configuration에서 ENABLE_AUTO_COMMIT_CONFIG을 false로 설정했다면, 즉 commit을 자동으로 하지 않았을때 어떻게 commit을 할지를 지정한다.

    optiosn

    BATCH : Lisner에게 전달된후 Consumer.commitAsync()을 호출한다.
    COUNT : ackCount가 초과 했을경우 Consumer.commitAsync()을 호출한다.
    COUNT_TIME : ackCount가 기준치를 넘거나, ackTime이 경과했을경우 Consumer.commitAsync()을 호출한다.
    MANUAL : pending중인경우를 제외하고는 COUNT_TIME과 같다.
    MANUAL_IMMEDIATE : 즉각적으로 ack을 날린다.

    factory.setBatchListener(true);

    배치모드 활성화



    dependencies

    compile('org.apache.kafka:kafka-clients:1.1.0')
    compile('org.springframework.kafka:spring-kafka:2.1.7.RELEASE')
    testCompile('org.springframework.kafka:spring-kafka-test:2.1.7.RELEASE')
     


    Consumer sample code

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
        private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);
    
        @Value("${kafka.bootstrapServers}")
        private String bootstrapServers;
    
        @Value("${kafka.enableAutoCommit}")
        private boolean enableAutoCommit;
    
        @Value("${kafka.autoOffsetReset}")
        private String autoOffSetReset;
    
        @Value("${kafka.maxPollRecords}")
        private int maxPollRecords;
    
        @Value("${kafka.numberOfConsumers}")
        private int numberOfConsumers;
    
        @Value("${kafka.pollingTimeOut}")
        private int pollingTimeOut;
    
        @Value("${kafka.some.groupId}")
        private String groupId;
    
        @Value("${kafka.some.topic}")
        private String topic;
    
    
        @Value("${serviceName}")
        private String serviceName;
    
    
        @Bean
        public Map consumerConfigs() {
            Map props = new HashMap<>();
    
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffSetReset);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    
            return props;
        }
    
        @Bean
        public ConsumerFactory consumerFactory(Map consumerConfigs) {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs);
        }
    
        @Bean
        public KafkaListenerContainerFactory> kafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory);
            factory.setConcurrency(numberOfConsumers);
            factory.getContainerProperties().setPollTimeout(pollingTimeOut);
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
            factory.setBatchListener(true);
            factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
    
    
            LOGGER.info("serviceName:{}, kafka config:{}, numberOfConsumers:{}, pollingTimeOut:{}", serviceName, consumerFactory, numberOfConsumers, pollingTimeOut);
            LOGGER.info("[ Config ] kafkaListenerContainerFactory is loaded.");
            return factory;
        }
    
    
        @Bean
        public Listener Listener() {
            switch (serviceName) {
    
                case "something1":
                    System.out.println(String.format("[ Config ] Listener class is %s", SomeListener.class.toString()));
    return new SomeListener();
    .... } LOGGER.error("{} is not defined. please check KafkaConsumerConfig listener", serviceName); return null; } }




    Listener sameple code

    public class SomeListener extends Listener {
     
        @KafkaListener(id = "${kafka.some.groupId}", topics = "${kafka.some.topic}", containerFactory = "kafkaListenerContainerFactory")
        public void receive(List records, Acknowledgment acknowledgment) {
     
            // do something with records
     
     
            // commit
            acknowledgment.acknowledge();
             
        }
    }
    


    댓글

Designed by Tistory.