의존성 주입 (build.gradle)
dependencies {
...
implementation 'org.springframework.kafka:spring-kafka:2.8.0'
}
Kafka Consumer 설정 파일
- ConsumerFactorcy - Topic에 접속에 필요한 정보
- ConcurrentKafkaListenerContainerFactory - Topic 에 변경사할을 리스닝하는 리스너
package com.gugbab.gugbabservices.messagequeue;
import ...
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
Environment env;
@Autowired
public KafkaConsumerConfig(Environment env) {
this.env = env;
}
@Bean
// Topic에 접속에 필요한 정보
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("gugbab.public.url.local.kafka_server"));
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "serviesConsumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
// Topic Listener
public ConcurrentKafkaListenerContainerFactory<String, String> KafkaListenerContatinerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
Kafka Consumer
- @KafkaListener - 토픽 리스터
- topics : 구독할 topic 명
package com.gugbab.gugbabservices.messagequeue;
import ...
@Service
@Slf4j
public class KafkaConsumer {
UserRepository userRepository;
@Autowired
public KafkaConsumer(UserRepository userRepository) {
this.userRepository = userRepository;
}
@KafkaListener(topics = "gugbab-services-kafka-users-star-cnt-topic", groupId = "serviesConsumerGroupId")
public void updateUserStarCnt(String kafkaMessage) {
log.info("kafka Message : " + kafkaMessage);
...
}
}
Kafka Producer 설정파일
- ProducerFactory - Topic에 접속에 필요한 정보
- KafkaTemplate - 데이터 전송에 필요한 인스턴스
package com.gugbab.gugbabservices.messagequeue;
import ...
@EnableKafka
@Configuration
public class KafkaProducerConfig {
Environment env;
public KafkaProducerConfig(Environment env) {
this.env = env;
}
@Bean
// Topic에 접속에 필요한 정보
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("gugbab.public.url.local.kafka_server"));
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
// 데이터 전송 인스턴스
public KafkaTemplate<String ,String > kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
Kafka Producer
package com.gugbab.gugbabvocaserver.massagequeue;
import ...
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public StarDto sendUserStarCnt(String topic, StarDto starDto) {
...
kafkaTemplate.send(topic, josnInString);
return starDto;
}
}
Contorller
- contorller에서 kafkaProucer를 통해서 topic에 보낸다.
import ...
@RestController
@RequestMapping("/star")
@Slf4j
public class StarController {
...
private KafkaProducer kafkaProducer;
@Autowired
public StarController(KafkaProducer kafkaProducer, ...) {
this.kafkaProducer = kafkaProducer;
...
}
@PostMapping("/add")
public ResponseEntity<StarAddResponse> createUser(@RequestBody StarAddRequest starAddRequest) {
...
// send Kafka
kafkaProducer.sendUserStarCnt("gugbab-services-kafka-users-star-cnt-topic", starDto);
...
}
}
'MSA' 카테고리의 다른 글
[MSA] Spring Boot - Zipkin 이란? Spring Cloud Sleuth Example (0) | 2023.05.14 |
---|---|
[MSA] Spring Boot - CricuitBreaker란? Resilience4J example (0) | 2023.05.13 |
[MSA]Kafka Connect - Connect Sink Example (0) | 2023.05.09 |
[MSA]Kafka Connect - Connect Source Example (0) | 2023.05.08 |
[MSA]Kafka Connect Start Jdbc Connect Example (0) | 2023.05.06 |