본문 바로가기

핀테크프로젝트/코인

다시 돌아온 코인 프로젝트 (0)

반응형

 

 

오랜만에 포슽잉 이예요 리프레쉬 이후 다시 씽나게 저의 목적을 향해 다시 글을 써보려고 합니다 

완성은 한 상태이고 하나하나 차근차근 복기를 해보며 지나가려고 합니다 

 

 

거래소 선택 

거래소를 기존 4개에서 9개로 늘렸습니다 이유는 스케일을 늘려 우리나라 와 다른 지역 간의 거래소 가격차이가
얼마나 나는지 지역 간 거래를 할 때 어떤 거래소에서 진행해야 수익이 나는지를 관찰하고 싶었습니다

  1. 대한민국
    1. 업비트
    2. 빗썸
    3. 코빗
    4. 코인원 
  2. 아시아
    1. OKX
    2. Bybit
    3. GateIO
  3. 유럽
    1. Binance
    2. Kraken 

 

어떻게 진행했을까?

websocket으로 Ticker와 Orderbook으로 진행하였습니다 도중에 끊기면 rest api로 호출하도록 만들었어요 

Ticker -> 현재가,  Orderbook -> 거래 영수증(어떤 가격에 얼마나 사고팔았는지)

 

제가 가장 궁금했던 건 가격데이터였습니다 그래서 다음과 같이 가지고 오도록 해보았습니다 (업비트 기준인데요)

업비트와 빗썸은 websocket 호출 이 같아 구축하기가 매우 편했어요 (갬동..) 

timestamp 타임스탬프
opening_price 시가
trade_price 현재가
high_price 고가
low_price 저가
prev_closing_price 지난 종가
acc_trade_volume_24h 24시간 평균 볼륨
signed_change_price 전일 대비 값
signed_change_rate 전일 대비 등락율

 

이런 식으로 9개의 변수들을 yml 파일에 담아 관리하도록 쉽게 만들었습니다 

# ticker 
upbit:
  parameter:
    - timestamp
    - opening_price
    - trade_price
    - high_price
    - low_price
    - prev_closing_price
    - acc_trade_volume_24h
    - signed_change_price
    - signed_change_rate

bithumb:
  parameter:
    - timestamp
    - opening_price
    - trade_price
    - high_price
    - low_price
    - prev_closing_price
    - acc_trade_volume_24h
    - signed_change_price
    - signed_change_rate

.... 9개의 거래소

 

 

어떻게 호출했을까?

크게 9개의 거래소에서 진행해야 하니 크게 집단을 만들었습니다 위에 보셨다시피 대한민국, 아시아, 유럽  이렇게요

클라이언트 -> connection으로 가는 구조로 이루어지게 했습니다 각각마다의 connector 가 필요 하였고

 

이런 식으로 진행하였습니다

1. 커넥션을 진행해 주는 모체 클래스를 만들고 

2. 그 뒤에 자식 클래스가 상속을 받음에 있어 진행해 주는 방식으로 진행하여 확장성과 유지보수성을 높일 수 있었습니다 

    2.1  그래야 나중에 거래소를 늘릴예정일때 클래스 하나만 추가해 주면 되니까요!

class CoinExchangeSocketConnection(AbstractExchangeSocketClient):
    async def get_present_websocket(self, symbol: str, socket_type: str) -> None:
        from protocols.connection.coin_socket import (
            KoreaWebsocketConnection as WCM,
        )

        """소켓 출발점"""
        return await WCM().websocket_to_json(
            uri=self._websocket,
            subs_fmt=self.socket_parameter(symbol=symbol, req_type=socket_type),
            symbol=symbol,
            socket_type=socket_type,
        )


class UpbitSocket(CoinExchangeSocketConnection):
    def __init__(self) -> None:
        super().__init__(
            target="upbit",
            location="korea",
            socket_parameter=upbithumb_socket_parameter,
        )

    async def price_present_websocket(self, symbol: str) -> None:
        return await super().get_present_websocket(symbol, socket_type=self.ticker)

    async def orderbook_present_websocket(self, symbol: str) -> None:
        return await super().get_present_websocket(symbol, socket_type=self.orderbook)
        
        
   ... 이런식으로 모든 지역마다 구성되어 있습니다

 

여기서 1차 아쉬운 점은.. 
모든 지역에 CoinExchangeSocketConnection 사용하고 있습니다 
순환 참조로 인해서 import는 함수 안에다가 두어서 방지를 했는데요 
프로토콜 안에 있는 connection 이 지역마다 있어서 (KoreaConnect.. AsiaConnect 이렇게..)
이걸 그냥 하나로 통일하는 게 맞았나? 생각이 들어요 
지역마다 connection이 다르게 적용할 거 같아서 그랬는데
지금 보니깐 굳이 그럴 필요가 있었나 생각이 드네요..

확장 가능성을 염두에 뒀지만 유지보수성이 올라간 거 같습니다... 

 

 

이렇게 해서 각각 지역마다 클래스를 만들어 protocol 폴더에 고이 모셔놓았습니다..!!

 

Connection은 어떻게 진행했을까?

위에 보이면 coin_socket.py 에 각가 지역마다의 Connection이 들어있고 그 connection은 WebsocketConnectionManager를 상속받았고 그건 common 폴더에 넣어놨습니다 각 지역마다 같은 걸 사용하기 때문에 common file에 넣어 논 거죠!! 

원본 코드는 아래 URL를 타주세요..!!

https://github.com/MajorShareholderClub/ExchangePipeline/blob/main/common/client/market_socket/websocket_interface.py

 

ExchangePipeline/common/client/market_socket/websocket_interface.py at main · MajorShareholderClub/ExchangePipeline

Contribute to MajorShareholderClub/ExchangePipeline development by creating an account on GitHub.

github.com

 

 

 

websocket에서 최종 카프카 까지 전송하기 위해 다음과 같은 로직을 진행하였습니다 모든 걸 설명할 수 없으니 다음과 같은 다이어그램으로 설명할 수 있을 거 같아요..!

 

이렇게 진행한 메시지를 비동기 큐에 넣고 다음과 같이 진행하였습니다 

크게 common 안에 있는 socket 클래스는 네 가지로 구분할 수 있으며 

1. MeaageQueueManager --> 메시지 큐 적재

2. KafkaService --> 카프카에 데이터 송신 

3. MessageProcessor --> 웹소켓 데이터 처리 

4. WebsocketConnectionManager --> 웹소켓에서 데이터 송수신 

 

데이터를 적재하기 위해 dictionary를 사용하였으며 TypeDict를 사용해 타입 안정성을 키웠습니다 

전체적인 다이어 그램을 보면 다음과 같다고 보시면 될꺼같아요..!!



이렇게 한 후 연결을 편하게 진행하기 위해 Factory 패턴을 사용하여  각 클래스를 묶어놓아 선언하기 쉽게 만들었습니다

class MarketAPIFactory:
    """Factory for market APIs."""

    _create: ClassVar = WorldMarketsRequestType(
        korea=KoreaMarketRequestType(
        ..... 지역마다 socket rest 있음

    @classmethod
    def market_load(
        cls, conn_type: str, market: str, c: str, *args, **kwargs
    ) -> Result[str, ClassAddress]:
        """
        거래소 API의 인스턴스를 생성합니다.
        """
        ...

class MarketLoadType:
    def __init__(self, conn_type: str, location: str) -> None:
        self.conn_type = conn_type
        self.location = location

    def load_json(self) -> RequestDict:
        """
        JSON 파일 로드 (socket 또는 rest)
            - 어떤 가격대를 가지고 올지 파라미터 정의되어 있음
        """

    def _market_api_load(self, market: str) -> Result[str, ClassAddress]:
        ....
        
class SocketMarketLoader(MarketLoadType):
    def __init__(self, location: str) -> None:
        super().__init__(conn_type="socket", location=location)

    def process_market_info(self) -> dict[str, Callable]:
       ....

이렇게 ticker와 orderbook의 websocket 파이프라인을 만들고 최종적으로 특정 시간 또는 배치가 쌓이면 카프카로 보내는 형태로 진행하였습니다..!

 

 

카프카로 어떻게 보냈니? 

저는 고민하고 또 고민하였습니다 저에겐 두 가지가 있었죠 

1. 각 거래소마다 토픽을 생성하고 거래 타입마다 파티션을 다르게 두자

             --> upbit-socket-data-topic(partition1: ticker, partition2: orderbook)

2. 각 지역마다 토픽을 생성하고 지역마다 있는 거래소를 파티션을 다르게 두자 

            --> KoreaMarketTopic --> (partition1: upbit, partition2: bithumb....) 

 

저는 2번을 선택하여 특정 지역의 partition은 offet이나 특정 용량이 넘어가면 디스크 자동저장이기 때문에 

관리가 더 편하지 않을까 생각이 들었습니다 그리고 토픽의 복잡성도 생각해 보았는데요 기존 1번을 선택을 했을 시 

 

각 거래소당 2개의 토픽 [9 x 2 = 18개]으로 복잡성이 너무 심해지고 또한 다른 전처리는 진행할 때 다시 소비하는 입장에서 매우 

요점이 될 거 같아 지역을 3개의 지역으로 topic을 나누어 [3 x 2 = 6] 3배의 복잡성을 줄이고 안에 있는 partition을 각 거래소만큼 나누어 최적화를 진행하였습니다 각 거래소마다 키를 부여해 특정 지역과 거래소가 매핑이 되면 partition으로 적재하는 방식으로 cusctom 하였으며 코드는 아래에서 보시면 되겠습니다...!!!

 

https://github.com/MajorShareholderClub/ExchangePipeline/blob/main/mq/data_partitional.py

 

ExchangePipeline/mq/data_partitional.py at main · MajorShareholderClub/ExchangePipeline

Contribute to MajorShareholderClub/ExchangePipeline development by creating an account on GitHub.

github.com

 

 

그러고 나서 보내는 과정...

일단 용량부터 체크해 보았습니다 

최고 크기는 569,262 바이트,

최저 크기는 1,509 바이트,

평균 크기는 약 77,471 바이트

 

이렇게 용량 차이가 큰 이유는 

ticker만 전처리를 하고 order book은 가능한 많은 데이터가 있으면 좋기 때문에 전처리를 하지 않고 통으로 보냈습니다 

카프카는 일정 용량을 넘기면 전송을 못하기에 넉넉하게 두었습니다..
압축 방식은 gzip으로 두었고 각 마다 key와 value를 serialization 하여 보냈으며 

 

만약 브로커가 실패했거나 기타 등등으로 메시지를 보낼 수 없을때 defaultdict 를 사용하여 임시 저장하여 복구 했을때 다시 저장할 수 있게 만들었습니다 !! 비동기 방식으로 보내는 것이기 때문에 python의 AIO-Kafka 사용했습니다 

 

근데 여기서도 
그냥 속 편하게 linger_ms 사용했으면 오히려 좋았을 텐데 라는 아쉬움이 남습니다
max_batch_size와, linger_ms를 최적의 값으로 찾았으면 아니면 배치 지원도 되니 
create_batch를 만들었을 것도 좋았을 거고... 
유지보수도 좋을 테고.. 


카프카를 적극적으로 사용한 게 아닌 
시간 or 배치 처리할 건데 카프카를 곁들인..

 

 

 

 

프로젝트를 유지보수할 때 
충분히 고려해 볼 수 있을 거 같아요 

 

 

이미 프로젝트는 다 끝난 상태입니다 미리 코드를 보고 싶으시면 다음 URL를 타주세요
많은 피드백 질타 부탁드리겠습니다

읽어주셔서 감사합니다..!! 

정말 감사합니다!

 

 

 

 

https://github.com/MajorShareholderClub

 

MajorShareholderClub

MajorShareholderClub has 5 repositories available. Follow their code on GitHub.

github.com

 

반응형