본문 바로가기

Data/Spark

[Spark] Delta Merge ConcurrentAppendException 탈출기

데이터를 통합하는 파이프라인을 운영하면서 겪은 ConcurrentAppendException 에러 탈출 기록용

 

문제 상황

네 개의 스트리밍 파이프라인이 하나의 Delta 테이블로 Merge하도록 구성했다.

모든 파이프라인의 목적지가 동일했고, 동시에 Write 작업이 발생하면서 ConcurrentAppendException이 발생했다.

 

1차 대응

에러 발생 시 sleep 후 재시도하는 방식을 적용했다.

try:
# merge logic
except ConcurrentAppendException:
time.sleep(retry_delay)
retry()

일시적으로는 정상 처리되는 듯 보였지만, 시간이 지날수록 처리 지연과 실패 발생률이 증가했다.

 

2차 대응

동시 Write 자체를 방지하기 위해 스트리밍 방식을 제거하고, Batch Job을 순차적으로 실행하도록 변경했다.

이 과정에서 클러스터 기동 시간을 줄이기 위해 databricks instance pool을 활용하고, job concurrency = 2로 설정했다.
하지만,, job 간 실행 타이밍이 예상보다 자주 겹치며 다시 ConcurrentAppendException이 발생했다.

 

3차 대응

ConcurrentAppendException로 인한 얼럿 노이즈가 심한 것 같아서 에러 발생 시 로그만 출력하도록 처리하였다..

당연히 에러가 발생했으니까 offset 커밋이 안 될 줄 알았다..

except ConcurrentAppendException as e:
print("ConcurrentAppendException 발생:", e)
pass

그런데 Emergency🚨 다른 통계 파이프라인 개발 중 데이터 누락을 발견했다.

원인은 위 로직이었는데, 에러가 발생했지만 offset이 커밋되어 체크포인트가 저장되고 있었다.(지금 생각해보니까 당연하다...)

 

최최종 대응

- 모든 Exception을 raise 하도록 try catch는 제거했다.

- 클러스터 기동 시간을 절약하겠다는 마음을 내려놓고 job concurrency는 1로 변경

- job이 끝날 때 다음 job을 trigger 하도록 구성하여 처리는 조금 지연되겠지만 절대로 동시 쓰기가 발생하지 않도록 했다.

'Data > Spark' 카테고리의 다른 글