오늘의하루

Spring for Apache Kafka - 맛보기 본문

Spring/Kafka

Spring for Apache Kafka - 맛보기

오늘의하루_master 2024. 5. 17. 16:24
 

GitHub - dukbong/spring-apache-Kafka-practice: Kafka 연습하기

Kafka 연습하기. Contribute to dukbong/spring-apache-Kafka-practice development by creating an account on GitHub.

github.com

Kafka란?

실시간 대용량 메시지 스트리밍 플랫폼으로써 높은 처리량과 낮은 지연 시간을 특징으로 하며, 이벤트 기반 아키텍처를 구현하는데 주로 사용됩니다.

  • 이벤트 기반 아키텍처 : 시스템의 다양한 구성 요소 간의 상호 작용을 이벤트 중심으로 구성하는 패턴입니다.

Kafka 구성 요소

1. 이벤트(Event)

시스템에서 발생하는 중요한 사건을 말하며, 특정 시점에서의 상태 변화를 나타냅니다.

Kafka에서 이벤트는 메시지로 표현되며, 토픽(Topic) 단위로 관리됩니다.

2. 토픽(Topic)

Kafka에서 데이터는 토픽 단위로 구성됩니다.

토픽은 특정 이벤트 카테고리 또는 주제에 해당하는 메시지 스트림을 나타냅니다.

예시로 회원가입, 로그인, 주문 등 각기 다른 토픽을 생성할 수 있습니다.

3. 파티션(Partition)

각 토픽은 하나 이상의 파티션으로 나눠질 수 있습니다.

파티션은 데이터를 분산 저장하고 병렬처리가 가능하도록 도와줍니다.

Kafka는 파티션을 사용하여 데이터를 여러 브로커에 분산 저장하고, 각 파티션은 메시지의 순서가 보장된다는 특징을 가지고 있다.

  • 별도로 파티션 수를 지정하지 않을 경우 기본적으로 토픽당 1개의 파티션만 생성된다.

4. 생산자(Producer)

특정 토픽으로 메시지를 생성하여 Kafka 클러스터로 보내는 역할을 합니다.

각 메시지는 선택적으로 특정 키를 기반으로 특정 파티션으로 전송될 수 있습니다.

  • 파티션이 N개 일때 키를 지정하지 않으면 랜덤으로 파티션을 선택하여 메시지를 배치합니다.
  • 어떤 키값이 몇번 파티션에 선택될지는 MurmurHash2 알고리즘에 의해 결정된다.

5. 소비자(Consumer)

특정 토픽의 메시지를 읽어오는 역할을 합니다.

소비자 그룹을 형성하여 여러 소비자가 병렬로 데이터를 처리할 수 있으며, 각 소비자는 자신이 담당하는 파티션에서 메시지를 소비합니다.

6. 주키퍼(Zookeeper)

Kafka 클러스터의 메타 데이터 관리와 리더 선출, 분산 코디네이션을 담당합니다.

Kafka 설치 및 사용방법

WSL2를 사용하여 docker-compose.yaml로 Zookeeper와 Kafka를 설정 및 실행하였습니다.

version: '3'
services:
  zookeeper:
    image: zookeeper:3.6.3
    ports:
      - "2181:2181"
    environment:
        ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  • image: zookeeper:3.6.3
    • Docker 이미지 및 버전 지정
  • ports: - "2181:2181"
    • Host의 port 2181을 Container port 2181에 매핑합니다.
    • Zookeeper는 기본적으로 2181 port를 사용합니다.
  • environment: ZOOKEEPER_CLIENT_PORT: 2181
    • 환경 변수를 설정합니다.
    • ZOOKEEPER_CLIENT_PORT에 연결 포트를 지정합니다.
  • image: wurstmeister/kafka:2.13-2.7.0
    • Docker 이미지 및 버전 지정
  • ports: - "9092:9092"
    • Host port 9092와 Container port 9092를 매핑합니다.
    • Kafka는 기본적으로 9092 포트를 사용합니다.
  • KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    • Kafka 브로커가 연결할 Zookeeper 주소를 지정합니다.
  • KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://kafka:9093
    • Kafka 브로커가 클라이언트와 다른 브로커에게 광고하는 리스너 주소를 지정합니다.
    • localhost:9092는 외부 클라이언트를 위한 주소이고 kafka:9093은 내부 브로커 간 통신을 위한 주소입니다.
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    • Listener 이름과 보안 프로토콜을 매핑합니다.
    • 지금은 모두 PLAINTEXT(암호화X) 프로토콜을 사용합니다.
  • KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    • 브로커 간 통신에 사용할 Listener 이름을 지정합니다.
  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    • 오프셋 토픽의 복제 팩터를 지정합니다.
    • 현재는 단일 브로커 설정으로 1을 사용합니다.
  • KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093
    • Kafka 브로커가 실제로 바인딩할 리스너 주소를 지정합니다.
    • 0.0.0.0은 모든 네트워크 인터페이스에서 연결을 허용하는 설정입니다.
  • volumes: - /var/run/docker.sock:/var/run/docker.sock
    • Docker 소켓을 컨테이너에 마운트하여 컨테이너가 호스트 Docker 데몬에 접근할 수 있도록 합니다.

소스코드

생산자(Producer) & 소비자(Consumer) 설정 코드 : 

// 생산자 설정
@Configuration
public class KafkaProducerConfig {
	@Bean
	public ProducerFactory<String, String> producerFactory() {
		Map<String, Object> configProps = new HashMap<>();
		configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return new DefaultKafkaProducerFactory<>(configProps);
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}
}

// 소비자 설정
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
	@Bean
	public ConsumerFactory<String, String> consumerFactoryA() {
		Map<String, Object> configProps = new HashMap<>();
		configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "groupA");
		configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return new DefaultKafkaConsumerFactory<>(configProps);
	}
	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryA() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactoryA());
		return factory;
	}
    
	@Bean
	public ConsumerFactory<String, String> consumerFactoryB() {
		Map<String, Object> configProps = new HashMap<>();
		configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "groupB");
		configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return new DefaultKafkaConsumerFactory<>(configProps);
	}
    
	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryB() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactoryA());
		return factory;
	}
}
  • 소비자 그룹을 2개 만든 이유는 추후에 A토픽에 파티션을 2개 만들어서 각 파티션에 소비자를 매칭시킵니다.

Controller & Service 레이어 : 

@RestController
@RequiredArgsConstructor
public class KafkaController {
	private final KafkaTemplate<String, String> kafkaTemplate;
    
	@PostMapping("/send")
	public void send(@RequestParam("message") String message) {
		kafkaTemplate.send("test-topic", "key1", message);
	}
}

@Service
@Slf4j
public class KafkaConsumerService {
	@KafkaListener(topics = "test-topic", groupId = "groupA")
	public void listenerA(String message) {
		log.info("message = {}", message);	
	}
}
  • 여기서 주의 할 점은 Listener의 매개변수의 타입은 소비자 설정에서 지정한 타입을 넣어줘야한다.

특정 토픽에 대한 파티션 개수 설정 : 

// docker-compose 기준
# docker-compose exec kafka bash

// 새로운 토픽 [test-topic] 추가 및 파티션 개수 설정
# kafka-topics.sh --create --topic test-topic --partitions 2 --replication-factor 1 --bootstrap-server localhost:9092

// 기존 test-topic 토픽에 파티션 2개 설정
# kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test-topic --partitions 2

// test-topic 설정 확인
# kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-topic

// topic 삭제 [my-topic-1]
# kafka-topics.sh --delete --topic my-topic-1 --bootstrap-server localhost:9092
  • 파티션 수는 늘리는건 바로 가능하지만 줄이는건 허용되지 않습니다.
  • 해결 방법은 Topic을 삭제 후 다시 만드는 방법이 있습니다.

Controller & Service 레이어 수정 : 

파티션을 2개로 설정했으며 소비자 설정도 2개로 만들었으니 이제 각 파티션과 소비자를 매칭시켜줄 수 있다.

@RestController
@RequiredArgsConstructor
public void KafkaController {
	private final KafkaTemplate<String, String> kafkaTemplate;

	@PostMapping("/send/a")
	public void sendMessageA(@RequestParam("message") String message) {
		KafkaTemplate.send("test-topic", "key1", message);
	}
    
	@PostMappping("/send/b")
	public void sendMessageB(@RequestParam("message") String message) {
		KafkaTemplate.send("test-topic", "key2", message);
	}
}

@Service
@Slf4j
public class KafkaConsumerService {
	@KafkaListener(topicPartitions = @TopicPartition(topic = "test-topic", partitions = {"0"}), groupId = "groupA")
	public void listenerA(String message) {
		log.info("partition 0 => message = {}", message);
	}
    
	@KafkaListener(topicPartitions = @TopicPartition(topic = "test-topic", partitions = {"1"}), groupId = "groupB")
	public void listenerB(String message) {
		log.info("partition 1 => message = {}", message);
	}
}
  • 어느 파티션에 들어갈지는 Key가 결정한다.

재시도 로직 구현 코드 : 

@Configuration
public class RetryConfig {
	@Primary
	@Bean
	public RetryTemplate retryTemplate() {
		RetryTemplate retryTemplate = new RetryTemplate();
        
		ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
		backOffPolicy.setInitialInterval(2000); // 초반 시간 간격은 2초
		backOffPolicy.setMultiplier(2.0); // 간격 증가 배수는 2배
		retryTemplate.setBackOffPolicy(backOffPolicy);
        
		SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
		retryPolicy.setMaxAttempts(3); // 재시도 3회
		retryTemplate.setRetryPolicy(retryPolicy);
        
		return retryTemplate;
	}
}

// 적용하기
@RestController
@RequiredArgsConstructor
public class KafkaController {
	private final KafkaTemplate<String, String> kafkaTemplate;
	private final RetryTemplate retryTemplate;
    
	@PostMapping("/send")
	public void sendMessage(@RequestParam("message") String message) {
		retryTemplate.excute(context -> {
			try{
				kafkaTemplate.send("test-topic", "key1", message);
				return null;
			} catch (Exception e) {
				log.error("재시도");
				throw new RuntimeException("kafka 메시징 처리 중 오류 발생");
			}
		}, context -> {
			log.error("재시도 종료");
			return null;
		});
	}
}
Comments