본문 바로가기

Data

[Spark] Delta Lack Change Data Feed

Use Case

세 개의 소스 데이터를 하나의 Delta Lake에 merge 한 후에 rds에 sink하기 위한 대상을 추출할 때 Delta 테이블의 Change Data Feed 기능을 활용했다.

기존 운영 중인 파이프라인을 참고했을 때 delta의 created_at, updated_at 컬럼으로 insert, update된 레코드를 추출하는 로직이었는데 CDF를 활용한 로직을 통해 델타 테이블 전체 조회 로직을 제거할 수 있었다!

 

Change Data Feed(CDF)란

Change Data Feed(CDF)는 Delta 테이블의 버전 간 행 단위 변경 사항을 추적할 수 있도록 해주는 기능입니다.
Delta 테이블에서 CDF 기능이 활성화되면, 런타임은 테이블에 기록되는 모든 데이터에 대한 변경 이벤트를 저장합니다. 여기에는 각 행의 데이터뿐만 아니라 해당 행이 삽입(insert), 삭제(delete), 또는 업데이트(update) 되었는지를 나타내는 메타데이터도 포함됩니다.

 

CDF 활성화 방법

ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

 

코드

merge를 시작한 timestamp를 기준으로 insert, update 된 레코드만 찾는 코드

spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingTimestamp", starting_timestamp)
    .table(table_path)
    .where(F.col("_change_type").isin(["insert", "update_postimage"]))
    .drop("_change_type", "_commit_version", "_commit_timestamp")

_change_type에는 insertupdate_preimage(update 되기 전 값) , update_postimagedelete가 있기 때문에

insert, update_postimage만 필터링하면 insert, update된 레코드를 추출할 수 있다.

 

참고

https://docs.databricks.com/gcp/en/delta/delta-change-data-feed

 

Use Delta Lake change data feed on Databricks | Databricks Documentation

Learn how to get row-level change information from Delta tables using the Delta Lake change data feed.

docs.databricks.com

https://velog.io/@azuresky/Change-data-feed

 

Change data feed

Change Data Feed (CDF) 기능은 Delta 테이블이 버전 간에 행 수준 변경을 추적할 수 있게 합니다. Delta 테이블에서 이 기능을 활성화하면 런타임은 테이블에 쓰여진 모든 데이터에 대해 "변경 이벤트"를

velog.io