전체 글 (70) 썸네일형 리스트형 [AWS DMS] CDC 최신 데이터 판별 AWS DMS 기반 CDC 구축 도중 발견한 데이터 정합성 오류를 DMS의 메타데이터 컬럼인 AR_H_CHANGE_SEQ 활용하여 해결한 경험을 기록 사전지식AWS DMS Target Endpoint의 설정이 아래와 같다고 가정했을 때- CdcMaxBatchInverval: 300최대 300초(5분) 동안 대기하다가 누적된 변경사항을 전송- TimestampColumnName: dms_timestamp변경사항이 target endpoint로 전송 되는 시간은 dms_timestamp 컬럼에 저장한다.=> 동일한 배치 내에 있는 레코드들은 dms_timestamp가 같다는 의미이다. 예시idview_countupdated_atdms_timestamp11012025-05-17 13:03:002025-05-1.. [AWS CLI] ECS Task Scale out 빠르게 하기 https://juyeonee826.tistory.com/199 [AWS CLI] ECS 서비스 우선순위 기반 Task 분배 자동화스크립트 작성 배경AWS ECS에는 Auto Scaling 기능이 있다. 하지만 때로는 다수의 서비스가 n개의 task 수를 적절히 나눠가져야 할 때가 생긴다..,그러나 기본 Auto Scaling으로는 안 된다. 그래서 Jenkins에juyeonee826.tistory.com이 글은 위 포스팅과 이어집니다. 위 포스팅에서 Task의 Max Capacity를 동적으로 조정하는 스크립트를 짰다.이 글은 max capacity를 조정하고 빠르게 max까지 띄우기 위한 scale out 최적화 스크립트를 짜는 내용으로 구성된다. 1. sleep 코드 추가하기sleep 코드 없이 바.. [Lambda] Lambda로 Kafka 컨슈머 운영할 때 주의할 점 카프카(AWS MSK)를 트리거로 돌리는 람다가 있다. 작년 8월부터 9개월을 운영했다.이벤트 처리 누락으로 CS가 들어왔던 건 한 두건 있었던 것 같다.(다른 팀에서 운영하는 람다에서도 종종 이벤트 누락이 발생한다고 하고,, 대응할 수 있는 방안이 있어서 람다 버그구나 생각하고 넘어갔다,,,) 내용 요약- 람다 이벤트 처리 누락 발생 -> records 첫번째 것만 처리함 -> records for문 돌려 해결- 그랬더니 중복 컨슘 발생 -> lambda timeout 발생 -> timeout 설정 변경하여 해결 이벤트 처리 누락 발견다른 팀에서 며칠 전부터 위 카프카로 배치성으로 이벤트를 보내기 시작했다고 하는데 90프로 처리가 누락되었다.kafka partition, offset 로그를 찍어뒀는데 .. [Spark] Delta Merge ConcurrentAppendException 탈출기 데이터를 통합하는 파이프라인을 운영하면서 겪은 ConcurrentAppendException 에러 탈출 기록용 문제 상황네 개의 스트리밍 파이프라인이 하나의 Delta 테이블로 Merge하도록 구성했다.모든 파이프라인의 목적지가 동일했고, 동시에 Write 작업이 발생하면서 ConcurrentAppendException이 발생했다. 1차 대응에러 발생 시 sleep 후 재시도하는 방식을 적용했다.try: # merge logicexcept ConcurrentAppendException: time.sleep(retry_delay) retry()일시적으로는 정상 처리되는 듯 보였지만, 시간이 지날수록 처리 지연과 실패 발생률이 증가했다. 2차 대응동시 Write 자체를 방지하기 위해 스트리밍.. [Spark] Kafka startingTimestamp 옵션 활용 카프카 스트리밍 파이프라인을 개발하면서 startingTimestamp 옵션을 활용한 경험 기록 Use Case- 스트리밍 파이프라인을 처음 실행할 때, 과거 전체 데이터를 모두 읽지 않고 지정한 시점 이후의 이벤트만 읽어와 처리 범위를 제어하기 위한 초기 checkpoint 저장 용도로 활용- Kafka에 적재된 실제 데이터 대상으로 파이프라인 로직을 구현 및 검증할 때 활용- 이상 데이터 디버깅을 위해 해당 데이터 적재 시점을 기준으로 raw 데이터를 조회할 때 활용(동일 이벤트 재발행까지) 초기 체크포인트 저장 용도로 활용import timeimport pytzfrom datetime import datetimedt = datetime(2025, 4, 17, 0, 0, 0, tzinfo=pytz.U.. [Spark] Delta Lack Change Data Feed Use Case세 개의 소스 데이터를 하나의 Delta Lake에 merge 한 후에 rds에 sink하기 위한 대상을 추출할 때 Delta 테이블의 Change Data Feed 기능을 활용했다.기존 운영 중인 파이프라인을 참고했을 때 delta의 created_at, updated_at 컬럼으로 insert, update된 레코드를 추출하는 로직이었는데 CDF를 활용한 로직을 통해 델타 테이블 전체 조회 로직을 제거할 수 있었다! Change Data Feed(CDF)란Change Data Feed(CDF)는 Delta 테이블의 버전 간 행 단위 변경 사항을 추적할 수 있도록 해주는 기능입니다.Delta 테이블에서 CDF 기능이 활성화되면, 런타임은 테이블에 기록되는 모든 데이터에 대한 변경 이벤트를.. [Spark] DataFrame API 테스트 하기 ** Databricks 환경에서 개발하는 거라서 디버깅 하기 정말 좋은 환경이지만,,,~ DataFrame API 디버깅, 테스트하면서 불편했던 점- 여러 로직 중에 딱 한 부분만 디버깅 하고 싶은데 그 결과를 보려고 다른 곳들 주석 처리하면서 해당 결과를 컬럼 추가했다가 drop 했다가 반복 Mock Dataframe 만들기gpt한테 코드 보여주고 mock dataframe 달라고 하면 준다.from pyspark.sql.types import StructType, StructField, StringTypefrom pyspark.sql import Rowfrom pyspark.sql import functions as F# 스키마 정의schema = StructType([ StructField(.. [AWS CLI] ECS 서비스 우선순위 기반 Task 분배 자동화 스크립트 작성 배경AWS ECS에는 Auto Scaling 기능이 있다. 하지만 때로는 다수의 서비스가 n개의 task 수를 적절히 나눠가져야 할 때가 생긴다..,그러나 기본 Auto Scaling으로는 안 된다. 그래서 Jenkins에서 AWS CLI로 오토스케일링을 구현했다. 스크립트우리의 목표!! svc-high-priority SQS에 있는 메시지를 우선처리 해야한다!!- total은 31개 유지하기- svc-high-priority가 svc-low-priority 보다 우선순위가 높다.- svc-high-priority sqs 잔여 메시지가 1000개 이상일 때 max capacity를 30으로 조정하고, desired count를 30으로 조정- svc-high-priority sqs 잔여 메.. 이전 1 2 3 4 ··· 9 다음 목록 더보기