Use Case
세 개의 소스 데이터를 하나의 Delta Lake에 merge 한 후에 rds에 sink하기 위한 대상을 추출할 때 Delta 테이블의 Change Data Feed 기능을 활용했다.
기존 운영 중인 파이프라인을 참고했을 때 delta의 created_at, updated_at 컬럼으로 insert, update된 레코드를 추출하는 로직이었는데 CDF를 활용한 로직을 통해 델타 테이블 전체 조회 로직을 제거할 수 있었다!
Change Data Feed(CDF)란
Change Data Feed(CDF)는 Delta 테이블의 버전 간 행 단위 변경 사항을 추적할 수 있도록 해주는 기능입니다.
Delta 테이블에서 CDF 기능이 활성화되면, 런타임은 테이블에 기록되는 모든 데이터에 대한 변경 이벤트를 저장합니다. 여기에는 각 행의 데이터뿐만 아니라 해당 행이 삽입(insert), 삭제(delete), 또는 업데이트(update) 되었는지를 나타내는 메타데이터도 포함됩니다.
CDF 활성화 방법
ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
코드
merge를 시작한 timestamp를 기준으로 insert, update 된 레코드만 찾는 코드
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", starting_timestamp)
.table(table_path)
.where(F.col("_change_type").isin(["insert", "update_postimage"]))
.drop("_change_type", "_commit_version", "_commit_timestamp")
_change_type에는 insert, update_preimage(update 되기 전 값) , update_postimage, delete가 있기 때문에
insert, update_postimage만 필터링하면 insert, update된 레코드를 추출할 수 있다.
참고
https://docs.databricks.com/gcp/en/delta/delta-change-data-feed
Use Delta Lake change data feed on Databricks | Databricks Documentation
Learn how to get row-level change information from Delta tables using the Delta Lake change data feed.
docs.databricks.com
https://velog.io/@azuresky/Change-data-feed
Change data feed
Change Data Feed (CDF) 기능은 Delta 테이블이 버전 간에 행 수준 변경을 추적할 수 있게 합니다. Delta 테이블에서 이 기능을 활성화하면 런타임은 테이블에 쓰여진 모든 데이터에 대해 "변경 이벤트"를
velog.io
'Data' 카테고리의 다른 글
[AWS DMS] CDC 최신 데이터 판별 (0) | 2025.05.20 |
---|---|
[Spark] Delta Merge ConcurrentAppendException 탈출기 (0) | 2025.04.26 |
[Spark] Kafka startingTimestamp 옵션 활용 (0) | 2025.04.17 |
[Spark] DataFrame API 테스트 하기 (0) | 2025.04.05 |
[Spark] 배치 조회 구현하기 (0) | 2025.03.25 |