본문 바로가기

오류모음집/kafka

aiokafka.errors.IllegalStateError: IllegalStateError: Subscription to topics, partitions and pattern are mutually exclusive

반응형

카픟하

발생 경위: consumer에서 특정 partition을 소모하려고 할때 발생 코드는 다음과 같다 

    @handle_kafka_errors
    async def initialize(self) -> None:
        """Kafka 소비자 및 생산자 연결 초기화"""
        if self.consumer_topic:
            self.consumer = AIOKafkaConsumer(
            	self.conuser_topic,
                bootstrap_servers=self.bootstrap_servers,
                group_id=self.group_id,
                auto_offset_reset="latest",
                enable_auto_commit=True,
                value_deserializer=lambda x: json.loads(x.decode("utf-8")),
            )
            await self.consumer.start()
            logger.info(f"소비자가 초기화되었습니다: {self.consumer_topic}")

            # 특정 파티션만 할당
            if self.partition is not None:
                partition = TopicPartition(self.consumer_topic, self.partition)
                self.consumer.assign([partition])  # 특정 파티션 할당
                logger.info(
                    f"{self.consumer_topic}의 파티션 {self.partition}을 소비합니다."
                )

 

 

 

발생이유 : consumer 에서 토픽에 특정 파티션을 구독하려할때 이중으로 구독을 하기 때문에 발생 

 

 

해결 방안 :

    @handle_kafka_errors
    async def initialize(self) -> None:
        """Kafka 소비자 및 생산자 연결 초기화"""
        if self.consumer_topic:
            self.consumer = AIOKafkaConsumer(
                bootstrap_servers=self.bootstrap_servers,
                group_id=self.group_id,
                auto_offset_reset="latest",
                enable_auto_commit=True,
                value_deserializer=lambda x: json.loads(x.decode("utf-8")),
            )
            await self.consumer.start()
            logger.info(f"소비자가 초기화되었습니다: {self.consumer_topic}")

            # 특정 파티션만 할당
            if self.partition is not None:
                partition = TopicPartition(self.consumer_topic, self.partition)
                self.consumer.assign([partition])  # 특정 파티션 할당
                logger.info(
                    f"{self.consumer_topic}의 파티션 {self.partition}을 소비합니다."
                )

Consumer config에서 self.consumer_topic 지우기 

반응형