위키
컴퓨터 과학에서 스트림은 시간이 지남에 따라 잠재적으로 무제한으로 제공되는 데이터 요소의 시퀀스다. 스트림은 컨베이어 벨트 위의 항목이 대량으로 처리되는 것이 아니라 한 번에 하나씩 처리되는 것으로 생각할 수 있다. 스트림은 배치 데이터와 다르게 처리된다.
스트림에는 잠재적으로 무제한의 데이터가 있기 때문에 일반 함수는 스트림 전체에서 작동할 수 없다. 공식적으로 스트림은 데이터(유한한)가 아니라 코드데이터(잠재적으로 무제한)다.
스트림에서 작동하여 다른 스트림을 생성하는 함수를 필터라고 하며, 함수 구성과 유사한 방식으로 파이프라인에 연결할 수 있다. 필터는 한 번에 스트림의 한 항목에 대해 작동하거나 이동 평균과 같은 여러 입력 항목을 기반으로 출력 항목을 만들 수 있다.
Stream Processing 장점
- 실시간 인사이트 확보
- 스트림 처리는 데이터가 도착하자마자 즉시 분석할 수 있어, 사기 탐지, 실시간 모니터링, 즉각적인 의사결정이 중요한 애플리케이션에 필수적이다.
- 지속적인 데이터 처리
- 데이터가 도착하는 즉시 처리되므로 워크플로우가 중단되지 않으며, 적시에 대응할 수 있다.
- 민첩성 향상
- 실시간 이벤트에 빠르게 대응할 수 있어, 변화가 빠른 산업에서 경쟁 우위를 확보할 수 있다.
- 이벤트 기반 처리에 적합
- 동적 확장성
- 데이터 유입량의 변화에 맞춰 성능을 일관되게 유지하며 확장 가능
Stream Processing 단점
- 복잡한 구현 구조
- 스트림 처리를 구현하려면 고도화도니 아키텍처, 전무 도구, 그리고 전문 지식이 필요하다.
- 높은 운영 비용
- 실시간 시스템은 상당한 컴퓨팅 자원을 요구하며, 이에 따라 총 운영비용이 상승할 수 있다.
- 데이터 정확성 문제
- 데이터 일관성 유지, 순서가 뒤바뀐 이벤트 처리 등에서 복잡한 문제가 발생할 수 있다.
- 지속적인 모니터링 필요
- 스트림 처리 시스템은 실시간 이슈 대응을 위해 항상 모니터링 및 유지보수가 필요하다.
- 과거 데이터 분석에 한계
- 스트림 처리는 현재 데이터에 집중하기 대문에, 상세한 과거 분석이 필요한 애플리케이션에는 부적할 수 있다.
Batch vs Stream Processing
Batch Processing
배치 처리(Batch Processing)는 수집된 대용량 데이터를 일괄적으로 처리하는 방식이다. 이 방식은 많은 자원을 요구하는 작업, 반복적인 업무, 실시간 처리가 필요 없는 방대한 데이터셋 관리에 특히 효과적이다. 따라서 데이터 웨어하우징, ETL(추출·변환·적재), 대규모 리포팅 같은 애플리케이션에 이상적이다. 배치 처리는 다양한 비즈니스 요구사항을 충족시킬 수 있는 유연성을 갖고 있어, 데이터 처리 방식으로 널리 채택되고 있다. 보통 배치 처리 작업은 자동화되어 있으며, 한 번 설정되면 사람의 개입 없이 스케줄에 따라 처리된다. 작업은 사전에 정의되어 있으며, 시스템은 일반적으로 비업무 시간(오프피크 시간대)에 작업을 실행하여 컴퓨팅 자원을 효율적으로 활용한다.
사람의 개입은 주로 다음과 같은 경우에 한정된다:
- 초기 파라미터 설정
- 에러 발생 시 트러블슈팅
- 출력 결과 검토
이런 점에서 배치 처리는 대규모 데이터 작업을 효율적이고 손이 덜 가는(hands-off) 방식으로 처리할 수 있게 한다.
배치 처리를 위한 다양한 ETL 도구가 있으며, 그 중 하나가 Apache Airflow입니다. Airflow는 데이터 오케스트레이션 파이프라인을 쉽게 구축하고, 스케줄 기반으로 실행하며, 간단한 모니터링 기능도 제공한다.
Stream Processing
스트림 처리(Stream processing)는 스트리밍 처리 또는 실시간 데이터 처리라고도 불리며, 데이터가 시스템을 통과하는 즉시 이를 처리하고 분석하기 위한 방식이다. 이는 일정 시간마다 데이터를 모아서 처리하는 배치 처리(batch processing)와는 달리, 데이터를 실시간으로 지속적이고 점진적으로 처리한다. 데이터는 센서, 로그, 거래 기록, 소셜 미디어 피드, 기타 실시간 데이터 소스 등 다양한 곳에서 수집된다. 이렇게 수집된 데이터 스트림(stream)은 시스템에 도착하는 즉시 필터링, 변환, 집계 등의 일련의 처리를 거친다.
이를 통해 다음과 같은 실시간 애플리케이션을 구현할 수 있다:
- 실시간 분석
- 경고(trigger) 발생
- 실시간 대시보드 업데이트
- 다른 시스템에 결과 전달 및 후속 조치
이러한 즉각적인 인사이트는 실시간 의사결정에 활용될 수 있다.
스트림 처리의 대표적인 활용 사례:
- 금융 시장의 실시간 분석
- 사기(Fraud) 탐지
- 네트워크 트래픽 모니터링
- 실시간 추천 시스템 등
스트림 처리 시스템의 특징:
- 빠르게 유입되는 데이터(고속 데이터)를 안정적으로 처리하기 위해, 지속적인 모니터링 및 파이프라인 관리 기능을 갖추고 있다.
- 시스템 성능, 데이터 흐름 상태, 처리 결과 등을 실시간으로 추적하고 점검할 수 있다.
대표적인 프레임워크: AWS Kinesis + Lambda
- Amazon Kinesis는 클라우드 기반의 스트리밍 데이터 수집·처리·분석 서비스입니다.
- AWS Lambda와 함께 사용하면, 복잡한 함수 실행이나 자동화도 가능하여 실시간 데이터 흐름 기반의 유연한 처리가 가능합니다.
Batch와 Stream Processing 차이
- 데이터 지연 시간
- 스트림 처리: 지연 시간 낮음
- 데이터가 도착하자마자 바로 처리되며, 거의 실시간으로 분석 및 의사결정이 가능하다.
- 즉각적인 반응이 중요한 애플리케이션에 적합
- 배치 치러: 지연시간이 높음
- 일정 시간동안 데이터 수집 후, 한 번에 처리
- 분석 타이밍이 중요하지 않은 시나리오에 적합
- 스트림 처리: 지연 시간 낮음
- 처리 가능한 데이터 양
- 스트림 처리: 실시간 데이터
- 지속적으로 유입되는 대용량 데이터를 처리할 수 있음
- 시스템 설계 및 인프라 확장성이 핵심
- 초고속 데이터 흐름을 처리하려면 강력한 인프라 필요
- 배치 처리: 큰 덩어리 데이터
- 대량의 데이터를 모아서 한 번에 처리하는 데 적합
- 사전 집계 또는 대규모 데이터 변환 작업에 강점
- 스트림 처리: 실시간 데이터
- 구현 및 유지보수 복잡성
- 스트림처리: 복잡도 높음
- 실시간 데이터 스트림을 다루기 위한 복잡한 인프라 필요
- 상태 관리, 장애 복구 등의 고려사항 많음
- 배치 처리: 복잡도 낮음
- 사전에 정의된 일정에 따라 작업 수행
- 구현 및 운영이 상대적으로 간단
- 스트림처리: 복잡도 높음
- 사용 사례
- 스트림 처리
- 실시간 인사이트와 즉각적인 조치가 필요한 상황에 적합
- 배치 처리
- 즉시 반응이 필요 없는 간헐적인 분석에 적합
- 스트림 처리
- 인프라 및 비용
- 스트림 처리
- 인프라: 고속 데이터 파이프라인, 실시간 처리 엔진, 분산 시스템 등 복잡한 인프라 요구
- 비용:
- 고성능 컴퓨팅 자원
- 상시 모니터링
- 실시간 확장성 유지 등으로 비용이 높을 수 있음
- 배치 처리
- 인프라: 주기적 데이터 저장 및 처리를 위한 인프라
- 비용:
- 기존 컴퓨팅 및 저장소 자원 활용 가능
- 연속적인 운영이 필요 없으므로 상대적으로 비용이 낮음
- 스트림 처리
자바로 구현한 예제
static class SensorEvent {
String key;
int value;
LocalDateTime timestamp;
public SensorEvent(String key, int value, LocalDateTime timestamp) {
this.key = key;
this.value = value;
this.timestamp = timestamp;
}
}
public static void main(String[] args) {
AtomicInteger counter = new AtomicInteger();
// 무한 루프를 통해 주기적으로 데이터를 수집하고 처리한다.
while (true) {
LocalDateTime start = LocalDateTime.now();
LocalDateTime end = start.plusSeconds(5);
// 5초 동안 생성된 센서 데이터를 수집한다.
List<SensorEvent> events = Stream.generate(() -> generateEvent(counter))
.takeWhile(takeEventBefore(end))
.toList();
// 센서별 평균 값과 개수를 계산한다.
Map<String, Pair<Double, Long>> perSensorResult = events.stream()
.collect(calcEventsPerSensor());
// 결과 출력
System.out.println("Window [" + start + " - " + end + "]");
perSensorResult.forEach(consumeResult());
}
}
private static SensorEvent generateEvent(AtomicInteger counter) {
return new SensorEvent(
"sensor-" + (counter.get() % 3),
20 + counter.incrementAndGet() % 10,
LocalDateTime.now());
}
private static Predicate<SensorEvent> takeEventBefore(LocalDateTime end) {
return e -> LocalDateTime.now().isBefore(end);
}
private static Collector<SensorEvent, ?, Map<String, Pair<Double, Long>>> calcEventsPerSensor() {
return groupingBy(
e -> e.key,
collectingAndThen(
teeing(
averagingInt(e -> e.value), // 평균 온도
counting(), // 사용된 데이터 수
Pair::of
),
p -> Pair.of(p.getFirst(), p.getSecond())
)
);
}
private static BiConsumer<String, Pair<Double, Long>> consumeResult() {
return (key, summary) ->
System.out.println(key + " 평균 온도: " + summary.getFirst() + ", 사용된 데이터 수: " + summary.getSecond());
}
- 예제 흐름
- 스트림을 통해 센서에서 데이터를 수집한다.
- 일정 단위로 데이터를 처리한다.
- 처리한 결과를 소비한다.
- 메서드 설명
takeEventBefore
메서드를 통해 5초 동안 생성된 센서 데이터를 수집한다.calcEventsPerSensor
메서드를 통해 수집한 데이터를 기반으로 원하는 작업(평균, 카운트)을 수행한다.consumeResult
메서드를 통해 생성된 결과를 소비한다.- 예제에서는 단순히 결과를 프린트한다.
- 모니터링 시스템 전송, 이상 탐지 / 알림 발송, DB 저장, 다른 시스템으로 메시지 전송, 파일 로깅, 머신러닝 모델 피드 입력 등 다른 다양한 방법으로 결과를 소비할 수 있다.
'개발' 카테고리의 다른 글
What do you mean by “Event-Driven”? 정리 (0) | 2025.05.07 |
---|---|
메시지 큐와 이벤트 스트림 (0) | 2025.05.02 |
[kotest] Add assertion error message 기여 과정 (0) | 2025.04.04 |
네임드 락과 커밋 (0) | 2025.03.20 |
동적인 락 순서에 의한 데드락 (0) | 2025.03.07 |