이벤트 자동 발행
EventPublishingRepositoryProxyPostProcessor
spring-data-commons
라이브러리의 EventPublishingRepositoryProxyPostProcessor
는 CrudRepository.save(Object)
와 CrudRepository.delet(Object)
메서드를 인터셉트하여 @DomainEvent
를 발행하고 @AfterDomainEventPublication
어노테이션이 붙은 메서드를 실행하는 MethodInterceptor
를 등록합니다.@DomainEvent
와 @AfterDomainEventPublication
를 직접 선언하거나 AbstractAggregateRoot
를 상속하는 방식으로 이벤트를 다루는 객체는 자유롭게 만들 수 있지만 이벤트 발행 시점이 CrudRepository.save(Object)
와 CrudRepository.delet(Object)
로 고정되어 있어 엔티티 객체가 아니면 EventPublishingRepositoryProxyPostProcessor
로 등록되는 MethodInterceptor
를 활용하여 이벤트를 발행할 수 없다는 한계가 존재합니다.
이에 저는 EventPublishingRepositoryProxyPostProcessor
를 참고하여 엔티티가 아닌 POJO 도메인 객체에서도 유사한 방식으로 이벤트를 발행할 수 있도록 도메인 이벤트 모듈을 구성해 보았습니다.
의존성
dependencies {
implementation("org.springframework.data:spring-data-commons")
implementation("org.jmolecules.integrations:jmolecules-starter-ddd:${DependencyVersion.JMOLECULES}")
implementation("org.springframework.modulith:spring-modulith-starter-core")
}
org.springframework.data:spring-data-commons
@DomainEvent
,@AfterDomainEventPublication
,AbstractAggregateRoot
클래스가 포함되어 있습니다.
org.jmolecules.integrations:jmolecules-starter-ddd
@AggregateRoot
클래스가 포함되어 있습니다.
org.springframework.modulith:spring-modulith-starter-core
org.springframework.data:spring-data-commons
의 이벤트를 다루는 클래스와@AggregateRoot
를 활용하여 이벤트 발행과 수신을 문서화합니다.
구성 클래스
DomainAbstractAggregateRoot
abstract class DomainAbstractAggregateRoot<A : DomainAbstractAggregateRoot<A>> : AbstractAggregateRoot <A>()
AbstractAggregateRoot
를 상속하여 사용합니다.
문서화
org.springframework.modulith:spring-modulith-starter-core
는 org.springframework.modulith.core.ArchitecturallyEvidentType
를 사용해 어그리게이트 루트 객체를 판별하고 이를 바탕으로 이벤트와 관련된 문서를 구성합니다.ArchitecturallyEvidentType
의 구현체 중 하나인 JMoleculesArchitecturallyEvidentType
는 AggregateRoot
와 @AggregateRoot
를 활용하여 루트 객체를 판별하는 메서드를 가지고 있고 도메인 이벤트 모듈에서는 이를 활용하여 문서화를 진행하려 합니다.
// JMoleculesArchitecturallyEvidentType
@Override
public boolean isAggregateRoot() {
JavaClass type = getType();
return isAnnotatedWith(org.jmolecules.ddd.annotation.AggregateRoot.class).test(type) //
|| type.isAssignableTo(org.jmolecules.ddd.types.AggregateRoot.class);
}
@PublishEvents
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class PublishEvents
EventPublishingRepositoryProxyPostProcessor
에서는 CrudRepository.save(Object)
와 CrudRepository.delet(Object)
메서드를 실행한 이후에만 이벤트를 발행할 수 있어 이벤트 발행 시점에 대한 자유가 없다는 아쉬움이 있었습니다.
이에 이벤트 발행 시점을 직접 지정할 수 있도록 하는 마커 인터페이스 @PublishEvents
를 추가하였습니다.
DomainEventPublishingMethodInterceptor
class DomainEventPublishingMethodInterceptor(
private val method: DomainEventPublishingMethod,
private val publisher: ApplicationEventPublisher,
) : MethodInterceptor {
override fun invoke(invocation: MethodInvocation): Any? {
val result = invocation.proceed()
if (result is DomainAbstractAggregateRoot<*>) {
method.publishEventsFrom(listOf(result), publisher, publicationRepository)
}
return result
}
}
이벤트 발행을 위한 MethodInterceptor
입니다.@PublishEvents
를 통해 선택된 메서드에 적용되고 해당 메서드의 결과가 DomainAbstractAggregateRoot
를 상속한 클래스여야 합니다.
DomainEventPublishingMethod
class DomainEventPublishingMethod(
private val type: Class<*>,
private val publishingMethod: Method?,
private val clearingMethod: Method? = null,
) {
companion object {
val NONE = DomainEventPublishingMethod(Any::class.java, null, null)
fun of(type: Class<*>): DomainEventPublishingMethod {
if (!type.superclass.isAssignableFrom(DomainAbstractAggregateRoot::class.java)) {
throw IllegalArgumentException("Type must extend DomainAbstractAggregateRoot: $type")
}
if (!type.isAnnotationPresent(AggregateRoot::class.java)) {
throw IllegalArgumentException("Type must be annotated with @AggregateRoot: $type")
}
val result =
from(
type,
getDetector(type, DomainEvents::class.java),
Supplier {
getDetector(
type,
AfterDomainEventPublication::class.java,
)
},
)
return result
}
}
fun publishEventsFrom(
aggregates: Iterable<*>,
publisher: ApplicationEventPublisher,
) {
for (aggregateRoot in aggregates) {
if (!type.isInstance(aggregateRoot)) {
continue
}
for (event in asCollection(ReflectionUtils.invokeMethod(publishingMethod!!, aggregateRoot), null)) {
publisher.publishEvent(event)
}
if (clearingMethod != null) {
ReflectionUtils.invokeMethod(clearingMethod, aggregateRoot)
}
}
}
}
private fun from(
type: Class<*>,
publishing: AnnotationDetectionMethodCallback<*>,
clearing: Supplier<AnnotationDetectionMethodCallback<*>>,
): DomainEventPublishingMethod {
if (!publishing.hasFoundAnnotation()) {
return NONE
}
val eventMethod = publishing.getMethod()!!
ReflectionUtils.makeAccessible(eventMethod)
return DomainEventPublishingMethod(
type,
eventMethod,
getClearingMethod(clearing.get()),
)
}
@Nullable
private fun getClearingMethod(clearing: AnnotationDetectionMethodCallback<*>): Method? {
if (!clearing.hasFoundAnnotation()) {
return null
}
val method = clearing.getRequiredMethod()
ReflectionUtils.makeAccessible(method)
return method
}
private fun <T : Annotation> getDetector(
type: Class<*>,
annotation: Class<T>,
): AnnotationDetectionMethodCallback<T> {
val callback = AnnotationDetectionMethodCallback(annotation)
ReflectionUtils.doWithMethods(type, callback)
return callback
}
of 생성자
해당 클래스가 DomainAbstractAggregateRoot
를 상속하고 @AggregateRoot
붙어 있는 클래스인지 확인하고 저장합니다.
이후 @DomainEvents
가 붙어 있는 메서드와 @AfterDomainEventPublication
가 붙어 있는 메서드를 찾아 저장합니다.
// AbstractAggregateRoot
@DomainEvents
protected Collection<Object> domainEvents() {
return Collections.unmodifiableList(domainEvents);
}
@AfterDomainEventPublication
protected void clearDomainEvents() {
this.domainEvents.clear();
}
AbstractAggregateRoot
를 상속하고 있는 DomainAbstractAggregateRoot
는 위의 두 메서드가 해당합니다.
DomainAbstractAggregateRoot
는 AbstractAggregateRoot
를 상속하였기에 AbstractAggregateRoot
의 domainEvents
와 clearDomainEvents
메서드가 등록되게 됩니다.
publishEventsFrom
우선 생성자로 저장한 타입을 바탕으로 파라미터로 전달받은 aggregate
가 동일한 타입인지 확인합니다.
리플렉션을 활용하여 publishingMethod
에서 이벤트 객체를 획득합니다.
그리고 획득한 객체를 발행합니다.
이후 clearingMethod
가 존재한다면 해당 메서드를 리플랙션을 활용하여 실행시킵니다.
DomainEventPublishingProxyPostProcessor
@Component
class DomainEventPublishingProxyPostProcessor(
private val publisher: ApplicationEventPublisher,
) : BeanPostProcessor {
override fun postProcessAfterInitialization(
bean: Any,
beanName: String,
): Any? {
val methods = bean.javaClass.methods
val domainEventPublishingMethods = methods.filter { it.getAnnotation(PublishEvents::class.java) != null }
if (domainEventPublishingMethods.isNotEmpty()) {
val proxyFactory = ProxyFactory(bean)
domainEventPublishingMethods.forEach { method ->
val returnType = method.returnType
val domainEventPublishingMethod = DomainEventPublishingMethod.of(returnType)
proxyFactory.addAdvice(
DomainEventPublishingMethodInterceptor(domainEventPublishingMethod, publisher),
)
}
return proxyFactory.proxy
}
return bean
}
}
fun asCollection(
@Nullable source: Any?,
@Nullable method: Method?,
): Iterable<Any> {
if (source == null) {
return emptyList()
}
if (method != null && method.name.startsWith("saveAll")) {
return source as Iterable<Any>
}
if (MutableCollection::class.java.isInstance(source)) {
return source as Collection<Any>
}
return listOf(source)
}
BeanPostProcessor
를 상속하여 postProcessAfterInitialization
에 @PublishEvents
가 메서드에 추가된 빈을 찾아 DomainEventPublishingMethodInterceptor
를 생성하여 프록시 객체로 다시 등록하는 ProxyPostProcessor
입니다.
이벤트 문서화
org.springframework.modulith:spring-modulith-starter-core
에서 제공하는 ApplicationModules
와 Documenter
를 활용하여 문서화를 진행할 수 있습니다.
class EventDocument {
@Test
fun `create event document`() {
val modules =
ApplicationModules.of(
"패키지 주소"
)
Documenter(modules)
.writeModuleCanvases(
Documenter.CanvasOptions
.defaults()
.revealInternals()
.revealEmptyLines(),
)
}
}
위의 테스트를 실행한 이후 build/spring-modulith-docs
에서 adoc
형식의 이벤트의 발행과 수신 정보를 확인할 수 있는 문서를 확인할 수 있습니다.
이벤트 발행 자동 저장
Spring Modulith에서는 JPA
,JDBC
, MongoDB
, Neo4j
등 spring-modulith-start-*
라이브러리를 제공하여 이벤트 발행 기록을 저장할 수 있도록 지원하고 있습니다.
하지만 해당 라이브러리에서 제공하는 EventPublicationRegistry
는 트랜잭션 커밋이 발생한 이후 전달되는 @TransactionalEventListener
에 대해 자동으로 이벤트 발행 기록 저장을 지원해주고 있어 트랜잭션 커밋이 발생하지 않는 상황에서 발행되는 이벤트에 대해서도 동일한 형식으로 이벤트를 저장, 관리할 수 있도록 도메인 이벤트 모듈에 추가 구현을 진행하였습니다.
Spring Modulith의 이벤트 발행 저장 과정
// PersistentApplicationEventMulticaster
public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
var type = eventType == null ? ResolvableType.forInstance(event) : eventType;
var listeners = getApplicationListeners(event, type);
if (listeners.isEmpty()) {
return;
}
new TransactionalEventListeners(listeners)
.ifPresent(it -> storePublications(it, getEventToPersist(event)));
for (ApplicationListener listener : listeners) {
listener.onApplicationEvent(event);
}
}
AbstractApplicationContext
의 multicastEvent
를 통해 발행된 이벤트를 멀티캐스팅 하며 PersistentApplicationEventMulticaster
의 multicastEvent
가 실행됩니다.
우선 모든 listeners
를 조회한 후 listeners
중 TransactionalEventListeners
에 해당하는 listeners
의 이벤트를 저장하고 이벤트를 발행합니다.
public TransactionalEventListeners(Collection<ApplicationListener<?>> listeners) {
Assert.notNull(listeners, "ApplicationListeners must not be null!");
this.listeners = (List) listeners.stream()
.filter(TransactionalApplicationListener.class::isInstance)
.map(TransactionalApplicationListener.class::cast)
.filter(it -> it.getTransactionPhase().equals(TransactionPhase.AFTER_COMMIT))
.sorted(AnnotationAwareOrderComparator.INSTANCE)
.toList();
}
TransactionalEventListeners
에 해당하는 listener
는 TransactionalApplicationListener
이고 TransactionPhase
가 TransactionPhase.AFTER_COMMIT
인 listener
입니다.
의존성
dependencies {
implementation("org.springframework.modulith:spring-modulith-events-core")
implementation("org.springframework.modulith:spring-modulith-events-api")
implementation("org.springframework.modulith:spring-modulith-starter-jdbc")
}
이벤트 발행 저장 시퀀스
구성 클래스
EventPublication
class EventPublication(
private val event: Event,
private val targetIdentifier: PublicationTargetIdentifier,
private val publicationDate: Instant,
private var completionDate: Optional<Instant> = Optional.empty(),
) : TargetEventPublication {
override fun markCompleted(instant: Instant) {
completionDate = Optional.of(instant)
}
override fun getIdentifier(): UUID = UUID.fromString(event.eventId)
override fun getEvent(): Any = event
override fun getPublicationDate(): Instant = publicationDate
override fun getCompletionDate(): Optional<Instant> = completionDate
override fun getTargetIdentifier(): PublicationTargetIdentifier = targetIdentifier
}
발행 이벤트를 저장하는 EventPublicationRepository
의 create
메서드는 TargetEventPublication
를 파라미터로 받기에 TargetEventPublication
를 상속하여 구현하였습니다.getIdentifier
의 경우 Event
클래스에서도 UUID
를 사용하여 eventId
를 생성하고 있어 동일하게 설정하였습니다.getTargetIdentifier
의 PublicationTargetIdentifier
는 이벤트가 발행되어 처리되는 이벤트 리스너의 주소를 말합니다.
이때 Spring Modulith에서 제공하는 구현의 경우 이벤트 리스너의 주소가 getTargetIdentifier
의 값에 해당하지만 도메인 이벤트의 경우 getTargetIdentifier
의 값은 이벤트 재발행시 발행될 리스너가 됩니다.
이는 이벤트 재발행시에 TransactionalEventListeners
에서의 리스너와 비교하여 동일한 값에 해당하는 리스너에 이벤트를 재발행하기 때문에 트랜잭션 커밋이 없는 이벤트 리스너의 경우 별도의 이벤트 재발행을 지원하기 위해서는 @TransactionalEventListner
의 추가 리스너가 필요하고 해당 리스너가 이벤트 발행 기록에도 기록되어야 하기 때문입니다.
@Component
class DomainEventListener() {
@Async
@EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
fun onEvent(event: DomainEvent) {
// ... 이벤트 처리
}
@TransactionalEventListener
fun onInCompleteEvent(event: DomainEvent) {
// ... 리플레이 이벤트 처리
}
}
TraceAbleEvent
abstract class TraceAbleEvent(
val targetIdentifier: PublicationTargetIdentifier,
eventId: String = EventUtils.generateEventId(),
eventTime: LocalDateTime = EventUtils.generateEventPublishedTime(),
) : Event(
eventId,
eventTime,
)
targetIdentifier
이벤트가 처리될 리스너를 의미합니다.
DomainEventPublishingMethod
// DomainEventPublishingMethod
fun publishEventsFrom(
aggregates: Iterable<*>,
publisher: ApplicationEventPublisher,
publicationRepository: EventPublicationRepository,
) {
for (aggregateRoot in aggregates) {
if (!type.isInstance(aggregateRoot)) {
continue
}
for (event in asCollection(ReflectionUtils.invokeMethod(publishingMethod!!, aggregateRoot), null)) {
publisher.publishEvent(event)
if (event is TraceAbleEvent) {
publicationRepository.create(
EventPublication(
event = event,
targetIdentifier = event.targetIdentifier,
publicationDate = Instant.now(),
),
)
}
}
if (clearingMethod != null) {
ReflectionUtils.invokeMethod(clearingMethod, aggregateRoot)
}
}
}
기존 DomainEventPublishingMethod
의 publishEventsFrom
에서 이벤트를 발행한 이후 publicationRepository
를 사용하여 이벤트 발행 정보를 저장할 수 있도록 수정하였습니다.
이벤트 완료 처리
@Component
class DomainEventListener() {
@Async
@EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
fun onEvent(event: DomainEvent) {
// ... 이벤트 처리
eventPublicationRepository.markCompleted(UUID.fromString(event.eventId), Instant.now())
}
@TransactionalEventListener
fun onInCompleteEvent(event: DomainEvent) {
// ... 리플레이 이벤트 처리
eventPublicationRepository.markCompleted(UUID.fromString(event.eventId), Instant.now())
}
}
이벤트 발행의 경우 DomainEventPublishingMethod
에서 공통으로 처리할 수 있지만 완료 처리의 경우 이벤트를 구독하는 리스너 각각에서 처리를 완료한 이후에 진행하여야 합니다.EventPublication
에서 getIdentifier
를 이벤트의 아이디와 동일하게 설정하였기 때문에 EventPublicationRepository
의 markCompleted
를 사용할 때에도 이벤트의 아이디를 사용하여 완료 처리를 진행할 수 있습니다.
'개발' 카테고리의 다른 글
SQS 리스너 구현기 (0) | 2025.01.16 |
---|---|
이벤트 모듈 설계 문서 (0) | 2025.01.14 |
레디스를 메시지 브로커로 사용하기 위한 정리 (1) | 2024.12.03 |
테스트 객체 용어 정리 (0) | 2024.12.01 |
MC/DC 커버리지 (0) | 2024.12.01 |