[λμμ±] Kafka Concurrency Control(feat. Redis)
π― Kafka Concurrency Control
Race Condition
κ²½μμνλ λ κ° μ΄μμ μ€λ λκ° κ³΅μ λ°μ΄ν°μ μ‘μΈμ€ ν μ μκ³ , λμμ λ³κ²½μ νλ €κ³ ν λ λ°μνλ λ¬Έμ
- Race Condition μΌλ‘ λμμ λ€μ΄μ€λ μμ²λ€μ΄ κ°±μ μ κ°μ μ½κ³ , μμ νλ©΄μ
- μ€μ κ°±μ μ΄ λλ½λλ νμμ΄ λ°μ
Series
πΉ Kafka
λΆμ° μ€νΈλ¦¬λ° νλ«νΌμΌλ‘, μ£Όλ‘ λκ·λͺ¨ μ€μκ° λ°μ΄ν° μ²λ¦¬μ λ©μμ§μ μ¬μ©
리λ μ μΆ(Leader Election) λλ νμ‘°μ μλΉμ ν¨ν΄(Cooperative Consumer Pattern) μ νμ©νμ¬ νΉμ 리μμ€μ λν μ κ·Όμ μ‘°μ¨
.
λμ λ°©μ.
- 리λ μ μΆμ ν΅ν λ½
- Kafkaμ νν°μ
κ³Ό μ€νμ
μ μ΄μ©νμ¬, νΉμ μμ
μ μνν 리λ μ μΆ
- νλμ Kafka νν°μ μμ λ©μμ§λ₯Ό μλΉνλ μ¬λ¬ 컨μλ¨Έ μ€ νλκ° λ¦¬λκ° λμ΄ νΉμ 리μμ€μ λν μμ μ μν β μ΄ λ¦¬λ μν μ΄ μΌμ’ μ λ½ μν
- λ§μ½ 리λκ° μ€ν¨νκ±°λ νΉμ μκ° λμ μλ΅νμ§ μμΌλ©΄, λ€λ₯Έ 컨μλ¨Έκ° μλ‘μ΄ λ¦¬λλ‘ μ μΆλμ΄ μμ μ μ΄μ΄λ°μ
- Kafkaμ νν°μ
κ³Ό μ€νμ
μ μ΄μ©νμ¬, νΉμ μμ
μ μνν 리λ μ μΆ
- νμ‘°μ μλΉμ ν¨ν΄:
- μ¬λ¬ 컨μλ¨Έκ° λμΌν Kafka ν ν½μμ λ©μμ§λ₯Ό μλΉν λ, μλ‘ νλ ₯νμ¬ νΉμ 리μμ€μ λν μ κ·Όμ μ‘°μ¨
- νΉμ 리μμ€λ₯Ό λμμ μ κ·Όνμ§ μλλ‘ νμνλ κ³Όμ μ΄ νμ β λ½κ³Ό μ μ¬ν κΈ°λ₯μ ꡬν κ°λ₯
- λΆμ° νΈλμμ
κ΄λ¦¬:
- Kafka νΈλμμ κΈ°λ₯μ νμ©νμ¬, λ©μμ§λ₯Ό μ²λ¦¬νλ©΄μ λ½μ μ€μ νκ³ ν΄μ
- λ©μμ§λ₯Ό μ²λ¦¬νλ λμ νΈλμμ μ μ μ§νκ³ , μμ μ΄ λλλ©΄ νΈλμμ μ 컀λ°νλ©΄μ λ½μ ν΄μ νλ λ°©μ
μ₯μ .
λμ νμ₯μ±
- λκ·λͺ¨μ λΆμ° μμ€ν μμ λμ νμ₯μ±μ μ 곡νλλ‘ μ€κ³
- μ¬λ¬ 컨μλ¨Έκ° λμΌν Kafka ν΄λ¬μ€ν°μμ λ½μ κ΄λ¦¬
λ΄κ²°ν¨μ±
- λ΄κ²°ν¨μ±(fault-tolerance)μ μ§μ
- 리λ μ μΆ λ°©μμ΄λ νμ‘°μ μλΉμ ν¨ν΄μ μ¬μ©νλ©΄ νΉμ λ Έλκ° μ€ν¨νλλΌλ λ€λ₯Έ λ Έλκ° μμ μ μ΄μ΄λ°μ λ½μ μμ μ±μ 보μ₯
μ€μ μ§μ€νλ κ΄λ¦¬
- Kafka ν ν½μ μ΄μ©νμ¬ λ½ μνλ μμ μνλ₯Ό μ€μμμ κ΄λ¦¬
- μμ€ν μ λ°μ μνλ₯Ό μ½κ² λͺ¨λν°λ§νκ³ κ΄λ¦¬
볡μ‘ν νΈλμμ μ²λ¦¬ κ°λ₯
- Kafka νΈλμμ κΈ°λ₯μ νμ©νλ©΄ λ©μμ§ μ²λ¦¬μ ν¨κ» 볡μ‘ν νΈλμμ μ κ΄λ¦¬
- λ½μ μ μ§νλ©΄μλ 볡μ‘ν μμ μ μμ νκ² μν
λ¨μ .
볡μ‘μ± μ¦κ°
- νΉν, 리λ μ μΆμ΄λ νμ‘°μ μλΉμ ν¨ν΄μ ꡬννλ €λ©΄ λ λ§μ μ½λ©κ³Ό μ€μ μ΄ νμ
μ§μ° μκ°
- Kafka λΆμ° μν€ν μ²μ λ°μ΄ν° μ λ¬ μ§μ°μΌλ‘ μΈν΄ λ½ ν΄μ λ 리λ μ μΆ κ³Όμ μμ μ§μ° λ°μ
- μ€μκ° μ²λ¦¬κ° μ€μν νκ²½μμλ λ¨μ
μ€λ²ν€λ
- λ½ κ΄λ¦¬λ₯Ό μν΄ λ©μμ§μ μμ°, μλΉ, 리λ μ μΆ κ³Όμ μμ μΆκ°μ μΈ μ€λ²ν€λκ° λ°μ
- νΉν, λ½μ μμ£Ό μ€μ νκ³ ν΄μ νλ μμ μ΄ λ§λ€λ©΄ μ±λ₯μλ μν₯
μΌκ΄μ± λ¬Έμ
- λ½μ κ΄λ¦¬νλ λ° μμ΄μ μΌκ΄μ± λ¬Έμ κ° λ°μ
- 리λκ° μ μΆλκΈ° μ κΉμ§ μ¬λ¬ 컨μλ¨Έκ° λμΌν 리μμ€μ μ κ·Όνλ €λ κ²½ν©μ΄ λ°μ
- μΌκ΄μ± λ¬Έμ ν΄κ²°μ μν΄ μΆκ°μ μΈ μ μ΄ λ‘μ§μ΄ νμ
- λ½μ κ΄λ¦¬νλ λ° μμ΄μ μΌκ΄μ± λ¬Έμ κ° λ°μ
νΈλμμ κ΄λ¦¬μ 볡μ‘μ±
- Kafka νΈλμμ κΈ°λ₯μ νμ©νμ¬ λ½μ κ΄λ¦¬νλ €λ©΄, 볡μ‘ν νΈλμμ μ μ΄κ° νμ
- νΈλμμ μ΄ κΈΈμ΄μ§κ±°λ λ§μ 리μμ€λ₯Ό μ κ·Έλ κ²½μ°, μ±λ₯ μ νλ λ°λλ½ κ°μ λ¬Έμ λ₯Ό μΌκΈ°
μ¬λ‘.
- λΆμ° μμ€ν
μμμ 리λ μ μΆ
- μ¬λ¬ λ Έλκ° λμΌν μμ μ μννλ €κ³ ν λ, νλμ λ Έλλ₯Ό 리λλ‘ μ μΆνμ¬ μμ μ μ‘°μ νλ μν©
- ex) μ¬λ¬ λ§μ΄ν¬λ‘μλΉμ€κ° λμΌν μμ μ μννμ§ μλλ‘ λ¦¬λλ₯Ό μ μΆνλ λ° μ¬μ©
- νμ‘°μ μμ
μν
- μ¬λ¬ 컨μλ¨Έκ° λμΌν Kafka ν ν½μμ λ°μ΄ν°λ₯Ό μ²λ¦¬νλ©΄μ, νΉμ 리μμ€μ λν μ κ·Όμ μ‘°μ¨ν΄μΌ νλ κ²½μ°
- ex) μ¬λ¬ 컨μλ¨Έκ° λμΌν λ°μ΄ν°λ² μ΄μ€ ν μ΄λΈμ μ κ·Όνμ¬ λ°μ΄ν°λ₯Ό μ²λ¦¬ν λ, Kafka λ½μ μ¬μ©ν΄ λ°μ΄ν° μΌκ΄μ±μ μ μ§
- 볡μ‘ν λΆμ° νΈλμμ
- λΆμ° νΈλμμ μ κ΄λ¦¬νλ μν©μμ Kafkaμ νΈλμμ κΈ°λ₯κ³Ό λ½μ κ²°ν©νμ¬, μμ νκ² μμ μ μν
- ex) λ©μμ§ μ²λ¦¬μ λ°μ΄ν°λ² μ΄μ€ μ λ°μ΄νΈκ° λμμ μ΄λ£¨μ΄μ§λ μν©μμ Kafka λ½μ ν΅ν΄ μΌκ΄μ±μ 보μ₯
- λκ·λͺ¨ λ°μ΄ν° μ²λ¦¬ νμ΄νλΌμΈ
- λκ·λͺ¨ λ°μ΄ν° μ²λ¦¬ νμ΄νλΌμΈμμ μ¬λ¬ μμ μ μ‘°μ¨νκ³ κ΄λ¦¬ν΄μΌ νλ κ²½μ° μμ κ°μ μμμ μΌκ΄μ±μ μ μ§
...
Start Kafka
- docker-compose.yml
version: '3' # Docker Compose νμΌ λ²μ μ§μ
services: # μ¬λ¬κ°μ Docker 컨ν
μ΄λ μλΉμ€ μ μ
zookeeper: # Zookeeper μλΉμ€ μ μ
image: wurstmeister/zookeeper:3.4.6
container_name: zookeeper
ports:
- "2181:2181" # νΈμ€νΈμ 2181 ν¬νΈλ₯Ό 컨ν
μ΄λμ 2181 ν¬νΈμ λ°μΈλ©
kafka: # kafka μλΉμ€ μ μ
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092" # νΈμ€νΈμ 9092 ν¬νΈλ₯Ό 컨ν
μ΄λμ 9092 ν¬νΈμ λ°μΈλ©
environment: # kafka 컨ν
μ΄λμ νκ²½ λ³μ μ€μ
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092 # λ΄/μΈλΆμμ μ κ·Όν μ μλ 리μ€λ μ£Όμ μ€μ
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT # 리μ€λμ 보μ νλ‘ν μ½ λ§€ν
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 # 컨ν
μ΄λ λ΄λΆμμ μ¬μ©ν 리μ€λ μ£Όμ μ€μ
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE # λΈλ‘컀 κ° ν΅μ μ μ¬μ©ν 리μ€λ μ΄λ¦
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # Kafkaκ° Zookeeperμ μ°κ²°νκΈ° μν μ£Όμ
volumes:
- /var/run/docker.sock:/var/run/docker.sock # Docker μμΌμ 컨ν
μ΄λμ 곡μ νμ¬ Docker μ΄λ²€νΈλ₯Ό κ΄λ¦¬ν μ μλλ‘ μ€μ
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8989:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
- run kafka
# μΉ΄νμΉ΄ μ€ν
$ docker-compose up -d
or
$ docker-compose -f docker-compose.yml up
# ν ν½μμ±
$ docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic stock_decrease
Created topic testTopic.
# νλ‘λμ μ€ν
$ docker exec -it kafka kafka-console-producer.sh --topic stock_decrease --broker-list 0.0.0.0:9092
>Hello
# 컨μλ¨Έ μ€ν
$ docker exec -it kafka kafka-console-consumer.sh --topic stock_decrease --bootstrap-server localhost:9092
Hello
# μΉ΄νμΉ΄ μ’
λ£
$ docker-compose down
.
commit log.
...
π§ kafka + redis μ μ© λ°©μ
- Producerλ λ©μμ§ λ°ν μ λ λμ€λ‘ μκ³κ° μ μ΄
- μκ³κ°μ΄ λμ§ μμμ κ²½μ°: μ΄ν λΉμ¦λμ€ λ‘μ§ μ²λ¦¬λ₯Ό μν΄ Consumerμκ² λ©μμ§ μ μ‘
- μκ³κ°μ΄ λμμ κ²½μ°: λ©μμ§ λ―Έλ°ν λ° μ¬μ©μμκ² μ²λ¦¬ λΆκ° μλ¦Ό
- Consumerλ λΉμ¦λμ€ λ‘μ§λ§ μν
μ΄λ κ² ν κ²½μ° μκ³κ°μ λ λμ€λ₯Ό ν΅ν΄ λΉ λ₯΄κ² νμΈνκ³ , μ€μ μ²λ¦¬λ 컨μλ¨Έκ° λΉλκΈ°λ‘ μ²λ¦¬ν μ μλ€.
public void decrease(final Long userId, final Long stockId) {
final KafkaStock stock = stockRepository.findById(stockId).orElseThrow();
// λ λμ€λ₯Ό ν΅ν΄ μκ³κ°μ νμΈ
final Long count = redisIncrRepository.increment(stockId);
stock.validQuantity(count);
// μ€μ μ²λ¦¬λ 컨μλ¨Έκ° λΉλκΈ°λ‘ μ²λ¦¬
stockDecreaseProducer.create(userId, stockId);
}
.
π οΈ μ±λ₯ ν μ€νΈ.
νμ μλ: 50,000
User: 296
Processes: 8
Threads: 37
Duration: 3 min (01:52)
κ²°κ³Όλ₯Ό 보면 μ ννκ² 50,000 κ±΄λ§ μ±κ³΅μΌλ‘ μ²λ¦¬λ κ²μ λ³Ό μ μλ€.
00:46 μ΄νμλ λͺ¨λ μλμ΄ 0μΌλ‘ μ‘°νλμ΄, μ¬κ³ λΆμ‘± μ€λ₯κ° λ°μνκ² λλ€.
Kafka λ©μμ§λ μ νν 50,000 κ±΄λ§ μ μ‘μ΄ λμλ€.
λ§μΌ 컨μλ¨Έμμ μλ¬κ° λ°μνλ©΄ λ³λλ‘ μ²λ¦¬λ₯Ό ν μ μλλ‘ λ‘κ·Έλ₯Ό λ¨κ²¨λ μλ μλ€.