Back/Bigdata

Spark -1 : 이론

TimeSave 2023. 5. 13. 23:19

ref) https://wikidocs.net/26513

 


목차


    1. 아파치 스파크(apache spark)

    • 2011년 버클리 대학의 AMPLab에서 개발, 2014년 5월 정식 출시
    • 아파치 재단의 오픈소스
    • 인메모리 기반의 대용량 데이터 고속 처리 엔진
      범용 분산 클러스터 컴퓨팅 프레임워크

     

    2. 특징

    • Speed : 인메모리(In-Memory) 기반의 빠른 처리
      • 맵리듀스 작업처리에 비해 디스크는 10배, 메모리 작업은 100배 빠른 속도
      • 맵리듀스는 작업의 중간 결과를 디스크에 쓰기 때문에 IO로 인하여 작업 속도에 제약이 생깁니다.
      • 스파크는 메모리에 중간 결과를 메모리에 저장하여 반복 작업의 처리 효율이 높습니다.
    • Ease of Use : 다양한 언어 지원(Java, Scala, Python, R, SQL)을 통한 사용의 편이성
      • 성능을 위해서는 Scala 로 개발을 진행하는 것이 좋다
    • Generality : SQL, Streaming, 머신러닝, 그래프 연산 등 다양한 컴포넌트 제공
      • 스파크 스트리밍을 이용한 스트림 처리, 스파크 SQL을 이용한 SQL 처리, MLib 를 이용한 머신러닝 처리, GraphX를 이용한 그래프 프로세싱 지원
    • Run Everywhere
      • YARN, Mesos, Kubernetes 등 다양한 클러스터에서 동작 가능
        • YARN, Mesos, Kubernetes, standalone 등 다양한 포맷
        • HDFS, 카산드라, HBase, S3 등의 다양한 데이터 포맷을 지원
      • HDFS, Casandra, HBase 등 다양한 파일 포맷 지원
        • TXT, Json, ORC, Parquet 등의 파일 포맷을 지원.
        • S3, HDFS 등의 파일 시스템과 연동도 가능하고, HBase, Hive 와도 간단하게 연동

     

    3. 구성

     스파크 코어, 클러스터 매니저, 스파크 라이브러리

    3.1 스파크 코어

    • 메인 컴포넌트
    • 기본기능 제공 : 작업 스케줄링, 메모리 관리, 장애 복구 등 
    • 스파크 연산 처리 : RDD, Dateset, DataFrame 이용

    3.2 스파크 라이브러리

    빅데이터 처리를 위한 작업용 라이브러리 .

    • Spark SQL
      • SQL을 이용하여 RDD, DataSet, DataFrame 작업을 생성하고 처리
      • 하이브 메타스토어와 연결하여 하이브의 메타 정보를 이용하여 SQL 작업을 처리
      • 샤크(Shark)는 하이브에서 스파크 작업 처리를 위한 외부 프로젝트 였는데 현재 스파크 SQL로 통합
    •  Spark Streaming
      • 실시간 데이터 스트림을 처리하는 컴포넌트. 스트림 데이터를 작은 사이즈로 쪼개어 RDD 처럼 처리.
    •  MLib
      • MLib는 스파크 기반의 머신러닝 기능을 제공하는 컴포넌트입니다. 분류(classification), 회귀(regression), 클러스터링(clustering), 협업 필터링(collaborative filtering) 등의 머신러닝 알고리즘과 모델 평가 및 외부 데이터 불러오기 같은 기능도 지원합니다.
    •  GraphX
      • GraphX는 분산형 그래프 프로세싱이 가능하게 해주는 컴포넌트입니다. 각 간선이나 점에 임의의 속성을 추가한 지향성 그래프를 만들 수 있습니다.

    3.3 클러스터 매니저

    스파크 작업을 운영하는 클러스터 관리자

    • 다양한 클러스터 매니저 지원
      • 스파크에서 제공하는 스탠드얼론(Standalone) 관리자
      • 메조스(Mesos), 얀(YARN), 큐버네티스(Kubernetes) 등의 관리자

    4. 스파크 구조

    • 크게 두가지 : 작업 + 실행
      • 작업 관리 : 드라이버
      • 노드 관리(작업 실행되는) : 클러스터 매니저

    스파크 드라이버 - 클러스터 매니저

    4.1 스파크 애플리케이션(=스파크 실행 프로그램) 구성

    • 드라이버와 익스큐터 프로세스로 실행되는 프로그램
    • 클러스터 매니저가 스파크 애플리케이션의 리소스를 효율적으로 배분
    • 마스터-슬레이브 구조로 실행

    4.1.1 드라이버(Driver) : 작업을 관장

    •  스파크 애플리케이션을 실행하는 프로세스
      •  main 함수를 실행
      • 스파크 컨텍스트(SparkContext) 객체를 생성
        • 클러스터 매니저와 통신
        • 클러스터의 자원 관리를 지원
        • 애플리케이션의 라이프 사이클 관리
      • 사용자로 부터 입력을 받아서 애플리케이션에 전달
      • 작업 처리 결과를 사용자에게 알려줌.
      • 디플로이 모드 : 실행시점에 설정
        • 클라이언트 모드 : 클러스터 외부에서 드라이버 실행
        • 클러스터 모드 : 클러스터 내에서 드라이버 실행

    4.1.2 익스큐터(Executor) : 작업을 동작시킴

    • 태스크 실행을 담당하는 에이전트
    • 작업을 진행하는 프로세스입니다.=  YARN의 컨테이너
    • 태스크 단위로 작업을 실행하고 결과를 드라이버에 알려줌
    • 동작 중 오류가 발생하면 다시 재작업을 진행
    • 태스크(Task)
      • 익스큐터에서 실행되는 작업. 익스큐터의 캐쉬를 공유하여 작업의 속도를 높일 수 있음.
      • 하나의 Core. 익스큐터가 여러개 태스크 동시실행
      • 익스큐터, 코어 수, 메모리 용량을 설정이 중요
        • 태스크가 너무 많으면 :  컨텍스트 스위칭, HDFS I/O로 인한 성능저하
        • 태스크가 너무 작으면 : JVM을 공유하는 장점이 사라짐

     

    4.2 스파크 잡 구성

    • 잡(Job), 스테이지(Stage), 태스크(Task)로 구성

    4.2.1 잡(Job)

    • 스파크 애플리케이션으로 제출된 작업(work)

    4.2.2 스테이지(Stage)

    • Job을 작업의 단위에 따라 구분한 것

    4.2.3 태스크(Task)

    • 익스큐터에서 실행되는 작업. 데이터를 읽거나, 필터링 하는 실제 실행내용

     

    4.3 클러스터 매니저

    스파크는 여러 가지 클러스터 매니저를 지원

    • YARN : 하둡 클러스터 매니저
      • 리소스 매니저, 노드 매니저로 구성 됨
    • Mesos : 아파치 클러스터 매니저
      • 동적 리소스 공유 및 격리를 사용하여 여러 소스의 워크로드를 처리
      • 마스터와 슬레이브로 구성됨
    • StandAlone : 스파크 자체 제공 클러스터 매니저
      • 각 노드에서 하나의 익스큐터만 실행 가능

     

    5.스파크 작업 모니터링 방법(Web UI/ REST API)

    5.1 스파크 컨텍스트 웹UI

    • 스파크 컨텍스트가 실행되면 4040포트로 웹UI를 실행
    • 하나의 노드에 여러개의 컨텍스트가 실행되면 포트번호가 1씩 증가하면서 생성(4041, 4042, ...).
    • 스파크 컨텍스트가 실행되고 있는 동안에만 사용 가능
    • 스파크 히스토리 서버
      • 실행중인 작업과 실행이 끝난 작업의 히스토리를 확인하기 위한 서버
      • start-history-server.sh로 실행하고 기본 접속 포트는 18080
      • 영구적인 저장소에 스파크 작업 내역을 저장 하고 사용자의 요청에 정보를 반환
    • 확인 가능 정보
      • 스테이지와 태스크 목록
      • RDD 크기와 메모리 사용량
      • 환경변수 정보
      • 익스큐터의 정보

    5.2 REST API(at 스파크 히스토리 서버)

    • 작업 애플리케이션에 대해 REST API를 이용해 json 형태의 정보를 제공.
    curl -s http://$(hostname -f):18080/api/v1/applications
    curl -s http://$(hostname -f):18080/api/v1/[app-id]/jobs

     

    6. 스파크 애플리케이션

    • 스칼라, 자바, 파이썬, R 로 구현가능
    • 각 언어로 스파크 SQL을 실행가능
    • jar 파일로 묶어서 실행하거나, REPL 환경에서 실행

    6.1 실행 방법

    6.1.1 방법 1 - 각 언어별 실행 스크립트

    스파크를 다운로드 받아서 bin 디렉토리에 들어가면 각 언어를 실행할 수 있는 스크립트가 있음.

    # bin 폴더 실행 파일 
    -rwxr-xr-x 1 root root 1099 Nov 16  2016 beeline
    -rw-r--r-- 1 root root 2143 Nov 16  2016 load-spark-env.sh
    -rwxr-xr-x 1 root root 3265 Nov 16  2016 pyspark       // 파이썬 
    -rwxr-xr-x 1 root root 1040 Nov 16  2016 run-example
    -rwxr-xr-x 1 root root 3126 Nov 16  2016 spark-class
    -rwxr-xr-x 1 root root 1049 Nov 16  2016 sparkR        // R
    -rwxr-xr-x 1 root root 3026 Nov 16  2016 spark-shell   // scala repl
    -rwxr-xr-x 1 root root 1075 Nov 16  2016 spark-sql     // spark on sql
    -rwxr-xr-x 1 root root 1050 Nov 16  2016 spark-submit  // scala jar
    

    6.1.2 방법 2 - 스파크 애플리케이션 제출(spark-submit)

    • 스칼라나, 자바로 작성한 스파크 애플리케이션을 jar 파일로 실행
    • 파이썬, R 파일 실행
    # spark-submit 실행 옵션 
    $ ./bin/spark-submit \
      --class <main-class> \
      --master <master-url> \
      --deploy-mode <deploy-mode> \
      --conf <key>=<value> \
      ... # other options
      <app jar | python file | R file> \
      [application-arguments]
    
    # 클러스터 매니저가 yarn인 경우 실행 방법 
    # JAR 파일 실행 
    $ spark-submit --master yarn \
      --queue spark_queue \
      --class sdk.spark.SparkWordCount \
      --conf spark.shuffle.service.enabled=true \
      ./Spark-Example.jar
    
    # 파이썬 파일 실행 
    $ spark-submit --master yarn \
      --queue spark_queue \
      ./py01.py
    

    6.1.2.A 설정값

    spark-submit을 이용시 설정값.

    --master 클러스터 매니저 설정
    --deploy-mode 드라이버의 디플로이 모드 설정
    --class main 함수가 들어 있는 클래스 지정
    --name 애플리케이션의 이름 지정. 스파크 웹 UI에 표시
    --jars 애플리케이션 실행에 필요한 라이브러리 목록. 콤마로 구분
    --files 애플리케이션 실행에 필요한 파일 목록
    --queue 얀의 실행 큐이름 설정
    --executor-memory 익스큐터가 사용할 메모리 바이트 용량. 512m. 1g 등도 사용 가능
    --driver-memory 드라이버 프로세스가 사용할 메모리 바이트 용량. 512m. 1g 등도 사용 가능
    --num-executors 익스큐터의 개수 설정
    --executor-cores 익스큐터의 코어 개수 설정

    6.1.2.B 디플로이 모드

    디플로이 모드에 대한 설정값

    client 프로그램을 실행하는 노드에서 드라이버 실행
    cluster 클러스터 내부의 노드에서 드라이버 실행

    6.1.2.C 클러스터 매니저 설정

    클러스터 매니저 설정

    spark://ip:port 스파크 스탠드얼론 클러스터 사용
    mesos://ip:port 아파치 메조스 사용
    yarn 하둡 얀 클러스터. HADOOP_CONF_DIR 설정 참조하여 처리
    local 로컬모드에서 싱글코어로 실행
    local[N] 로컬모드에서 N개 코어로 실행

     

    6.1.3 스파크 애플리케이션 REPL 처리

    • REPL 환경을 이용한 작업 처리도 지원합니다.
    • spark-shell, pyspark를 실행하면 REPL 환경에서 인터랙티브하게 작업을 처리
    • spark-shell은 스칼라, pyspark는 파이썬을 지원
    • 각 쉘을 실행할 때 옵션으로 클러스터 매니저 지정 가능

    6.1.3.A  spark-shell 환경(Scalar)

    $ spark-shell --master yarn --queue queue_name
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel).
    19/01/03 08:40:29 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
    19/01/03 08:40:36 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
    Spark context Web UI available at http://127.0.0.1:4040
    Spark context available as 'sc' (master = yarn, app id = application_1520227878653_37974).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.0.2
          /_/
    
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> 
    

    설정값 변경 : 설정값은 spark-submit과 같음

    # jar 파일 추가 
    $ spark-shell --master yarn \
                     --queue queue_name \
                     --jars a.jar,b.jar,c.jar \
                     --conf spark.shuffle.service.enabled=true
    

    익스큐터 개수 설정 : 스파크 쉘을 실행할 때 실행할 익스큐터의 개수와 메모리 설정 가능

    $ spark-shell \
      --master yarn \
      --deploy-mode client \
      --driver-memory 2g \
      --executor-memory 2g \
      --executor-cores 1 \
      --num-executors 5
    

    6.1.3.B pyspark 환경(python)

    파이썬을 이용한 처리는 pyspark를 이용

    $ pyspark --master yarn --queue queue_name
    Python 2.7.12 (default, Sep  1 2016, 22:14:00) 
    [GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel).
    19/01/03 08:46:58 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 2.0.2
          /_/
    
    Using Python version 2.7.12 (default, Sep  1 2016 22:14:00)
    SparkSession available as 'spark'.
    >>> 
    

     

    6.1.4 log4j 로그출력 : https://wikidocs.net/31530

    6.1.5 관련 여러 설정 : https://wikidocs.net/107317

     

    6.2 구현 방법

    > SQL과 데이터셋, 데이터프레임을 이용한 처리는 동일한 엔진을 이용하기 때문에 사용자에게 편리한 API를 이용

    • 스파크 애플리케이션 구현 방법
      • RDD를 이용 : 스파크 v1에서 발표
        • 스파크 컨텍스트(SparkContext)를 이용
      • 데이타셋(Dataset)과 데이터프레임(DataFrame)을 이용: 스파크 v2에서 RDD의 단점으로 개선하여 발표
        • 스파크 세션(SparkSession) 객체를 이용
        •  스파크 세션에서는 SQL을 이용하여 데이터 처리 가능.

    6.2.1 RDD(2014년, 스파크 v1)

    • 기존 기술에 비해 처리 속도 상승 : 인메모리 데이터 처리
    • 최적화 어려움 : 테이블 조인 효율화 같은 처리를 사용자가 직접 해야 함
    // RDD 예제 
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
    distData.map(x => if(x >= 3) x else 0).reduce((x, y) => x + y)
    • 데이터 처리 : 파티션 단위로 분리하여 작업
      • 외부 데이터를 읽어서
      • 자체적인 컬렉션 데이터를 생성 

    6.2.2 데이타셋, 데이터 프레임, SQL(스파크 1.3 ~1.6~ 2.0)

    6.2.2.A DataFrame(스파크 1.3 ~ 스파크 2.0)

    • 프로젝트 텅스텐의 일부로 소개 : 처리 속도 증가 프로젝트
    • 데이터 추상화  : 스키마 형태로..
    • 쿼리 최적화 : 카탈리스트 옵티마이저가..
    // 데이터프레임 예제 
    val df = spark.read.json("examples/src/main/resources/people.json")
    df.select($"name", $"age").filter($"age" > 20).show()
    df.groupBy("age").count().show()
    

    6.2.2.B Dataset(스파크 1.6~, 스파크 2.0)

    • RDD와 유사
    • 데이터의 타입체크
    • 데이터 직렬화를 위한 인코더
      • 자바의 기본 시리얼라이제이션이나 kyro를 사용하지 않음
      • 스파크의 인코더(Encoder)를 이용하여 RDD 보다 빠른속도.
    • 카탈리스트 옵티마이저 지원
    • 스파크 2.0에서 데이터프레임과 데이터셋을 통합
    • 스칼라 API에서 Dataset[Row]는 DataFrame을 의미 함
    // 데이터셋 예제 
    val path = "examples/src/main/resources/people.json"
    case class Person(name:String, age:Long)
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()

    6.2.3 RDD 구현

     

    6.2.3.A 초기화 : 스파크 컨텍스트, RDD 

    • RDD는 스파크 컨텍스트(SparkContext) 객체를 이용하여 생성
    • 스파크컨텍스트 초기화
      •  SparkConf 객체를 이용해서 설정값을 생성하고, 이를 이용해서 초기화
      • 스파크 쉘(spark-shell)을 이용할 경우 REPL쉘이 스파크 컨텍스트 객체를 생성
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    // SparkContext 객체 초기화 
    // 클러스터 매니저의 타입 지정
    val conf = new SparkConf().setAppName("sample").setMaster("yarn")
    val sc = new SparkContext(conf)
    
    $ spark-shell --master yarn --queue queue_name
    Spark context Web UI available at http://127.0.0.1:4040
    Spark context available as 'sc' (master = yarn, app id = application_1520227878653_37974).
    Spark session available as 'spark'.
    scala> sc
    res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3c910acd
    
    • RDD 초기화
      • 내부 데이터 이용(Parallelized Collections)
        • 스파크 컨텍스트의 parallelize() 메소드를 이용하여 처리
        • 사용자가 직업 데이터를 입력하여 생성.
        • 생성한 객체는 RDD 타입
        • 해당 객체를 map(), reduce(), filter() 등의 RDD 연산이용하여 처리
      • 외부 데이터 이용
        • 스파크 컨텍스트의 textFile() 메소드를 이용하여 처리 : HDFS, S3, hbase 등 파일시스템 지원
        • 생성한 객체는 RDD 타입이므로 RDD 연산을 이용하여 처리
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
    val distData = sc.parallelize(data, 5)  // 파티션 개수 지정 
    
    // 리듀스 처리 - 모든 수 더하기 
    scala> distData.reduce((a, b) => a + b)
    res26: Int = 15
    
    // 맵으로 3이상일때만 모든수 더하기 
    scala> distData.map(x => if(x >= 3) x else 0).reduce((x, y) => x + y)
    res27: Int = 12
    
    // 필터를 이용하여 4이상일때만 더하기 
    scala> distData.filter(_ >= 4).reduce(_ + _)
    res29: Int = 9    
    
    // 로컬파일을 지정하면 워커노드도 동일한 위치에 파일이 있어야 함 
    val distFile = sc.textFile("data.txt")
    // s3의 파일도 지정가능 
    val distFile = sc.textFile("s3://your-bucket/data.txt")
    // hdfs의 파일도 지정가능 
    val distFile = sc.textFile("hdfs:///user/data.txt")
    // 디렉토리를 지정하면 하위의 모든 파일을 처리 
    val distFile = sc.textFile("hdfs:///user/")
    // 와일드 카드 사용 가능 
    val distFile = sc.textFile("hdfs:///user/*.txt")
    // 압축파일도 지정 가능 
    val distFile = sc.textFile("hdfs:///user/*.gz")
    
    // RDD 객체 생성 확인 
    scala> val distFile = sc.textFile("data.txt")
    distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
    

     

     

    6.2.3.B RDD 연산

    • RDD 연산
      • 연산 API : https://spark.apache.org/docs/2.0.2/api/scala/index.html#org.apache.spark.rdd.RDD
      • 트랜스포메이션(transformation), 액션(action) 두가지 타입 
        • 트랜스포메이션(RDD 생성, 데이터 변환 = 작업 구성)
          • 필터링 같은 작업 : RDD에서 새로운 RDD 반환
        • 액션(RDD로 작업을 처리하여 결과를 반환. = 실제계산)
          • 액션이 실행될 때마다 새로운 연산을 처리
          • 작업의 처리 결과를 재사용하고 싶으면 persist() 메소드를 사용하여 결과를 메모리에 유지
          • 함수 전달 : 매번 작업을 구현하지 않고, 함수로 구현가능
            • 순수 함수를 이용하는 것이 좋음(외부의 변수를 이용하지 않는)
          • 캐쉬이용
            • https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
            • 처리 결과를 메모리나 디스크에 저장하고 다음 계산에 이용 가능
              • 반복작업 :  캐싱 사용시 처리 속도 상승
              • 단일작업 : 데이터 복사를 위한 오버헤드가 발생
                • 처리시간이 더 느려질 수 있음
            • 메소드 : persist(), cache()
              • 캐싱 데이터에 문제 발생시 자동 복구
              • 저장 방법 설정 가능 : 메모리나 디스크에 저장가능
            • 키, 밸류를 이용한 처리 : flatMap, reduceByKey, groupByKey, mapValues, sortByKey
            • 어큐뮬레이터(Accmulator) : 맵리듀스의 카운터와 유사한 역할
              • 클러스터 모드 : 클러스터에서 처리하므로, 클로져 이용 시 다른 결과 가능
              • 로컬 모드 
            • 브로드캐스트(broadcast) : 모든 노드에서 공유되는 읽기 전용 값
              • broadcast()
            • 셔플 : 스파크의 조인, 정렬 작업은 셔플(Shuffle) 작업
              • 임시 파일의 복사, 이동이 있기 대문에 많은 비용
      • 스파크의 지연 처리(lazy evalution) : 트랜스포메이션을 호출할 때는 작업을 처리하지 않고, 액션을 호출하는 시점에 작업을 처리함 ->  작업의 효율성 제공
    트랜스포메이션 함수
    map(func) _func_로 처리된 새로운 데이터셋 반환
    filter(func) _func_에서 true를 반환한 값으로 필터링
    flatMap(func) _func_는 배열(혹은 Seq)을 반환하고, 이 배열들을 하나의 배열로 반환
    distinct([numPartitions]) 데이터셋의 중복을 제거
    groupByKey([numPartitions]) 키를 기준으로 그룹핑 처리. (K, V) 쌍을 처리하여 (K, Iterable)로 반환
    reduceByKey(func, [numPartitions]) 키를 기준으로 주어진 _func_로 처리된 작업 결과를 (K, V)로 반환
    sortByKey([ascending], [numPartitions]) 키를 기준으로 정렬

     

    액션 함수
    reduce(func) _func_를 이용하여 데이터를 집계(두 개의 인수를 받아서 하나를 반환). 병렬처리가 가능해야 함
    collect() 처리 결과를 배열로 반환. 필터링 등 작은 데이터 집합을 반환하는데 유용
    count() 데이터셋의 개수 반환
    first() 데이터셋의 첫번째 아이템 반환(take(1)과 유사)
    take(n) 데이터셋의 첫번째 부터 _n_개의 배열을 반환
    saveAsTextFile(path) 데이터셋을 텍스트 파일로 지정한 위치에 저장
    countByKey() 키를 기준으로 카운트 반환
    foreach(func) 데이터셋의 각 엘리먼트를 _func_로 처리. 보통 Accmulator와 함께 사용
    캐싱
    MEMORY_ONLY RDD를 메모리상에 저장. 메모리보다 용량이 크면 일부만 저장하고 필요할 때마다 계산. 기본값
    MEMORY_AND_DISK RDD를 메모리상에 저장. 메모리보다 용량이 크면 일부는 메모리, 일부는 디스크에 저장
    DISK_ONLY RDD를 디스크에 저장

     

    ex1) 작업 처리 타이밍

    // RDD 객체 생성 
    scala> val lines = sc.textFile("/user/cctv_utf8.csv")
    lines: org.apache.spark.rdd.RDD[String] = /user/shs/cctv_utf8.csv MapPartitionsRDD[7] at textFile at <console>:24
    
    // map() 액션 호출시에는 반응 없음 
    scala> val lineLengths = lines.map(s => s.length)
    lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:26
    
    // reduce 호출시 작업 처리 
    scala> val totalLength = lineLengths.reduce((a, b) => a + b)
    [Stage 1:> (0 + 0) / 2]
    totalLength: Int = 18531244  
    

    ex2) cctvRDD를 이용한 transformation. RDD 반환. take 액션 실행시 작업 진행

    // RDD 생성 
    scala> val cctvRDD = sc.textFile("/user/cctv_utf8.csv")
    cctvRDD: org.apache.spark.rdd.RDD[String] = /user/cctv_utf8.csv MapPartitionsRDD[1] at textFile at <console>:24
    
    // 라인을 탭단위로 분리하여 첫번째 아이템 반환 
    scala> val magRDD = cctvRDD.map(line => line.split("\t")(0))
    magRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:26
    
    // 중복 제거 
    scala> val distRDD = magRDD.distinct()
    distRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at distinct at <console>:28
    
    // 중복 제거한 데이터를 10개만 출력 
    scala> distRDD.take(10).foreach(println)
    성남둔치공영주차장                                                              
    울산 동구청
    

    ex3) cctvRDD를 이용하여 처리한 액션은 결과를 드라이버(스파크쉘)에 반환하거나, 파일로 저장

    // RDD 생성
    scala> val cctvRDD = sc.textFile("/user/cctv_utf8.csv")
    cctvRDD: org.apache.spark.rdd.RDD[String] = /user/cctv_utf8.csv MapPartitionsRDD[1] at textFile at <console>:24
    
    // 첫번째 라인 반환 
    scala> cctvRDD.first()
    res0: String = 관리기관명    소재지도로명주소    소재지지번주소 설치목적구분  카메라대수   카메라화소수  촬영방면정보  보관일수    설치년월    관리기관전화번호    위도  경도  데이터기준일자 제공기관코드  제공기관명
    
    // 10개의 라인을 출력
    scala> cctvRDD.take(10).foreach(println)
    관리기관명   소재지도로명주소    소재지지번주소 설치목적구분  카메라대수   카메라화소수  촬영방면정보  보관일수    설치년월    관리기관전화번호    위도  경도  데이터기준일자 제공기관코드  제공기관명
    제주특별자치도 제주특별자치도 제주시 동문로9길 3 제주특별자치도 제주시 건입동 1120    생활방범    1       청은환타지아 북측 4가    30      064-710-8855    33.5132891  126.5300275 2018-04-30  6500000 제주특별자치도
    
    // 텍스트 파일로 지정한 위치에 저장 
    scala> cctvRDD.saveAsTextFile("/user/cctvRDD")
    [Stage 7:>                                                          (0 + 0) / 2]
    
    // 저장한 파일을 확인 
    $ hadoop fs -ls /user/cctvRDD/
    Found 3 items
    -rw-r--r--   2 hadoop hadoop          0 2019-01-22 04:05 /user/cctvRDD/_SUCCESS
    -rw-r--r--   2 hadoop hadoop   15333006 2019-01-22 04:05 /user/cctvRDD/part-00000
    -rw-r--r--   2 hadoop hadoop   15332503 2019-01-22 04:05 /user/cctvRDD/part-00001
    

    ex4) 함수 구현

    // RDD에 map, reduce 함수를 람다함수로 전달
    scala> cctvRDD.map(line => line.length).reduce((a, b) => a + b)
    res12: Int = 18531244
    
    // 함수 구현체 
    object Func {
      // line의 길이를 반환하는 함수 
      def mapFunc(line: String): Int = { line.length }
      // a, b의 합을 반환하는 함수 
      def reduceFunc(a:Int, b:Int): Int = { a + b }
    }
    
    // RDD에 mapFunc, reduceFunc를 전달
    scala> cctvRDD.map(Func.mapFunc).reduce(Func.reduceFunc)
    res11: Int = 18531244                                                           
    

    ex5) 캐싱

    val txts = sc.textFile("/user/sample.txt")
    val pairs = txts.flatMap(line => line.split(" ")).map(word => (word, 1))
    
    // 각 단계의 결과를 캐슁 
    scala> pairs.persist()
    res39: pairs.type = MapPartitionsRDD[36] at map at <console>:26
    
    val counts = pairs.reduceByKey(_ + _) 
    scala> counts.persist()
    res38: counts.type = ShuffledRDD[37] at reduceByKey at <console>:28
    

     

    6.2.4 DataFrame/Dataset 구현

    • 데이터셋, 데이터 프레임은 스파크 세션을 이용하여 처리

    A. 스파크세션 초기화

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    
    • 스파크 쉘(spark-shell) 이용 시,
      • REPL쉘이 스파크 세션 객체를 초기화.
      • 쉘 실행시 스파크 컨텍스트와 스파크 세션을 생성했다는 메세지를 확인 가능
    $ spark-shell --master yarn --queue queue_name
    Spark context Web UI available at http://127.0.0.1:4040
    Spark context available as 'sc' (master = yarn, app id = application_1520227878653_37974).
    Spark session available as 'spark'.
    scala> spark
    res40: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@21840920
    
    • 하이브 메타스토어 연결
      •  세션 생성시에 hive.metastore.uris 값을 설정하여 메타스토어와 연결
      • 세션은 단독 or 하이브 메타스토어와 연결 가능
    // hive.metastore.uris 옵션에 하이브 메타스토어 접속 주소를 입력한다. 
    val spark = SparkSession.builder().appName("sample").config("hive.metastore.uris", "thrift://hive_metastore_ip:hive_metastore_port").enableHiveSupport().getOrCreate()
    
    // 데이터베이스 조회 
    scala> spark.sql("show databases").show()
    +-------------+
    | databaseName|
    +-------------+
    |      test_db1|
    |      test_db2|
    

     

    B. 데이터프레임 초기화

    • 예시 : https://wikidocs.net/28555 
    • B.1 스파크 세션의 read 메소드 이용
      • json, parquet, orc, text 등 읽기 가능
    • B.2 RDD 이용 : 스키마구조를 지정할 수도 있고, 지정하지 않으면 스파크에서 임시 칼럼명을 지정
      • 배열 RDD : 단일 데이터
      • 복합구조 RDD : 칼럼이 여러개인 데이터
      • 스키마 이용 : 데이터를 검증하면서 초기화
      • 외부데이터 read
        • json : 구조가 있으므로, 자동 스키마 생성
        • txt : 스키마 생성하여 초기화

    C. 데이터프레임 연산

    • 데이터 프레임은 스키마 구조를 가지므로, 쿼리를 날리는 것처럼 작업가능.
      • 명령어 체인 이용
        • 스키마 확인 : printSchema()
        • 조회
          • select() : 칼럼 선택시 $ 사용
          • show() 함수 설정 : 데이터 출력설정. 라인 개수, 칼럼 사이즈
        • 필터링 : filter()
        • 그룹핑 : groupBy()
        • 칼럼추가 : withColumn()
      •  SQL 이용 : 뷰생성 -> SQL 실행
        • 뷰 생성 : createOrReplaceTempView("view Name")
        • SQL 사용 : sql(). DB사용시와 동일

    D. 데이터셋 초기화

    • 내부데이터 이용
      • 케이스 클래스 이용 : case class People(name: String, salary: Int)
      • RDD를 데이터셋으로 초기화 : RDD -> 데이터 프레임 변경 ->  데이터셋 변환
        • RDD.map -> dataframe.as
      • 데이터프레임을 데이터셋으로 초기화
        • 데이터프레임에 정적데이터로 변경 필요 : as 이용해서 클래스를 지정
        • 스키마와 클래스의 이름이 동일하면 자동 바인딩
    • 외부데이터 이용

    E. 데이터셋 연산 : 데이터프레임과 동일

    • 명령어 체인 이용
      • 스키마 확인 : printSchema()
      • 조회
        • select() : 칼럼 선택시 $ 사용
        • show() 함수 설정 : 데이터 출력설정. 라인 개수, 칼럼 사이즈
      • 필터링 : filter()
      • 그룹핑 : groupBy()
      • 맵연산 : RDD와 동일
    •  SQL 이용 : 뷰생성 -> SQL 실행
      • 뷰 생성 : createOrReplaceTempView("view Name")
      • SQL 사용 : sql(). DB사용시와 동일

    F. 저장/불러오기 : 데이타프레임, 데이타셋 동일

    • 저장
      • 데이터 저장 : save()
        • 저장 포멧 지정 : format()
        • 압축 포멧 지정 : option(). gzip, snappy..
        • 저장모드
      • 테이블저장 : saveAsTable()
        • 버켓팅, 정렬, 파티셔닝 : bucketBy(), sortBy(),  partitionBy(), 
          • 임시테이블에서는 불가
    • 데이터 불러오기 : load() 혹은 read의 파일로더 함수(read.json() 등...)
      • 불러오기 포멧, 옵션 : format(), option()
    저장모드
    SaveMode.ErrorIfExists 파일이 있으면 에러 처리
    SaveMode.Append 다른 이름으로 파일 추가
    SaveMode.Overwrite 기존 파일을 삭제하고 추가
    SaveMode.Ignore 파일이 있으면 저장하지 않고, 에러 처리도 하지 않음

    G. DB 저장 : MySQL 등의 RDB에 직접 저장가능

    • 데이터 저장 : DB 저장시 format("jdbc)로 설정

    7.스파크 SQL과 하이브

    스파크 SQL을 하이브의 메타스토어 정보를 이용하여 처리하기

    • 메타스토어의 데이터베이스, 테이블 정보를 이용하여 작업을 진행
    • RDD, 데이터프레임, 스파크 SQL을 이용하여 작업
    • 스파크 SQL
      • 하이브의 쿼리를 스파크에서 그대로 이용할 수 있게 함.
      • 스파크 SQL을 이용하여 조회한 정보는 데이터프레임으로 반환 -> 데이터셋으로 변환
      • 스칼라를 이용하여 UDF를 생성 or  자바로 작성한 UDF 사용
      • 스파크 SQL, 데이터프레임은 내부적으로 동일한 최적화 과정
      • 하이브 메타스토어 연결
        • 스파크 세션 초기화 시 hive.metastore.uris 정보를 제공하여 하이브 메타스토어와 연결
      • 스파크 SQL 쿼리
        • 조회 : select sql문 이용
        • 조인
          • SQL 이용 : 표준 SQL의 조인 명령
          • 데이터 프레임 이용 : 명령어 체인. select()
        • 저장 : write()
          • 데이터 포멧, 저장모드, 저장위치 전달 필요
          • coalesce로 파티션 갯수(최종 파일의 갯수) 지정 가능
        • UDF(User Defined Function)
          • 스파크 세션에 함수 등록하여 사용 : register()
            • 메소드 체인에서 사용하려면
              • UDF 함수로 변환필요
              • udf()로 함수를 감싸주기
          • 하이브 UDF(하이브 쿼리) 그대로 사용가능
            • 버전에 따라 ADD JAR 명령에서 오류가 발생가능
              • 스파크 실행시에 --jars 옵션을 이용하여 jar 파일을 추가

    8.스파크 스트리밍

    • 스파크 스트리밍은 실시간 데이터 분석을 위한 스파크 컴포넌트
    • 실시간 데이터 분석은
      • 페이지 뷰에 대한 통계를 추적하거나,
      • 머신러닝 모델을 학습시킬 수 있습니다.
      • 로그 분석을 통해서 실시간으로 에러를 감지할 수도 있고,
      • 현재 사용자의 구매 패턴을 분석하여 물품 추천을 할 수도 있습니다.

    8.1 디스트림

    8.2 스트리밍 컨텍스트

    8.3 스트리밍 워드 카운트

    8.4 윈도우 트랜스포메이션

    8.5 디스트림 연산

    9. PySpark

     

     

     

     

    'Back > Bigdata' 카테고리의 다른 글

    Docker - 2: 세팅 연습  (1) 2023.05.21
    Spark -2 : 세팅 연습  (0) 2023.05.21
    Scalar  (0) 2023.05.15
    Apache Hive & HiveQL  (0) 2023.05.13