서비스가 서비스를 호출하는 서버-클라이언트간의 네트워크 통신은 결합도가 높은 방식으로 시스템간의 의존성이 높아져 코드가 유연하지 못합니다. 그렇기 때문에 서비스간의 결합도가 낮아야 하는 MSA에서는 데이터 송수신을 비동기로 처리하는 Message Queue를 사용하는게 효율적이고, 대표적인 메시지 브로커로 Apache에서 만든 Kafka가 있습니다.
Kafka
Apache에서 만든 메시지 큐 시스템으로, 대용량의 데이터를 실시간으로 처리하는데 특화되어 있습니다. 메시지를 파일 시스템으로 저장하여 메시지가 유실될 우려가 적으며, Consumer가 Broker에 Pulling 방식으로 메시지를 가져오기 때문에 Consumer가 처리할 수 있는 메시지만 가져올 수 있습니다.
Kafka에 대한 기본적인 개념과 사용법은 아래 글에서 확인 가능합니다.
Kafka 사용해서 주문 이벤트 발행하기
기존에 Display 서비스에서 Product 서비스로 제품 목록을 조회하는 코드만 있었는데, 임의로 주문을 할 수 있는 코드를 작성하겠습니다.
별도로 주문 서비스를 안만들고 기존의 Display와 Product에 주문 API를 추가하도록 하겠습니다.
Product 도메인은 재고 수량을 가지고 있고 주문을 하면 주문 수량이 감소합니다.
public class Product {
private String id;
private String name;
private int price;
private int stock;
// Getter
public void order(int count) {
if (stock - count < 0) {
throw new IllegalStateException("재고가 부족합니다.");
}
stock -= count;
}
}
Kafka를 사용해 Display서비스가 Product서비스로 메시지를 발행하기 위해 의존성을 추가해주도록 합니다.
spring-kafka는 apache-kafka를 wrapping한 라이브러리 입니다.
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
}
앞서 구축한 Kafka의 정보를 application.yml에 작성해줍니다. 저는 임의로 order라는 토픽을 만들어주었습니다.
spring:
kafka:
consumer:
group-id: my-group
bootstrap-servers: localhost:9092
kafka:
topics:
test: order
그 후 카프카의 Producer/Consumer 설정을 해주도록 합니다. 저는 Display에서 이벤트를 발행하는 Producer, Product에서 이벤트를 수신하는 Consumer로 설정을 해주었습니다.
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
이제 메시지를 발행/구독 할 수 있는 Producer/Consumer 컴포넌트를 생성해주도록 하겠습니다. 처음에는 별도의 로직없이 KafkaTemplate을 이용해 메시지를 발행하고, Consumer는 발행된 메시지를 @KafkaListener를 통해 핸들링 하게 됩니다.
@Component
public class Producer {
private final KafkaTemplate<String, String> kafkaTemplate;
public Producer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
@Component
public class Consumer {
private final Logger log = LoggerFactory.getLogger(Consumer.class);
private final ProductService productService;
public Consumer(ProductService productService) {
this.productService = productService;
}
@Value("${kafka.topics.test}")
private String topic;
@KafkaListener(topics = "${kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}")
void listen(String message) {
productService.orderProdcut(message);
}
}
Display 서비스에서 order를 호출하면 해당 제품에 대한 id를 kafka를 통해 메시지를 발행하고, Product 서비스에서는 이 메시지를 수신해 해당하는 제품의 수량을 감소해줍니다.
@RestController
public class DisplayController {
...
@GetMapping("/order/{productId}")
public String orderProduct(@PathVariable String productId) {
return displayService.orderProduct(productId);
}
}
@Service
public class DisplayService {
private final Producer producer;
...
public String orderProduct(String productId) {
producer.sendMessage(topic, productId);
return productId;
}
}
@Service
public class ProductService {
private final ProductRepository productRepository;
...
public Product orderProdcut(String productId) {
var product = productRepository.findById(productId)
.orElseThrow(IllegalAccessError::new);
product.order(1);
return product;
}
}
해당 로직 처리 후 제품 목록을 다시 조회하면 재고 수량이 줄어들어 있는 것을 확인할 수 있습니다.
이걸로 끝일까?
지금까지는 임의로 MSA의 구성요소들을 구현하기 위해 제품 데이터 조회라는 로직만을 작업했었습니다. 하지만 주문 이라는 서비스가 추가되면 어떤 문제가 발생할까요? 보통 주문로직은 아래와 같습니다.
상품 선택 -> 주문 요청 -> 결제 요청 -> 결제 -> 결제 완료 -> 주문 완료
만약 주문을 요청했는데 결제가 실패한다면? PG사의 오류로 결제 완료가 되지 않는다면?
모놀로식 아키텍처에서는 기존의 방법으로 한 트랜잭션으로 처리되기 때문에 문제가 없어보입니다. 그러나 우리는 마이크로서비스아키텍처로 서로 다른 데이터베이스를 가지고 있기 때문에 하나의 트랜잭션으로 묶을 수 없습니다. 그렇기 때문에 분산환경에 맞는 트랜잭션이 필요합니다.
다음 글에서는 Eventual Consistency 모델을 통해 분산 시스템이 최종적으로 일관성을 유지할 수 있도록 해보겠습니다.
'Spring' 카테고리의 다른 글
넌 못지나간다! Test가 실패했으면 배포도 되면 안되지! (0) | 2021.05.06 |
---|---|
뭐? MSA? 그렇다면 서버를 찾아보자! Service Discovery. - 05 (0) | 2021.04.21 |
내 코인이 떡락이라고??! Slack으로 알림 경보를 받아보자! (0) | 2021.04.20 |
뭐? MSA? 그렇다면 고가용성을 보장해보자! Load Balancer. - 04 (0) | 2021.04.19 |
뭐? MSA? 그렇다면 장애 전파를 막아보자! Circuit Breaker. - 03 (0) | 2021.04.16 |