최근 프로젝트를 진행하며 SQS를 사용하여 이벤트를 다룰 기회가 있어 해당 구현을 글로 남겨 보려 합니다.
SQS 설정
스프링에서 SQS를 사용을 지원하는 의존성은 spring-cloud-aws-messaging와 spring-cloud-aws-starter-sqs 두 가지가 존재합니다.
레퍼런스의 경우 spring-cloud-aws-messaging
조금 더 많이 존재하였지만 저는 aws-starter-sqs
를 선택하였습니다.
그 이유는 mvn
저장소에서 해당 의존성의 개발 상황을 확인해 본 결과 spring-cloud-aws-messaging
는 2021년 이후 더 이상 업데이트가 없는 반면, aws-starter-sqs
는 지금까지 꾸준히 업데이트되고 있었기 때문입니다.
의존성
dependencies {
/** aws - sqs */
implementation("io.awspring.cloud:spring-cloud-aws-starter-sqs:${DependencyVersion.AWS_SQS}")
}
설정 클래스
@Configuration
class SqsConfig {
@Value("\${aws.credentials.access-key}")
val accessKey: String? = null
@Value("\${aws.credentials.secret-key}")
val secretKey: String? = null
@Value("\${aws.region.static}")
val region: String? = null
private fun credentialProvider(): AwsCredentials =
object : AwsCredentials {
override fun accessKeyId(): String = accessKey ?: ""
override fun secretAccessKey(): String = secretKey ?: ""
}
@Bean
fun sqsAsyncClient(): SqsAsyncClient =
SqsAsyncClient
.builder()
.credentialsProvider(this::credentialProvider)
.region(Region.of(region))
.build()
@Bean
fun defaultSqsListenerContainerFactory(): SqsMessageListenerContainerFactory<Any> =
SqsMessageListenerContainerFactory
.builder<Any>()
.configure { opt ->
opt.acknowledgementMode(AcknowledgementMode.MANUAL)
}
.sqsAsyncClient(sqsAsyncClient())
.build()
@Bean
fun sqsTemplate(): SqsTemplate = SqsTemplate.newTemplate(sqsAsyncClient())
}
AcknowledgementMode
SqsMessageListenerContainerFactory
에서 설정할 수 있는 acknowledgementMode
는 ON_SUCCESS
, ALWAYS
, MANUAL
세 가지가 모드가 존재합니다.
기본 적으로 ON_SUCCESS
가 설정되고 해당 모드의 경우 SqsMessageListener
가 메시지를 성공적으로 처리하면 자동으로 응답을 보냅니다.
제가 설정한 MANUAL
의 경우 ON_SUCCESS
와 달리 SqsMessageListener
에서 응답을 명시적으로 보내야 합니다.
저는 명시적으로 응답을 보내는 행위를 통해 메시지 차리리 단위를 조금 더 명확히 구분할 수 있다고 생각하여 MANUAL
타입의 설정을 선호합니다.
SqsListener
SqsListener
를 구현하며 해당 리스너에 어떠한 책임을 부여할 것인지에 대해 많이 고민하였습니다.
저는 SQS와 같이 외부 서비스를 활용하는 경우 그것이 언제 다른 것으로 바뀔 수 있는 가능성을 고려해야 한다 생각하고 있습니다.
이에 저는 SqsListener
에는 메시지를 외부에서 구독하고 메시지를 애플리케이션 내부에서 사용할 이벤트로 바꾸어 발행하는 책임까지 부여하였습니다.
@Service
class SqsMessageReverseRelay(
private val applicationEventPublisher: ApplicationEventPublisher,
private val objectMapper: ObjectMapper,
private val eventMessageMapper: EventMessageMapper,
) : MessageReverseRelay() {
val log = KotlinLogging.logger { }
@SqsListener(queueNames = ["sqs"])
fun onMessage(
message: String,
acknowledgement: Acknowledgement,
) {
objectMapper
// message를 처리 할 수 있는 객체로 읽는다.
.readMessage(message)
// MessagePayload를 만든다. (생략가능)
.let {
MessagePayload(
eventId = EventUtils.generateEventId(),
eventType = it.eventType(),
eventTime = EventUtils.generateEventPublishedTime(),
data = it.data(),
)
}
// Event로 변환 시킨다.
.let {
eventMessageMapper.to(it)
}
// 변환 성공하면 Event를 발행한다.
.ifPresent {
publish(it)
}
// 메시지 완료 처리를 명시적으로 진행한다.
acknowledgement.acknowledge()
}
override fun publish(event: EmailSendEvent) {
log.info { "Publishing event: $event" }
applicationEventPublisher.publishEvent(event)
}
}
이와 같은 이벤트를 통한 외부 서비스와 애플리케이션의 분리는 SQS가 아닌 다른 이벤트 브로커가 추가되거나 교체되는 경우에도 어플리케이션 코드는 영향을 미치지 않아 외부 서비스에 대한 유연한 선택을 가능하게 한다는 장점이 있습니다.
메시지 구독, 처리 분리에 따른 추가 고려 사항
MessageReverseRelay
에 메시지 구독과 Event
발행에 대한 책임을 부여하며 브로커가 메시지가 성공적으로 처리되었는지 알지 못하는 상황이 되었습니다.
이에 메시지 구독, 처리에 대한 책임을 분리하며 메시지의 성공적인 처리에 대한 책임은 MessageReverseRelay
에서 Event
를 처리하는 EventListener
와 EventHandler
로 넘어갔다고 생각할 수 있었습니다.
리스너의 graceful shutdown
리스너가 동작하는 Thread의 설정
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// ...
// 스프링 컨테이너 종료 시 남은 태스크가 완료될 때까지 대기하도록 설정
executor.setWaitForTasksToCompleteOnShutdown(true);
// (선택) 대기 타임아웃 설정: 지정 시간 안에 끝나지 않으면 강제 종료
// executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
리스너가 동작하는 Thread
의 설정을 통해 리스너가 처리하고 있는 태스크가 있다면 스프링 컨테이너 종료 시 남은 태스크가 완료될 때까지 대기하도록 설정하였습니다.
이를 통해 이벤트가 완전히 처리될 수 있게 되었습니다.
이벤트 발행 기록 저장
이벤트가 너무 많다면 스레드 풀의 큐에서 대기하지 못하는 상황이 있을 수 있습니다.
이를 위해 MessageReverseRelay
에서 이벤트를 구독하고 Event
를 발행하면 이벤트 발행 기록을 저장할 필요가 있어 도메인 이벤트 모듈에서 구현하였던 이벤트 발행 기록 저장 부분을 SqsListener
에도 적용하였습니다.
@Component
class WrappedApplicationEventPublisher(
private val applicationEventPublisher: ApplicationEventPublisher,
private val eventPublicationRepository: EventPublicationRepository,
) {
fun publishEvent(event: TraceAbleEvent) {
applicationEventPublisher.publishEvent(event)
eventPublicationRepository.create(
EventPublication(
event = event,
targetIdentifier = event.targetIdentifier,
publicationDate = Instant.now(),
),
)
}
fun publishEvent(event: Event) {
applicationEventPublisher.publishEvent(event)
}
}
SqsListener
의 경우 도메인 객체를 사용하는 경우가 아니기 때문에 위의 코드와 같이 ApplicationEventPublisher
래핑 하여 TraceAbleEvent
를 발행할 때 이벤트 발행 이후 eventPublicationRepository
를 통해 이벤트 발행을 기록할 수 있도록 하였습니다.
'개발' 카테고리의 다른 글
도메인 이벤트 모듈 구성 (0) | 2025.01.16 |
---|---|
이벤트 모듈 설계 문서 (0) | 2025.01.14 |
레디스를 메시지 브로커로 사용하기 위한 정리 (1) | 2024.12.03 |
테스트 객체 용어 정리 (0) | 2024.12.01 |
MC/DC 커버리지 (0) | 2024.12.01 |