Data

[Spark] 배치 조회 구현하기

코딩콩 2025. 3. 25. 22:38

Spring Boot로 개발된 파이프라인을 이관하게 되었다.

로직은 복잡하지만 이미 검증된 로직이기 때문에 고대로 옮기면 될 줄 알았는데 고대로 옮기는 게 쉽지 않았다.

 

특히 어려웠던 것..

spring boot는 단건 조회이고, 객체 뽑아서 재활용이 가능하고, 이렇게 저렇게 중복 필드명을 가져도 된다.

spark는 스트리밍 방식으로 배치 조회이고, 각 레코드에 맞는 객체를 조회(join)해서 붙여놔야 한다, df에 중복 필드명도 안 된다.

('coulmn_name' ambiguous,,, PTSD)

 

아래는 객체 조회 방식을 spark로 재구현한 코드이다.

Kotlin 코드

fun getObject(): Object? {
    val object =
        if (조건1) {
            repository.findAllByIdAndName()
                .sortedWith( compareBy<Object> { it.current }.thenBy { it.a }.thenByDescending { it.b }.thenByDescending { it.c })
                .firstOrNull()
        } else if (조건2) {
            repository.findByIdAndNumber()
        } else {
            val object = repository.findAllByIdAndName()
            if (object.isNotEmpty()) {
                object.sortedWith( compareBy<Object> { it.current }.thenBy { it.a }.thenByDescending { it.b }.thenByDescending { it.c })
                    .firstOrNull()
            } else {
                repository.findByIdAndNumber()
            }
        }
    return object
}

조건에 따라서 같은 객체를 뽑는데 조회 쿼리가 다르다. 정렬도 한다 😇

 

Spark 코드

def get_join_type():
    if (조건1):
        return "join_by_id_and_name"
    elif (조건2):
        return "join_by_id_and_number"
    else:
        return "join_by_name_and_number"
    
join_type_mapping_udf = udf(lambda x, y, z: get_join_type(x, y, z), StringType())

위와 같이 조회 조건을 맵핑할 udf를 구현하고

df = df.withColumn("join_type",join_type_mapping_udf(x,y,z))
object_1 = join_by_condition_1(df.filter(F.col("join_type") == "join_by_id_and_name"))
object_2 = join_by_condition_2(df.filter(F.col("join_type") == "join_by_id_and_number"))
object_1_and_2 = join_by_condition_1_and_2(df.filter(F.col("join_type") == "join_by_name_and_number"))
return (
    object_1.unionByName(object_2).unionByName(object_1_and_2)
    .withColumn(
        "rank",
        F.row_number().over(
            Window.partitionBy("pk_id", "source_key").orderBy(
                F.col("a").asc(),
                F.col("b").desc(),
                F.col("c").desc(),
            )
        ),
    )
    .filter(F.col("rank") == 1)
    .drop("rank")
)

join_type으로 레코드를 필터링 해서 객체 join 한 후에 다시 union 하기

그리고 마지막으로 정렬 로직을 구현하기 위한 window 함수를 사용으로 재탄생한 코드,,,

 

사실 위 코드가 맞나 모르겠지만 배치 조회 방식에서 소스 데이터 조건에 따라 조인 방식을 달리하는 방법은 저 방법밖에 떠오르지 않았다.

다음에 좀 더 배우면 좀 더 깔꼼한 코드가 나오겠지...