본문 바로가기

핀테크프로젝트/코인

코인 모듈 그 첫번째 각 거래소 API 정형화 하기 (1)

반응형

콬ㅋ콬코코인

거래소 선택

API를 정립을 하기 위해서 현 우리나라에서 제공하는 대한민국 4대 거래소를 선택했다

  1. 업비트
  2. 빗썸
  3. 코빗
  4. 코인원 

 

 

REST와 좀 더 정확하게 실시간 데이터를 받을 수 있는 Websocket를 가져가기로 했다 

REST로 받을 수 있는데 이미 응답을 받고 데이터를 넘겨받는 순간 몇 초 뒤 일 수도 있어
좀 더 명확하게 실시간 데이터를 받으려면 끊기지 않는 웹소켓을 얻는 것이 가장 좋다
하지만 나는 혹시 몰라 둘 다 하기로 했다 

 

 

 

 

4대 거래소 정형화의 시작 

일단 나는 거래소의 일관된 형태의 코드를 만들어 관리하기 편하게 하고 싶은 욕망이 매우 컸기 때문에

다음과 같은 상속 관계를 만들어 시도하였다 

 

관리하기 편하도록 팩토리 패턴을 사용하였고 market을 생성자를 주어 util에 따로 만들어놓은 매칭 패턴을 이용하여 해당 마켓을 작성하면 매칭하여 URL을 넘겨주는 방향으로 선택하였다 

 

def get_symbol_collect_url(market: str) -> str:
    """URL matting

    Depandancy:
        -  possible python 3.10 \n
    Args:
        -  market (str): market name \n
    Raises:
        - ValueError: Not Fount market is ValueError string \n
    Returns:
        str: market url
    """
    match market:
        case "upbit":
            return UPBIT_URL
        case "bithum":
            return BITHUMB_URL
        case "korbit":
            return KORBIT_URL
        case "coinone":
            return COINONE_URL
        case _:
            raise ValueError("Not Found market")

 

from typing import Any
from abc import abstractmethod, ABC
from coin.core.util.util_func import get_symbol_collect_url


class CoinSocketAndRestAbstract(ABC):
    def __init__(self, market: str) -> None:
        self.url: str = get_symbol_collect_url(market)

    @abstractmethod
    def get_socket_parameter() -> list[dict[str, Any]]:
        """
        Returns:
            list[dict[str, Any]]: 각 거래소 socket parameter
        """
        raise NotImplementedError()

    @abstractmethod
    async def get_present_websocket(symbol: str) -> None:
        """
        Subject:
            - 코인 현재가 실시간 \n
        Args:
            - uri (str): 소켓주소
            - subscribe_fmt (list[dict]): 인증파라미터 \n
            - symbol (str) : 심볼
        Returns:
            - 무한루프 \n
        """
        raise NotImplementedError()

    @abstractmethod
    def get_coin_present_price(self, coin_name: str) -> dict[str, Any]:
        """
        Subject:
            - 코인 인덱스 가격 정보 \n
        Parameter:
            - coin_name (str) : 코인이름\n
        Returns:
            - market 형식
        """
        raise NotImplementedError()

    @abstractmethod
    def get_coinsymbol_extraction(self) -> list[str]:
        """
        Subject:
            - 코인 심볼 추출 \n
        Input:
            - market API 형식 \n
        Returns:
            >>> list[str]: ["BTC", "ETH" ....]
        """
        raise NotImplementedError()

 

 

이렇게 함으로써 메서드를 강제할 수 있으니 구현하기 매우 편해진 거 같다

물론 규모가 커질수록 복잡도의 위험성이 커지는 것도 사실이다 일단 우리나라 거래소의 기준으로 진행되기 때문에 복잡성은 높아지지 않을 거지만 만약 더 높아질 수 있는 가능성이 있으면 전제적으로 리팩토링을 하는 것도 방법일 것이다.

 

 

그러면 무엇을 얻어내고 무엇을 하고 싶은 건가?

내가 데이터를 얻으려는 큰 목적성은 3가지가 있다 

  1. 코인 전체 가격데이터를 얻음으로써 해당 코인의 거래의 역사를 보고 싶은 것 
  2. 현재 코인 거래가의 거래소의 평균을 구해서 새로운 차트를 그려내는 것 선택 (1)
  3. 각각의 거래소의 데이터를 각각 가지고 와 하나의 그래프 안에 거래소의 가격을 각각 그려내는 것 (2)
    1. 현재로서는 선택 (1)을 최선으로 두고 있다 
  4. 데이터를 직접 가지고 와 분 시간 월 별로 데이터를 정제하여 새로운 차트를 만들려는 것 

가격은 다음과 같이 6개를 선택하였다 

  • opening_price(시작가), max_price(고가), min_price(저가), closeing_price(종가)
    • 이 가격은 해당 시간대의 거래가 시작될 때의 가격을 나타내므로, 가격 변동을 추적하는 데 중요한 기준으로 판단
    • 이 부분에서 고가, 저가, 종가를 보면서 현시점의 코인의 가치를 판단할 수 있다고 생각
  • prev_closing_price(전일 종가)
    • 이 가격이 나올 때까지의 거래자들의 행동을 반영하므로, 최근 24시간 동안의 가격 변동을 파악하는 것은
    • 현재 코인 가격의 상황을 이해하고 미래의 가격 움직임이 현재가의 기준에서 움직일 수 있다고 하는 데 도움이 될 수 있다고 판단
  • acc_trade_volume_24h(24시간 동안 거래된 개수)
    • 특정 기간 동안 거래된 코인의 총량을 나타내며, 이는 시장의 활동성과 관심도를 나타내는 중요한 지표라고 판단했고
    • 높은 거래량은 해당 코인에 대한 강한 관심을 나타내며, 가격의 변동성이 높을 가능성을 시사한다고 판단
    1. 이는 특정 시간대에 코인이 거래된 시작 가격입니다. 이 가격은 해당 시간대의 거래가 시작될 때의 가격을 나타내므로, 가격 변동을 추적하는 데 중요한 기준으로 판단
      1. 이 부분에서 고가와 저가를 보면서 코인의 가치를 판단할 수 있다고 생각

 

 

그렇다면 결국 가격을 얻어와야 한다 각각의 거래소의 Docs의 사용법을 일일이 나열하기에는 너무 스압과 쓸데없는 정보를 가지고 온다고 생각하기에 다음과 같이 코드로서 첨부하겠다
주석을 써놓았기 때문에 읽기 쉬울 것이다.

https://github.com/PropertyCoinDashboard/CoinInstrucstureStream/blob/main/pipe/coin/core/ubkc_market.py

 

 

그러면 데이터를 가지고 어떻게 정형화를 시키고 싶은 것인가?

데이터는 다음과 같이 정형화를 진행하였다 JSON을 선택하였고 

  1. 스키마 통일화: 거래소별로 통일되지 않은 API 디자인을 Pydantic을 활용하여 통일, 일관된 데이터 관리
    1. 내가 진행한 코인 모듈의 가장 큰 목적은 위에서 말한 선택 (1)의 과정을 진행하고 있기에 실시간으로 데이터를 가지고 오는 것이 가장 중요한 포인트로 작용하였음  
    2. 데이터 구조를 설계하기 위한 선택지는 dataclass와 pydantic이 있었는데 그중 pydantic으로 고른 가장 큰 이유는 데이터 유효성 검사를 진행해 준다는 측면이 가장 컸다
      1. dataclass인 경우 간단한 데이터구조를 설계할 때 사용하면 좋기 때문에 고려대상이 있었으나 유효성 검사를 따로 진행해주지 않는다는 측면에서 제외
    3. 위에서 언급드렸던 스키마 정형화를 위해 pydantic을 사용하였으며 코드는 너무 길어 링크를 첨부
      1. 나의 총목적은 하나의 dictionary에 4개의 거래소 데이터를 모두 넣는 게 핵심이었으며 최종 스키마는 이런 식으로 설계하는 것이 목표 결괏값은 아래와 같이 생겼다 

https://github.com/PropertyCoinDashboard/CoinInstrucstureStream/blob/main/pipe/coin/core/util/data_format.py

{
    "upbit": {
        "name": "upbit-ETH",
        "timestamp": 1689633864.89345,
        "data": {
            "opening_price": 2455000.0,
            "max_price": 2462000.0,
            "min_price": 2431000.0,
            "prev_closing_price": 2455000.0,
            "acc_trade_volume_24h": 11447.92825886,
        }
    }... Bithum, korbit, coinone
}

 

 

 

위에서 만든 팩토리 패턴으로 만든 API 합친 것으로으로 REST와 WebSocket을 만들었고 

팩토리 패턴으로 사용한 이점이 이경우에 너무 좋다

REST 방식 

만든 API의 함수중 get_coin_present_price를 이용하여  symbol 값을 주어 해당 심벌의 REST 한 값을 불러온 후 

pydantic으로 진행한 스키마에 맞춰서 호출값을 넣었다 

 

https://github.com/PropertyCoinDashboard/CoinInstrucstureStream/blob/main/pipe/coin/core/coin_rest_interaction.py

WebSocket 방식 

websock은 각각 거래소마다 호출하는 방법이 제각각이고 한번 시작하면 끊지 않고 계속 들어가는 패턴이기에 비동기로 사용하였다

    async def process_exchange(self, market: str, message: dict) -> dict:
        """
        message 필터링

        Args:
            market (str):
                -> 거래소
            message (dict):
                -> 데이터

        Returns:
            dict: 각 거래소당 dictionary가 달라 저렇게 항목으로 접근
        """

        if market == "bithumb":
            if message.get("resmsg") == "Filter Registered Successfully":
                pass
            else:
                return message["content"]

        if market == "korbit":
            if message.get("event") == "korbit:subscribe":
                pass
            else:
                return message["data"]

        if market == "coinone":
            if message.get("response_type") == "SUBSCRIBED":
                pass
            else:
                return message["data"]
        return message

 

websocket은 기본적으로 승인받는 패턴이 있기 때문에 다음과 같이 이루어진다 

웹소켓 승인 dictionary -> 확인이 되면 확인 이 되었다는 JSON or None -> 연결 

 

그러므로 데이터를 보내거나 할 때 확인 메시지까지 같이 보내지는 수가 있어 제외하고 보내는 방식을 선택하였다 

메시지를 받고 남에 있어서 정형화까지 함께 들어가기 때문에 각 하나의 클래스에 너무 많은 책임을 짊어주기는 싫어 

2개의 클래스로 나누었으나 서로 간에 의존성이 너무 강해 이걸 어떻게 풀어야만 의존성이 풀리는지 조언을 구합니다.. 살려주세요..

 

자바는 인터페이스로 할  수 있을 거 같은데 python에서 새로운 클래스나 함수를 만들어서 최대한 종속성을 떨어뜨려야 할지 고민 중이다 

https://github.com/PropertyCoinDashboard/CoinInstrucstureStream/blob/main/pipe/coin/core/coin_socket_interaction.py

 

 

정형화된 데이터를 만들고 어떻게 전달할 건가?

Kafka를 이용할 것이다.

이미 정형화를 끝냈고 데이터를 보내기만 하면 되기 때문에 각각 생성되는 데이터의 용량이  어느 정도인지 확인이 필요하였다 

 

2024-01-29 19:23:55,619 - success_data - INFO - Message delivered to: btcSocketDataInBithumb --> counting --> 10 size --> 2743
 
2024-01-29 19:23:57,437 - success_data - INFO - Message delivered to: btcSocketDataInUpbit --> counting --> 10 size --> 2793
 
2024-01-29 19:27:25,346 - success_data - INFO - Message delivered to: btcSocketDataInCoinone --> counting --> 10 size --> 2803
 
2024-01-29 19:27:32,806 - success_data - INFO - Message delivered to: btcSocketDataInKorbit --> counting --> 10 size --> 2783
 

 

4대 거래소의 평균 데이터 size는 근삿값으로 2850

aiokafka의 기본값으로는 

max_request_size=1048576   # 10 KB
max_batch_size=16384     # 16KB

현 값보다 현저히 높아 조금 더 힘들게 잡아서 전송속도를 높이는데 초점을 맞췄다 

하지만 이렇게 맞췄다고 해서 네트워크 I/O와 웹소켓을 제공하는 측의 성능에 따라 
데이터가 보내지는 것이 확연히 차이가 날 텐데 굳이 필요한가에 자답을 해보면..
필요 없는 성능을 두는 것보다 
최대한 타이트하게 두어 메모리를 조금이라도 낭비하는 게..

라고 생각하지만 구지라는 생각도 많이 든다.. 

https://github.com/PropertyCoinDashboard/CoinInstrucstureStream/blob/main/pipe/coin/core/data_mq/data_interaction.py

 

 

 

이렇게 만든 모듈들을 효율적으로 관리하기 위해 팩토리 패턴을 사용했으며 각각의 필요한 파라미터를 JSON으로 관리하는데 용이가 편해 JSON으로 요청할 파라미터를 관리하였고 REST, WebSocket 별도로 관리하였으며 이를 JSON을 불러오는 값으로 Dict에 담겨있는 클래스와 같이 JSON을 조합해 넘겨주는 방향으로 채택하여 유지보수성을 높였다 

class __MarketAPIFactory:
    """Factory for market APIs."""

    _create: dict[str, dict[str, Any]] = {
        "upbit": UpbitRestAndSocket,
        "bithumb": BithumbRestAndSocket,
        "korbit": KorbitRestAndSocket,
        "coinone": CoinoneRestAndSocket,
    }

    @classmethod
    def market_load(cls, conn_type: str, *args, **kwargs):
        """
        거래소 API의 인스턴스를 생성합니다.
        """
        if conn_type not in cls._create:
            raise ValueError(f"잘못된 연결 유형: {conn_type}")

        creator = cls._create[conn_type]
        return creator(*args, **kwargs)
        
        
 
 def load_json(
    conn_type: str,
) -> ExchangeSocketDataTypeHints | ExchangeRestDataTypeHints:
    """
    Open the file and load market information.

    ExchangeRestDataTypeHints(Type): dict[str, ExchangeRestConfig]
    - from coin.core.util._typing import ExchangeRestDataTypeHints

    ExchangeSocketDataTypeHints(Type): dict[str, ExchangeSocketConfig]
    - from coin.core.util._typing import ExchangeSocketDataTypeHints


    """
    with open(
        file=f"{path}/config/_market_{conn_type}.json", mode="r", encoding="utf-8"
    ) as file:
        market_info = json.load(file)

    # ExchangeSocketDataTypeHints | ExchangeRestDataTypeHints
    # JSON에 저장되어 있는 값 + API 클래스 주소
    market_info = {
        market: {**info, "api": __MarketAPIFactory.market_load(market)}
        for market, info in market_info.items()
    }
    return market_info

 

 

전체 코드는 다음과 같다 
https://github.com/PropertyCoinDashboard/CoinInstrucstureStream/tree/main

 

GitHub - PropertyCoinDashboard/CoinInstrucstureStream

Contribute to PropertyCoinDashboard/CoinInstrucstureStream development by creating an account on GitHub.

github.com

 

 

설계 측면에서 따지만 다음과 같은 설계도가 나왓다 

 

 

 

 

코드 피드백 있으시면 부탁드리겠습니다 좀 더 나은 코드 좀 더 나은 설계를 하고싶습니다 

 

반응형