Data
[Spark] 배치 조회 구현하기
코딩콩
2025. 3. 25. 22:38
Spring Boot로 개발된 파이프라인을 이관하게 되었다.
로직은 복잡하지만 이미 검증된 로직이기 때문에 고대로 옮기면 될 줄 알았는데 고대로 옮기는 게 쉽지 않았다.
특히 어려웠던 것..
spring boot는 단건 조회이고, 객체 뽑아서 재활용이 가능하고, 이렇게 저렇게 중복 필드명을 가져도 된다.
spark는 스트리밍 방식으로 배치 조회이고, 각 레코드에 맞는 객체를 조회(join)해서 붙여놔야 한다, df에 중복 필드명도 안 된다.
('coulmn_name' ambiguous,,, PTSD)
아래는 객체 조회 방식을 spark로 재구현한 코드이다.
Kotlin 코드
fun getObject(): Object? {
val object =
if (조건1) {
repository.findAllByIdAndName()
.sortedWith( compareBy<Object> { it.current }.thenBy { it.a }.thenByDescending { it.b }.thenByDescending { it.c })
.firstOrNull()
} else if (조건2) {
repository.findByIdAndNumber()
} else {
val object = repository.findAllByIdAndName()
if (object.isNotEmpty()) {
object.sortedWith( compareBy<Object> { it.current }.thenBy { it.a }.thenByDescending { it.b }.thenByDescending { it.c })
.firstOrNull()
} else {
repository.findByIdAndNumber()
}
}
return object
}
조건에 따라서 같은 객체를 뽑는데 조회 쿼리가 다르다. 정렬도 한다 😇
Spark 코드
def get_join_type():
if (조건1):
return "join_by_id_and_name"
elif (조건2):
return "join_by_id_and_number"
else:
return "join_by_name_and_number"
join_type_mapping_udf = udf(lambda x, y, z: get_join_type(x, y, z), StringType())
위와 같이 조회 조건을 맵핑할 udf를 구현하고
df = df.withColumn("join_type",join_type_mapping_udf(x,y,z))
object_1 = join_by_condition_1(df.filter(F.col("join_type") == "join_by_id_and_name"))
object_2 = join_by_condition_2(df.filter(F.col("join_type") == "join_by_id_and_number"))
object_1_and_2 = join_by_condition_1_and_2(df.filter(F.col("join_type") == "join_by_name_and_number"))
return (
object_1.unionByName(object_2).unionByName(object_1_and_2)
.withColumn(
"rank",
F.row_number().over(
Window.partitionBy("pk_id", "source_key").orderBy(
F.col("a").asc(),
F.col("b").desc(),
F.col("c").desc(),
)
),
)
.filter(F.col("rank") == 1)
.drop("rank")
)
join_type으로 레코드를 필터링 해서 객체 join 한 후에 다시 union 하기
그리고 마지막으로 정렬 로직을 구현하기 위한 window 함수를 사용으로 재탄생한 코드,,,
사실 위 코드가 맞나 모르겠지만 배치 조회 방식에서 소스 데이터 조건에 따라 조인 방식을 달리하는 방법은 저 방법밖에 떠오르지 않았다.
다음에 좀 더 배우면 좀 더 깔꼼한 코드가 나오겠지...