MSA

[MSA] Spring Boot - Kafka Consumer, Producer Example

차노도리 2023. 5. 10. 00:09

의존성 주입 (build.gradle)

dependencies {
	...
	implementation 'org.springframework.kafka:spring-kafka:2.8.0'
}

 

 

Kafka Consumer 설정 파일

  • ConsumerFactorcy - Topic에 접속에 필요한 정보
  • ConcurrentKafkaListenerContainerFactory - Topic 에 변경사할을 리스닝하는 리스너
package com.gugbab.gugbabservices.messagequeue;

import ...

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    Environment env;

    @Autowired
    public KafkaConsumerConfig(Environment env) {
        this.env = env;
    }


    @Bean
    // Topic에 접속에 필요한 정보
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("gugbab.public.url.local.kafka_server"));
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "serviesConsumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    // Topic Listener
    public ConcurrentKafkaListenerContainerFactory<String, String> KafkaListenerContatinerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }


}

 

Kafka Consumer

  • @KafkaListener - 토픽 리스터
    • topics : 구독할 topic 명
package com.gugbab.gugbabservices.messagequeue;

import ...

@Service
@Slf4j
public class KafkaConsumer {

    UserRepository userRepository;

    @Autowired
    public KafkaConsumer(UserRepository userRepository) {
        this.userRepository = userRepository;
    }
    
    @KafkaListener(topics = "gugbab-services-kafka-users-star-cnt-topic", groupId = "serviesConsumerGroupId")
    public void updateUserStarCnt(String kafkaMessage) {
        
        log.info("kafka Message : " + kafkaMessage);
        ...


    }

}

 

Kafka Producer 설정파일

  • ProducerFactory - Topic에 접속에 필요한 정보
  • KafkaTemplate - 데이터 전송에 필요한 인스턴스
package com.gugbab.gugbabservices.messagequeue;

import ...

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    Environment env;

    public KafkaProducerConfig(Environment env) {
        this.env = env;
    }

    @Bean
    // Topic에 접속에 필요한 정보
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("gugbab.public.url.local.kafka_server"));
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);

    }

    @Bean
    // 데이터 전송 인스턴스
    public KafkaTemplate<String ,String > kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }

}

 

Kafka Producer

package com.gugbab.gugbabvocaserver.massagequeue;

import ...
@Service
@Slf4j
public class KafkaProducer {

    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public StarDto sendUserStarCnt(String topic, StarDto starDto) {
	...

	kafkaTemplate.send(topic, josnInString);

        return starDto;
    }
}

 

Contorller

  • contorller에서 kafkaProucer를 통해서 topic에 보낸다.
import ...
@RestController
@RequestMapping("/star")
@Slf4j
public class StarController {

	...
    private KafkaProducer kafkaProducer;
    
    @Autowired
    public StarController(KafkaProducer kafkaProducer, ...) {
        this.kafkaProducer = kafkaProducer;
        ...
    }
    
    @PostMapping("/add")
    public ResponseEntity<StarAddResponse> createUser(@RequestBody StarAddRequest starAddRequest) {
		
        ...
        
	// send Kafka
        kafkaProducer.sendUserStarCnt("gugbab-services-kafka-users-star-cnt-topic", starDto);
	...

    }
    
    



}