Web/Spring

[λ™μ‹œμ„±] Kafka Concurrency Control(feat. Redis)

Aaron 2024. 8. 20. 22:16
λ°˜μ‘ν˜•

🎯 Kafka Concurrency Control

Race Condition

κ²½μŸμƒνƒœλŠ” 두 개 μ΄μƒμ˜ μŠ€λ ˆλ“œκ°€ 곡유 데이터에 μ•‘μ„ΈμŠ€ ν•  수 있고, λ™μ‹œμ— 변경을 ν•˜λ €κ³  ν•  λ•Œ λ°œμƒν•˜λŠ” 문제

  • Race Condition 으둜 λ™μ‹œμ— λ“€μ–΄μ˜€λŠ” μš”μ²­λ“€μ΄ κ°±μ‹  μ „ 값을 읽고, μˆ˜μ •ν•˜λ©΄μ„œ
  • μ‹€μ œ 갱신이 λˆ„λ½λ˜λŠ” ν˜„μƒμ΄ λ°œμƒ

Series

Java Concurrency Control

Database Concurrency Control

Redis Concurrency Control

Kafka Concurrency Control

Compare Concurrency Control

🏹 Kafka

λΆ„μ‚° 슀트리밍 ν”Œλž«νΌμœΌλ‘œ, 주둜 λŒ€κ·œλͺ¨ μ‹€μ‹œκ°„ 데이터 μ²˜λ¦¬μ™€ λ©”μ‹œμ§•μ— μ‚¬μš©

리더 μ„ μΆœ(Leader Election) λ˜λŠ” ν˜‘μ‘°μ  μ†ŒλΉ„μž νŒ¨ν„΄(Cooperative Consumer Pattern) 을 ν™œμš©ν•˜μ—¬ νŠΉμ • λ¦¬μ†ŒμŠ€μ— λŒ€ν•œ 접근을 쑰율


.

λ™μž‘ 방식.

  • 리더 μ„ μΆœμ„ ν†΅ν•œ 락
    • Kafka의 νŒŒν‹°μ…˜κ³Ό μ˜€ν”„μ…‹μ„ μ΄μš©ν•˜μ—¬, νŠΉμ • μž‘μ—…μ„ μˆ˜ν–‰ν•  리더 μ„ μΆœ
      • ν•˜λ‚˜μ˜ Kafka νŒŒν‹°μ…˜μ—μ„œ λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•˜λŠ” μ—¬λŸ¬ 컨슈머 쀑 ν•˜λ‚˜κ°€ 리더가 λ˜μ–΄ νŠΉμ • λ¦¬μ†ŒμŠ€μ— λŒ€ν•œ μž‘μ—…μ„ μˆ˜ν–‰ β†’ 이 리더 역할이 μΌμ’…μ˜ 락 μ—­ν• 
      • λ§Œμ•½ 리더가 μ‹€νŒ¨ν•˜κ±°λ‚˜ νŠΉμ • μ‹œκ°„ λ™μ•ˆ μ‘λ‹΅ν•˜μ§€ μ•ŠμœΌλ©΄, λ‹€λ₯Έ μ»¨μŠˆλ¨Έκ°€ μƒˆλ‘œμš΄ λ¦¬λ”λ‘œ μ„ μΆœλ˜μ–΄ μž‘μ—…μ„ μ΄μ–΄λ°›μŒ
  • ν˜‘μ‘°μ  μ†ŒλΉ„μž νŒ¨ν„΄:
    • μ—¬λŸ¬ μ»¨μŠˆλ¨Έκ°€ λ™μΌν•œ Kafka ν† ν”½μ—μ„œ λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•  λ•Œ, μ„œλ‘œ ν˜‘λ ₯ν•˜μ—¬ νŠΉμ • λ¦¬μ†ŒμŠ€μ— λŒ€ν•œ 접근을 쑰율
    • νŠΉμ • λ¦¬μ†ŒμŠ€λ₯Ό λ™μ‹œμ— μ ‘κ·Όν•˜μ§€ μ•Šλ„λ‘ ν˜‘μ˜ν•˜λŠ” 과정이 ν•„μš” β†’ 락과 μœ μ‚¬ν•œ κΈ°λŠ₯을 κ΅¬ν˜„ κ°€λŠ₯
  • λΆ„μ‚° νŠΈλžœμž­μ…˜ 관리:
    • Kafka νŠΈλžœμž­μ…˜ κΈ°λŠ₯을 ν™œμš©ν•˜μ—¬, λ©”μ‹œμ§€λ₯Ό μ²˜λ¦¬ν•˜λ©΄μ„œ 락을 μ„€μ •ν•˜κ³  ν•΄μ œ
    • λ©”μ‹œμ§€λ₯Ό μ²˜λ¦¬ν•˜λŠ” λ™μ•ˆ νŠΈλžœμž­μ…˜μ„ μœ μ§€ν•˜κ³ , μž‘μ—…μ΄ λλ‚˜λ©΄ νŠΈλžœμž­μ…˜μ„ μ»€λ°‹ν•˜λ©΄μ„œ 락을 ν•΄μ œν•˜λŠ” 방식

μž₯점.

  • 높은 ν™•μž₯μ„±
    • λŒ€κ·œλͺ¨μ˜ λΆ„μ‚° μ‹œμŠ€ν…œμ—μ„œ 높은 ν™•μž₯성을 μ œκ³΅ν•˜λ„λ‘ 섀계
    • μ—¬λŸ¬ μ»¨μŠˆλ¨Έκ°€ λ™μΌν•œ Kafka ν΄λŸ¬μŠ€ν„°μ—μ„œ 락을 관리
  • 내결함성
    • 내결함성(fault-tolerance)을 지원
    • 리더 μ„ μΆœ λ°©μ‹μ΄λ‚˜ ν˜‘μ‘°μ  μ†ŒλΉ„μž νŒ¨ν„΄μ„ μ‚¬μš©ν•˜λ©΄ νŠΉμ • λ…Έλ“œκ°€ μ‹€νŒ¨ν•˜λ”λΌλ„ λ‹€λ₯Έ λ…Έλ“œκ°€ μž‘μ—…μ„ 이어받아 락의 μ•ˆμ •μ„±μ„ 보μž₯
  • 쀑앙 μ§‘μ€‘ν™”λœ 관리
    • Kafka 토픽을 μ΄μš©ν•˜μ—¬ 락 μƒνƒœλ‚˜ μž‘μ—… μƒνƒœλ₯Ό μ€‘μ•™μ—μ„œ 관리
    • μ‹œμŠ€ν…œ μ „λ°˜μ˜ μƒνƒœλ₯Ό μ‰½κ²Œ λͺ¨λ‹ˆν„°λ§ν•˜κ³  관리
  • λ³΅μž‘ν•œ νŠΈλžœμž­μ…˜ 처리 κ°€λŠ₯
    • Kafka νŠΈλžœμž­μ…˜ κΈ°λŠ₯을 ν™œμš©ν•˜λ©΄ λ©”μ‹œμ§€ μ²˜λ¦¬μ™€ ν•¨κ»˜ λ³΅μž‘ν•œ νŠΈλžœμž­μ…˜μ„ 관리
    • 락을 μœ μ§€ν•˜λ©΄μ„œλ„ λ³΅μž‘ν•œ μž‘μ—…μ„ μ•ˆμ „ν•˜κ²Œ μˆ˜ν–‰

단점.

  • λ³΅μž‘μ„± 증가
    • 특히, 리더 μ„ μΆœμ΄λ‚˜ ν˜‘μ‘°μ  μ†ŒλΉ„μž νŒ¨ν„΄μ„ κ΅¬ν˜„ν•˜λ €λ©΄ 더 λ§Žμ€ μ½”λ”©κ³Ό 섀정이 ν•„μš”
  • μ§€μ—° μ‹œκ°„
    • Kafka λΆ„μ‚° μ•„ν‚€ν…μ²˜μ™€ 데이터 전달 μ§€μ—°μœΌλ‘œ 인해 락 ν•΄μ œλ‚˜ 리더 μ„ μΆœ κ³Όμ •μ—μ„œ μ§€μ—° λ°œμƒ
    • μ‹€μ‹œκ°„ μ²˜λ¦¬κ°€ μ€‘μš”ν•œ ν™˜κ²½μ—μ„œλŠ” 단점
  • μ˜€λ²„ν—€λ“œ
    • 락 관리λ₯Ό μœ„ν•΄ λ©”μ‹œμ§€μ˜ 생산, μ†ŒλΉ„, 리더 μ„ μΆœ κ³Όμ •μ—μ„œ 좔가적인 μ˜€λ²„ν—€λ“œκ°€ λ°œμƒ
    • 특히, 락을 자주 μ„€μ •ν•˜κ³  ν•΄μ œν•˜λŠ” μž‘μ—…μ΄ λ§Žλ‹€λ©΄ μ„±λŠ₯에도 영ν–₯
  • 일관성 문제
    • 락을 κ΄€λ¦¬ν•˜λŠ” 데 μžˆμ–΄μ„œ 일관성 λ¬Έμ œκ°€ λ°œμƒ
      • 리더가 μ„ μΆœλ˜κΈ° μ „κΉŒμ§€ μ—¬λŸ¬ μ»¨μŠˆλ¨Έκ°€ λ™μΌν•œ λ¦¬μ†ŒμŠ€μ— μ ‘κ·Όν•˜λ €λŠ” 경합이 λ°œμƒ
      • 일관성 문제 해결을 μœ„ν•΄ 좔가적인 μ œμ–΄ 둜직이 ν•„μš”
  • νŠΈλžœμž­μ…˜ κ΄€λ¦¬μ˜ λ³΅μž‘μ„±
    • Kafka νŠΈλžœμž­μ…˜ κΈ°λŠ₯을 ν™œμš©ν•˜μ—¬ 락을 κ΄€λ¦¬ν•˜λ €λ©΄, λ³΅μž‘ν•œ νŠΈλžœμž­μ…˜ μ œμ–΄κ°€ ν•„μš”
    • νŠΈλžœμž­μ…˜μ΄ κΈΈμ–΄μ§€κ±°λ‚˜ λ§Žμ€ λ¦¬μ†ŒμŠ€λ₯Ό μž κ·ΈλŠ” 경우, μ„±λŠ₯ μ €ν•˜λ‚˜ λ°λ“œλ½ 같은 문제λ₯Ό μ•ΌκΈ°

사둀.

  • λΆ„μ‚° μ‹œμŠ€ν…œμ—μ„œμ˜ 리더 μ„ μΆœ
    • μ—¬λŸ¬ λ…Έλ“œκ°€ λ™μΌν•œ μž‘μ—…μ„ μˆ˜ν–‰ν•˜λ €κ³  ν•  λ•Œ, ν•˜λ‚˜μ˜ λ…Έλ“œλ₯Ό λ¦¬λ”λ‘œ μ„ μΆœν•˜μ—¬ μž‘μ—…μ„ μ‘°μ •ν•˜λŠ” 상황
    • ex) μ—¬λŸ¬ λ§ˆμ΄ν¬λ‘œμ„œλΉ„μŠ€κ°€ λ™μΌν•œ μž‘μ—…μ„ μˆ˜ν–‰ν•˜μ§€ μ•Šλ„λ‘ 리더λ₯Ό μ„ μΆœν•˜λŠ” 데 μ‚¬μš©
  • ν˜‘μ‘°μ  μž‘μ—… μˆ˜ν–‰
    • μ—¬λŸ¬ μ»¨μŠˆλ¨Έκ°€ λ™μΌν•œ Kafka ν† ν”½μ—μ„œ 데이터λ₯Ό μ²˜λ¦¬ν•˜λ©΄μ„œ, νŠΉμ • λ¦¬μ†ŒμŠ€μ— λŒ€ν•œ 접근을 μ‘°μœ¨ν•΄μ•Ό ν•˜λŠ” 경우
    • ex) μ—¬λŸ¬ μ»¨μŠˆλ¨Έκ°€ λ™μΌν•œ λ°μ΄ν„°λ² μ΄μŠ€ ν…Œμ΄λΈ”μ— μ ‘κ·Όν•˜μ—¬ 데이터λ₯Ό μ²˜λ¦¬ν•  λ•Œ, Kafka 락을 μ‚¬μš©ν•΄ 데이터 일관성을 μœ μ§€
  • λ³΅μž‘ν•œ λΆ„μ‚° νŠΈλžœμž­μ…˜
    • λΆ„μ‚° νŠΈλžœμž­μ…˜μ„ κ΄€λ¦¬ν•˜λŠ” μƒν™©μ—μ„œ Kafka의 νŠΈλžœμž­μ…˜ κΈ°λŠ₯κ³Ό 락을 κ²°ν•©ν•˜μ—¬, μ•ˆμ „ν•˜κ²Œ μž‘μ—…μ„ μˆ˜ν–‰
    • ex) λ©”μ‹œμ§€ μ²˜λ¦¬μ™€ λ°μ΄ν„°λ² μ΄μŠ€ μ—…λ°μ΄νŠΈκ°€ λ™μ‹œμ— μ΄λ£¨μ–΄μ§€λŠ” μƒν™©μ—μ„œ Kafka 락을 톡해 일관성을 보μž₯
  • λŒ€κ·œλͺ¨ 데이터 처리 νŒŒμ΄ν”„λΌμΈ
    • λŒ€κ·œλͺ¨ 데이터 처리 νŒŒμ΄ν”„λΌμΈμ—μ„œ μ—¬λŸ¬ μž‘μ—…μ„ μ‘°μœ¨ν•˜κ³  관리해야 ν•˜λŠ” 경우 μž‘μ—… κ°„μ˜ μˆœμ„œμ™€ 일관성을 μœ μ§€


...

Start Kafka

  • docker-compose.yml
version: '3' # Docker Compose 파일 버전 μ§€μ •
services: # μ—¬λŸ¬κ°œμ˜ Docker μ»¨ν…Œμ΄λ„ˆ μ„œλΉ„μŠ€ μ •μ˜
  zookeeper: # Zookeeper μ„œλΉ„μŠ€ μ •μ˜
    image: wurstmeister/zookeeper:3.4.6
    container_name: zookeeper
    ports:
      - "2181:2181" # 호슀트의 2181 포트λ₯Ό μ»¨ν…Œμ΄λ„ˆμ˜ 2181 ν¬νŠΈμ™€ 바인딩
  kafka: # kafka μ„œλΉ„μŠ€ μ •μ˜
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092" # 호슀트의 9092 포트λ₯Ό μ»¨ν…Œμ΄λ„ˆμ˜ 9092 ν¬νŠΈμ™€ 바인딩
    environment: # kafka μ»¨ν…Œμ΄λ„ˆμ˜ ν™˜κ²½ λ³€μˆ˜ μ„€μ •
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092 # λ‚΄/μ™ΈλΆ€μ—μ„œ μ ‘κ·Όν•  수 μžˆλŠ” λ¦¬μŠ€λ„ˆ μ£Όμ†Œ μ„€μ •
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT # λ¦¬μŠ€λ„ˆμ˜ λ³΄μ•ˆ ν”„λ‘œν† μ½œ λ§€ν•‘
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 # μ»¨ν…Œμ΄λ„ˆ λ‚΄λΆ€μ—μ„œ μ‚¬μš©ν•  λ¦¬μŠ€λ„ˆ μ£Όμ†Œ μ„€μ •
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE # 브둜컀 κ°„ 톡신에 μ‚¬μš©ν•  λ¦¬μŠ€λ„ˆ 이름
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # Kafkaκ°€ Zookeeper에 μ—°κ²°ν•˜κΈ° μœ„ν•œ μ£Όμ†Œ
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock # Docker μ†ŒμΌ“μ„ μ»¨ν…Œμ΄λ„ˆμ™€ κ³΅μœ ν•˜μ—¬ Docker 이벀트λ₯Ό 관리할 수 μžˆλ„λ‘ μ„€μ •
  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8989:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
  • run kafka
# μΉ΄ν”„μΉ΄ μ‹€ν–‰
$ docker-compose up -d
or
$ docker-compose -f docker-compose.yml up

# 토픽생성
$ docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic stock_decrease
Created topic testTopic.

# ν”„λ‘œλ“€μ„œ μ‹€ν–‰
$ docker exec -it kafka kafka-console-producer.sh --topic stock_decrease --broker-list 0.0.0.0:9092
>Hello

# 컨슈머 μ‹€ν–‰
$ docker exec -it kafka kafka-console-consumer.sh --topic stock_decrease --bootstrap-server localhost:9092
Hello

# μΉ΄ν”„μΉ΄ μ’…λ£Œ
$ docker-compose down


.

commit log.


...

🧐 kafka + redis 적용 방식

  1. ProducerλŠ” λ©”μ‹œμ§€ λ°œν–‰ μ „ λ ˆλ””μŠ€λ‘œ μž„κ³„κ°’ μ œμ–΄
    • μž„κ³„κ°’μ΄ λ„˜μ§€ μ•Šμ•˜μ„ 경우: 이후 λΉ„μ¦ˆλ‹ˆμŠ€ 둜직 처리λ₯Ό μœ„ν•΄ Consumerμ—κ²Œ λ©”μ‹œμ§€ 전솑
    • μž„κ³„κ°’μ΄ λ„˜μ—ˆμ„ 경우: λ©”μ‹œμ§€ λ―Έλ°œν–‰ 및 μ‚¬μš©μžμ—κ²Œ 처리 λΆˆκ°€ μ•Œλ¦Ό
  2. ConsumerλŠ” λΉ„μ¦ˆλ‹ˆμŠ€ 둜직만 μˆ˜ν–‰

μ΄λ ‡κ²Œ ν•  경우 μž„κ³„κ°’μ€ λ ˆλ””μŠ€λ₯Ό 톡해 λΉ λ₯΄κ²Œ ν™•μΈν•˜κ³ , μ‹€μ œ μ²˜λ¦¬λŠ” μ»¨μŠˆλ¨Έκ°€ λΉ„λ™κΈ°λ‘œ μ²˜λ¦¬ν•  수 μžˆλ‹€.

 public void decrease(final Long userId, final Long stockId) {
      final KafkaStock stock = stockRepository.findById(stockId).orElseThrow();
      // λ ˆλ””μŠ€λ₯Ό 톡해 μž„κ³„κ°’μ„ 확인
      final Long count = redisIncrRepository.increment(stockId);
      stock.validQuantity(count);

      // μ‹€μ œ μ²˜λ¦¬λŠ” μ»¨μŠˆλ¨Έκ°€ λΉ„λ™κΈ°λ‘œ 처리
      stockDecreaseProducer.create(userId, stockId);
  }


.

πŸ› οΈ μ„±λŠ₯ ν…ŒμŠ€νŠΈ.

  • ν•œμ • μˆ˜λŸ‰: 50,000

  • User: 296

  • Processes: 8

  • Threads: 37

  • Duration: 3 min (01:52)

    κ²°κ³Όλ₯Ό 보면 μ •ν™•ν•˜κ²Œ 50,000 건만 μ„±κ³΅μœΌλ‘œ 처리된 것을 λ³Ό 수 μžˆλ‹€.

    00:46 μ΄ν›„μ—λŠ” λͺ¨λ“  μˆ˜λŸ‰μ΄ 0으둜 μ‘°νšŒλ˜μ–΄, 재고 λΆ€μ‘± 였λ₯˜κ°€ λ°œμƒν•˜κ²Œ λœλ‹€.

    Result


Kafka λ©”μ‹œμ§€λ„ μ •ν™•νžˆ 50,000 건만 전솑이 λ˜μ—ˆλ‹€.

Result


만일 μ»¨μŠˆλ¨Έμ—μ„œ μ—λŸ¬κ°€ λ°œμƒν•˜λ©΄ λ³„λ„λ‘œ 처리λ₯Ό ν•  수 μžˆλ„λ‘ 둜그λ₯Ό λ‚¨κ²¨λ‘˜ μˆ˜λ„ μžˆλ‹€.

Result

λ°˜μ‘ν˜•