들어가며
동시성 문제가 일어나는 여러 가지 상황이 있겠지만 저는 2가지 상황을 떠올릴 수 있었습니다.
- 요청에 의해 처리되는 데이터가 중요한 동시성
- 카프카로 동시에 들어오는 중복된 발주를 수신하는 경우
- 검수/검품 이슈 등록 시 더블 클릭, 네트워크 이슈로 인해 중복된 요청이 동시에 들어오는 경우
- 선착순과 같이 순서가 중요한 동시성
- 선착순 쿠폰 차감
이번 글에서는 "순서가 중요한 동시성" 문제를 어플리케이션에서 어떻게 해결할 수 있을지 고민한 내용을 정리해보려 합니다.
Redisson 분산락
다중 어플리케이션 환경에서 동시성 문제 해결하기 위한 솔루션을 검색하며 Redisson을 활용한 분산락에 관한 내용을 접할 수 있었습니다.
// 특정 이름으로 락 정의
RLock lock = redissonClient.getLock(key.toString());
try {
// 락 획득을 시도한다(20초동안 시도를 할 예정이며 획득할 경우 1초안에 해제할 예정이다)
boolean available = lock.tryLock(20, 1, TimeUnit.SECONDS);
if (!available) {
System.out.println("lock 획득 실패");
return;
}
// 트랜잭션 로직(ex. orderService.createOrder(), stockService.increase())
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
주로 위의 코드와 같이 redissonClient
를 활용하여 일정 시간 동안 락을 획득하기 위해 시도하고 락을 획득하지 못하면 포기하는 예제를 확인할 수 있었습니다.
// version: redisson 3.16.0 RedissonLock 256 ~ 283
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
RedissonClient
의 getLock
구현을 확인해 보아도 while 루프 속에서 일정 시간동안 락을 획득하기 위해 시도하는 코드를 확인할 수 있습니다.
예상되는 문제
하지만 위의 구현과 같이 일정 시간 동안 락을 획득하기 위해 시도하고 락을 획득하지 못하면 포기한다면 아래 이미지와 같은 문제가 일어날 수 있다고 생각합니다.
설명 : Request2
보다 이후에 요청되었던 Request3
이 먼저 처리되는 문제
스핀락
요청의 순서와 상관없이 처리되는 데이터가 문제가 없는 것이 중요한 상황이라면 이는 문제가 되지 않을 수 있지만 순서가 중요한 상황이라면 Redisson을 활용한 분산락은 적절한 선택이 되지 못할 것이라 생각하였습니다.
이에 스핀락에 조금의 구현을 추가하여 순서가 보장되는 락을 구현해보았습니다.
public interface LockService {
boolean lock(Long id, Long userId);
void unlock(Long id, Long userId);
}
lock(id)
와 같이 id만 파라미터로 가지게 된다면 id에 대한 락을 획득할 수는 있지만 각 요청에 대한 식별을 할 수 없고 userId
파라미터와 같이 요청을 식별할 수 있는 파라미터를 추가하였습니다.
이에 해당 인터페이스를 우선 로컬 환경에서 구현한다면 아래의 LocalLockService
와 같이 구현할 수 있습니다.
locks
를 통해 락에 관한 이력 관리를 수행하고 waitingQueue
를 통해 순서를 보장하도록 구현하였습니다.
public class LocalLockService implements LockService {
/**
* key: lockId<br>
* value: Pair<userId, isLocked><br>
*/
private final Map<Long, Pair<Long, Boolean>> locks = new ConcurrentHashMap<>();
/** userId를 대기시키기 위한 Queue */
private final Queue<Long> waitingQueue = new ConcurrentLinkedQueue<>();
@Override
public boolean lock(Long lockId, Long userId) {
if (!waitingQueue.contains(userId)) {
waitingQueue.add(userId);
}
Long peek = waitingQueue.peek();
if (peek != null && peek.equals(userId)) {
locks.put(lockId, Pair.of(userId, true));
waitingQueue.poll();
return true;
} else {
return false;
}
}
@Override
public void unlock(Long id, Long userId) {
if (locks.containsKey(id) && locks.get(id).getLeft().equals(userId)) {
locks.put(id, Pair.of(userId, false));
}
}
}
이를 다중화 어플리케이션 환경에 적용하기 위해서 Redis를 활용한 구현으로 변경한다면 아래와 같이 구현할 수 있습니다.
public class RedisLockService implements LockService {
private final RedisLockRepository redisLockRepository;
private final RedisQueueRepository redisQueueRepository;
public RedisLockService(
RedisLockRepository redisLockRepository, RedisQueueRepository redisQueueRepository) {
this.redisLockRepository = redisLockRepository;
this.redisQueueRepository = redisQueueRepository;
}
@Override
public boolean lock(Long lockId, Long userId) {
if (!redisQueueRepository.isContain(userId.toString())) {
redisQueueRepository.add(userId.toString());
}
String peek = redisQueueRepository.peek();
if (peek != null && peek.equals(userId.toString())) {
redisLockRepository.lock(lockId);
redisQueueRepository.poll();
return true;
} else {
return false;
}
}
@Override
public void unlock(Long id, Long userId) {
redisLockRepository.unlock(id);
}
}
LocalLockService
에서는 ConcurrentHashMap
과 ConcurrentLinkedQueue
를 통해 락과 큐를 관리하였다면 RedisLockService
에서는 이를 레디스를 활용해 관리한다는 것이 다를 뿐 다른 부분은 동일하게 구현할 수 있었습니다.
한계
이렇게 WaitingQueue
를 도입하여 순서를 보장하며 동시성 문제를 해결할 수 있지만 기존의 스핀 락 방식에 WaitingQueue
를 위한 연산이 더해져 락 관리를 위한 레디스 서버에 부하는 여전할 것임을 예상할 수 있었습니다.
스핀락에서도 순서를 보장하지 않는다면?
스핀락에서도 순서를 보장하는 코드를 추가하지 않으면 아래 사진처럼 Request2
와 Request3
중 어떤 요청이 먼저 처리될지 보장하지 못하는 문제가 발생할 수 있을 것이라 생각합니다.
락 대신 큐의 ACK를 활용한 방법
추가로 응답을 동기적으로 처리하는 것이 아닌 비동기적으로 처리하여도 괜찮은 경우 락을 대신해 큐의 ACK를 활용하여 보았습니다.
메시지 큐
메시지 큐는 FIFO 형식의 자료구조를 가지고 있고 이를 통해 요청의 순서를 보장할 수 있다 생각하였습니다.
명시적 ACK
AMQP 0-9-1
프로토콜에서 확인할 수 있는 명시적 승인 모델(어플리케이션이 응답을 보낸다)을 활용하였습니다.
락을 획득하는 대신 큐에서 메시지를 받은 어플리케이션에서 요청에 대한 처리가 완료된 이후 명시적으로 ACK를 큐에 보낸다면 동시성 문제를 겪지 않을 수 있을 것이라 생각하였습니다.
이를 코드로 메시지를 전송하는 서비스 클래스와 메시지를 처리하는 핸들러 클래스를 통해 간단히 구현해보았습니다.
@Service
public class SymposiumService {
// ...
public String execute() {
// ...
CountDownMessage message = new CountDownMessage(targetId, userId);
// 메시지 전송
messageTemplate.send(message);
}
}
@Service
public class CountDownHandler implements MyMessageHandler<CountDownMessage> {
// ...
@Override
public void onMessage(CountDownMessage message) {
try {
// ...
} catch (Exception e) {
// ...
} finally {
// 메시지 큐에 처리 완료 응답
messageTemplate.ack();
}
}
}
이를 다중화 어플리케이션 환경에 적용하기 위해서 RabbitMQ를 메시지 큐로 활용한 구현으로 변경한다면 아래와 같이 구현할 수 있습니다.
public class SymposiumService {
private final RabbitTemplate messageTemplate;
private final CountDownMessageRecords messageRepo;
public SymposiumService(RabbitTemplate messageTemplate, CountDownMessageRecords messageRepo) {
this.messageTemplate = messageTemplate;
this.messageRepo = messageRepo;
}
public String execute() {
// ...
CountDownMessage message = new CountDownMessage(targetId, userId);
// direct.waiting 익스체인지에 메시지 전송
messageTemplate.convertAndSend("direct.waiting", "direct.waiting", message);
}
}
한계점과 개선 방안
지금의 서비스 클래스 - 메시지 큐 - 핸들러
구조에서는 핸들러가 메시지를 하나씩 처리해야 동시성 문제에서 벗어날 수 있기에 병렬적으로 메시지를 처리하지 못한다는 한계가 존재합니다.
이에 메시지 큐 이전에 대기열 큐를 추가하여 이후 핸들러에 메시지를 하나씩 넘겨줌을 보장한다면 메시지를 처리하는 핸들러에서는 병렬적으로 메시지를 처리하여도 동시성 문제에서 벗어날 수 있을 것이라 생각하였습니다.
queue-rabbitmq-waiting 구현 깃허브 바로가기
public class RabbitWaitingHandler {
// ...
@RabbitListener(queues = "queue.waiting", concurrency = "1", ackMode = "MANUAL")
public void onMessage(Message message, @Nullable Channel channel) throws IOException {
try {
byte[] body = message.getBody();
CountDownMessage countDownMessage = objectMapper.readValue(body, CountDownMessage.class);
int countDown = countDownService.getCountDown();
// 컨슈머에서 동작을 수행할 수 있는 상황인지 확인한다.
if (countDown > 0) {
// 컨슈머 큐로 메시지 전송
messageTemplate.convertAndSend("direct.waiting.consumer", "direct.waiting.consumer", message);
} else {
throw new RuntimeException(formattedMessage.getFormattedMessage());
}
} finally {
// 메시지 처리 완료 ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
public class RabbitCountDownHandler {
// ...
// concurrency를 통해 동시에 처리할 수 있는 RabbitListener의 갯수를 지정한다.
@RabbitListener(queues = "queue.waiting.consumer", ackMode = "MANUAL", concurrency = "3")
public void onMessage(Message message, @Nullable Channel channel) throws IOException {
try {
// 메시지를 처리한다.
} catch (Exception e) {
// ...
} finally {
// 메시지 처리 완료 ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
테스트
상황 : 메시지 처리 과정에서 의도적으로 100ms의 지연을 추가하고 Jmeter를 통해 1011개의 요청을 처리하는 데 걸리는 시간
결과 :
- 단일 핸들러 구조 :105s
- 대기열 큐 + 멀티 핸들러 구조 (3개) : 35s
3개의 핸들러가 메시지를 처리하는 만큼 3배 정도 빠른 결과를 확인할 수 있었고 대기열 큐로 인해 동시성 문제도 겪지 않음을 확인할 수 있었습니다.
참고
https://helloworld.kurly.com/blog/distributed-redisson-lock/
https://incheol-jung.gitbook.io/docs/q-and-a/spring/redisson-trylock
'개발' 카테고리의 다른 글
이미지가 포함된 multipart/form-data 요청 Swagger-UI 만들기 (0) | 2024.06.28 |
---|---|
동시성 문제에 대한 이해 (0) | 2024.05.29 |
위치 기반 프로젝트를 준비하며 (0) | 2024.04.23 |
데코레이터 패턴을 활용하여 행위를 정의하기 (0) | 2024.04.10 |
인터페이스가 필요한 순간 (0) | 2024.04.09 |