데이터를 통합하는 파이프라인을 운영하면서 겪은 ConcurrentAppendException 에러 탈출 기록용
문제 상황
네 개의 스트리밍 파이프라인이 하나의 Delta 테이블로 Merge하도록 구성했다.
모든 파이프라인의 목적지가 동일했고, 동시에 Write 작업이 발생하면서 ConcurrentAppendException이 발생했다.
1차 대응
에러 발생 시 sleep 후 재시도하는 방식을 적용했다.
try: # merge logic except ConcurrentAppendException: time.sleep(retry_delay) retry()
일시적으로는 정상 처리되는 듯 보였지만, 시간이 지날수록 처리 지연과 실패 발생률이 증가했다.
2차 대응
동시 Write 자체를 방지하기 위해 스트리밍 방식을 제거하고, Batch Job을 순차적으로 실행하도록 변경했다.
이 과정에서 클러스터 기동 시간을 줄이기 위해 databricks instance pool을 활용하고, job concurrency = 2로 설정했다.
하지만,, job 간 실행 타이밍이 예상보다 자주 겹치며 다시 ConcurrentAppendException이 발생했다.
3차 대응
ConcurrentAppendException로 인한 얼럿 노이즈가 심한 것 같아서 에러 발생 시 로그만 출력하도록 처리하였다..
당연히 에러가 발생했으니까 offset 커밋이 안 될 줄 알았다..
except ConcurrentAppendException as e: print("ConcurrentAppendException 발생:", e) pass
그런데 Emergency🚨 다른 통계 파이프라인 개발 중 데이터 누락을 발견했다.
원인은 위 로직이었는데, 에러가 발생했지만 offset이 커밋되어 체크포인트가 저장되고 있었다.(지금 생각해보니까 당연하다...)
최최종 대응
- 모든 Exception을 raise 하도록 try catch는 제거했다.
- 클러스터 기동 시간을 절약하겠다는 마음을 내려놓고 job concurrency는 1로 변경
- job이 끝날 때 다음 job을 trigger 하도록 구성하여 처리는 조금 지연되겠지만 절대로 동시 쓰기가 발생하지 않도록 했다.
'Data > Spark' 카테고리의 다른 글
[Spark] Spark Submit (0) | 2025.07.12 |
---|---|
[Spark] Spark 구성 요소 (0) | 2025.07.12 |
[Spark] Kafka startingTimestamp 옵션 활용 (0) | 2025.04.17 |
[Spark] Delta Lake Change Data Feed (0) | 2025.04.06 |
[Spark] DataFrame API 테스트 하기 (0) | 2025.04.05 |