개발/kafka

[kafka] Consumer

namni 2022. 12. 29. 22:10

inflearn강의(아파치 카프카 애플리케이션 프로그래밍-데브원영 DVWY)를 들으며 타이핑했던 내용들을 블로그에 기록용으로 남기고자 합니다.강의를 들으며 개인적으로 타이핑한 내용으로 오타나 맞춤법, 띄어쓰기 등이 엉망일 수 있습니다.


컨슈머

- 기본적으로 토픽을 만들고 토픽에 데이터를 넣어야지만 컨슈머 애플리케이션에서 사용할 수 있는 것.

- 데이터 베이스에 넣는다던가 다른 어플리케이션과 통신을 한다던가의 역할을 한다.

- 만약 마케팅 문자를 보내는 기능이 있다면 특정 리스트를 프로듀서가 토픽에 넣고 토픽에 있는 데이터를 컨슈머가 가져와서 문자 발송을 처리한다.

 

컨슈머 내부구조

- 카프카 클러스터에서 리더 파티션에 있는 브로커가 데이터를 보내면 컨슈머 어플리케이션 내에 Fetcher가 우선적으로 데이터를 받게 된다. completedFatches라고해서 데이터를 어느정도 받게되면 내부에서 poll메소드를 통해서 데이터를 처리할 ConsumerRecords로 데이터를 받게 된다. list로 array로 데이터를 병렬로 처리할 수 있다. 

- poll이라는 함수를 처리하기 전에 데이터를 가져온다는 점인데 레코드의 처리속도가 늦더라도 데이터를 이미 가져왔기 때문에 (컨슈머) 처리하는 만큼 속도를 유지해서 처리해도 된다는 점이다. 

- 리더파티션에서 데이터를 가져올떄 배치로 데이터를 미리 가져오는 형태기 때문에 데이터 처리속도가 빠르더라도 크게 걱정할 필요가 없다. 

- ConsumerRecoreds는 레코드의 모음이라고 하는데 레코드에는 브로커에 저장될때 offset이 있는데 ConsumerRecords에서 처리가 완료되면 commit이라는 과정(어디까지 수행했어!)을 수행한다. 처리가 완료한 레코드에 대해서는 commit을 수행해야 컨슈머가 정상적으로 처리가 완료되었다 라는 것을 보장할 수 있는 것

 

컨슈머 그룹

- 특정 토픽에 대해서 목적에 따라 데이터를 처리하는 컨슈머를 묶은 그룹이 컨슈머 그룹이다. 컨슈머 어플리케이션을 실행할때 정의하게 되는데(?) 동일한 컨슈머를 가진 컨슈머 그룹은 다 동일한 로직을 가지고 있다. 

- A라는 컨슈머 그룹이 있고 B라는 컨슈머 그룹이 있을때 토픽의 데이터를 온전히 컨슈머 그룹이 다 쓰고 싶을때 처럼 목적에 따라서 나누는 것을 컨슈머 그룹이라고 한다. 

- 토픽의 저장된 데이터는 컨슈머가 데이터를 가져간다고 해서 데이터를 삭제하지 않는다. 그렇기 떄문에 다른 목적의 컨슈머 그룹이 나타나면 이 데이터를 다시 온전히 가져간다.  

- subscribe라는 메소드를 통해서 토픽의 모든 파티션 데이터를 가져가고 할당하는 것이 일반적이다. assign이라는 메소드를 활용하면 각각의 파티션에 대해서 직접 할당하고 데이터를 가져갈 수 있다. 이런 경우는 아주 흔하지 않고 특수한 경우에만 사용된다.

- 토픽의 파티션이 3, 컨슈머 그룹의 컨슈머가 2일때 동일한 컨슈머 그룹으로 된 컨슈머가 적절하게 분산, 할당되어 가져간다.

- 컨슈머를 하나더 추가하고 싶으면 파티션 하나당 컨슈머가 일대일 매칭되어 처리한다는 것을 볼 수 있다. 최대의 성능을 가질 수 있기 때문에 토픽의 파티션 개수만큼 컨슈머 개수를 늘려서 운영하는 것이 좋다. 

 

컨슈머 그룹의 컨슈머가 파티션 개수보다 많을 경우

- subscribe에서 파티션은 최대의 한개의 컨슈머만 할당되기 때문에 일대일로 매칭되고 남은 컨슈머는 Idle상태가 된다. 컨슈머가 thread라고 가정했을때 컨슈머 0,1,2는 파티션에 할당이 되어 일을 하고 있는데 컨슈머3은 아무것도 안하고 있는 상태를 유휴 상태라고 한다. 따라서 파티션 개수만큼만 컨슈머를 띄워야 한다. 

 

컨슈머 그룹을 활용하는 이유

- 운영 서버에는 주요 리소스인 CPU, 메모리 정보, 디스크, 네트워크 용량 등을 수집해야한다. 이런 데이터를 대용량으로 수집하기 위해선 리소스 수집 에이전트가 필요하다. 그리고 로그에 대해서 긴 시간 데이터를 저장하기 위해서 하둡에 저장해야하고 엘라스틱 서치에 저장해서 시계열로 데이터를 보여준다거나 alert를 걸어 메모리 알림을 해줘야한다. 

- 이 운영서버가 1~10대 사이라면 문제가 되지 않지만 서버의 갯수가 100대 이상일 경우 에이전트에서 수집하는 데이터양이 어마어마하게 많아진다. 하둡이랑 엘라스틱 서치에 저장되는 데이터양도 엄청나게 많다. 문제는 지금처럼 각각의 서버에 이 에이전트가 리소스 수집하고 엘라스틱 저장하고 하둡에 저장하는 이런 싱크 방식을 사용한다면 문제가 생긴다. 만약 엘라스틱 서치에 문제가 생긴다고 가정했을때 리소스를 엘라스틱 서치에 저장하기 위해 기다려야 된다. 이를 위해 엘라스틱과 에이전트 사이에 강하게 연결된 커플링을 끊어 내야 하는데 컨슈머 그룹을 활용할 수 있다. 

- 일단 카프카에 데이터를 보내고 카프카에 저장된 데이터를 용도에 따라 서로 다른 그룹으로 운영한다. 카프카는 특정 이벤트에 대해서 우선적으로 저장하고 각각의 목적을 가진 컨슈머 그룹이 컨슈머를 토대로 데이터를 가져가다가 만에 하나 있을 장애에도 조금 늦게 쌓으면 된다. 또 데이터도 유실되지 않는다. 토픽에 데이터가 충분히 들어가기 때문에 엘라스틱 서치가 복구될때 까지 기다리면 된다.

- 강하게 구성된 커플링을 끊고 유연하게 구조가 될 수 있다. 이런식으로 하게 되면 토픽의 개수만큼 컨슈머 개수를 늘려서 데이터 처리량을 늘릴 수 도 있다. 

- 엘라스틱 서치는 컨슈머를 하나, 하둡은 컨슈머를 세 개 생성하여 필요에 따라 리소스 사용량도 컨슈머 그룹에 따라 다르게 설정할 수 있다.

- mysql, mongodb 등에도 데이터를 저장하고 싶을 경우 각각의 데이터를 저장하는 컨슈머 그룹을 만들어 기존의 엘라스틱이나 하둡 파이프라인을 건들지 않고 생성할 수 있다는 장점이 있다.
- 데이터 파이프 라인을 만들고 수집하고 싶다면 일단 데이터를 토픽을 만들어서 넣는다. 그리고 어디로 적재할지에 대한 컨슈머 그룹을 생성하는 것이 좋다.

 

리밸런싱

- 컨슈머에는 리밸런싱이라는 failover 방식이 있다. 리밸런싱은 토픽에 있는 컨슈머에 할당과정이 변경되는 것이다. 

- 파티션과 컨슈머가 일대일 매칭되어있는 관계에서 일부 컨슈머(컨슈머2)에 장애가 발생하면 이 파티션(파티션2)에 있는 데이터가 지속적으로 처리 되야 하는데 데이터 처리가 지연이 오래되면서 데이터가 유실될 가능성이 있다. 따라서 장애가 컨슈머를 빼버리고 다시 재할당 과정을 거쳐서 파티션2의 데이터도 지속적으로 처리가 되야한다. 이를 리밸런싱이라 한다.

- 컨슈머가 추가되는 상황과 컨슈머가 제외되는 상황 두 가지 경우가 있다. 컨슈머가 새로 떠서 추가되는 상황이라면 다시 할당해서 일대일 매칭으로 다시 매칭하는 과정이 있다.

- 언제든지 발생할 수 있기 때문에 리밸런싱에 대한 메소드로 구현할 수 있는데, 이를 리밸런스 리스너로 제공된다. 

- 참고로 토픽에 있는 파티션 개수에 굉장히 큰 영향을 미치게 된다. 파티션이 3개 있을때는 리밸런싱 시간이 짧다. 파티션 개수가 100개나 1000개 같이 많을 경우 내부적으로 재할당하는 과정이 몇 십초에서 몇분까지 일어나기 떄문에 장애와 비슷한 상황으로 간주할 수 있다 

- 자주 발생하지 않도록 하는 것이 목적. 그럼에도 발생할 수 있기 때문에 로직을 구성해둬야 한다.

 

커밋

- 카프카 브로커에서 리더 파티션에 있는 레코드를 가져가게 되면 poll을 통해서 데이터를 처리하고 그 데이터를 완료했다는 것이 커밋이다.  

- 카프카 브로커 내부에 있는 __consumer_offsets에 기록된다. 

- 컨슈머 동작에 이슈가 발생했을 경우 커밋이 정상적으로 처리 되지 않았을 경우에는 이전 커밋까지 기록이 되기 떄문에 데이터 처리에 중복이 발생할 수 있다. 데이터를 처리하고 나면 commit을 반드시 완료해야지만 데이터 처리할 때 안전하게 다음 데이터를 처리한다.

- 내부적으로도 기록하지만 컨슈머를 운영할때 commit이라는 메소드를 호출함으로써 기록을 대신할 수 있다. 컨슈머 로직을 짤때 반드시 커밋을 해야한다.

 

Assignor 어사이너

- 컨슈머와 파티션의 할당 정책이다. 카프카에서는 세가지 어사이너를 제공한다. RangeAssigner, RoundRobinAssigner, StickyAssigner

-  2.5.0에서 RangeAssigner를 기본으로 사용한다. RangeAssigner는 각 토픽에서 파티션을 숫자로 정렬, 컨슈머를 순서대로 정렬해서 할당한다. 

ex

0 - a
1 - b
2 - c

- RoundRobinAssigner는 모든 파티션을 컨슈머에게 번갈아 가며 할당, StickAssigner는 파티션을 균등하게 배분해서 할당.

- 대부분의 환경에서는 토픽과 컨슈머 그룹의 컨슈머와 일대일 매칭이 되어서 운영되어 크게 걱정할 필요가 없다.

 

컨슈머 주요 옵션

필수옵션 - 디폴트X

- bootstrap.servers: 어떤 카프카 클러스터를 연동할지 정하는 설정
- key.deserializer: 레코드의 메시지 키를 역직렬화하는 클래스를 지정
- value.deserializer: 레코드의 메시지 값을 역직렬화하는 클래스를 지정
프로듀서에서 토픽에 데이터를 serializing해서 저장하기 떄문에 그 데이터를 받아서 역직렬화 하여 사용해야 한다. 이 부분은 프로듀서를 개발할때 미리 약속을 해야한다. 주로 string으로 사용해서 json데이터를 사용하곤 했음

 

선택옵션 - 디폴트O

- group.id: subscribe()메소드로 토픽을 구독할때는 옵션이 필수로 바뀐다. 디폴트는 null값이다. assign과 같이 토픽을 할당해서 사용할때는 필수는 아니다. 컨슈머 로직을 운영할때는 subscribe를 사용해서 개발하기 떄문에 알아 두자!

- auto.offset.reset: 컨슈머 그룹을 대상으로 특정 파티션을 읽을때 컨슈머 오프셋이 없을 경우(한번이라도 커밋을 해본적이 없는) 어떤 오프셋부터 읽을지 선택하는 옵션이다. 가장 최신부터 읽을지 가장 오래된 데이터부터 읽을지 정하는 것. 기본값은 latest이다. 

- enable.auto.commit: 자동으로 할지 수동으로 할지 정하는 것. 기본값은 true

- auto.commit.interval.ms: 자동 커밋일 경우 오프셋 커밋 간격을 지정한다. 기본값은 5000(5초). 직접 설정하지 않는다면 poll()해서 records를 가져올때 5초마다 커밋이 된다는 것. 
레코드를 처리할때마다 하거나 커밋간격을 바꾸고 싶다면 enable을 false로 변경하면 된다.
- max.poll.records: poll()메소드로 리턴되는 리스트의 사이즈를 지정. 기본값은 500이다. 
- session.timeout.ms: 컨슈머가 브로커와 연결이 끊기는 최대시간. 기본값은 1000(10초).   
- heartbeat.interval.ms: 하트비트를 전송하는 시간 간격이다. 기본값은 3000(3초).

* 위의 두 개(session.timeout.ms, heartbeat.interval.ms)는 카프카 브로커와 컨슈머 사이의 처리가 정상적인지 판단하는 여부. 
컨슈머에 문제가 있는지 여부를 판단하는 설정이다. 하트비트를 일정시간마다 전송을 하는데 이 하트비트가 이 브로커에 더이상 전송되지 않으면 문제가 있다. 리밸런싱을 시작하자, 하는 옵션이다. 하트비트가 3초마다 가다가 하트비트가 마지막으로 오고나서 10초(session.timeout.ms)가 지날 경우 문제가 있다고 판단한다. 
session.timeout.ms 값이 너무 길면 데이터가 지연될 가능성이 있고 너무 짧으면 문제가 있다고 판단하여 리밸런싱이 자주 일어날 수 있다. 따라서 위의 두개의 옵션은 네트워크 사용환경에 따라서 적절히 조절하는 것이 중요하다.

- max.poll.interval.ms: poll()메소드를 호출하는 간격의 최대 시간. 기본값 300000(5분).
poll()메소드를 호출하고 나서 레코드를 처리하고 있는데 5분(기본값)동안 poll()메소드가 호출되지 않을 경우 문제가 있다고 판단하여 리밸런싱이 일어나게 된다. 이 레코드 하나당 데이터 처리량이 높다면 이 시간을 높이는 게 좋다. 만약 처리량이 낮다면 장애 상황을 빠르게 판단하기 위해 이 시간을 낮추는게 좋다. 
- isolation.level: 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다. 트랜잭션 프로듀서가 레코드를 트랜잭션으로 묶어서 atomic하게 사용될 떄 사용한다. 

 

auto.offset.reset

- 컨슈머 오프셋이 있는지 없는지 여부에 따라서 어떤 오프셋부터 읽을지 설정하는 옵션이다. 파티션에는 0~999까지의 오프셋이 있다면 어떤 데이터부터 읽을지 결정한다. 새로 띄운 컨슈머 그룹의 오프셋 기록이 있다면 이 옵션은 무시된다.

- latest: 설정하면 가장 높은(가장 최근에 넣은) 오프셋 부터 읽기 시작한다. 999
- ealiest: 설정하면 가장 낮은(가장 오래된) 오프셋 부터 읽기 시작한다. 0
- none: 컨슈머 그룹이 커밋한 기록이 있는지 찾아본다. 
보통은 latest, ealiest중 하나를 사용한다.

- 항상 설정하는 것이 아닌 새로 만든 컨슈머를 운영해야 하는데 어떤 오프셋 부터 읽을지 활용한다. 그 이후는 전혀 의미가 없다.

 

리밸런스 리스너를 가진 컨슈머

- 카프카 라이브러리 ConsumerRebalaceListener인터페이스를 지원한다. onPartitionAssigned()메소드와 onPartitionRevoked()메소드로 이루어져있다.

- onPartitionAssigned(): 리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출된다.
- onPartitionRevoked(): 리밸런스가 시작되기 전에 호출된다.
마지막 처리하고 있는 레코드를 기준으로 어떤 토픽이 할당되고 있는지 확인할 수 있다.
마지막 기준으로 커밋을 하기 위해서는 리밸런스 시작직전에 커밋을 하면되므로 메소드에 커밋을 구현하여 처리할 수 있다.

 

컨슈머 애플리케이션의 안전한 종료

- 정상적으로 종료되지 않은 컨슈머는 세션 타임아웃이 발생할때까지 컨슈머 그룹에 남게 된다. 해당 컨슈머가 종료되지 않았는지 명시적으로 알리지 않으면 리밸런싱 시간이 늦어지기 때문에 안전하게 종료해야한다. 

- 안전한 종료를 위해 wakeup()메소드를 지원하는데 poll()안에서 WakeupException이 발생하게 된다. poll호출때마다 wakeup()를 호출되었는지 확인하고 wakeup()이 발생했으면 poll()호출시 WakeupException을 타게 된다.

- ShutdownThread에 지정해서 명시적으로 애플리케이션의 종료를 호출하고 그 이후에는 리소스를 해제하고 컨슈머를 클로즈 시켜야 한다. 그래야 데이터의 유실이나 중복이 발생하지 않는다.

멀티스레드 컨슈머 어플리케이션

- 컨슈머는 기본적으로 1쓰레드에 1컨슈머를 운영하는 것이 기본이다. 컨슈머 쓰레드를 여러개 가진 프로세스를 하나 띄울 것인지,
한 프로세스당 하나의 스레드를 가진 여러개의 프로세스를 띄우는 방식을 선택해야 한다. 

- 즉 파티션 3개를 대응하기 위해서 프로세스(하나의 쓰레드를 가진)를 3개 띄울 것인지 프로세스(세 개의 쓰레드를 가진)를 1개 띄울 것인지 결정한다. 

- 프로세스(하나의 쓰레드를 가진)를 띄우게 되면 특정 쓰레드에서 장애가 발생해도 다른 프로세스에는 영향이 없어서 좋으나 각각 배포해야 해야하는 단점이 있다. 따라서 배포 자동화가 잘되어있다면 이 방식을 사용해도 좋다.

- 반면에 컨슈머 스레드를 3개가 동시에 띄어진 프로세스를 하나만 운영하는 방식(scale-up)은 프로세스를 하나만 운영하기 때문에 배포가 간소화 될 수 있다. 다만 쓰레드 장애가 크리티컬하면 다른 쓰레드에게 영향을 미칠 수 있다는 단점이 있다.

 

컨슈머 랙(LAG)

- 컨슈머 랙은 파티션의 최신 오프셋(LOG-END-OFFSET)과 컨슈머 오프셋(CURRENT_OFFSET)간의 차이다. 
- 프로듀서는 데이터를 많이 보내고 컨슈머는 이를 바로바로 처리할 수 없기 때문에 발생하는 지연을 컨슈머 랙이라 한다. 컨슈머가 정상 동작하는지 확인할 수 있고 전체 데이터 파이프라인에서 지연이 발생하는지 확인하는 지표이다.

- 컨슈머 랙은 컨슈머 그룹, 토픽, 파티션별로 생성된다. 1개의 토픽에 3개의 파티션이 있고 1개의 컨슈머 그룹이 토픽을 구독하여 데이터를 가져가면 컨슈머 랙은 총 3개가 된다. 

- 컨슈머 그룹이 2개인 경우 이 토픽을 subscribe 하고 있을때 총 컨슈머 랙은 6개다. 각각의 컨슈머 그룹마다 이 파티션에 대해 측정이 다르기 때문이다. 컨슈머 그룹과 파티션별로 묶인다. 컨슈머 그룹이 많아질 수 록 모니터링 양이 많아진다. 

 

컨슈머 랙 - 프로듀서와 컨슈머의 데이터 처리량

- 프로듀서가 초당 3개의 레코드를 발행한다고 한다. 컨슈머는 초당 1개의 레코드를 처리한다면? 초당 2개씩 늦어지기 때문에 컨슈머 랙이 지속적으로 많아지게 되서 지연이 발생한다. 따라서 실시간 데이터가 아니라 지연이 발생한다는 것을 알 수 있다.

- 반대로 프로듀서가 초당 1개 발행하고 컨슈머는 초당 3개의 레코드를 처리한다면 가장 최신의 오프셋 레코드를 지속적으로 처리하게 될 것이다. 컨슈머 랙을 모니터링 하면 0~10까지 아주 낮은 수치로 왔다갔다 하는 것을 볼 수 있다. 이게 가장 이상적.

 

컨슈머 랙 모니터링

- 카프카를 통한 데이터 파이프라인을 운영하는데 핵심적인 역할을 한다. 이게 없다면 컨슈머 장애를 확인할 수 없고 파티션 개수를 정하는데 참고 할 수 있다. 

- 예를들어 네비게이션의 사용자 데이터를 전송하는 프로듀서가 있다고 가정할 경우 추석, 설날 같이 갑자기 사용량이 많아질 떄 랙이 늘어나게 될 것이다. 즉 지연이 발생한다. 

- 이때 파티션 개수를 늘리고 컨슈머 개수를 늘려서 여기에 대한 지연을 해소 시킬 수 있다. 이를 해결하기 전에 파악해야 하는 건 컨슈머 랙이여야 한다. 모니터링 시스템을 구축한 뒤에야만 컨슈머 어플리케이션을 구축해야 한다 할 정도로 중요하다. 

컨슈머 랙 모니터링 - 처리량 이슈

- 프로듀서의 데이터양이 늘어날 경우 컨슈머 랙이 늘어날 수 있다. 지연이 발생하면 파티션 개수와 컨슈머를 늘려 병렬처리량을 늘려 지연을 해소 시킨다. 

컨슈머 랙 모니터링 - 파티션 이슈

- 리더 파티션에 장애가 났다는 걸 알 수 있다. 컨슈머가 정상적으로 일을 하는데 리더 파티션의 특정 브로커에 데이터가 일을 못한다는 걸 컨슈머 랙을 확인하면 더이상 커밋되지 않아서 컨슈머 랙이 늘어나는 상황을 볼 수 있다.

- 또는 프로듀서가 데이터를 보냄에도 새로운 오프셋의 데이터가 추가가 안되는 경우가 있다. 이런 정체 현상을 보면서 문제가 있다는 것을 인지할 수 있다.

 

컨슈머 랙을 확인하는 방법 3가지

카프카 명령어 사용

- kafka-consumer-group.sh를 통해서 컨슈머 그룹에 대해서 상태를 조회해볼 수 있다. 이를 통해 컨슈머 랙도 확인할 수 있따. 
LOG-END-OFFSET - CURRENT-OFFSET을 뺀 LAG이 있다는 걸 확인할 수 있다.  그렇지만 일회성으로 그친다. 

- 이 모니터링을 기록하고 적재해야 하기 떄문에 이 명령어 방식은 부족한다. 수집 아키텍쳐가 따로 있어야 한다. 

metrics() 메서드 사용

- 자바 라이브러리 metrics()를 활용하면 모니터링 지표를 확인할 수 있다. max, records-lag, records-lag-avg 값을 확인할 수 있다.  

- 그러나 문제가 있다.  첫번째는 컨슈머가 정상 동작해야 하는 경우에만 확인할 수 있다. 컨슈머 랙을 확인하는 이유는 컨슈머의 장애나 처리량을 확인하기 위해서 사용하는데 컨슈머 어플리케이션이 장애가 발생할 경우에는 더 이상 컨슈머 랙을 모니터링 할 수 없다. 

- 두번째는 모든 컨슈머 어플리케이션에 컨슈머 랙 모니터링 코드를 중복해서 작성해야 한다. 어플리케이션이 수집하는 컨슈머 랙은 자기 자신 컨슈머 그룹에 대한 컨슈머 랙만 한정되기 때문에 모든 컨슈머 랙을 확인할 수 없다. 

- 세번째는 컨슈머 랙을 모니터링 하는 코드를 추가할 수 없는 서트파티 어플리케이션의 컨슈머 랙 모니터링이 불가능하다. 

외부 모니터링 툴을 사용

- 가장 최선의 방법이다. 제일 유명한건 데이터 독, 컨플루언트 컨트롤 센터.

- 컨슈머 랙 모니터링만을 위한 오픈소스로 공개되어있는 버로우를 사용할 수 있다. 

 

버로우 

- 링크드인에서 개발해서 오픈소스로 공개한 컨슈머 랙 체크툴이다. 다수의 카프카 클로스터와 동시에 연결해서 확인할 수 있다. 기업들은 dev, prodcut 등으로 서버를 나누는데 함께 모니터링 하는 버로우를 운영하는 것에 적합하다. 

REST API

- 더 많은 api는 카프카 버로우 깃허브 페이지에서 확인하는 것을 추천

컨슈머 랙 이슈 판별

- 가장 큰 장점은 컨슈머 랙의 이슈를 판별하는 evalutaion로직이 있다. 

- 1번 컨슈머 랙은 지속적으로 아래에서 머물고 0번 컨슈머 랙은 특정시간에 컨슈머 랙이 늘어 난걸 볼 수 있다. 임계치가 있는데 이 임계치를 넘어서면 알람을 줄 수 있다.

- 이렇게 임계치를 잠깐 넘었다가 내려오는 경우는 일시적이기 떄문에 컨슈머 랙에 대해서 대응할 필요가 없다. 컨슈머 어플리케이션이 운영할때 특정 임계치에 도달할떄마다 알람을 받는 것은 무의미 한 일이다. 

- 따라서 버로우에서는 evalution을 사용하는데 임계치가 아닌 슬라이딩 윈도우 방식을 활용한다. 컨슈머 랙 평가라고 부른다. 
- 컨슈머 랙과 파티션의 오프셋을 슬라이딩 윈도우로 계산하면서 상태를 정한다. 파티션의 상태를 OK, STALLED, STOPED로 표한하고 컨슈머의 상태를 OK, WARNING, ERROR로 표현한다. 

컨슈머 처리량 이슈

- 최신오프셋이 지속적으로 늘어나는데 컨슈머 오프셋이 못따라 갈 경우 컨슈머 랙이 리니어(선형적)으로 늘어나는 것을 확인한다.

- 컨슈머가 처리량을 못따라간다는 걸 의미한다. 파티션은 OK, 컨슈머는 WARNING상태로 나타난다. 

- 이럴 경우 파티션 개수를 늘리고 컨슈머 개수를 늘림으로써 전체적인 처리량을 늘리는 것이 제일 좋은 방법중 하나다. 

컨슈머 이슈

- 최신 오프셋이 증가하고 있으나 컨슈머 오프셋이 중단되었을때, 커밋이 일어나지 않는 걸로 보인다. 

- 컨슈머 랙이 빠른속도로 증가한다. 이 경우 파티션은 STALLED, 컨슈머는 ERROR상태로 나타낸다.

- 이때는 이메일, SMS, 슬랙을 통해서 개발자가 알람을 받고 즉각적으로 조치해야 한다.

컨슈머 랙 모니터링 아키텍처 

- 카프카 버로우 - 텔레그래프 - 엘라스틱 서치에 데이터를 수집한다. 

- 텔레그래프를 활용하면 restApi를 호출해서 엘라스틱에 데이터를 시간 순서로 넣어 그라파나를 활용해서 시각화 할 수 있다. 

- 설치 방법: https://blog.voidmainvoid.net/279

 

아파치 카프카 Lag 모니터링 대시보드 만들기

kafka-lag-dashboard Kafka lag을 모니터링하는 확실한 방법 Kafka Consumer의 처리시간이 지연되면 topic 내부의 partition lag이 증가합니다. lag 모니터링을 통해 어느 partition이 lag이 증가하고 있는지, 어느 컨

blog.voidmainvoid.net

 

실습

- kotlin으로 작성한 예제입니다.

https://github.com/namoomi/inflearn-kafka-basic