오랜만에 포슽잉 이예요 리프레쉬 이후 다시 씽나게 저의 목적을 향해 다시 글을 써보려고 합니다
완성은 한 상태이고 하나하나 차근차근 복기를 해보며 지나가려고 합니다
거래소 선택
거래소를 기존 4개에서 9개로 늘렸습니다 이유는 스케일을 늘려 우리나라 와 다른 지역 간의 거래소 가격차이가
얼마나 나는지 지역 간 거래를 할 때 어떤 거래소에서 진행해야 수익이 나는지를 관찰하고 싶었습니다
- 대한민국
- 업비트
- 빗썸
- 코빗
- 코인원
- 아시아
- OKX
- Bybit
- GateIO
- 유럽
- Binance
- Kraken
어떻게 진행했을까?
websocket으로 Ticker와 Orderbook으로 진행하였습니다 도중에 끊기면 rest api로 호출하도록 만들었어요
Ticker -> 현재가, Orderbook -> 거래 영수증(어떤 가격에 얼마나 사고팔았는지)
제가 가장 궁금했던 건 가격데이터였습니다 그래서 다음과 같이 가지고 오도록 해보았습니다 (업비트 기준인데요)
업비트와 빗썸은 websocket 호출 이 같아 구축하기가 매우 편했어요 (갬동..)
timestamp | 타임스탬프 |
opening_price | 시가 |
trade_price | 현재가 |
high_price | 고가 |
low_price | 저가 |
prev_closing_price | 지난 종가 |
acc_trade_volume_24h | 24시간 평균 볼륨 |
signed_change_price | 전일 대비 값 |
signed_change_rate | 전일 대비 등락율 |
이런 식으로 9개의 변수들을 yml 파일에 담아 관리하도록 쉽게 만들었습니다
# ticker
upbit:
parameter:
- timestamp
- opening_price
- trade_price
- high_price
- low_price
- prev_closing_price
- acc_trade_volume_24h
- signed_change_price
- signed_change_rate
bithumb:
parameter:
- timestamp
- opening_price
- trade_price
- high_price
- low_price
- prev_closing_price
- acc_trade_volume_24h
- signed_change_price
- signed_change_rate
.... 9개의 거래소
어떻게 호출했을까?
크게 9개의 거래소에서 진행해야 하니 크게 집단을 만들었습니다 위에 보셨다시피 대한민국, 아시아, 유럽 이렇게요
클라이언트 -> connection으로 가는 구조로 이루어지게 했습니다 각각마다의 connector 가 필요 하였고
이런 식으로 진행하였습니다
1. 커넥션을 진행해 주는 모체 클래스를 만들고
2. 그 뒤에 자식 클래스가 상속을 받음에 있어 진행해 주는 방식으로 진행하여 확장성과 유지보수성을 높일 수 있었습니다
2.1 그래야 나중에 거래소를 늘릴예정일때 클래스 하나만 추가해 주면 되니까요!
class CoinExchangeSocketConnection(AbstractExchangeSocketClient):
async def get_present_websocket(self, symbol: str, socket_type: str) -> None:
from protocols.connection.coin_socket import (
KoreaWebsocketConnection as WCM,
)
"""소켓 출발점"""
return await WCM().websocket_to_json(
uri=self._websocket,
subs_fmt=self.socket_parameter(symbol=symbol, req_type=socket_type),
symbol=symbol,
socket_type=socket_type,
)
class UpbitSocket(CoinExchangeSocketConnection):
def __init__(self) -> None:
super().__init__(
target="upbit",
location="korea",
socket_parameter=upbithumb_socket_parameter,
)
async def price_present_websocket(self, symbol: str) -> None:
return await super().get_present_websocket(symbol, socket_type=self.ticker)
async def orderbook_present_websocket(self, symbol: str) -> None:
return await super().get_present_websocket(symbol, socket_type=self.orderbook)
... 이런식으로 모든 지역마다 구성되어 있습니다
여기서 1차 아쉬운 점은..
모든 지역에 CoinExchangeSocketConnection 사용하고 있습니다
순환 참조로 인해서 import는 함수 안에다가 두어서 방지를 했는데요
프로토콜 안에 있는 connection 이 지역마다 있어서 (KoreaConnect.. AsiaConnect 이렇게..)
이걸 그냥 하나로 통일하는 게 맞았나? 생각이 들어요
지역마다 connection이 다르게 적용할 거 같아서 그랬는데
지금 보니깐 굳이 그럴 필요가 있었나 생각이 드네요..
확장 가능성을 염두에 뒀지만 유지보수성이 올라간 거 같습니다...
이렇게 해서 각각 지역마다 클래스를 만들어 protocol 폴더에 고이 모셔놓았습니다..!!
Connection은 어떻게 진행했을까?
위에 보이면 coin_socket.py 에 각가 지역마다의 Connection이 들어있고 그 connection은 WebsocketConnectionManager를 상속받았고 그건 common 폴더에 넣어놨습니다 각 지역마다 같은 걸 사용하기 때문에 common file에 넣어 논 거죠!!
원본 코드는 아래 URL를 타주세요..!!
ExchangePipeline/common/client/market_socket/websocket_interface.py at main · MajorShareholderClub/ExchangePipeline
Contribute to MajorShareholderClub/ExchangePipeline development by creating an account on GitHub.
github.com
websocket에서 최종 카프카 까지 전송하기 위해 다음과 같은 로직을 진행하였습니다 모든 걸 설명할 수 없으니 다음과 같은 다이어그램으로 설명할 수 있을 거 같아요..!
이렇게 진행한 메시지를 비동기 큐에 넣고 다음과 같이 진행하였습니다
크게 common 안에 있는 socket 클래스는 네 가지로 구분할 수 있으며
1. MeaageQueueManager --> 메시지 큐 적재
2. KafkaService --> 카프카에 데이터 송신
3. MessageProcessor --> 웹소켓 데이터 처리
4. WebsocketConnectionManager --> 웹소켓에서 데이터 송수신
데이터를 적재하기 위해 dictionary를 사용하였으며 TypeDict를 사용해 타입 안정성을 키웠습니다
전체적인 다이어 그램을 보면 다음과 같다고 보시면 될꺼같아요..!!
이렇게 한 후 연결을 편하게 진행하기 위해 Factory 패턴을 사용하여 각 클래스를 묶어놓아 선언하기 쉽게 만들었습니다
class MarketAPIFactory:
"""Factory for market APIs."""
_create: ClassVar = WorldMarketsRequestType(
korea=KoreaMarketRequestType(
..... 지역마다 socket rest 있음
@classmethod
def market_load(
cls, conn_type: str, market: str, c: str, *args, **kwargs
) -> Result[str, ClassAddress]:
"""
거래소 API의 인스턴스를 생성합니다.
"""
...
class MarketLoadType:
def __init__(self, conn_type: str, location: str) -> None:
self.conn_type = conn_type
self.location = location
def load_json(self) -> RequestDict:
"""
JSON 파일 로드 (socket 또는 rest)
- 어떤 가격대를 가지고 올지 파라미터 정의되어 있음
"""
def _market_api_load(self, market: str) -> Result[str, ClassAddress]:
....
class SocketMarketLoader(MarketLoadType):
def __init__(self, location: str) -> None:
super().__init__(conn_type="socket", location=location)
def process_market_info(self) -> dict[str, Callable]:
....
이렇게 ticker와 orderbook의 websocket 파이프라인을 만들고 최종적으로 특정 시간 또는 배치가 쌓이면 카프카로 보내는 형태로 진행하였습니다..!
카프카로 어떻게 보냈니?
저는 고민하고 또 고민하였습니다 저에겐 두 가지가 있었죠
1. 각 거래소마다 토픽을 생성하고 거래 타입마다 파티션을 다르게 두자
--> upbit-socket-data-topic(partition1: ticker, partition2: orderbook)
2. 각 지역마다 토픽을 생성하고 지역마다 있는 거래소를 파티션을 다르게 두자
--> KoreaMarketTopic --> (partition1: upbit, partition2: bithumb....)
저는 2번을 선택하여 특정 지역의 partition은 offet이나 특정 용량이 넘어가면 디스크 자동저장이기 때문에
관리가 더 편하지 않을까 생각이 들었습니다 그리고 토픽의 복잡성도 생각해 보았는데요 기존 1번을 선택을 했을 시
각 거래소당 2개의 토픽 [9 x 2 = 18개]으로 복잡성이 너무 심해지고 또한 다른 전처리는 진행할 때 다시 소비하는 입장에서 매우
요점이 될 거 같아 지역을 3개의 지역으로 topic을 나누어 [3 x 2 = 6] 3배의 복잡성을 줄이고 안에 있는 partition을 각 거래소만큼 나누어 최적화를 진행하였습니다 각 거래소마다 키를 부여해 특정 지역과 거래소가 매핑이 되면 partition으로 적재하는 방식으로 cusctom 하였으며 코드는 아래에서 보시면 되겠습니다...!!!
https://github.com/MajorShareholderClub/ExchangePipeline/blob/main/mq/data_partitional.py
ExchangePipeline/mq/data_partitional.py at main · MajorShareholderClub/ExchangePipeline
Contribute to MajorShareholderClub/ExchangePipeline development by creating an account on GitHub.
github.com
그러고 나서 보내는 과정...
일단 용량부터 체크해 보았습니다
최고 크기는 569,262 바이트,
최저 크기는 1,509 바이트,
평균 크기는 약 77,471 바이트
이렇게 용량 차이가 큰 이유는
ticker만 전처리를 하고 order book은 가능한 많은 데이터가 있으면 좋기 때문에 전처리를 하지 않고 통으로 보냈습니다
카프카는 일정 용량을 넘기면 전송을 못하기에 넉넉하게 두었습니다..
압축 방식은 gzip으로 두었고 각 마다 key와 value를 serialization 하여 보냈으며
만약 브로커가 실패했거나 기타 등등으로 메시지를 보낼 수 없을때 defaultdict 를 사용하여 임시 저장하여 복구 했을때 다시 저장할 수 있게 만들었습니다 !! 비동기 방식으로 보내는 것이기 때문에 python의 AIO-Kafka 사용했습니다
근데 여기서도
그냥 속 편하게 linger_ms 사용했으면 오히려 좋았을 텐데 라는 아쉬움이 남습니다
max_batch_size와, linger_ms를 최적의 값으로 찾았으면 아니면 배치 지원도 되니
create_batch를 만들었을 것도 좋았을 거고...
유지보수도 좋을 테고..
카프카를 적극적으로 사용한 게 아닌
시간 or 배치 처리할 건데 카프카를 곁들인..
프로젝트를 유지보수할 때
충분히 고려해 볼 수 있을 거 같아요
이미 프로젝트는 다 끝난 상태입니다 미리 코드를 보고 싶으시면 다음 URL를 타주세요
많은 피드백 질타 부탁드리겠습니다
읽어주셔서 감사합니다..!!
https://github.com/MajorShareholderClub
MajorShareholderClub
MajorShareholderClub has 5 repositories available. Follow their code on GitHub.
github.com
'핀테크프로젝트 > 코인' 카테고리의 다른 글
다시 돌아온 코인 프로젝트 (2) (0) | 2024.11.17 |
---|---|
다시 돌아온 코인 프로젝트 (1) (1) | 2024.11.11 |
거래소별 API 분석 각 거래소 별 캔들 데이터는 무사한가 (0) | 2024.01.29 |
코인 모듈 그 첫번째 각 거래소 API 정형화 하기 (1) (0) | 2024.01.29 |