본문 바로가기

전체 글

(85)
[Spark] Delta Merge ConcurrentAppendException 탈출기 데이터를 통합하는 파이프라인을 운영하면서 겪은 ConcurrentAppendException 에러 탈출 기록용 문제 상황네 개의 스트리밍 파이프라인이 하나의 Delta 테이블로 Merge하도록 구성했다.모든 파이프라인의 목적지가 동일했고, 동시에 Write 작업이 발생하면서 ConcurrentAppendException이 발생했다. 1차 대응에러 발생 시 sleep 후 재시도하는 방식을 적용했다.try: # merge logicexcept ConcurrentAppendException: time.sleep(retry_delay) retry()일시적으로는 정상 처리되는 듯 보였지만, 시간이 지날수록 처리 지연과 실패 발생률이 증가했다. 2차 대응동시 Write 자체를 방지하기 위해 스트리밍.. 2025. 4. 26. 14:16
[Spark] Kafka startingTimestamp 옵션 활용 카프카 스트리밍 파이프라인을 개발하면서 startingTimestamp 옵션을 활용한 경험 기록 Use Case- 스트리밍 파이프라인을 처음 실행할 때, 과거 전체 데이터를 모두 읽지 않고 지정한 시점 이후의 이벤트만 읽어와 처리 범위를 제어하기 위한 초기 checkpoint 저장 용도로 활용- Kafka에 적재된 실제 데이터 대상으로 파이프라인 로직을 구현 및 검증할 때 활용- 이상 데이터 디버깅을 위해 해당 데이터 적재 시점을 기준으로 raw 데이터를 조회할 때 활용(동일 이벤트 재발행까지) 초기 체크포인트 저장 용도로 활용import timeimport pytzfrom datetime import datetimedt = datetime(2025, 4, 17, 0, 0, 0, tzinfo=pytz.U.. 2025. 4. 17. 21:29
[Spark] Delta Lake Change Data Feed Use Case네 개의 소스 데이터를 하나의 Delta Lake에 merge 한 후에 rds에 sink하기 위한 대상을 추출할 때데이터 변경을 감지하고 집계 파이프라인을 돌릴 때Change Data Feed(CDF)란Change Data Feed(CDF)는 Delta 테이블의 버전 간 행 단위 변경 사항을 추적할 수 있도록 해주는 기능입니다.Delta 테이블에서 CDF 기능이 활성화되면, 런타임은 테이블에 기록되는 모든 데이터에 대한 변경 이벤트를 저장합니다. 여기에는 각 행의 데이터뿐만 아니라 해당 행이 삽입(insert), 삭제(delete), 또는 업데이트(update) 되었는지를 나타내는 메타데이터도 포함됩니다.CDF 활성화 방법ALTER TABLE myDeltaTable SET TBLPROPER.. 2025. 4. 6. 16:50
[Spark] DataFrame API 테스트 하기 ** Databricks 환경에서 개발하는 거라서 디버깅 하기 정말 좋은 환경이지만,,,~ DataFrame API 디버깅, 테스트하면서 불편했던 점- 여러 로직 중에 딱 한 부분만 디버깅 하고 싶은데 그 결과를 보려고 다른 곳들 주석 처리하면서 해당 결과를 컬럼 추가했다가 drop 했다가 반복 Mock Dataframe 만들기gpt한테 코드 보여주고 mock dataframe 달라고 하면 준다.from pyspark.sql.types import StructType, StructField, StringTypefrom pyspark.sql import Rowfrom pyspark.sql import functions as F# 스키마 정의schema = StructType([ StructField(.. 2025. 4. 5. 00:21
[AWS CLI] ECS 서비스 우선순위 기반 Task 분배 자동화 스크립트 작성 배경AWS ECS에는 Auto Scaling 기능이 있다. 하지만 때로는 다수의 서비스가 n개의 task 수를 적절히 나눠가져야 할 때가 생긴다..,그러나 기본 Auto Scaling으로는 안 된다. 그래서 Jenkins에서 AWS CLI로 오토스케일링을 구현했다. 스크립트우리의 목표!! svc-high-priority SQS에 있는 메시지를 우선처리 해야한다!!- total은 31개 유지하기- svc-high-priority가 svc-low-priority 보다 우선순위가 높다.- svc-high-priority sqs 잔여 메시지가 1000개 이상일 때 max capacity를 30으로 조정하고, desired count를 30으로 조정- svc-high-priority sqs 잔여 메.. 2025. 4. 4. 23:57
[Spark] Spring Boot에서 Spark로 파이프라인 이관기: 객체 조회 Spring Boot로 개발된 파이프라인을 이관하게 되었다."단순히 로직만 옮기면 되겠지"라고 생각했지만, 두 프레임워크의 근본적인 처리 방식 차이로 인해 완전히 다른 접근법이 필요했다.Spring Boot vs Spark: 근본적인 차이점Spring Boot의 특징단건 처리: 하나의 레코드에 대해 순차적으로 처리객체 재활용: 한 번 조회한 객체를 메모리에서 재사용 가능변수명으로 구분: 동일한 객체를 다른 변수명으로 사용하면 내부 필드명은 동일해도 됨Spark Streaming의 특징마이크로 배치 처리: 전체 데이터셋을 모아서 처리하는 스트리밍 방식조인 기반: 필요한 데이터는 미리 조인해서 준비해야 함컬럼명 충돌: 소스 데이터와 조인 테이블에 동일한 컬럼명이 있으면 ambiguous 에러 → 컬럼 ali.. 2025. 3. 25. 22:38
[SpringBoot] 병렬, 비동기 처리 wiki * 이해한 만큼만 정리 parallelStream 비동기 처리는 아니고 병렬 처리 쓰레드 수가 임의로 지정되기 때문에 위험 짧게 끝날 수 있는 병렬 처리에 사용해야겠다고 결심 List storeList = storeListPage.getContent(); storeList.parallelStream() .forEach( store - > { ... }); ExecutorService 스레드 풀을 생성하고 관리할 수 있음 execute() : 반환 값 없을 때 submit() : 반환 값 있을 때 스레드 반환 shutdown() 필수 private void scrapExecuteAsync(List keywords) { ExecutorService executorService = Exec.. 2023. 11. 13. 22:00
[Helm] Spring Batch Helm으로 GKE Job 배포(젠킨스에서 배포) * 배치 잡을 GKE 파드 생성해서 돌리기 위한 여정 중🏃‍♀️ TODO https://juyeonee826.tistory.com/188 [Helm] Spring Batch Helm으로 GKE Job 배포(로컬 테스트) * 배치 잡을 GKE 파드 생성해서 돌리기 위한 여정 중🏃‍♀️ TODO https://juyeonee826.tistory.com/187 [GKE/K8S] GKE 생성, Spring batch job 실행(Cloud Shell 사용) * 배치 잡을 GKE 파드 생성해서 돌리기 위한 여정 중 juyeonee826.tistory.com 이것을 젠킨스에서 해내야한다. helm 차트 배포 jenkins 다시 띄우기 helm, gcloud, google-cloud-sdk-gke-gcloud-aut.. 2023. 10. 15. 14:05