ref) https://wikidocs.net/26513
목차
1. 아파치 스파크(apache spark)
- 2011년 버클리 대학의 AMPLab에서 개발, 2014년 5월 정식 출시
- 아파치 재단의 오픈소스
- 인메모리 기반의 대용량 데이터 고속 처리 엔진
범용 분산 클러스터 컴퓨팅 프레임워크
2. 특징
- Speed : 인메모리(In-Memory) 기반의 빠른 처리
- 맵리듀스 작업처리에 비해 디스크는 10배, 메모리 작업은 100배 빠른 속도
- 맵리듀스는 작업의 중간 결과를 디스크에 쓰기 때문에 IO로 인하여 작업 속도에 제약이 생깁니다.
- 스파크는 메모리에 중간 결과를 메모리에 저장하여 반복 작업의 처리 효율이 높습니다.
- Ease of Use : 다양한 언어 지원(Java, Scala, Python, R, SQL)을 통한 사용의 편이성
- 성능을 위해서는 Scala 로 개발을 진행하는 것이 좋다
- Generality : SQL, Streaming, 머신러닝, 그래프 연산 등 다양한 컴포넌트 제공
- 스파크 스트리밍을 이용한 스트림 처리, 스파크 SQL을 이용한 SQL 처리, MLib 를 이용한 머신러닝 처리, GraphX를 이용한 그래프 프로세싱 지원
- Run Everywhere
- YARN, Mesos, Kubernetes 등 다양한 클러스터에서 동작 가능
- YARN, Mesos, Kubernetes, standalone 등 다양한 포맷
- HDFS, 카산드라, HBase, S3 등의 다양한 데이터 포맷을 지원
- HDFS, Casandra, HBase 등 다양한 파일 포맷 지원
- TXT, Json, ORC, Parquet 등의 파일 포맷을 지원.
- S3, HDFS 등의 파일 시스템과 연동도 가능하고, HBase, Hive 와도 간단하게 연동
- YARN, Mesos, Kubernetes 등 다양한 클러스터에서 동작 가능
3. 구성
스파크 코어, 클러스터 매니저, 스파크 라이브러리
3.1 스파크 코어
- 메인 컴포넌트
- 기본기능 제공 : 작업 스케줄링, 메모리 관리, 장애 복구 등
- 스파크 연산 처리 : RDD, Dateset, DataFrame 이용
3.2 스파크 라이브러리
빅데이터 처리를 위한 작업용 라이브러리 .
- Spark SQL
- SQL을 이용하여 RDD, DataSet, DataFrame 작업을 생성하고 처리
- 하이브 메타스토어와 연결하여 하이브의 메타 정보를 이용하여 SQL 작업을 처리
- 샤크(Shark)는 하이브에서 스파크 작업 처리를 위한 외부 프로젝트 였는데 현재 스파크 SQL로 통합
- Spark Streaming
- 실시간 데이터 스트림을 처리하는 컴포넌트. 스트림 데이터를 작은 사이즈로 쪼개어 RDD 처럼 처리.
- MLib
- MLib는 스파크 기반의 머신러닝 기능을 제공하는 컴포넌트입니다. 분류(classification), 회귀(regression), 클러스터링(clustering), 협업 필터링(collaborative filtering) 등의 머신러닝 알고리즘과 모델 평가 및 외부 데이터 불러오기 같은 기능도 지원합니다.
- GraphX
- GraphX는 분산형 그래프 프로세싱이 가능하게 해주는 컴포넌트입니다. 각 간선이나 점에 임의의 속성을 추가한 지향성 그래프를 만들 수 있습니다.
3.3 클러스터 매니저
스파크 작업을 운영하는 클러스터 관리자
- 다양한 클러스터 매니저 지원
- 스파크에서 제공하는 스탠드얼론(Standalone) 관리자
- 메조스(Mesos), 얀(YARN), 큐버네티스(Kubernetes) 등의 관리자
4. 스파크 구조
- 크게 두가지 : 작업 + 실행
- 작업 관리 : 드라이버
- 노드 관리(작업 실행되는) : 클러스터 매니저
4.1 스파크 애플리케이션(=스파크 실행 프로그램) 구성
- 드라이버와 익스큐터 프로세스로 실행되는 프로그램
- 클러스터 매니저가 스파크 애플리케이션의 리소스를 효율적으로 배분
- 마스터-슬레이브 구조로 실행
4.1.1 드라이버(Driver) : 작업을 관장
- 스파크 애플리케이션을 실행하는 프로세스
- main 함수를 실행
- 스파크 컨텍스트(SparkContext) 객체를 생성
- 클러스터 매니저와 통신
- 클러스터의 자원 관리를 지원
- 애플리케이션의 라이프 사이클 관리
- 사용자로 부터 입력을 받아서 애플리케이션에 전달
- 작업 처리 결과를 사용자에게 알려줌.
- 디플로이 모드 : 실행시점에 설정
- 클라이언트 모드 : 클러스터 외부에서 드라이버 실행
- 클러스터 모드 : 클러스터 내에서 드라이버 실행
4.1.2 익스큐터(Executor) : 작업을 동작시킴
- 태스크 실행을 담당하는 에이전트
- 작업을 진행하는 프로세스입니다.= YARN의 컨테이너
- 태스크 단위로 작업을 실행하고 결과를 드라이버에 알려줌
- 동작 중 오류가 발생하면 다시 재작업을 진행
- 태스크(Task)
- 익스큐터에서 실행되는 작업. 익스큐터의 캐쉬를 공유하여 작업의 속도를 높일 수 있음.
- 하나의 Core. 익스큐터가 여러개 태스크 동시실행
- 익스큐터, 코어 수, 메모리 용량을 설정이 중요
- 태스크가 너무 많으면 : 컨텍스트 스위칭, HDFS I/O로 인한 성능저하
- 태스크가 너무 작으면 : JVM을 공유하는 장점이 사라짐
4.2 스파크 잡 구성
- 잡(Job), 스테이지(Stage), 태스크(Task)로 구성
4.2.1 잡(Job)
- 스파크 애플리케이션으로 제출된 작업(work)
4.2.2 스테이지(Stage)
- Job을 작업의 단위에 따라 구분한 것
4.2.3 태스크(Task)
- 익스큐터에서 실행되는 작업. 데이터를 읽거나, 필터링 하는 실제 실행내용
4.3 클러스터 매니저
스파크는 여러 가지 클러스터 매니저를 지원
- YARN : 하둡 클러스터 매니저
- 리소스 매니저, 노드 매니저로 구성 됨
- Mesos : 아파치 클러스터 매니저
- 동적 리소스 공유 및 격리를 사용하여 여러 소스의 워크로드를 처리
- 마스터와 슬레이브로 구성됨
- StandAlone : 스파크 자체 제공 클러스터 매니저
- 각 노드에서 하나의 익스큐터만 실행 가능
5.스파크 작업 모니터링 방법(Web UI/ REST API)
5.1 스파크 컨텍스트 웹UI
- 스파크 컨텍스트가 실행되면 4040포트로 웹UI를 실행
- 하나의 노드에 여러개의 컨텍스트가 실행되면 포트번호가 1씩 증가하면서 생성(4041, 4042, ...).
- 스파크 컨텍스트가 실행되고 있는 동안에만 사용 가능
- 스파크 히스토리 서버
- 실행중인 작업과 실행이 끝난 작업의 히스토리를 확인하기 위한 서버
- start-history-server.sh로 실행하고 기본 접속 포트는 18080
- 영구적인 저장소에 스파크 작업 내역을 저장 하고 사용자의 요청에 정보를 반환
- 확인 가능 정보
- 스테이지와 태스크 목록
- RDD 크기와 메모리 사용량
- 환경변수 정보
- 익스큐터의 정보
5.2 REST API(at 스파크 히스토리 서버)
- 작업 애플리케이션에 대해 REST API를 이용해 json 형태의 정보를 제공.
curl -s http://$(hostname -f):18080/api/v1/applications
curl -s http://$(hostname -f):18080/api/v1/[app-id]/jobs
6. 스파크 애플리케이션
- 스칼라, 자바, 파이썬, R 로 구현가능
- 각 언어로 스파크 SQL을 실행가능
- jar 파일로 묶어서 실행하거나, REPL 환경에서 실행
6.1 실행 방법
6.1.1 방법 1 - 각 언어별 실행 스크립트
스파크를 다운로드 받아서 bin 디렉토리에 들어가면 각 언어를 실행할 수 있는 스크립트가 있음.
# bin 폴더 실행 파일
-rwxr-xr-x 1 root root 1099 Nov 16 2016 beeline
-rw-r--r-- 1 root root 2143 Nov 16 2016 load-spark-env.sh
-rwxr-xr-x 1 root root 3265 Nov 16 2016 pyspark // 파이썬
-rwxr-xr-x 1 root root 1040 Nov 16 2016 run-example
-rwxr-xr-x 1 root root 3126 Nov 16 2016 spark-class
-rwxr-xr-x 1 root root 1049 Nov 16 2016 sparkR // R
-rwxr-xr-x 1 root root 3026 Nov 16 2016 spark-shell // scala repl
-rwxr-xr-x 1 root root 1075 Nov 16 2016 spark-sql // spark on sql
-rwxr-xr-x 1 root root 1050 Nov 16 2016 spark-submit // scala jar
6.1.2 방법 2 - 스파크 애플리케이션 제출(spark-submit)
- 스칼라나, 자바로 작성한 스파크 애플리케이션을 jar 파일로 실행
- 파이썬, R 파일 실행
# spark-submit 실행 옵션
$ ./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<app jar | python file | R file> \
[application-arguments]
# 클러스터 매니저가 yarn인 경우 실행 방법
# JAR 파일 실행
$ spark-submit --master yarn \
--queue spark_queue \
--class sdk.spark.SparkWordCount \
--conf spark.shuffle.service.enabled=true \
./Spark-Example.jar
# 파이썬 파일 실행
$ spark-submit --master yarn \
--queue spark_queue \
./py01.py
6.1.2.A 설정값
spark-submit을 이용시 설정값.
--master | 클러스터 매니저 설정 |
--deploy-mode | 드라이버의 디플로이 모드 설정 |
--class | main 함수가 들어 있는 클래스 지정 |
--name | 애플리케이션의 이름 지정. 스파크 웹 UI에 표시 |
--jars | 애플리케이션 실행에 필요한 라이브러리 목록. 콤마로 구분 |
--files | 애플리케이션 실행에 필요한 파일 목록 |
--queue | 얀의 실행 큐이름 설정 |
--executor-memory | 익스큐터가 사용할 메모리 바이트 용량. 512m. 1g 등도 사용 가능 |
--driver-memory | 드라이버 프로세스가 사용할 메모리 바이트 용량. 512m. 1g 등도 사용 가능 |
--num-executors | 익스큐터의 개수 설정 |
--executor-cores | 익스큐터의 코어 개수 설정 |
6.1.2.B 디플로이 모드
디플로이 모드에 대한 설정값
client | 프로그램을 실행하는 노드에서 드라이버 실행 |
cluster | 클러스터 내부의 노드에서 드라이버 실행 |
6.1.2.C 클러스터 매니저 설정
클러스터 매니저 설정
spark://ip:port | 스파크 스탠드얼론 클러스터 사용 |
mesos://ip:port | 아파치 메조스 사용 |
yarn | 하둡 얀 클러스터. HADOOP_CONF_DIR 설정 참조하여 처리 |
local | 로컬모드에서 싱글코어로 실행 |
local[N] | 로컬모드에서 N개 코어로 실행 |
6.1.3 스파크 애플리케이션 REPL 처리
- REPL 환경을 이용한 작업 처리도 지원합니다.
- spark-shell, pyspark를 실행하면 REPL 환경에서 인터랙티브하게 작업을 처리
- spark-shell은 스칼라, pyspark는 파이썬을 지원
- 각 쉘을 실행할 때 옵션으로 클러스터 매니저 지정 가능
6.1.3.A spark-shell 환경(Scalar)
- 스칼라를 이용한 처리는 spark-shell 을 이용
- 쉘 명령 : https://wikidocs.net/31530
$ spark-shell --master yarn --queue queue_name
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
19/01/03 08:40:29 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
19/01/03 08:40:36 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://127.0.0.1:4040
Spark context available as 'sc' (master = yarn, app id = application_1520227878653_37974).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.2
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
설정값 변경 : 설정값은 spark-submit과 같음
# jar 파일 추가
$ spark-shell --master yarn \
--queue queue_name \
--jars a.jar,b.jar,c.jar \
--conf spark.shuffle.service.enabled=true
익스큐터 개수 설정 : 스파크 쉘을 실행할 때 실행할 익스큐터의 개수와 메모리 설정 가능
$ spark-shell \
--master yarn \
--deploy-mode client \
--driver-memory 2g \
--executor-memory 2g \
--executor-cores 1 \
--num-executors 5
6.1.3.B pyspark 환경(python)
파이썬을 이용한 처리는 pyspark를 이용
$ pyspark --master yarn --queue queue_name
Python 2.7.12 (default, Sep 1 2016, 22:14:00)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
19/01/03 08:46:58 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.0.2
/_/
Using Python version 2.7.12 (default, Sep 1 2016 22:14:00)
SparkSession available as 'spark'.
>>>
6.1.4 log4j 로그출력 : https://wikidocs.net/31530
6.1.5 관련 여러 설정 : https://wikidocs.net/107317
6.2 구현 방법
> SQL과 데이터셋, 데이터프레임을 이용한 처리는 동일한 엔진을 이용하기 때문에 사용자에게 편리한 API를 이용
- 스파크 애플리케이션 구현 방법
- RDD를 이용 : 스파크 v1에서 발표
- 스파크 컨텍스트(SparkContext)를 이용
- 데이타셋(Dataset)과 데이터프레임(DataFrame)을 이용: 스파크 v2에서 RDD의 단점으로 개선하여 발표
- 스파크 세션(SparkSession) 객체를 이용
- 스파크 세션에서는 SQL을 이용하여 데이터 처리 가능.
- RDD를 이용 : 스파크 v1에서 발표
6.2.1 RDD(2014년, 스파크 v1)
- 기존 기술에 비해 처리 속도 상승 : 인메모리 데이터 처리
- 최적화 어려움 : 테이블 조인 효율화 같은 처리를 사용자가 직접 해야 함
// RDD 예제
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData.map(x => if(x >= 3) x else 0).reduce((x, y) => x + y)
- 데이터 처리 : 파티션 단위로 분리하여 작업
- 외부 데이터를 읽어서
- 자체적인 컬렉션 데이터를 생성
6.2.2 데이타셋, 데이터 프레임, SQL(스파크 1.3 ~1.6~ 2.0)
6.2.2.A DataFrame(스파크 1.3 ~ 스파크 2.0)
- 프로젝트 텅스텐의 일부로 소개 : 처리 속도 증가 프로젝트
- 데이터 추상화 : 스키마 형태로..
- 쿼리 최적화 : 카탈리스트 옵티마이저가..
// 데이터프레임 예제
val df = spark.read.json("examples/src/main/resources/people.json")
df.select($"name", $"age").filter($"age" > 20).show()
df.groupBy("age").count().show()
6.2.2.B Dataset(스파크 1.6~, 스파크 2.0)
- RDD와 유사
- 데이터의 타입체크
- 데이터 직렬화를 위한 인코더
- 자바의 기본 시리얼라이제이션이나 kyro를 사용하지 않음
- 스파크의 인코더(Encoder)를 이용하여 RDD 보다 빠른속도.
- 카탈리스트 옵티마이저 지원
- 스파크 2.0에서 데이터프레임과 데이터셋을 통합
- 스칼라 API에서 Dataset[Row]는 DataFrame을 의미 함
// 데이터셋 예제
val path = "examples/src/main/resources/people.json"
case class Person(name:String, age:Long)
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
6.2.3 RDD 구현
6.2.3.A 초기화 : 스파크 컨텍스트, RDD
- RDD는 스파크 컨텍스트(SparkContext) 객체를 이용하여 생성
- 스파크컨텍스트 초기화
- SparkConf 객체를 이용해서 설정값을 생성하고, 이를 이용해서 초기화
- 스파크 쉘(spark-shell)을 이용할 경우 REPL쉘이 스파크 컨텍스트 객체를 생성
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
// SparkContext 객체 초기화
// 클러스터 매니저의 타입 지정
val conf = new SparkConf().setAppName("sample").setMaster("yarn")
val sc = new SparkContext(conf)
$ spark-shell --master yarn --queue queue_name
Spark context Web UI available at http://127.0.0.1:4040
Spark context available as 'sc' (master = yarn, app id = application_1520227878653_37974).
Spark session available as 'spark'.
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3c910acd
- RDD 초기화
- 내부 데이터 이용(Parallelized Collections)
- 스파크 컨텍스트의 parallelize() 메소드를 이용하여 처리
- 사용자가 직업 데이터를 입력하여 생성.
- 생성한 객체는 RDD 타입
- 해당 객체를 map(), reduce(), filter() 등의 RDD 연산이용하여 처리
- 외부 데이터 이용
- 스파크 컨텍스트의 textFile() 메소드를 이용하여 처리 : HDFS, S3, hbase 등 파일시스템 지원
- 생성한 객체는 RDD 타입이므로 RDD 연산을 이용하여 처리
- 내부 데이터 이용(Parallelized Collections)
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
val distData = sc.parallelize(data, 5) // 파티션 개수 지정
// 리듀스 처리 - 모든 수 더하기
scala> distData.reduce((a, b) => a + b)
res26: Int = 15
// 맵으로 3이상일때만 모든수 더하기
scala> distData.map(x => if(x >= 3) x else 0).reduce((x, y) => x + y)
res27: Int = 12
// 필터를 이용하여 4이상일때만 더하기
scala> distData.filter(_ >= 4).reduce(_ + _)
res29: Int = 9
// 로컬파일을 지정하면 워커노드도 동일한 위치에 파일이 있어야 함
val distFile = sc.textFile("data.txt")
// s3의 파일도 지정가능
val distFile = sc.textFile("s3://your-bucket/data.txt")
// hdfs의 파일도 지정가능
val distFile = sc.textFile("hdfs:///user/data.txt")
// 디렉토리를 지정하면 하위의 모든 파일을 처리
val distFile = sc.textFile("hdfs:///user/")
// 와일드 카드 사용 가능
val distFile = sc.textFile("hdfs:///user/*.txt")
// 압축파일도 지정 가능
val distFile = sc.textFile("hdfs:///user/*.gz")
// RDD 객체 생성 확인
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
6.2.3.B RDD 연산
- RDD 연산
- 연산 API : https://spark.apache.org/docs/2.0.2/api/scala/index.html#org.apache.spark.rdd.RDD
- 트랜스포메이션(transformation), 액션(action) 두가지 타입
- 트랜스포메이션(RDD 생성, 데이터 변환 = 작업 구성)
- 필터링 같은 작업 : RDD에서 새로운 RDD 반환
- 액션(RDD로 작업을 처리하여 결과를 반환. = 실제계산)
- 액션이 실행될 때마다 새로운 연산을 처리
- 작업의 처리 결과를 재사용하고 싶으면 persist() 메소드를 사용하여 결과를 메모리에 유지
- 함수 전달 : 매번 작업을 구현하지 않고, 함수로 구현가능
- 순수 함수를 이용하는 것이 좋음(외부의 변수를 이용하지 않는)
- 캐쉬이용
- https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
- 처리 결과를 메모리나 디스크에 저장하고 다음 계산에 이용 가능
- 반복작업 : 캐싱 사용시 처리 속도 상승
- 단일작업 : 데이터 복사를 위한 오버헤드가 발생
- 처리시간이 더 느려질 수 있음
- 메소드 : persist(), cache()
- 캐싱 데이터에 문제 발생시 자동 복구
- 저장 방법 설정 가능 : 메모리나 디스크에 저장가능
- 키, 밸류를 이용한 처리 : flatMap, reduceByKey, groupByKey, mapValues, sortByKey
- 어큐뮬레이터(Accmulator) : 맵리듀스의 카운터와 유사한 역할
- 클러스터 모드 : 클러스터에서 처리하므로, 클로져 이용 시 다른 결과 가능
- 로컬 모드
- 브로드캐스트(broadcast) : 모든 노드에서 공유되는 읽기 전용 값
- broadcast()
- 셔플 : 스파크의 조인, 정렬 작업은 셔플(Shuffle) 작업
- 임시 파일의 복사, 이동이 있기 대문에 많은 비용
- 트랜스포메이션(RDD 생성, 데이터 변환 = 작업 구성)
- 스파크의 지연 처리(lazy evalution) : 트랜스포메이션을 호출할 때는 작업을 처리하지 않고, 액션을 호출하는 시점에 작업을 처리함 -> 작업의 효율성 제공
트랜스포메이션 함수 | |
map(func) | _func_로 처리된 새로운 데이터셋 반환 |
filter(func) | _func_에서 true를 반환한 값으로 필터링 |
flatMap(func) | _func_는 배열(혹은 Seq)을 반환하고, 이 배열들을 하나의 배열로 반환 |
distinct([numPartitions]) | 데이터셋의 중복을 제거 |
groupByKey([numPartitions]) | 키를 기준으로 그룹핑 처리. (K, V) 쌍을 처리하여 (K, Iterable)로 반환 |
reduceByKey(func, [numPartitions]) | 키를 기준으로 주어진 _func_로 처리된 작업 결과를 (K, V)로 반환 |
sortByKey([ascending], [numPartitions]) | 키를 기준으로 정렬 |
액션 함수 | |
reduce(func) | _func_를 이용하여 데이터를 집계(두 개의 인수를 받아서 하나를 반환). 병렬처리가 가능해야 함 |
collect() | 처리 결과를 배열로 반환. 필터링 등 작은 데이터 집합을 반환하는데 유용 |
count() | 데이터셋의 개수 반환 |
first() | 데이터셋의 첫번째 아이템 반환(take(1)과 유사) |
take(n) | 데이터셋의 첫번째 부터 _n_개의 배열을 반환 |
saveAsTextFile(path) | 데이터셋을 텍스트 파일로 지정한 위치에 저장 |
countByKey() | 키를 기준으로 카운트 반환 |
foreach(func) | 데이터셋의 각 엘리먼트를 _func_로 처리. 보통 Accmulator와 함께 사용 |
캐싱 | |
MEMORY_ONLY | RDD를 메모리상에 저장. 메모리보다 용량이 크면 일부만 저장하고 필요할 때마다 계산. 기본값 |
MEMORY_AND_DISK | RDD를 메모리상에 저장. 메모리보다 용량이 크면 일부는 메모리, 일부는 디스크에 저장 |
DISK_ONLY | RDD를 디스크에 저장 |
ex1) 작업 처리 타이밍
// RDD 객체 생성
scala> val lines = sc.textFile("/user/cctv_utf8.csv")
lines: org.apache.spark.rdd.RDD[String] = /user/shs/cctv_utf8.csv MapPartitionsRDD[7] at textFile at <console>:24
// map() 액션 호출시에는 반응 없음
scala> val lineLengths = lines.map(s => s.length)
lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:26
// reduce 호출시 작업 처리
scala> val totalLength = lineLengths.reduce((a, b) => a + b)
[Stage 1:> (0 + 0) / 2]
totalLength: Int = 18531244
ex2) cctvRDD를 이용한 transformation. RDD 반환. take 액션 실행시 작업 진행
// RDD 생성
scala> val cctvRDD = sc.textFile("/user/cctv_utf8.csv")
cctvRDD: org.apache.spark.rdd.RDD[String] = /user/cctv_utf8.csv MapPartitionsRDD[1] at textFile at <console>:24
// 라인을 탭단위로 분리하여 첫번째 아이템 반환
scala> val magRDD = cctvRDD.map(line => line.split("\t")(0))
magRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:26
// 중복 제거
scala> val distRDD = magRDD.distinct()
distRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at distinct at <console>:28
// 중복 제거한 데이터를 10개만 출력
scala> distRDD.take(10).foreach(println)
성남둔치공영주차장
울산 동구청
ex3) cctvRDD를 이용하여 처리한 액션은 결과를 드라이버(스파크쉘)에 반환하거나, 파일로 저장
// RDD 생성
scala> val cctvRDD = sc.textFile("/user/cctv_utf8.csv")
cctvRDD: org.apache.spark.rdd.RDD[String] = /user/cctv_utf8.csv MapPartitionsRDD[1] at textFile at <console>:24
// 첫번째 라인 반환
scala> cctvRDD.first()
res0: String = 관리기관명 소재지도로명주소 소재지지번주소 설치목적구분 카메라대수 카메라화소수 촬영방면정보 보관일수 설치년월 관리기관전화번호 위도 경도 데이터기준일자 제공기관코드 제공기관명
// 10개의 라인을 출력
scala> cctvRDD.take(10).foreach(println)
관리기관명 소재지도로명주소 소재지지번주소 설치목적구분 카메라대수 카메라화소수 촬영방면정보 보관일수 설치년월 관리기관전화번호 위도 경도 데이터기준일자 제공기관코드 제공기관명
제주특별자치도 제주특별자치도 제주시 동문로9길 3 제주특별자치도 제주시 건입동 1120 생활방범 1 청은환타지아 북측 4가 30 064-710-8855 33.5132891 126.5300275 2018-04-30 6500000 제주특별자치도
// 텍스트 파일로 지정한 위치에 저장
scala> cctvRDD.saveAsTextFile("/user/cctvRDD")
[Stage 7:> (0 + 0) / 2]
// 저장한 파일을 확인
$ hadoop fs -ls /user/cctvRDD/
Found 3 items
-rw-r--r-- 2 hadoop hadoop 0 2019-01-22 04:05 /user/cctvRDD/_SUCCESS
-rw-r--r-- 2 hadoop hadoop 15333006 2019-01-22 04:05 /user/cctvRDD/part-00000
-rw-r--r-- 2 hadoop hadoop 15332503 2019-01-22 04:05 /user/cctvRDD/part-00001
ex4) 함수 구현
// RDD에 map, reduce 함수를 람다함수로 전달
scala> cctvRDD.map(line => line.length).reduce((a, b) => a + b)
res12: Int = 18531244
// 함수 구현체
object Func {
// line의 길이를 반환하는 함수
def mapFunc(line: String): Int = { line.length }
// a, b의 합을 반환하는 함수
def reduceFunc(a:Int, b:Int): Int = { a + b }
}
// RDD에 mapFunc, reduceFunc를 전달
scala> cctvRDD.map(Func.mapFunc).reduce(Func.reduceFunc)
res11: Int = 18531244
ex5) 캐싱
val txts = sc.textFile("/user/sample.txt")
val pairs = txts.flatMap(line => line.split(" ")).map(word => (word, 1))
// 각 단계의 결과를 캐슁
scala> pairs.persist()
res39: pairs.type = MapPartitionsRDD[36] at map at <console>:26
val counts = pairs.reduceByKey(_ + _)
scala> counts.persist()
res38: counts.type = ShuffledRDD[37] at reduceByKey at <console>:28
6.2.4 DataFrame/Dataset 구현
- 데이터셋, 데이터 프레임은 스파크 세션을 이용하여 처리
A. 스파크세션 초기화
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
- 스파크 쉘(spark-shell) 이용 시,
- REPL쉘이 스파크 세션 객체를 초기화.
- 쉘 실행시 스파크 컨텍스트와 스파크 세션을 생성했다는 메세지를 확인 가능
$ spark-shell --master yarn --queue queue_name
Spark context Web UI available at http://127.0.0.1:4040
Spark context available as 'sc' (master = yarn, app id = application_1520227878653_37974).
Spark session available as 'spark'.
scala> spark
res40: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@21840920
- 하이브 메타스토어 연결
- 세션 생성시에 hive.metastore.uris 값을 설정하여 메타스토어와 연결
- 세션은 단독 or 하이브 메타스토어와 연결 가능
// hive.metastore.uris 옵션에 하이브 메타스토어 접속 주소를 입력한다.
val spark = SparkSession.builder().appName("sample").config("hive.metastore.uris", "thrift://hive_metastore_ip:hive_metastore_port").enableHiveSupport().getOrCreate()
// 데이터베이스 조회
scala> spark.sql("show databases").show()
+-------------+
| databaseName|
+-------------+
| test_db1|
| test_db2|
B. 데이터프레임 초기화
- 예시 : https://wikidocs.net/28555
- B.1 스파크 세션의 read 메소드 이용
- json, parquet, orc, text 등 읽기 가능
- B.2 RDD 이용 : 스키마구조를 지정할 수도 있고, 지정하지 않으면 스파크에서 임시 칼럼명을 지정
- 배열 RDD : 단일 데이터
- 복합구조 RDD : 칼럼이 여러개인 데이터
- 스키마 이용 : 데이터를 검증하면서 초기화
- 외부데이터 read
- json : 구조가 있으므로, 자동 스키마 생성
- txt : 스키마 생성하여 초기화
C. 데이터프레임 연산
- 데이터 프레임은 스키마 구조를 가지므로, 쿼리를 날리는 것처럼 작업가능.
- 명령어 체인 이용
- 스키마 확인 : printSchema()
- 조회
- select() : 칼럼 선택시 $ 사용
- show() 함수 설정 : 데이터 출력설정. 라인 개수, 칼럼 사이즈
- 필터링 : filter()
- 그룹핑 : groupBy()
- 칼럼추가 : withColumn()
- SQL 이용 : 뷰생성 -> SQL 실행
- 뷰 생성 : createOrReplaceTempView("view Name")
- SQL 사용 : sql(). DB사용시와 동일
- 명령어 체인 이용
D. 데이터셋 초기화
- 내부데이터 이용
- 케이스 클래스 이용 : case class People(name: String, salary: Int)
- RDD를 데이터셋으로 초기화 : RDD -> 데이터 프레임 변경 -> 데이터셋 변환
- RDD.map -> dataframe.as
- 데이터프레임을 데이터셋으로 초기화
- 데이터프레임에 정적데이터로 변경 필요 : as 이용해서 클래스를 지정
- 스키마와 클래스의 이름이 동일하면 자동 바인딩
- 외부데이터 이용
E. 데이터셋 연산 : 데이터프레임과 동일
- 명령어 체인 이용
- 스키마 확인 : printSchema()
- 조회
- select() : 칼럼 선택시 $ 사용
- show() 함수 설정 : 데이터 출력설정. 라인 개수, 칼럼 사이즈
- 필터링 : filter()
- 그룹핑 : groupBy()
- 맵연산 : RDD와 동일
- SQL 이용 : 뷰생성 -> SQL 실행
- 뷰 생성 : createOrReplaceTempView("view Name")
- SQL 사용 : sql(). DB사용시와 동일
F. 저장/불러오기 : 데이타프레임, 데이타셋 동일
- 저장
- 데이터 저장 : save()
- 저장 포멧 지정 : format()
- 압축 포멧 지정 : option(). gzip, snappy..
- 저장모드
- 테이블저장 : saveAsTable()
- 버켓팅, 정렬, 파티셔닝 : bucketBy(), sortBy(), partitionBy(),
- 임시테이블에서는 불가
- 버켓팅, 정렬, 파티셔닝 : bucketBy(), sortBy(), partitionBy(),
- 데이터 저장 : save()
- 데이터 불러오기 : load() 혹은 read의 파일로더 함수(read.json() 등...)
- 불러오기 포멧, 옵션 : format(), option()
저장모드 | |
SaveMode.ErrorIfExists | 파일이 있으면 에러 처리 |
SaveMode.Append | 다른 이름으로 파일 추가 |
SaveMode.Overwrite | 기존 파일을 삭제하고 추가 |
SaveMode.Ignore | 파일이 있으면 저장하지 않고, 에러 처리도 하지 않음 |
G. DB 저장 : MySQL 등의 RDB에 직접 저장가능
- 데이터 저장 : DB 저장시 format("jdbc)로 설정
7.스파크 SQL과 하이브
스파크 SQL을 하이브의 메타스토어 정보를 이용하여 처리하기
- 메타스토어의 데이터베이스, 테이블 정보를 이용하여 작업을 진행
- RDD, 데이터프레임, 스파크 SQL을 이용하여 작업
- 스파크 SQL
- 하이브의 쿼리를 스파크에서 그대로 이용할 수 있게 함.
- 스파크 SQL을 이용하여 조회한 정보는 데이터프레임으로 반환 -> 데이터셋으로 변환
- 스칼라를 이용하여 UDF를 생성 or 자바로 작성한 UDF 사용
- 스파크 SQL, 데이터프레임은 내부적으로 동일한 최적화 과정
- 하이브 메타스토어 연결
- 스파크 세션 초기화 시 hive.metastore.uris 정보를 제공하여 하이브 메타스토어와 연결
- 스파크 SQL 쿼리
- 조회 : select sql문 이용
- 조인
- SQL 이용 : 표준 SQL의 조인 명령
- 데이터 프레임 이용 : 명령어 체인. select()
- 저장 : write()
- 데이터 포멧, 저장모드, 저장위치 전달 필요
- coalesce로 파티션 갯수(최종 파일의 갯수) 지정 가능
- UDF(User Defined Function)
- 스파크 세션에 함수 등록하여 사용 : register()
- 메소드 체인에서 사용하려면
- UDF 함수로 변환필요
- udf()로 함수를 감싸주기
- 메소드 체인에서 사용하려면
- 하이브 UDF(하이브 쿼리) 그대로 사용가능
- 버전에 따라 ADD JAR 명령에서 오류가 발생가능
- 스파크 실행시에 --jars 옵션을 이용하여 jar 파일을 추가
- 버전에 따라 ADD JAR 명령에서 오류가 발생가능
- 스파크 세션에 함수 등록하여 사용 : register()
8.스파크 스트리밍
- 스파크 스트리밍은 실시간 데이터 분석을 위한 스파크 컴포넌트
- 실시간 데이터 분석은
- 페이지 뷰에 대한 통계를 추적하거나,
- 머신러닝 모델을 학습시킬 수 있습니다.
- 로그 분석을 통해서 실시간으로 에러를 감지할 수도 있고,
- 현재 사용자의 구매 패턴을 분석하여 물품 추천을 할 수도 있습니다.
8.1 디스트림
8.2 스트리밍 컨텍스트
8.3 스트리밍 워드 카운트
8.4 윈도우 트랜스포메이션
8.5 디스트림 연산
9. PySpark
'Back > Bigdata' 카테고리의 다른 글
Docker - 2: 세팅 연습 (1) | 2023.05.21 |
---|---|
Spark -2 : 세팅 연습 (0) | 2023.05.21 |
Scalar (0) | 2023.05.15 |
Apache Hive & HiveQL (0) | 2023.05.13 |