home

카프카 프로듀서

  • 다양한 요구사항이 존재
    • 신용카드 트랜잭션 처리 시스템: 메시지의 어떠한 유실, 중복 허용 X, 처리율은 높아야함
    • 웹사이트 클릭 정보: 어느정도 중복, 유실 괜찮고, 지연율도 늦어져도 괜찮다.

alt text

  • 키와 객체를 직렬화햇거 바이트 배열로 변환
  • 데이터를 파티션을 결정하는 파티셔너에게 전달
  • 같은 토픽 파티션으로 전ㄴ송될 레코드들을 모은 레코드 배치에 추가 -> 전송

프로듀서 생성

  • bootstrap.server
    • 브로커의 host:post 목록
    • 최소 2개 이상의 브로커에 지정하도록 권장
  • key.serializer
    • 카프카에 쓸 레코드의 키의 값을 직렬화하기 위해 사용하는 시리얼라이저 지정
  • value.serialzier
    • 벨류 값을 직렬화 하기 위해 사용하는 시리얼라이저 지정
  • 메시지 전송 방법
    • 파이어 앤 포켓: 메시지를 서버에 전송만하고 성공 혹은 실패 여부 신경 X
    • 동기적 전송
    • 비동기적 전송

카프카로 메시지 전달하기

  • 동기적 전송
    • 성능이 낮아져서 잘 사용되지는 않는다.
    • 메시지 크기가 너무 클 경우와 같은 에러일 때는 재시도를 하지 않고, 파티션이 리더가 아니거나 연결 에러 등은 재시도가 가능하다.
  • 비동기적 전송
    • 메시지 전송이 완전히 실패했을 때는 callback 함수가 필요하고, send에 같이 보낸다
      producer.send(record, new DemoProducerCallback());
    

프로듀서 설정하기

  • client.id: 프로듀서 식별값, 트러뷸 슈팅에 장점
  • acks
    • acks = 0: 성공적으로 전달되었다고 간주, 응답을 기다리지 않는다.
    • acks = 1: 리더 레플리카가 메시지를 받는 순간 브로커로부터 성공했다는 응답을 받는다.
    • acks=all: 3.0 기본 값, 모든 인-싱크 레플리카에 전달된 뒤에야 성공
    • 이 세가지의 종단 지연은 모두 똑같다. 일관성 유지를 위해 모든 인-싱크 레플리카에 복제가 완료되어야 컨슈머가 소비 가능하기 때문
  • linger.ms: 배치를 전송하기 전까지 대기하는 시간, 0보다 큰 값으로 설정하면 처리율을 크게 늘릴 수 있다.
  • buffer.memory: 전송하기 전 메시지를 대기시키는 버퍼의 크기 결정
  • compression type: 압축 한 뒤 브로커에 보냄 Snappy 추천
  • batch.size: 배치에 사용될 메모리 양을 결정, 크게 해도 가득 찰 때 까지 기다리진 않아서 큰게 작은 것 보다 좋다
  • max.request.size: 프로듀서가 전송하는 쓰기 요청의 크기 결정
  • enable.idempotence: 정확히 한번

시리얼라이저

  • 라이브러리를 사용하지 않으면, int -> Long으로 변환할 때 모든 팀이 시리얼라이저 코드를 수정해야하는 사단이 발생
  • Avro를 사용하여 직렬화
    • 데이터를 읽는 쪽 애플리케이션을 전부 변경하지 않고 스키마를 변경해도 어떠한 예외나 에러 발생 X

    alt text

    • 스키마 레지스트리를 사용하여 고유 식별자를 심어주면 컨슈머는 역직렬화할 때 레지스트리에서 스키마를 가져온다.

파티션

  • 키값은 토픽의 여러 파티션 중 해당 메시지가 저장될 파티션을 정하는 기준점
  • 같은 키값은 모두 파티션에 저장
    • 근데 파티션을 추가하면 이것이 깨진다. 그래서 처음부터 충분한 파티션 수를 고려
  • 키값이 없으면 라운드 로빈으로 파티션에 저장
  • 기본 파티셔너 말고도 종류가 많다. UniformStickyPartitioner를 사용할 경우 균등한 분포
  • 커스텀 파티셔너를 통해서 특정 키값만 한 파티션에 몰고, 다른 키값은 해당 파티션에 들어오지 않게 해줄 수 있다.

헤더

  • 카프카 레코드의 키/밸류값을 건드리지 않고 추가 메타데이터를 심을 때 사용

인터셉터

  • 코드를 고치지 않고 작동을 변경해야 할 때 사용
  • onSend는 프로듀서가 브로커에게 보내기 전, onAcknowledgement는 컨슈머가 받았을 때 호출
  • 인터셉터를 만들고 CLASSPATH로 jar 경로를 등록한 뒤, 애플리케이션을 실행시킬 때 설정파일을 포함해서 실행시킨다.

쿼터, 스로틀링

  • 쓰기 쿼터
  • 읽기 쿼터
  • 요청 쿼터

  • 쓰기 읽기 속도를 제한할 수 있다.
  • JMX 메트릭을 통해서 스로틀링 작동 여부를 알 수 있다.
  • 설정 변경하면 브로커를 재시작, 그래서 kafka-configs.sh나 AdminClient API로 동적 설정 기능을 사용
  • 프로듀서와 컨슈머의 속도를 맞춰야 TimeoutException 등을 방지할 수 있음.
    • 프로듀서가 너무 많은 메시지를 보내면 컨슈머는 메모리의 큐에 적재하고 넘치면 블록
    • 버퍼의 공간이 확보되지 않으면 TimeoutException을 내고, 배치에 올라간 레코드는 시간이 지나면서 무효화