본문 바로가기

Data/Spark

(8)
[Spark] Catalyst Optimizer Hive의 Compiler 동작 흐름을 보다가 문득 Spark Catalyst Optimizer와 동작 흐름이 비슷한 것 같아서 글을 작성해보려고 한다.서론DataFrame API나 Spark SQL로 코드를 작성할 때, 가끔 이런 생각이 든다."내가 작성한 이 쿼리가 정말 효율적으로 실행될까?" "join 순서를 바꿔야 할까? filter를 어디에 넣어야 할까?"하지만 실제로는 개발자가 크게 신경 쓰지 않아도 된다. Spark의 Catalyst Optimizer가 우리 대신 쿼리를 분석하고 최적화해주기 때문이다.Catalyst Optimizer란?Catalyst는 Spark의 규칙 기반 쿼리 최적화 엔진이다. SQL이나 DataFrame API로 작성한 코드를 받아서, 가장 효율적인 실행 계획으로 변환..
[Spark] Spark Submit Spark Submitspark-submit은 Spark 애플리케이션을 클러스터에 제출하고 실행하는 통합 스크립트입니다. 모든 클러스터 매니저(Standalone, YARN, Kubernetes, Mesos)에서 일관된 인터페이스를 제공합니다.$SPARK_HOME/bin/spark-submit 기본 문법과 구조./bin/spark-submit \ --master \ --deploy-mode \ --conf = \ --driver-memory \ --executor-memory \ --executor-cores \ --jars \ --class \ \ [application-arguments] 주요 옵션들필수 옵션옵션설명예시--master클러스터 마스터 URLyarn, sp..
[Spark] Spark 구성 요소 여태 스파크로 여러 개의 파이프라인을 개발했지만 사실 코드를 작성할 줄만 할지 어떻게 동작하는지, 클러스터가 어떻게 구성되어 있는지 아무것도 모른다.이제야 스파크를 이해해보기 위해 정리한다.클러스터 구성 요소 Driver ProgramCluster Manager Worker NodeExecutorDriver ProgramSpark 애플리케이션의 메인 프로그램으로 작업을 계획하고 전체 실행을 조정Cluster Manager클러스터 리소스 관리자로 CPU, 메모리 등을 애플리케이션에 할당 (YARN, Kubernetes, Standalone, Mesos)Worker Node실제 작업이 실행되는 물리적 머신으로 Executor 프로세스를 실행하고 관리Executor워커 노드에서 실제 데이터 처리를 담당하는 프..
[Spark] Delta Merge ConcurrentAppendException 탈출기 데이터를 통합하는 파이프라인을 운영하면서 겪은 ConcurrentAppendException 에러 탈출 기록용 문제 상황네 개의 스트리밍 파이프라인이 하나의 Delta 테이블로 Merge하도록 구성했다.모든 파이프라인의 목적지가 동일했고, 동시에 Write 작업이 발생하면서 ConcurrentAppendException이 발생했다. 1차 대응에러 발생 시 sleep 후 재시도하는 방식을 적용했다.try: # merge logicexcept ConcurrentAppendException: time.sleep(retry_delay) retry()일시적으로는 정상 처리되는 듯 보였지만, 시간이 지날수록 처리 지연과 실패 발생률이 증가했다. 2차 대응동시 Write 자체를 방지하기 위해 스트리밍..
[Spark] Kafka startingTimestamp 옵션 활용 카프카 스트리밍 파이프라인을 개발하면서 startingTimestamp 옵션을 활용한 경험 기록 Use Case- 스트리밍 파이프라인을 처음 실행할 때, 과거 전체 데이터를 모두 읽지 않고 지정한 시점 이후의 이벤트만 읽어와 처리 범위를 제어하기 위한 초기 checkpoint 저장 용도로 활용- Kafka에 적재된 실제 데이터 대상으로 파이프라인 로직을 구현 및 검증할 때 활용- 이상 데이터 디버깅을 위해 해당 데이터 적재 시점을 기준으로 raw 데이터를 조회할 때 활용(동일 이벤트 재발행까지) 초기 체크포인트 저장 용도로 활용import timeimport pytzfrom datetime import datetimedt = datetime(2025, 4, 17, 0, 0, 0, tzinfo=pytz.U..
[Spark] Delta Lake Change Data Feed Use Case네 개의 소스 데이터를 하나의 Delta Lake에 merge 한 후에 rds에 sink하기 위한 대상을 추출할 때데이터 변경을 감지하고 집계 파이프라인을 돌릴 때Change Data Feed(CDF)란Change Data Feed(CDF)는 Delta 테이블의 버전 간 행 단위 변경 사항을 추적할 수 있도록 해주는 기능입니다.Delta 테이블에서 CDF 기능이 활성화되면, 런타임은 테이블에 기록되는 모든 데이터에 대한 변경 이벤트를 저장합니다. 여기에는 각 행의 데이터뿐만 아니라 해당 행이 삽입(insert), 삭제(delete), 또는 업데이트(update) 되었는지를 나타내는 메타데이터도 포함됩니다.CDF 활성화 방법ALTER TABLE myDeltaTable SET TBLPROPER..
[Spark] DataFrame API 테스트 하기 ** Databricks 환경에서 개발하는 거라서 디버깅 하기 정말 좋은 환경이지만,,,~ DataFrame API 디버깅, 테스트하면서 불편했던 점- 여러 로직 중에 딱 한 부분만 디버깅 하고 싶은데 그 결과를 보려고 다른 곳들 주석 처리하면서 해당 결과를 컬럼 추가했다가 drop 했다가 반복 Mock Dataframe 만들기gpt한테 코드 보여주고 mock dataframe 달라고 하면 준다.from pyspark.sql.types import StructType, StructField, StringTypefrom pyspark.sql import Rowfrom pyspark.sql import functions as F# 스키마 정의schema = StructType([ StructField(..
[Spark] Spring Boot에서 Spark로 파이프라인 이관기: 객체 조회 Spring Boot로 개발된 파이프라인을 이관하게 되었다."단순히 로직만 옮기면 되겠지"라고 생각했지만, 두 프레임워크의 근본적인 처리 방식 차이로 인해 완전히 다른 접근법이 필요했다.Spring Boot vs Spark: 근본적인 차이점Spring Boot의 특징단건 처리: 하나의 레코드에 대해 순차적으로 처리객체 재활용: 한 번 조회한 객체를 메모리에서 재사용 가능변수명으로 구분: 동일한 객체를 다른 변수명으로 사용하면 내부 필드명은 동일해도 됨Spark Streaming의 특징마이크로 배치 처리: 전체 데이터셋을 모아서 처리하는 스트리밍 방식조인 기반: 필요한 데이터는 미리 조인해서 준비해야 함컬럼명 충돌: 소스 데이터와 조인 테이블에 동일한 컬럼명이 있으면 ambiguous 에러 → 컬럼 ali..