ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark]RDD 이해하기
    데이터 엔지니어링/Spark 2022. 6. 19. 23:23
    반응형

    데이터 연산 방식 이해하기

    RDD를 들어가기 전에 먼저 데이터 연산 방식을 이해 해야한다. 이를 이해하면 RDD가 왜 나왔는지 Spark에서는 RDD가 왜 사용돼는지 간접적으로 이해 할 수 있다. 데이터 연산 방식은 병렬처리 방식 분산 처리 방식이 있다.

    병렬(Data-Parallel) 처리 방식

    • 데이터를 여러개로 쪼갠다.
    • 여러 쓰레드에 쪼갠 데이터를 넣어 task를 적용한다
    • 각자 만든 결과값을 반환 받아 합친다.
    • 위와 같은 일렬의 과정으로 결과값을 얻는다.

    분산처리 방식(Distributed Data-Parallel)

    • 데이터를 여러개로 쪼개서 여러 노드로 보낸다.
    • 여러 노드에서 각자 독립적인 task를 적용한다.
    • 각자 만든 결과값을 합치는 과정을 한다.
    • 위와 같은 일렬의 과정을 통해 결과값을 얻어 반환한다.

     

     

    RDD가 나온 이유 간접적으로 이해하기

    • 데이터 연산 방식에서 병렬처리 보단 분산처리 방식이 빅데이터 처리에서는 빠르다.
    • 분산처리가 속도가 빠른 이유는 노드간 작업을 나누기 때문인데, Spark는 이를 쉽게 구현할 수 있도록 도와주는 툴이다. Spark를 이용한 분산된 환경에서도 일반적인 병렬처리를 하듯 코드를 짜서 분산 처리 환경이 가능하기 때문이다.
    • Spark에서는 분산된 환경에서 데이터 병렬 모델을 구현해 추상화 시켜주기 때문에 속도가 빠르다고 했는데 그것이 바로 Resilient Distributed Datasets(RDD)때문이다.
    • 알아서 추상화해 노드로 작업을 분배하고 합친다. 하지만 Spark에서 다 해준다고 해서 그냥 코드를 짜면 되는 것이 아니라 어느정도 빽단에서 어떻게 실행 되는지 이해를 하고 사용하여야 한다. 그래서 RDD에서 깊게 알아보는 작업이 필요하다

     

     

    분산처리에서 신경써야 하는 부분

    위에서 병렬처리와 분산처리의 차이점을 볼 수 있는데 이것이 바로 작업 환경의 차이점이다. 쓰레드와 노드간 연산차이인데 이 차이점 때문에 분산처리가 처리속도가 더 빠르게 되었다(빅데이터의 경우에서). 빅데이터 연산처리 방식이 병렬처리에서 분산처리로 넘어오면서 신경써야 하는 부분이 생기게 되었다. 바로 부분실패와 속도이다.

    부분 실패

    • 부분 실패란 분산처리에서 노드간 작업량을 분배해서 처리 후 받는 형식인데, 이런 노드간 task를 진행 할 때 노드가 값자기 없어지는 경우가 있다. (데이터 센터가 없어지거나 노드를 수행하는 컴퓨터가 바이러스나 디스크 문제 같은 경우로 다운되는 경우)
    • 이런 부분이 바로 부분 실패 노드가 각자 작업량을 처리해야하는데 갑자기 노드가 없어지면 중간이 뻥비어버리는 연산 처리가 안돼는 경우가 발생한다. 빅데이터 연산처리에서 이런 문제점을 극복해야 했다.
    • 하지만 RDD는 불변성을 유지하기 때문에 복원을 하여 다시 진행하면 된다. 이 부분은 RDD에서 잘 처리 해주기 때문에 개발자가 그렇게 많이 신경 쓸필요는 없어 졌다.

    속도

    • RDD에서는 노드간 분산처리 하고 작업 결과값을 받는 순서를 개발자가 코드를 작성하여 처리 하게 되는데 아래 코드로 예를 들면 첫번째 두번째 중 속도가 빠른것은 첫번째 이다.
    • 왜냐하면 reduceByKey는 노드간 작업을 분배한 것을 받아와서 Key를 기분으로 나누는 것인데, 두번째 경우 노드간 filter를 진행하기 않고 받아오면 작업량이 늘어날 뿐만아니라, 노드간 작업해야하는 것을 한 노드에서 진행하기 때문에 처리 속도 또한 높아지기 때문이다.
    • 이렇게 RDD를 활용하여 속도를 개선하기 위해서는 꼭 개발자가 빽단의 처리 구조를 이해하고 넘어가야 한다.
    RDD.map(A).filter(B).reduceByKey(C).take(100)
    RDD.map(A).reduceByKey(C).filter(B).take(100)
    

    RDD를 사용하는 이유는?

    이처럼 Spark에서는 RDD(Resilient Distributed Datasets)를 이용하여 분산처리에서 가장 중요한 부분인 노드간 분산처리 연산을 진행하고 결과값을 받는 과정을 자동으로 script 하나로 이용하여 처리 해줌으로써 개발 편의와 속도 그리고 부분실패 까지 다 잡아주는 역할을 하기 위해 만들어 졌다.

     

     

    RDD란?

    • RDD는 Resilient Distributed Dataset의 줄임말로 스파크의 기본 데이터 구조이다.
    • 산전적으로 해석해 보면Resilient(변하지 않는) Distributed(분산된) Data(데이터): 변하지 않는 데이터 라는 뜻으로 RDD로 생성된 내용은 변화가 있을 때 마다 기록으로 남기며 기록으로 남긴 내용은 변경되지 않는다. github에 snapshot방식으로 변경 내용을 기록하는 것과 비슷하다고 이해하면 편할 것 같다.
    • RDD는 스파크 1.0에서 부터 도입이 되었으며, RDD를 이용하면 자동으로 노드간 작업량을 분배하여 분산처리를 해주고 또한 DISK I/O를 이용하는 것이 아니라 인메모리 연산으로 속도 또한 Hadoop의 Map Reduce보다 좋다.
    • 특정 동작에 의해 생성되는 RDD는 DAG(Directed Acyclic Graph)의 형태를 가져 비순환 구조를 가진다.

    RDD 동작 원리

    • RDD의 동작 원리는 Lazy Evaluation(느린 연산)이라고 한다.
    • Lazy Evalutaion이란 해당 연산이 직접적으로 변경 될때 까지 기다렸다가 직접적으로 변경되면 연산을 한다는 의미이다.
    • 위에서 말했듯 github snapshot 방식 같이 일일히 저장을 해서 불변성 가진다고 했는데 그럼 어떤 연산 작업을 진행 할 때 RDD가 새로 생성되고 이전인지 알면 좋을 것 같다. 즉 Lazy Evaluation이 언제 일어나는지 파악해 보자.
    • Spark에서 transformation 이라는 연산 작업이 진행 될 때 RDD는 이전 RDD라고 파악을 하며 새로운 RDD를 만든다. 그리고 이전 RDD는 따로 분리하여 저장을 한다.
    • Action이라는 형태의 Data추출은 데이터를 보는 형태의 작업이지 RDD를 변경 하지는 않는다.
    • 즉, 데이터 구조가 변경 될때 RDD를 새로 생성하여 기억하며 action(데이터 추출하여 보는 형식)으로는 RDD는 새로 생성 되지 않는다.

    출처 - https://m.blog.naver.com/tajogood/220783546981

     

     

    Transformation 과 Action 이해하기

    Transformation

    • 어떤 연산을 통해 RDD에서 새로운 RDD를 만드는 동작이다.
    • 해당 작업으로 받는 리턴값은 RDD이다.
    • Transformation 공식 문서
    Tramsformation  설명
    map(func) 주어진 함수에 맞는 원하는 값을 반환
    filter(func) 주어진 함수에 맞게 RDD를 필터링해서 반환
    flatMap(func) 단일 로우를 여러 로우로 변환하여 반환
    distinct([numPartitions]) 중복 제거
    groupByKey([numPartitions]) Key-Value데이터셋이 있을 때, key기분으로 group화
    reduceByKey(func, [numPartitions]) 데이터 셔플 되기 전에 같은 머신에 같은 key끼리 결합하여 Key별로 count하여 반환
    sorByKey([asending], [numPartitions]) Key-Value 데이터셋이 있을 때 이 데이터는 Key를 기준으로 정렬 하여
    join(otherDataset, [numPartitions]) 데이터베이스에서 쓰이는 조인과 같은 개념 공통된 부분으로 나와서 새로운 RDD추출
    coalesce(numPartitions) output partition 수 조절할 때 사용, shuffle을 하지 않음
    union() elements를 합쳐서 새로운 RDD 생성

     

    Action

    • 기록된 RDD에서 RDD가 가지고있는 값을 추출하여 보는 행위이다.
    • 리턴값을 실행 범위나 형태에 맞게 값을 추출하여 데이터를 리턴한다.
    • Action 공식 문서

     

    Action 설명
    count() row수를 나타내고 RDD가 아닌 int으로 바뀜
    collect() drver program에 RDD전체 데이터를 리턴하고 RDD가 아닌 List로 바뀜
    take(n) 지정한 n만큼 값 반환 List로 반뀜
    top(n) 지정한 N만큰 값 반환 큰 순서대로 반환하며 List로 바뀜
    first 요소 중 첫 번째 값을 반환 RDD가 아닌 다른 형태의 타입으로 변경됨
    max(), min() RDD중 가장 큰 가장 작은 값 반환 RDD가 아닌 다른 형태의 타입으로 변경됨
    aggregate() input type으로 다른 데이터 타입으로 반환 받을 수 잇음
    reduce() RDD의 모든 값을 하나의 값으로 만들 때 사용
    countByValue() key-value 유형의 RDD에서만 사용 할 수 있음, 각 키의 개수와 함께 key int 쌍의 해시맵으로 반환된다.
    saveAsTextFile(path) RDD를 HDFS 또는 Hadoop 지원 파일 시스템의 지정된 경로에 textfile을 생성한다. 각 RDD row를 tostring을 호출하여 파일의 텍스트 줄로 변환한다.

     

    반응형

    댓글

Designed by Tistory.