카프카 스트리밍 파이프라인을 개발하면서 startingTimestamp 옵션을 활용한 경험 기록
Use Case
- 스트리밍 파이프라인을 처음 실행할 때, 과거 전체 데이터를 모두 읽지 않고 지정한 시점 이후의 이벤트만 읽어와 처리 범위를 제어하기 위한 초기 checkpoint 저장 용도로 활용
- Kafka에 적재된 실제 데이터 대상으로 파이프라인 로직을 구현 및 검증할 때 활용
- 이상 데이터 디버깅을 위해 해당 데이터 적재 시점을 기준으로 raw 데이터를 조회할 때 활용
import time
import pytz
from datetime import datetime
dt = datetime(2025, 4, 17, 0, 0, 0, tzinfo=pytz.UTC)
starting_timestamp = int(time.mktime(dt.timetuple()) * 1000)
df = (spark
.read # or readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_servers)
.option("subscribe", topics)
.option("startingTimestamp", starting_timestamp)
.option("startingOffsetsByTimestampStrategy", "latest")
.load()
)
'Data' 카테고리의 다른 글
[Spark] Delta Merge ConcurrentAppendException 탈출기 (0) | 2025.04.26 |
---|---|
[Spark] Delta Lack Change Data Feed (0) | 2025.04.06 |
[Spark] DataFrame API 테스트 하기 (0) | 2025.04.05 |
[Spark] 배치 조회 구현하기 (0) | 2025.03.25 |