프로듀서-컨슈머 패턴은 '해야 할 일' 목록을 가운데에 두고 작업을 만들어 내는 주체와 작업을 처리하는 주체를 분리하는 설계 방법이다. 프로듀서-컨슈머 패턴을 사용하는 작업을 만들어 내는 부분과 작업을 처리하는 부분을 완전히 분리할 수 있기 때문에 개발 과정을 좀 더 명확하게 단순화시킬 수 있고, 작업을 생성하는 부분과 처리하는 부분이 각각 감당할 수 있는 부하를 조절할 수 있다는 장점이 있다.
프로듀서-컨슈머 패턴을 적용해 프로그램을 구현할 때 블로킹 큐를 사용하는 경우가 많다. 예를 들어 프로듀서는 작업을 새로 만들어 큐에 쌓아두고, 컨슈머는 큐에 쌓여 있는 작업을 가져다 처리하는 구조다. 프로듀서는 어떤 컨슈머가 몇 개나 동작하고 있는지 전혀 신경 쓰지 않을 수 있다. 단지 새로운 작업 내용을 만들어 큐에 쌓아두기만 하면 된다. 반대로 컨슈머 역시 프로듀서에 대해서 뭔가를 알고 있어야 할 필요가 없다. 프로듀서가 몇 개이건, 얼마나 많은 작업을 만들어 내고 있건 상관이 없다. 단지 큐에 쌓여 있는 작업을 가져다 처리하기만 하면 된다. 블로킹 큐를 사용하면 여러 개의 프로듀서와 여러 개의 컨슈머가 작동하는 프로듀서 -컨슈머 패턴을 쉽게 구현할 수 있다. 큐와 함께 스레드 풀을 사용하는 경우가 바로 프로듀서-컨슈머 패턴을 활용하는 가장 흔한 경우다.
블로킹 큐를 사용하면 값이 들어올 때까지 take
메소드가 알아서 멈추고 대기하기 때문에 컨슈머 코드를 작성하기 편리하다. 프로듀서가 컨슈머가 감당하지 못할 만큼 일을 많이 만들어 내지 않는 한, 컨슈머는 작업을 끝내고 다음 작업이 들어올 때까지 기다리게 된다. 서버 애플리케이션을 놓고 보면 클라이언트의 수가 적거나 요청량이 많지 않아 이렇게 컨슈머가 '놀고 있는' 상황이 정상적일 수도 있다. 하지만 또 다른 경우에는 프로듀서와 컨슈머의 비율이 적절하지 않다고, 즉 하드웨어의 자원을 효율적으로 사용하지 못하는 것으로 판단할 수도 있다.
프로듀서-컨슈머 패턴을 사용하면 각각의 프로그램 코드는 서로 연결하는 큐를 기준으로 서로 분리되지만, 움직이는 동작 자체는 큐를 사이에 두고 서로 간접적으로 연결되어 있다. 생각하기에는 컨슈머가 항상 밀리지 않고 작업을 마쳐준다고 가정하고, 따라서 작업 큐에 제한을 둘 필요가 없을 것이라고 마음 편하게 넘어갈 수도 있다. 하지만 이런 가정을 하는 순간 나중에 프로그램 구조를 뒤집어엎어야 하는 원인을 하나 남겨두는 것뿐이니 주의해야 한다.
ThreadPoolTaskExecutor
프로듀서-컨슈머 패턴을 스프링에서 발견할 수 있는 대표적인 클래스는 ThreadPoolTaskExecutor
이다.
@Bean
public Executor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CORE_POOL_SIZE);
executor.setMaxPoolSize(MAX_POOL_SIZE);
executor.setQueueCapacity(QUEUE_CAPACITY);
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
executor.initialize();
return executor;
}
위와 같이 ThreadPoolTaskExecutor
의 구성을 설정하고 초기화하는 과정에서 블로킹 큐와 컨슈머를 생성한다.
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
// 블로킹 큐
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
// 컨슈머
ThreadPoolExecutor executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = command;
if (taskDecorator != null) {
decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
}
super.execute(decorated);
}
@Override
protected void beforeExecute(Thread thread, Runnable task) {
ThreadPoolTaskExecutor.this.beforeExecute(thread, task);
}
@Override
protected void afterExecute(Runnable task, Throwable ex) {
ThreadPoolTaskExecutor.this.afterExecute(task, ex);
}
};
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
if (this.prestartAllCoreThreads) {
executor.prestartAllCoreThreads();
}
this.threadPoolExecutor = executor;
return executor;
}
ThreadPoolTaskExecutor
를 구성하며 설정한 값을 기반으로 블로킹 큐를 생성하고 컨슈머인 ThreadPoolExecutor
를 생성한다.
블로킹 큐는 workQueue
라는 이름으로 ThreadPoolExecutor
에 넘겨진다.
@Override
public void execute(Runnable task) {
Executor executor = getThreadPoolExecutor();
try {
executor.execute(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(executor, task, ex);
}
}
ThreadPoolTaskExecutor
에 task
를 전달하면 컨슈머인 ThreadPoolExecutor
에게 전달된다.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
ThreadPoolExecutor
의 execute
에서는 workQueue
의 상태에 따라 offer
혹은 addWorker
를 통해 큐를 추가하여 프로듀서의 요청을 처리한다.
'개발' 카테고리의 다른 글
네임드 락과 커밋 (0) | 2025.03.20 |
---|---|
동적인 락 순서에 의한 데드락 (0) | 2025.03.07 |
SQS 리스너 구현기 (0) | 2025.01.16 |
도메인 이벤트 모듈 구성 (0) | 2025.01.16 |
이벤트 모듈 설계 문서 (0) | 2025.01.14 |