origin video: https://inf.run/pR7q
origin posting : https://blog.voidmainvoid.net/179
자료 : https://www.slideshare.net/WonyoungChoi2/ss-236018398
ref : https://blog.voidmainvoid.net/343
목차
1. 아파치 카프카 기초
- 카프카가 무엇인지?
- 왜 사용하는 지?
1.1 아파치 카프카 개요
A. LinkedIn에서 최초로 만들고 opensource화 한 확장성이 뛰어난 분산 메시지 큐(FIFO : First In First Out)
B. source Application <-> Target Application 사이 coupling을 약하게 하기 위해 등장
> 단방향(source -> target 전송) 구성에서, 전송 라인이 추가될 때 마다 많아지고, 배포 장애 대응이 어려워 짐. 이 복잡함을 해결
C. Topic 이란 것을 담는 queue
D. json, tsv, avro 등등 포멧 지원
E. fault tolereant : 고 가용성, 서버 전원이 내려가도 데이터를 손실 없이 복구
F. 높은 데이터 처리량 : low latency, high throughput -> 빅데이터에서 필수요소로 자리잡음
1.2 Kafka Topic
A. 기본
: Kafka에서 데이터가 들어가는 공간. AMQP와 동작이 다름
- DB의 테이블, filesystem의 folder와 유사
- Topic에다 Producer가 데이터를 넣고, Topic에서 Consumer가 가져간다.
- 각 Topic은 이름을 명명 할 수 있음. ex)click_log, send_sms, location_log 등 보관하는 데이터 성격 표현 가능
B. 내부
B.1 파티션
토픽 = {파티션,파티션,파티션}, 0번부터 시작. 파티션 끝 indext 부터 쌓음
consumer는 가장 오래된 데이터 부터 가져감
consumer가 데이터 가져가더라도 데이터를 삭제하지 않는다.
-> 다른 consumer가 붙었을 때 재새용
-> consumer 그룹이 달라야하고, auto.offset.reset = earlist 설정 필요
-> 동일데이터 두번 처리 가능해짐 : 카프카를 사용하게 되는 중요 기능 중 하나.
>> click log 분석 & 시각화를 위해 elastic search에 저장 or 백업을 위해 hadoop에 저장
B.2 파티션 키
- key 지정 시 : key-value로 할당
- key = null : round-robin으로 할당(순차적으로 돌려서)
B.3 파티션 늘릴 떄 주의사항
- 늘리는 것만 가능하고 줄일 수 는 없음
- 파티션을 늘리면 conusmer 갯수를 늘려서 data 처리를 분산시킬 수 있음
파티션의 데이터는 언제 삭제됨?
> option에 따라 다름
>> 최대 보존시간(log.retention.ms), 최대 보존 크기 (log.retention.byte)로 만료정책 설정
1.3 Broker, Replication, ISR(In-Sync-Replication)
Replication : 카프카 아키텍쳐의 핵심. 파티션의 복제를 의미
cluster에서 서버 장애가 생길때, 가용성을 보장하는 가장 좋은 방법
replication = 1 : partition 1개 존재(원본)
replication = 2 : partition 2개 존재(원본, 복제본)
replication = 3 : partition 3개 존재(원본 1개, 복제본 2개)
브로커 개수에 맞춰서, replication 개수는 제한 됨
원본 = leader partition, 복제본 = follower partition
leader : producer로 부터 데이터 전달 받는 주체
> producer에 ack라는 상세 옵션이 있음. ack통해 고가용성 유지(파티션의 replication과 관련)
> ack : 0,1,all
>> 0 일 때 : producer는 전송만 하고, response 안 받음. 결과 확인 불가 but 속도 빠름
>> 1 일 때 : 전송하고 leader partition의 response도 받음. follower에 partition에 복제 여부는 모름
>> All 일 때 : 1 옵션 + follower에 replicata 응답도 받음.
follower :
replication 사용 이유 : 고가용성 위해서. 남은것으로 고장난 것 복구
-replication 갯수 많아지면 브로커의 리소스 사용량이 늘어남
> 데이터 량과, retention date(저장시간) 을 잘 고려해서 replication 갯수 정할 것
>> 3개 이상 브로커 사용시 replication = 3 으로 설정하는 것 추천.
Broker : 카프카가 설치되어 있는 서버 단위
보통 3개 이상의 broker로 구성하여 사용 권장
-파티션이 1개, replication 이 1인 topic이 존재하고, 브로커가 3대라면 브로커 3대중 1대에 토픽의 정보(데이터)가 저장됩니다.
ISR : In-Sync-Replication(replica)
- leader + follower 파티션 합친 것
1.4 파티셔너(Partitioner) of Producer
- Producer의 중요 개념 : 데이터 전송시 무조건 Partitioner를 거쳐서 브로커로 데이터 전송 됨
- 데이터를 어떤 파티션에 넣을 지 결정하는 역할
- 메세지에 포함된 메세지 키 or 값에 따라 파티션 위치 결정
- producter에서 파티셔너 따로 설정하지 않으면 : UniformStickyPartitioner로 설정
> 동작
>> 메세지 키가 있을 때 : 메시지 키로 hash값 생성 = 파티션 번호. 항상 동일한 파티션에 들어가는 것 보장
>>> 순서를 지켜서 데이터 처리 할 수 있는 장점. ex) 서울 날씨 처리 > Key=서울 > 파티션 고정되어 순서대로 들어감
>> 메세지 키가 없을 때 : 쪼금 다른 라운드 로빈 처리 > producer에서 배치로 모을 수 있는 최대의 레코드를 모은 후 파티션으로 보냄. == 적절한 양으로 파티션에 분배.
- 커스텀 파티셔너 사용 가능 : Kafka에서 제공하는 Partitioner 인터페이스 사용 > custom partitioner class 제작
> 메세지 키 or 값, topic 이름 등등 파티션 결정 로직을 입맛에 맞게 짤 수 있음
> ex) VIP 데이터 처리를 좀 더 빠르게 처리하는 로직 : 80% 파티션 vip고객, 20% 파티션 : 일반고객 으로 분배
> AMQP 시스템에서 우선순위 큐 만드는 것과 비슷
1.5 컨슈머 랙(Consumer Lag) = kafka lag
- kafka 운영 시 중요한 모니터링 지표
- producer offset과 consumer offset의 차이이다.
- lag은 여러개 존재할 수 있다.
- 파티션에 데이터가 들어갈 때, 각 데이터에 오프셋이라는 숫자가 붙음.
- 프로듀서가 데이터 넣는 속도가 컨슈머가 가져가는 속도보다 빠르면, producer ~ consumer 간의 offset 차이가 발생
- lag숫자를 통해 producer & consumer 상태 유추 가능.
- 주로 consumer 모니터링에 사용. consumer 성능 문제 생기면 lag이 필연적으로 발생 한다.
- records-lag-max : lag 중 숫자가 가장 큰 것
1.6 컨슈머 랙 모니터링 애플리케이션, 카프카 버로우(Burrow)
: lag 모니터링 용 오픈소스. linkedin에서 lag 모니터링을 위해 공개. goLang으로 작성. Consumer lag 모니터링 용 독립 application. 외부 application으로 kafka 모니터링 하는 것이 답이었다.
- kafka client library(java, scalar 등등..) 사용해서 kafka consumer 구현 가능
- kafka consumer 객체에서 현재 lag 정보 수집 가능
- lag 실시간 모니터링 : 데이터 > elasticSearch(or InfluxDB) 에 넣기 > grafana 대시보드로 보기
- consumer 단위에서 lag 모니터링 위험, 운영요소 많음 : consumer 상태에 dependancy가 걸림.(즉, lag값을 받아야 나머지가 돌아감). consumer 추가 개발 시, 특정 저장소에 저장하는 로직 포함해야 함.
특징
1. 멀티 클러스터 카프카 지원
2. sliding window 통한 consumer status(error, warning, ok) 확인
> error : 데이터 양이 많아지는 데, consumer가 데이터 가져가지 않을 때
> warning : 데이터 양이 일시적으로 많아져, consumer offset 증가 중일 때
3. http api 제공
- 위 1,2를 http api 통해서 조회 가능 -> 다른 application에서 활용 가능
1.7 카프카, 레빗엠큐, 레디스 큐의 차이점
메세징 플랫폼의 두 종류 : 메세지 브로커, 이벤트 브로커(메세지 브로커 역활로도 쓸 수 있음)
메세지 브로커 : 삭제한다
> 대규모 메세지기반 미들웨어 아키텍쳐에서 사용
cf) 미들웨어 : 서비스하는 애플리케이션들의 아키텍쳐를 효율적으로 연결해주는 소프트웨어. 메세징 플랫폼, 인증 플랫폼, DB 등등..
> 프로듀서와 컨슈머로 메세지를 통신하고, 네트워크를 맺는 용도.
특징 : 메세지를 받아서 적절히 처리하면, 즉시 or 짧은 시간 내에 삭제되는 구조
> 레디스 큐, 레빗엠큐
이벤트 브로커 : 삭제 안한다.
> 레코드(= 이벤트 or 메세지) 장부를 하나만 보관하고 인덱스로 개별 엑세스 관리
> 업무상 필요한 시간 동안 이벤트를 보존 가능
> 이벤트이기 때문에 보존한다.
>> 단일 진실 공급원으로 사용
>> 장애 발생시 장애지점부터 재처리 가능
>> 많은 실시간 스트림 데이터를 효과적으로 처리 가능.
>> 카프카, AWS의 키네시스
>> 이벤트 기반 MSA에서는 사용하기.
2. 아파치 카프카 개발
2.1 AWS에 카프카 클러스터 설치, 실행하기
https://blog.voidmainvoid.net/325
homebrew,1개노드 -> 3개 이상 브로커로 이루어진 카프카 클러스터 구축(고가용성의 진가)
AWS EC2 서버 3대 발급 -> 카프카 설치 -> console producter, console consumer 연동 실습
준비물 : 주키퍼 & 카프카
주키퍼 : 카프카 관련 정보 저장
EC2 : aws에서 발급가능한 가상머신
- t2.micro 선택하기: 1 cpu, 1g ram. 테스트 목적이므로 간단한 것
- 인스턴스 개수 : 3 입력
- 인스턴스 시작 > 서버 실행 됨
- 접속 후 wget으로 주키퍼 다운 > 압축 풀기
- 주키퍼 앙상블 구축 : 각 서버마다 주키퍼 설정
- hostname 통신을 위해 /etc/hosts 수정
- 주키퍼 연동을 위한 방화벽설정(보안그룹설정)
> AWS Security Group Inbound/outbound rule)
> 주키퍼는 2181,2888,3888 포트 사용 > 이 포트에 anywhere 조건 open + 카프카 통신을 위해 9092도 같이 열기
>> ecu instance list 하단 > 보안그룹 > 현재 방화벽 설정 확인 가능
>> inbound 방화벽 설정 수정
>> 앞 과정 완료 후 각 서버에서 주키퍼 실행
>> 방화벽 정상설정시 아래처럼 local 컴퓨터의 주키퍼 cli로 aws의 주키퍼 연결 가능
- 카프카 설치 : 2.1.0 버전
- wget으로 카프카 다운 > 압축 풀기
- 카프카 클러스터 구성을 위해 브로커 별, 카프카 설정 : config/server.properties
>> (가장 중요) 브로커 아이디를 서버별로 다른 숫자로 설정하기
>> listener & advertise listener 설정
>> 주키퍼의 hostname, port 넣기
- 카프카 실행하기
- 카프카 테스트
> test_log 토픽 생성 > console-producer로 데이터 넣기 > console-consumer로 데이터 확인하기
2.2 카프카 프로듀서 애플리케이션
https://blog.voidmainvoid.net/193
- 데이터를 카프카에 보내는 역할 : 엄청난 클릭로그, 큰 단위로, 실시간으로, 카프카에 적재가능
> 데이터를 kafka topic에 생성
- 역할
> topic에 해당하는 메세지를 생성한다.
> 특정 topic으로 데이터를 publish(전송) 할 수 있다.
> broker로 데이터 전송 시, 전송 성공여부를 알 수 있음, 실패시 재시도 할 수 있음
- 카프카 클라이언트(consumer,producer) 사용하기
> apache kafka library 추가해야 함. 버전 입력 주의(브로커 - 클라이언트간 버전 하위 호환성 완벽하지 않음)
- 카프카 프로듀서 코드
: 토픽, 데이터, 브로커의 호스트, 포트 정보 필요
> 자파 프로퍼티 객체(Properties configs) : 프로듀서의 설정 define
>> 부트스트랩 서버설정(=카프카 브로커 설정) -> localhost의 kafka 바라보기
>>> 카프카 브로커의 주소목록에 2개 이상의 브로커정보(ip, port)를 설정하도록 권장(하나가 장애일 경우 다른 브로커에 연결)
>> 나머지 key&value에 StringSerializer로 직렬화 설정
>>> serializer는 key or value를 직렬화하기 위해 사용. byte array, string, integer 시리얼라이즈 사용 가능
>> key? : 메세지를 보내면, 토픽의 파티션이 지정될 때 쓰임
> KafkaProducer : config 객체 사용해, 카프카 프로듀서 인스턴스 생성
> 전송할 객체 생성 : ProducerRecord(카프카 클라이언트 제공 클래스)
>> ProducerRecord 생성시, 어느 토픽에 넣을 것인지, 어떤 key,value를 담을 것인지 선언 가능
>> 여기서는 key없이, click_log 토픽에 login이라는 value를 전송할 계획
>>> key를 포함하고 싶다면 아래처럼 작성하기
new ProducerRecord <String,String> ("click_log", "1", "login");
> 전송하기 : send()메서드에 Record 넣으면 됨 : topic에 record의 value가 들어감
> 전송 완료 시 producer.close()로 종료해주기
- 동작 디테일
> 키 설정 안했을 때, 그냥 쌓이고, partition이 여러개면 round-robin으로 데이터 저장 함
> 키는 파티션 갯수에 dependancy가 있어서, 파티션 갯수 고정이면 hash처리한 결과에 따라 1:1 매핑 되는데, 파티션 추가되는 순간 어디로 갈지 모른다.(안정해시 방식인 듯??)
>> 키 사용시, 파티션 개수 생성 후 추가금지.
2.3 카프카 컨슈머 애플리케이션
- 데이터 파이프라인으로서의 동작 및 역할
- Topic->파티션에 저장된 data를 가져오는 역할
- polling : 파티션에서 데이터 가져오는 것
-컨슈머 역할
> Topic의 partition에서 data polling 하기
> Partition offset(파티션에 있는 데이터의 번호) 위치를 커밋(기록) 하기
> Consumer group을 통해 병렬처리 : consumer가 여러개일 경우 병렬처리
- 카프카 클라이언트(consumer,producer) 사용하기
> apache kafka library 추가해야 함. 버전 입력 주의(브로커 - 클라이언트간 버전 하위 호환성 완벽하지 않음)
- consumer 사용 코드
> 자파 프로퍼티 객체(Properties configs) : 프로듀서의 설정 define
>> 부트스트랩 서버설정(=카프카 브로커 설정) -> localhost의 kafka 바라보기
>>> 카프카 브로커의 주소목록에 2개 이상의 브로커정보(ip, port)를 설정하도록 권장(하나가 장애일 경우 다른 브로커에 연결)
>> 그룹 아이디(=consumer group, consumer들의 묶음) 지정
>> 나머지 key&value에 StringSerializer로 직렬화 설정
>>> serializer는 key or value를 직렬화하기 위해 사용. byte array, string, integer 시리얼라이즈 사용 가능
>> key? : 메세지를 보내면, 토픽의 파티션이 지정될 때 쓰임
> KafkaConsumer: config 객체 사용해, 카프카 컨슈머 인스턴스 생성
> 데이터 가져올 topic 설정 : consumer.subscribe(Arrays.asList("topicName"));
>> 여기서는 해당 topic의 모든 파티션의 데이터를 가져오는 설정
>> 파티션 선택해서 가져오고 싶을 경우 -> assign 메서드(키가 존재할 때 유용)
> 데이터 가져오기 : 폴링 루프 구문 : ConsumerRecords(카프카 클라이언트 제공 클래스)
>> 폴링 루프 = poll() 메서드가 포함된 무한루프. consumer api의 핵심로직
>> consumer API의 핵심 : 브로커로부터 연속적으로, 컨슈머가 허락하는 한 많은 데이터를 읽는 것
>> poll(waitTime) : 설정한 waitTime(ms)동안 데이터를 기다림. 시간 안에 데이터가 들어오지 않으면 empty record 반환
>> records 변수 : 데이터 배치로서, 레코드의 묶음 list
>> 카프카에서 실제 데이터 처리시, record 단위로 분해해서 처리하기.
>> record.value()가 실제 producer가 전송한 데이터
- 동작 디테일
> consumer가 데이터를 읽기 시작하면 offset을 commit(기록)한다
>> 데이터를 가져갈 때 마다, kafka의 __consumer_offset 라는 Topic에 저장(consumer Group별, Topic별 구분 저장)
>> 장애 시, __consumer_offset 정보를 토대로 데이터 읽기 재시작 가능
> 컨슈머 생성 허용갯수 : 파티션 갯수 이하
>> 동일 컨슈머 그룹 case
>>> 컨슈머가 남아돌 경우, 파티션 당 컨슈머 1개씩 1:1로 할당한다.
>> 다른 consumer 그룹 case : 다른 그룹에 영향 없음
2.4 카프카 스트림즈 애플리케이션(부가기능 설명)
: 폭발적인 성능의 실시간 데이터 처리
> 이미, 분산 이벤트 스트리밍 플랫폼으로 많이 쓰이고 있음. 컨슈머 사용해서, 데이터 파이프라인 구축 많이 함.
- 카프카 스트림즈 : 카프카 지원 자바 라이브러리. 이벤트 프로세싱 도구. JVM기반 언어 선택
> 컨슈머 보다 안전한 기술. 낮은지연, 빠른속도 지원
- 장점
> 카프카와 완벽 호환된다. 대부분 카프카 = 이벤트 저장소로 쓰고, 데이터 -> 스파크, 로그스태시 등에 연동.
>> 이런 외부툴 연결시 카프카 버전 발전 따라오지 못하게 됨. 스트림즈는 카프카와 거의 동시 릴리즈. 데이터 유실, 중복처리 없음
>> 스케쥴링 도구가 필요없음 : 스케일 인 스케일 아웃 자유로움.
>> 보통, 스파크 스트리밍, 스파크 구조적 스트림 사용 이벤트 데이터 애플리케이션 제작
>>> : 카프카 연동 > 마이크로 배치 처리. 클러스터 관리자 or 리소스 매니저, 대규모 장비 필요함(yarn)
>> 스트림즈DSL과 프로세서 API 제공
>> 로컬 상태저장소 사용
2.5 카프카 커넥트(부가기능 설명)
3. 아파치 카프카의 미래
3.1 클라우드 기반 아파치 카프카 서비스
3.2 빅데이터 플랫폼 아키텍처와 카프카
3.3 아파치 카프카의 미래