본문 바로가기

핀테크프로젝트/코인

다시 돌아온 코인 프로젝트 (2)

반응형

 

 

 

스토리가 계속 이어집니다 이전 편을 보고 오시면 더욱 이해가 좋습니다! 

https://sky-develop.tistory.com/99

 

다시 돌아온 코인 프로젝트 (1)

스토리가 계속 이어집니다 이전 편을 보고 오시면 더욱 이해가 좋습니다! https://sky-develop.tistory.com/98 다시 돌아온 코인 프로젝트 (0)오랜만에 포슽잉 이예요 리프레쉬 이후 다시 씽나게 저의 목

sky-develop.tistory.com

 

 

 

다시 한번 가보시죠! 

 

 

 

 

kafka로 전송로 전처리 한 데이터를 다시 생성해서 보내고 나서 spark-streaming으로 처리 분석을 시작하였습니다 

이때 제가 원했던 처리 분석은 다음과 같아요 

 

Ticker 거래 분석

1. 거래소별 평균 

2. 지역별 평균 

3. 지역 간 거래 차익 계산

4. 거래소 별 차익 계산

5. 가격 변동성 과 거리량 급증 감지 

 

orderbook 거래 분석

1. 각 지역별 통계

2. 모든 지역별 총합 통계 계산 

 

 

카프카의 토픽을 소모에서 spark-streaming에서 처리를 하는 것이 최종 목표였기 때문에 

spark-streaming에서 kafka를 연결해 주기 위해 두 가지 작업을 진행하였습니다 

1. Spark-struct-streaming 스키마 인식 하기위해서 Schema 진행 

2. 환경 설정 진행하였습니다 

 

 

1. Schma 제작 

거래소에서 들어오는건 시간 또는 배치 개수가 넘겼을 때 넘어오는 것이기 때문에 list로 감싸져 있습니다 

[
    {
      "region": "Asia",
      "market": "OKX",
      "coin_symbol": "BTC",
      "timestamp": 1729678813578.0,
      "data": [
        {
			.... 데이터 스키마
        }
      ]
    }
    ... 다수개
]
[
	{
          "region": "Asia",
          "market": "OKX",
          "coin_symbol": "BTC",
          "highest_bid": 66609.22,
          "lowest_ask": 66619.98,
          "spread": 10.759999999994761,
          "total_bid_volume": 0.0016,
          "total_ask_volume": 0.259115,
          "timestamp": "2024-10-23 10:25:00.050791+00:00"
	}
    ...
]

 

위에서부터 ticker 와 orderbook으로 하였습니다 

 

Spark-struct-streaming 은 Schema를 만들고 그 스키마로 처리해야 하며 spark-streaming는 

단일 쿼리 안에서 전부터 처리해야합니다 다중 조인 불가능 

 

 

스키마를 하기 전 데이터 스키마를 보았을때 list안에 담겨 있으니 ArrayType를 사용하여 다음과 같은 스키마를 만들었습니다 

1. data 안에 있는 데이터가 따로 있으니 .. ArrayType -> StructType ([ 스키마들... data(ArrayType)]) 이렇게 만들었고 

2. orderbook 또한 Array(StructType)) 으로 만들었습니다!! 

 

자세한건 다음 코드를 참고해 주세요..!!

https://github.com/MajorShareholderClub/CryptoStreamSpark/blob/main/src/schema/data_constructure.py

 

CryptoStreamSpark/src/schema/data_constructure.py at main · MajorShareholderClub/CryptoStreamSpark

Contribute to MajorShareholderClub/CryptoStreamSpark development by creating an account on GitHub.

github.com

 

 

2. 환경 설정 

spark-streaming을 kafka를 연결하기 위해서 topic를 연결할 환경 설정과 connect를 구비해줘야 합니다 

config("spark.jars.packages", f"{SPAKR_PACKAGE}") 환경 설정을 진행해줘야 하며 

저의 프로젝트에서는 kafka연결과 mysql 저장하는 것이 목표였기 때문에 2가지의 connection을 준비해주어야 했으며 

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10

https://mvnrepository.com/artifact/com.mysql/mysql-connector-j

두 가지의 package를 로드를 gradle로 받아주고 설정하였습니다 

    spark = (
        SparkSession.builder
        .appName("coin")
        .master("local[*]")
        .config("spark.jars.packages", f"{SPARK_PACKAGE}")
		...
        .config("spark.streaming.backpressure.enabled", "true")
        .config("spark.streaming.kafka.consumer.config.auto.offset.reset", "earliest")
		....
        .getOrCreate()
    )

backpressure와 카프카에서 데이터를 읽어올 때 어떻게 잃어올 것인지 등등 configuration을 작성하였습니다 

 

 

 

 

3. kafka와 mysql 설정 

1. 카프카와 연결을 위해서 토픽과 부트스트랩 설정을 해주었습니다 

2. 데이터 처리 후 어떻게 보내고 checkpoint와 함께 처리해 주는 함수를 작성하였습니다 

    2.1 checkpoint는 중단하였을 때 checkpoint를 통해서 재시작할 때 어디서부터 했는지 기록할 수 있기 때문에 지속성이 좋죠! 

    2.2 mysql 설정 또한 똑같이 진행해 주었고 trigger를 1분으로 하여 지속성을 어필해 보았습니다..!! 

https://github.com/MajorShareholderClub/CryptoStreamSpark/blob/main/src/setting/streaming_connection.py

 

CryptoStreamSpark/src/setting/streaming_connection.py at main · MajorShareholderClub/CryptoStreamSpark

Contribute to MajorShareholderClub/CryptoStreamSpark development by creating an account on GitHub.

github.com

 

 

 

 

4. 그래서 분석은?

spark-streaming은 본래 배치를 위해서 만들어졌고 배치를 잘게 쪼개서 결국 Mirobatch로 보내 실시간처럼 엄청 빠른 거뿐이지 본적은 batch입니다 streaming을 흉내 낸 거처럼 사람들의 편의성을 위해 확장판 같은 존재 이죠 

그럼에서 쓰이는 이유는 기존 spark 쿼리로 움직일 수 있다는 점 
코드 이식성이 높아 높은 지연율에도 사용할 수 있었지 않았나라고 생각이 듭니다 

 

 

4.1 kafka에서 가져올 때..

spark-streaming에서 kafka에 연결 후 토픽을 가지고 올 때 topic, key, value로 스키마가 형성되어 들어옵니다 

이 중에서 제 스키마 같은 경우 ArrayType으로 되어 있기 때문에 explode로 풀어주어 안에 있는 data 스키마를 평탄화 작업을 해주는 것이 최종 목적이었죠 

 

1. from_json으로 value를 등록하고 schama 적용해 주고 

2. select(F.explode(~~~)~~~) 진행하여 Array를 풀어주어 평탄화를 진행하였고 

평탄화한 작업들을 각 필요한 분석에 맞춰서 쿼리를 만들었습니다 

 

4.2 그래서 쿼리는?

공통 사항 

지연된 데이터까지 처리하기 위해서 withWaterMark를 5분으로 설정하였습니다 

그다음 시간 별 window를 1분으로 30초 단위로 움직이면서 처리할 수 있도록 적용하여 처리하였습니다 

Ticker

4.2.1 데이터 평탄화: 거래소 데이터를 분석에 적합하게 준비하기

첫 번째 단계로, 거래소의 중첩된 데이터 구조를 평탄화하여 접근성을 높입니다. ticker_flatten_exchange_data 함수는 지역, 마켓, 코인 심벌, 타임스탬프, 가격 데이터를 추출하여 다음 단계에서 사용할 수 있는 형태로 변환합니다. Unix 타임스탬프를 가독성 높은 일반 시간 형식으로 변환하여 시간 기반의 집계 작업을 더욱 쉽게 할 수 있도록 합니다.

 

4.2.2 시간 기반 지표 계산: 주요 가격 및 거래량 지표 계산

calculate_time_based_metrics 함수에서는 각 코인 심볼 및 지역별로 시간 창을 설정하여 평균 가격, 최고 및 최저 가격, 총 거래량, 가격 변동성, 이동 평균 가격 및 거래량과 같은 지표를 계산합니다. 이러한 지표들은 주로 가격 동향을 분석하고, 시장의 전반적인 변동성을 체크하였습니다

 

4.2.3 지역 간 차익 거래 기회 계산: 지역별 가격 차이를 통한 차익 계산

다음으로는 calculate_arbitrage 함수에서 암호화폐 가격의 지역 간 차이를 계산하여 차익 거래 기회를 탐색합니다. 예를 들어, 한국(KR), 아시아, 글로벌 지역 간의 평균 가격을 비교하여 차이를 계산하며, 이를 통해 지역 간의 거래 차익 가능성을 평가합니다. 각 지역의 평균 가격을 바탕으로 두 지역 간 가격 차익과 퍼센트 차이를 구하여 차익 기회를 판단하였습니다

 

4.2.4 거래소별 차익 거래 기회 계산: 거래소 간 가격 차이를 통한 차익 기회 탐색

이와 유사하게, calculate_market_arbitrage 함수는 거래소별로 거래 가격의 차이를 계산합니다. 여기서는 특정 거래소, 예를 들어 업비트와 바이낸스 또는 빗썸과 바이낸스의 가격 차익을 파악합니다. 이를 통해 한국의 주요 거래소 간 가격 차이뿐만 아니라, 글로벌 거래소와의 차익 기회도 식별할 수 있습니다. 각 거래소별 가격을 비교하여 차익 기회가 있는지 확인하고, 거래소 간 가격 차이를 퍼센트로 나타내어 차익 가능성을 더욱 명확하게 드러냅니다.

 

4.2.5 가격 및 거래량 신호 감지: 가격 변동 및 거래량 급증을 실시간으로 알림

마지막으로 detect_price_volume_signals 함수는 거래 가격과 거래량의 변화를 분석하여 특정 임계값 이상일 때 알림을 생성합니다. 가격의 경우 변동률이 ±2% 이상일 경우 ‘PRICE_ALERT’를, 거래량이 2.8% 이상 급증할 경우 ‘VOLUME_SURGE’를 생성하여 거래에 대한 즉각적인 인사이트를 제공하였습니다. 이러한 신호는 거래자에게 중요한 가격 급등 또는 급락, 거래량 급증 등의 트렌드를 실시간으로 파악할 수 있게 하였습니다

Orderbook

4.2.1 데이터 평탄화: 거래소 데이터를 분석에 적합하게 준비하기

ticker 데이터와  똑같이 분석하기 편하게 평탄화를 진행하였습니다 

 

4.2.2 데이터 평균 계산: orderbook 평균 계산 

bid_ask_avg 함수로 평균을 계산하고 0 보타 큰 값을 고려해서 반올림합니다 

 

4.2.3 지역별 통계 계산

각 지역별로 통계를 계산 1분 윈도 사용하여 최고 입찰가 최저 입찰량 총 요청량의 평균 구하고 특정 조건을 만독하는 데이터만 필터링

 

4.2.4 모든 지역의 통계 계산 

모든 지역의 통합 통계 계산 합니다 

 

자세한 함수를 보고 싶으면 아래 링크를 타주세요!

https://github.com/MajorShareholderClub/CryptoStreamSpark/tree/main/src/preprocess

 

CryptoStreamSpark/src/preprocess at main · MajorShareholderClub/CryptoStreamSpark

Contribute to MajorShareholderClub/CryptoStreamSpark development by creating an account on GitHub.

github.com

 

지연된 데이터에서 처리되지 못한 노이즈 데이터일 경우 
timestamp가 1970 년 또는 2024년 0101로 나왔기에 
특정 값이 아닌 모든 값이 isnull이 아니거나 timestamp가 현 날짜와 맞는지를 
filter로 check 하여 노이즈를 필터링하였습니다 

5. 그래서 어떻게 보냈는데?

이거는 깔끔하게 5줄 이내로 설명할 수 있습니다 

1. spark-streaming으로 처리한 스키마를 정의 

2. to_json으로 JSON화 진행 

3. mysql로 저장할 경우 -> from_json과 전처리한 스키마와 함께 전송 

4. kafka는 보낼 토픽 명과 함께 전송 

5. checkpoint 생성 

https://github.com/MajorShareholderClub/CryptoStreamSpark/blob/main/src/setting/streaming_connection.py

 

CryptoStreamSpark/src/setting/streaming_connection.py at main · MajorShareholderClub/CryptoStreamSpark

Contribute to MajorShareholderClub/CryptoStreamSpark development by creating an account on GitHub.

github.com

 

 

 

 

모든 코드는 여기 있습니다..!!

https://github.com/MajorShareholderClub/CryptoStreamSpark

 

GitHub - MajorShareholderClub/CryptoStreamSpark

Contribute to MajorShareholderClub/CryptoStreamSpark development by creating an account on GitHub.

github.com

 

 

 

 

이미 프로젝트는 다 끝난 상태입니다 미리 코드를 보고 싶으시면 다음 URL를 타주세요
많은 피드백 질타 부탁드리겠습니다

읽어주셔서 감사합니다..!! 

 

정말 감사합니다..!!

 

 

https://github.com/MajorShareholderClub

 

MajorShareholderClub

MajorShareholderClub has 5 repositories available. Follow their code on GitHub.

github.com

 

반응형