Data (5) 썸네일형 리스트형 [Spark] Delta Merge ConcurrentAppendException 탈출기 데이터를 통합하는 파이프라인을 운영하면서 겪은 ConcurrentAppendException 에러 탈출 기록용 문제 상황세 개의 스트리밍 파이프라인이 하나의 Delta 테이블로 Merge하도록 구성했다.모든 파이프라인의 목적지가 동일했고, 동시에 Write 작업이 발생하면서 ConcurrentAppendException이 발생했다. 1차 대응에러 발생 시 sleep 후 재시도하는 방식을 적용했다.try: # merge logicexcept ConcurrentAppendException: time.sleep(retry_delay) retry()일시적으로는 정상 처리되는 듯 보였지만, 시간이 지날수록 처리 지연과 실패 발생률이 증가했다. 2차 대응동시 Write 자체를 방지하기 위해 스트리밍.. [Spark] Kafka startingTimestamp 옵션 활용 카프카 스트리밍 파이프라인을 개발하면서 startingTimestamp 옵션을 활용한 경험 기록 Use Case- 스트리밍 파이프라인을 처음 실행할 때, 과거 전체 데이터를 모두 읽지 않고 지정한 시점 이후의 이벤트만 읽어와 처리 범위를 제어하기 위한 초기 checkpoint 저장 용도로 활용- Kafka에 적재된 실제 데이터 대상으로 파이프라인 로직을 구현 및 검증할 때 활용- 이상 데이터 디버깅을 위해 해당 데이터 적재 시점을 기준으로 raw 데이터를 조회할 때 활용import timeimport pytzfrom datetime import datetimedt = datetime(2025, 4, 17, 0, 0, 0, tzinfo=pytz.UTC)starting_timestamp = int(time... [Spark] Delta Lack Change Data Feed Use Case세 개의 소스 데이터를 하나의 Delta Lake에 merge 한 후에 rds에 sink하기 위한 대상을 추출할 때 Delta 테이블의 Change Data Feed 기능을 활용했다.기존 운영 중인 파이프라인을 참고했을 때 delta의 created_at, updated_at 컬럼으로 insert, update된 레코드를 추출하는 로직이었는데 CDF를 활용한 로직을 통해 델타 테이블 전체 조회 로직을 제거할 수 있었다! Change Data Feed(CDF)란Change Data Feed(CDF)는 Delta 테이블의 버전 간 행 단위 변경 사항을 추적할 수 있도록 해주는 기능입니다.Delta 테이블에서 CDF 기능이 활성화되면, 런타임은 테이블에 기록되는 모든 데이터에 대한 변경 이벤트를.. [Spark] DataFrame API 테스트 하기 ** Databricks 환경에서 개발하는 거라서 디버깅 하기 정말 좋은 환경이지만,,,~ DataFrame API 디버깅, 테스트하면서 불편했던 점- 여러 로직 중에 딱 한 부분만 디버깅 하고 싶은데 그 결과를 보려고 다른 곳들 주석 처리하면서 해당 결과를 컬럼 추가했다가 drop 했다가 반복 Mock Dataframe 만들기gpt한테 코드 보여주고 mock dataframe 달라고 하면 준다.from pyspark.sql.types import StructType, StructField, StringTypefrom pyspark.sql import Rowfrom pyspark.sql import functions as F# 스키마 정의schema = StructType([ StructField(.. [Spark] 배치 조회 구현하기 Spring Boot로 개발된 파이프라인을 이관하게 되었다.로직은 복잡하지만 이미 검증된 로직이기 때문에 고대로 옮기면 될 줄 알았는데 고대로 옮기는 게 쉽지 않았다. 특히 어려웠던 것..spring boot는 단건 조회이고, 객체 뽑아서 재활용이 가능하고, 이렇게 저렇게 중복 필드명을 가져도 된다.spark는 스트리밍 방식으로 배치 조회이고, 각 레코드에 맞는 객체를 조회(join)해서 붙여놔야 한다, df에 중복 필드명도 안 된다. 아래는 객체 조회 방식을 spark로 재구현한 코드이다.Kotlin 코드fun getObject(): Object? { val object = if (조건1) { repository.findAllByIdAndName() .. 이전 1 다음