Data

[Spark] Kafka startingTimestamp 옵션 활용

코딩콩 2025. 4. 17. 21:29

카프카 스트리밍 파이프라인을 개발하면서 startingTimestamp 옵션을 활용한 경험 기록

 

Use Case

- 스트리밍 파이프라인을 처음 실행할 때, 과거 전체 데이터를 모두 읽지 않고 지정한 시점 이후의 이벤트만 읽어와 처리 범위를 제어하기 위한 초기 checkpoint 저장 용도로 활용

- Kafka에 적재된 실제 데이터 대상으로 파이프라인 로직을 구현 및 검증할 때 활용

- 이상 데이터 디버깅을 위해 해당 데이터 적재 시점을 기준으로 raw 데이터를 조회할 때 활용(동일 이벤트 재발행까지)

 

초기 체크포인트 저장 용도로 활용

import time
import pytz
from datetime import datetime

dt = datetime(2025, 4, 17, 0, 0, 0, tzinfo=pytz.UTC)
starting_timestamp = int(time.mktime(dt.timetuple()) * 1000)

df = (spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_servers)
    .option("subscribe", topics)
    .option("startingTimestamp", starting_timestamp)
    .option("startingOffsetsByTimestampStrategy", "latest")
    .load()
)

(
    df.writeStream.foreachBatch(process_method)
    .option("checkpointLocation", checkpoint_location)
    .trigger(availableNow=True)
    .start()
)

- readStream으로 읽고

- writeStream의 trigger option을 availableNow=True로 설정하여 한 사이클만 돌리고 일단 끄기

=> 체크포인트 저장 완

 

특정 이벤트 필터링해서 재발행

def parse_df(stream_df):
    event_schema = StructType([...])
    return (
        stream_df.withColumn("raw_data", F.col("value").cast(StringType()))
        .withColumn("parsed_value", F.from_json(F.col("value").cast("string"), event_schema))
    )

import time
import pytz
from datetime import datetime

dt = datetime(2025, 4, 17, 0, 0, 0, tzinfo=pytz.UTC)
starting_timestamp = int(time.mktime(dt.timetuple()) * 1000)

df = (spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_servers)
    .option("subscribe", topics)
    .option("startingTimestamp", starting_timestamp)
    .option("startingOffsetsByTimestampStrategy", "latest")
    .load()
)

# 특정 이벤트 필터링
result = (
    parse_df(df)
    .filter(F.col("column_name") == "value")
    .withColumnRenamed("raw_data", "value")
    .select("value")
)

# 저장
(
result.write.format("kafka")
    .option("kafka.bootstrap.servers", broker_servers)
    .option("topic", topic)
    .mode("append")
    .save()
)

- read로 가져와서 필터링하고

- 기존 이벤트를 고대로 write