Data
[Spark] Kafka startingTimestamp 옵션 활용
코딩콩
2025. 4. 17. 21:29
카프카 스트리밍 파이프라인을 개발하면서 startingTimestamp 옵션을 활용한 경험 기록
Use Case
- 스트리밍 파이프라인을 처음 실행할 때, 과거 전체 데이터를 모두 읽지 않고 지정한 시점 이후의 이벤트만 읽어와 처리 범위를 제어하기 위한 초기 checkpoint 저장 용도로 활용
- Kafka에 적재된 실제 데이터 대상으로 파이프라인 로직을 구현 및 검증할 때 활용
- 이상 데이터 디버깅을 위해 해당 데이터 적재 시점을 기준으로 raw 데이터를 조회할 때 활용(동일 이벤트 재발행까지)
초기 체크포인트 저장 용도로 활용
import time
import pytz
from datetime import datetime
dt = datetime(2025, 4, 17, 0, 0, 0, tzinfo=pytz.UTC)
starting_timestamp = int(time.mktime(dt.timetuple()) * 1000)
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_servers)
.option("subscribe", topics)
.option("startingTimestamp", starting_timestamp)
.option("startingOffsetsByTimestampStrategy", "latest")
.load()
)
(
df.writeStream.foreachBatch(process_method)
.option("checkpointLocation", checkpoint_location)
.trigger(availableNow=True)
.start()
)
- readStream으로 읽고
- writeStream의 trigger option을 availableNow=True로 설정하여 한 사이클만 돌리고 일단 끄기
=> 체크포인트 저장 완
특정 이벤트 필터링해서 재발행
def parse_df(stream_df):
event_schema = StructType([...])
return (
stream_df.withColumn("raw_data", F.col("value").cast(StringType()))
.withColumn("parsed_value", F.from_json(F.col("value").cast("string"), event_schema))
)
import time
import pytz
from datetime import datetime
dt = datetime(2025, 4, 17, 0, 0, 0, tzinfo=pytz.UTC)
starting_timestamp = int(time.mktime(dt.timetuple()) * 1000)
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", broker_servers)
.option("subscribe", topics)
.option("startingTimestamp", starting_timestamp)
.option("startingOffsetsByTimestampStrategy", "latest")
.load()
)
# 특정 이벤트 필터링
result = (
parse_df(df)
.filter(F.col("column_name") == "value")
.withColumnRenamed("raw_data", "value")
.select("value")
)
# 저장
(
result.write.format("kafka")
.option("kafka.bootstrap.servers", broker_servers)
.option("topic", topic)
.mode("append")
.save()
)
- read로 가져와서 필터링하고
- 기존 이벤트를 고대로 write