본문 바로가기

Data

[Spark] Kafka startingTimestamp 옵션 활용

카프카 스트리밍 파이프라인을 개발하면서 startingTimestamp 옵션을 활용한 경험 기록

 

Use Case

- 스트리밍 파이프라인을 처음 실행할 때, 과거 전체 데이터를 모두 읽지 않고 지정한 시점 이후의 이벤트만 읽어와 처리 범위를 제어하기 위한 초기 checkpoint 저장 용도로 활용

- Kafka에 적재된 실제 데이터 대상으로 파이프라인 로직을 구현 및 검증할 때 활용

- 이상 데이터 디버깅을 위해 해당 데이터 적재 시점을 기준으로 raw 데이터를 조회할 때 활용

import time
import pytz
from datetime import datetime

dt = datetime(2025, 4, 17, 0, 0, 0, tzinfo=pytz.UTC)
starting_timestamp = int(time.mktime(dt.timetuple()) * 1000)

df = (spark
    .read # or readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_servers)
    .option("subscribe", topics)
    .option("startingTimestamp", starting_timestamp)
    .option("startingOffsetsByTimestampStrategy", "latest")
    .load()
)