반응형
발생 경위: consumer에서 특정 partition을 소모하려고 할때 발생 코드는 다음과 같다
@handle_kafka_errors
async def initialize(self) -> None:
"""Kafka 소비자 및 생산자 연결 초기화"""
if self.consumer_topic:
self.consumer = AIOKafkaConsumer(
self.conuser_topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
auto_offset_reset="latest",
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode("utf-8")),
)
await self.consumer.start()
logger.info(f"소비자가 초기화되었습니다: {self.consumer_topic}")
# 특정 파티션만 할당
if self.partition is not None:
partition = TopicPartition(self.consumer_topic, self.partition)
self.consumer.assign([partition]) # 특정 파티션 할당
logger.info(
f"{self.consumer_topic}의 파티션 {self.partition}을 소비합니다."
)
발생이유 : consumer 에서 토픽에 특정 파티션을 구독하려할때 이중으로 구독을 하기 때문에 발생
해결 방안 :
@handle_kafka_errors
async def initialize(self) -> None:
"""Kafka 소비자 및 생산자 연결 초기화"""
if self.consumer_topic:
self.consumer = AIOKafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
auto_offset_reset="latest",
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode("utf-8")),
)
await self.consumer.start()
logger.info(f"소비자가 초기화되었습니다: {self.consumer_topic}")
# 특정 파티션만 할당
if self.partition is not None:
partition = TopicPartition(self.consumer_topic, self.partition)
self.consumer.assign([partition]) # 특정 파티션 할당
logger.info(
f"{self.consumer_topic}의 파티션 {self.partition}을 소비합니다."
)
Consumer config에서 self.consumer_topic 지우기
반응형