Data
[Spark] DataFrame API 테스트 하기
코딩콩
2025. 4. 5. 00:21
** Databricks 환경에서 개발하는 거라서 디버깅 하기 정말 좋은 환경이지만,,,~
DataFrame API 디버깅, 테스트하면서 불편했던 점
- 여러 로직 중에 딱 한 부분만 디버깅 하고 싶은데 그 결과를 보려고 다른 곳들 주석 처리하면서 해당 결과를 컬럼 추가했다가 drop 했다가 반복
Mock Dataframe 만들기
gpt한테 코드 보여주고 mock dataframe 달라고 하면 준다.
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.sql import functions as F
# 스키마 정의
schema = StructType([
StructField("timestamp", StringType(), True),
])
# 한 줄의 데이터 추가
data = [Row(timestamp='2025-03-19 14:08:31')]
# DataFrame 생성
df = spark.createDataFrame(data, schema)
df.select(F.to_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss")).display()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 테스트 데이터 생성
data = [
("B001", "2024-04-01", "M123", "A456", 100.0, "I789", "2024-04-01 12:00:00", "APPROVED"),
("B001", "2024-04-01", "M123", "A456", 100.0, "I789", "2024-04-01 12:00:00", "APPROVAL_CANCELED")
]
columns = ["pk_id", "transacted_date", "mc_id", "auth_id", "amt", "name", "timestamp", "status"]
df = spark.createDataFrame(data, columns)
# 필수 컬럼 필터링
df = (df
.filter(F.col("pk_id").isNotNull())
.filter(F.col("transacted_date").isNotNull())
.filter(F.col("mc_id").isNotNull())
.filter(F.col("auth_id").isNotNull())
.filter(F.col("amt").isNotNull())
.filter(F.col("name").isNotNull())
)
# 최신 트랜잭션 조회 (ranking)
merge_keys = ["pk_id", "transacted_date", "mc_id", "auth_id", "amt", "name"]
recent_dedup_window = Window.partitionBy(merge_keys).orderBy(F.col("timestamp").desc(), F.col("status").asc())
df = (
df.withColumn("rank", F.row_number().over(recent_dedup_window)).filter(F.col("rank") == 1)
)
# 결과 확인
df.display()