본문 바로가기

Data

[Spark] Delta Merge ConcurrentAppendException 탈출기

데이터를 통합하는 파이프라인을 운영하면서 겪은 ConcurrentAppendException 에러 탈출 기록용

 

문제 상황

세 개의 스트리밍 파이프라인이 하나의 Delta 테이블로 Merge하도록 구성했다.

모든 파이프라인의 목적지가 동일했고, 동시에 Write 작업이 발생하면서 ConcurrentAppendException이 발생했다.

 

1차 대응

에러 발생 시 sleep 후 재시도하는 방식을 적용했다.

try:
    # merge logic
except ConcurrentAppendException:
    time.sleep(retry_delay)
    retry()

일시적으로는 정상 처리되는 듯 보였지만, 시간이 지날수록 처리 지연과 실패 발생률이 증가했다.

 

2차 대응

동시 Write 자체를 방지하기 위해 스트리밍 방식을 제거하고, Batch Job을 순차적으로 실행하도록 변경했다.

이 과정에서 클러스터 기동 시간을 줄이기 위해 databricks instance pool을 활용하고, job concurrency = 2로 설정했다.
하지만,, job 간 실행 타이밍이 예상보다 자주 겹치며 다시 ConcurrentAppendException이 발생했다.

얼럿 노이즈가 심한 것 같아서 에러 발생 시 로그만 출력하도록 처리하였다.

except ConcurrentAppendException as e:
    print("ConcurrentAppendException 발생:", e)
    pass

다른 통계 파이프라인 개발 중 데이터 누락을 발견했다...

원인은 위 로직이었는데, 그래도 에러가 발생한 거니까 체크포인트가 넘어가지 않을 거라 예상했지만 체크포인트가 저장되고 있었다.(지금 생각해보니까 당연하다...)

 

최최종 대응

- 모든 Exceptiondms raise 하도록 try catch는 제거했다.

- 클러스터 기동 시간을 절약하겠다는 마음을 내려놓고 job concurrency는 1로 변경하고, job이 끝날 때 다음 job을 trigger 하도록 구성하여 처리는 조금 지연되겠지만 절대로 concurrency exception이 발생하지 않도록 했다.

 

** 다행히 아직 서비스에 사용 중인 데이터가 아니라서 별도 backfill 작업은 진행하지 않았다..

'Data' 카테고리의 다른 글

[Spark] Kafka startingTimestamp 옵션 활용  (0) 2025.04.17
[Spark] Delta Lack Change Data Feed  (0) 2025.04.06
[Spark] DataFrame API 테스트 하기  (0) 2025.04.05
[Spark] 배치 조회 구현하기  (0) 2025.03.25