[kafka]멱등성 프로듀서, 트랜잭션 프로듀서와 컨슈머
inflearn강의(아파치 카프카 애플리케이션 프로그래밍-데브원영 DVWY)를 들으며 타이핑했던 내용들을 블로그에 기록용으로 남기고자 합니다.강의를 들으며 개인적으로 타이핑한 내용으로 오타나 맞춤법, 띄어쓰기 등이 엉망일 수 있습니다.
멱등성 프로듀서
- 멱등성이란 여러 번 연산을 수행하더라도 동일한 결과를 나타내는 것이다. 프로듀셔는 동일한 데이터를 여러번 저장하더라도 카프카에 단 한번만 저장됨을 의마한다.
- 본 프로듀서 동작 방식은 적어도 한번 전달을 지원한다. 적어도 한번 전달이라는 건 중복이 발생할 수 있기 때문에 단 한번만 전달하도록 하고 싶으면 멱등성 프로듀서를 사용해야 한다.
- 데이터의 중복 적재를 막기 위해 0.11.0이후 부터 enable.idempotence옵션을 이용하여 딱 한번만 전달하는 걸 지원한다.
- 네트워크 이슈가 발생했을 때 데이터를 중복 전송할 수 있다. 정확히 한번을 위해 true로 옵션값을 변경해서 운영할 수 있다.
- 카프카 3.0.0부터는 옵션값의 기본값이 true이다. 2.x 버전은 false였지만 3.x 부터는 true로 모두 변경된다.
- enable.idempotence=true, acks=all로 하게 되면 큰 문제가 발생할 수 있는데 모두 데이터가 적재되었을때 알려주는 걸 acks=all인데 이걸로 하면 처리가 무척 느려지기 떄문에 이렇게 할 경우 유의해야 한다.
동작방식
- 기본 프로듀서와 달리 데이터를 브로커로 전달할때 브로듀서 PID(프로듀서의 고유한 ID)와 시퀀스 넘버(레코드의 전달 번호ID)를 함께 전달한다. 이 두 개의 조합으로 적재하고자 하는 브로커에 중복적재 되었는지 확인을 한다.
- ProducerID:1, SID:1가 한번 적재하고 나면 브로커는 이 아이디에 대해서 중복 적재하지 않는다.
멱등성 프로듀서가 아닌경우
- send()메소드를 통해 특정 파티션에 데이터를 보냈는데 어떠한 사유로 인해 리더 파티션에 데이터 적재가 완료되었음에도 불구하고 응답값을 적재적소에 받지 못한다면 프로듀서는 다시 한번 retry하게 되어 중복 적재하게 된다.
멱등성 프로듀서인 경우
- send()할때 v1(seq=0,pid=0)를 보내게 되면 한번 적재된 데이터에 대해서 이미 적재되어있으면 send()를 여러번 호출해서 동일한 레코드를 보내더라도 이미 적재된 데이터를 기본으로 해서 응답한다.
멱등성 프로듀서의 한계
- 멱등성 프로듀서는 동일한 세션에서만 정확히 한번 전달을 보장한다. 프로듀서가 비정상적인 종료일 경우 보장하지 못한다.
- 프로듀서가 pid, sid를 보내게 되는데 만약 프로듀서가 비정상적으로 종료되어 재생성되면 pid가 달라지게 된다. 달라진 pid, sid로 동일한 레코드를 보내더라도 브로커에선 다른 레코드로 인식하여 한번만 적재하지 않는다.
멱등성 프로듀서로 설정할 경우 옵션
- retires 기본값은 Integer.MAX_VALUE로 설정되고 acks=all로 설정된다.
- 프로듀서가 여러번 브로커에 데이터를 보내더라도 브로커에서 단 한번만 데이터를 적재하도록 하는 것이 멱등성 프로듀서의 동작이다.
- 네트워크를 여러번 받고 pid, sid정보를 메모리에서 갖고 있기 때문에 이 메모리 정보를 읽고 쓰다 보니 브로커 자체에서도 부하가 생긴다.
- 카프카 3.0부터는 이러한 부분에 대해서 많은 최적화가 되었다.
멱등성 프로듀서 사용시 오류 확인
- 기존 sid가 파티션의 레코드가 얼마나 순서대로 적재되는지 보장할 수 있다.
- 멱등성 프로듀서의 시퀀스 넘버는 0부터 1씩 더한 값이 전달되는데, 1이 사라지고 2가 먼저 전송되는 경우 OutOfOrderSequenceException이 발생하게 된다. 따라서 순서가 중요한 데이터일 경우 이 exception에 대해서 처리하게 되면 된다.
트랜잭션 프로듀서의 동작
- 개개별의 앨리먼트를 한개의 아토믹하게 동작하는 방식이다. 트랜잭션 프로듀서를 동작하도록 설정해야 한다.
- 다수의 파티션에 데이터를 저장하는 경우에 하나의 atomic을 만족하기 위해 사용한다.
- 사용자가 보낸 데이터를 레코드를 파티션에 저장할 뿐만 아니라 커밋이라는 과정을 통해 컨슈머에서 처리할지 말지 선택하게 하게 한다. 커밋을 수행해야지만 컨슈머에서는 이 커밋한 데이터에 대해서 처리를 하게 된다.
- 즉 프로듀서에서 여러 개의 레코드 전송이 완료되면 커밋을 수행하게 된다. 그때 컨슈머에서 이 여러 개의 레코드를 하나의 트랜잭션으로 가져가 처리하게 된다.
트랜잭션 컨슈머의 동작
- 트랜잭션 프로듀서가 보낸 데이터 같은 경우에는 이미 레코드는 쌓이기 된다. 이미 레코드가 쌓이게 되더라도 트랜잭션 컨슈머는 커밋이 완료된 레코드만 가져가기 원하기 떄문에 대기 상태로 기다리게 된다.
- 트랜잭션이 커밋할때 레코드들이 원자성을 가지게 되는데 이떄 컨슈머가 처리를 하게 된다.
트랜잭션 프로듀서 설정
- transaction.id를 설정하게 되면 트랜잭션 프로듀서로 설정된다. 프로듀서별로 고유한 ID값을 사용해야 한다. UUID와 같은 고유한 값.
- initTransaction(): 운영하는 프로듀서가 준비되었다
- beginTransaction(): 레코드를 실질적으로 보내기 전
- commitTransaction(): 레코드를 보낸 후 커밋
- beginTransaction()과 commitTransaction() 사이에 복수의 레코드를 전송한다.
- 안전하게 종료하기 위해 close()
트랜잭션 컨슈머 설정
- config컨슈머 설정 옵션 중 isolation.level을 read_committed로 설정하면 된다. 즉 커밋된 레코드만 읽어라 라는 의미다.
- 기본값은 read_uncommitted로 커밋이 안된것도 처리하게 된다.
주의점:
java 공식 라이브러리에서 트랜잭션을 지원. 다른 언어에 대해서는 확인하고 사용하는 것이 중요하다.