ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] Postgre의 데이터 JDBC 커넥터 활용하여 병렬 처리 해보기(feat, partitionColumn,lowerBound, upperBound, numPartitions)
    데이터 엔지니어링/Spark 2023. 2. 12. 14:10
    반응형

    Spark에서 직접 PostgreSql 연결하는 법

    Spark는 java기반 오픈 소스 툴이기 때문에 JDBC(Java Database Connectivity)를 이용하여 데이터베이스에 접근 할 수 있습니다. 아래 코드가 JDBC를 이용하여 select를 할 수 있는 상태를 만들어 놓은 것이다. jdbcDF라는 객체를 아래와 같이 만들고 쿼리를 날리면 데이터를 조회 할 수 있습니다.

    jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql:dbname") \
        .option("dbtable", "public.tablename") \
        .option("user", "username") \
        .option("password", "password") \
        .load()

    Spark에서 JDBC 커넥터를 사용하면 문제점이 무엇일까?

    하지만, 위와 같이 작성 하였을 때 문제점이 있습니다. 바로 spark의 장점인 분산처리를 할 수 없다는 점인데요. 위 jdbcDF로 쿼리를 날린다면 무조건 Select용 spark Task는 1개로 구성이 됩니다. 데이터 양이 많다면 조회가 되지않고, timeout되거나 java GC가 full나서 spark session은 멈춰버립니다. 그래서 많은 수의 데이터를 조회 할려면 JDBC 커넥터를 병렬처리하여야 합니다.

    Spark에서 JDBC 커넥터 병렬처리하기

    Spark에서 병렬처리 하기 위해서는 데이터를 partitioning해야하는데요. JDBC커넥터에서는 병렬처리용 option이 여러개 존재합니다. spark jdbc option에 대해서는 이 페이지를 참조해주세요. 오늘 알아볼 옵션은 바로 partitionColumn, lowerBound, upperBound입니다. 이 3개는 하나의 쌍으로 보고 3개중의 하나의 옵션을 사용할려면 3개다 작성해 주어야합니다. 그리고 numPartitions도 같이 알아 보겠습니다.

    옵션 살펴보기

    • partitionColumn - partition 나눌 key가 되는 값 - 숫자, 날짜, 타임스탬프여야함
    • lowerBound - partition의 최소값
    • upperBound - partition의 최대값
    • numPartitions - 파티셔닝하는 partition 최대 갯수

    옵션을 사용한 코드 예시

    df = spark.read \
            .format("jdbc") \
            .option("url", "jdbc:postgresql:dbname") \
            .option("dbtable", "public.tablename") \
            .option("user", "username") \
            .option("password", "password") \
            .option("partitionColumn", "keyCol") \
            .option("lowerBound", 1) \
            .option("upperBound", 50000) \
            .load()

    자, 위의 코드를 해석해 보면 lowerBound - key에서 select할 최소값은 1이고, upperBound - key에서 select할 최대값은 50000입니다. 즉, keyCol의 값을 최소 1 ~ 최대 50000까지의 값을 가져온다고 설정 한 것입니다. 여기선 upperBound의 값이 꼭 있어야 하는건 아니고 작아도 상관없습니다. 이 상태에서 데이터를 조회 한다면 아래와 같은 쿼리가 실행 된다고 보면 됩니다.

    select * from tablename where keyCol between 1 and 50000
    

    이렇게 설정 하였는데도, 병렬 처리가 되었을 까요? 하지만, 병렬처리가 제대로 돼지 않습니다. 왜냐하면 full쿼리로 날렸기 때문에 Task는 여전히 하나로 잡힙니다. 그럼 어떻게 해야 할까요?? 병렬 처리 할 수 있도록 partition을 나눠줘야 합니다. 이 속성은 numPartitions로 설정이 가능합니다.

    df = spark.read \
            .format("jdbc") \
            .option("url", "jdbc:postgresql:dbname") \
            .option("dbtable", "public.tablename") \
            .option("user", "username") \
            .option("password", "password") \
            .option("partitionColumn", "keyCol") \
            .option("numPartitions", 5) \
            .option("lowerBound", 1) \
            .option("upperBound", 50000) \
            .load()

    numpartitions 5 로 설정 하였습니다. 그럼 아래와 같은 쿼리가 실행 된다고 보면 됩니다. 이렇게 5개의 쿼리로 나눠져서 쿼리가 실행 됩니다. 실제로 spark ui에서 executor를 확인하면 task는 5개 실행 되고 있는 것을 확인 할 수 있을 것 입니다.

    select * from tablename where keyCol between 1 and 10000;
    select * from tablename where keyCol between 10001 and 20000;
    select * from tablename where keyCol between 20001 and 30000;
    select * from tablename where keyCol between 30001 and 40000;
    select * from tablename where keyCol between 40001 and 50000;
    

    병렬 처리 주의 점

    지금 까지 spark에서 JDBC 쿼리를 병렬처리하는 방법에 대해 알아 보았습니다. 하지만, 주의점도 간략하게 알아 봐야 할 것 같습니다.

    주의 점

    • key값으로 병렬 쿼리가 실행되어 속도가 개선 되지만, 병렬 쿼리가 실행 되는 만큼 DB에 무리가 가지 않는 선에서 쿼리가 실행 되어야한다.
    • numPartitions 수 만큼 spark task(cpu)를 잡아 먹는 것이기 때문에 spark cluster에 잘 맞게 리소스를 설정하여 한다.
    • numpartitions의 설정 값을 설정하였지만, 한 기간에 너무 많은 데이터(위에 between 20001 and 30000 모든 데이터가 모여 있다면 이역시 full쿼리가 된다.)가 조회 되어 오히려 속도가 느려 질 수 있다. 즉, 데이터의 이해가 필요하다.
    반응형

    댓글

Designed by Tistory.