스토리가 계속 이어집니다 이전 편을 보고 오시면 더욱 이해가 좋습니다!
https://sky-develop.tistory.com/99
다시 한번 가보시죠!
kafka로 전송로 전처리 한 데이터를 다시 생성해서 보내고 나서 spark-streaming으로 처리 분석을 시작하였습니다
이때 제가 원했던 처리 분석은 다음과 같아요
Ticker 거래 분석
1. 거래소별 평균
2. 지역별 평균
3. 지역 간 거래 차익 계산
4. 거래소 별 차익 계산
5. 가격 변동성 과 거리량 급증 감지
orderbook 거래 분석
1. 각 지역별 통계
2. 모든 지역별 총합 통계 계산
카프카의 토픽을 소모에서 spark-streaming에서 처리를 하는 것이 최종 목표였기 때문에
spark-streaming에서 kafka를 연결해 주기 위해 두 가지 작업을 진행하였습니다
1. Spark-struct-streaming 스키마 인식 하기위해서 Schema 진행
2. 환경 설정 진행하였습니다
1. Schma 제작
거래소에서 들어오는건 시간 또는 배치 개수가 넘겼을 때 넘어오는 것이기 때문에 list로 감싸져 있습니다
[
{
"region": "Asia",
"market": "OKX",
"coin_symbol": "BTC",
"timestamp": 1729678813578.0,
"data": [
{
.... 데이터 스키마
}
]
}
... 다수개
]
[
{
"region": "Asia",
"market": "OKX",
"coin_symbol": "BTC",
"highest_bid": 66609.22,
"lowest_ask": 66619.98,
"spread": 10.759999999994761,
"total_bid_volume": 0.0016,
"total_ask_volume": 0.259115,
"timestamp": "2024-10-23 10:25:00.050791+00:00"
}
...
]
위에서부터 ticker 와 orderbook으로 하였습니다
Spark-struct-streaming 은 Schema를 만들고 그 스키마로 처리해야 하며 spark-streaming는
단일 쿼리 안에서 전부터 처리해야합니다 다중 조인 불가능
스키마를 하기 전 데이터 스키마를 보았을때 list안에 담겨 있으니 ArrayType를 사용하여 다음과 같은 스키마를 만들었습니다
1. data 안에 있는 데이터가 따로 있으니 .. ArrayType -> StructType ([ 스키마들... data(ArrayType)]) 이렇게 만들었고
2. orderbook 또한 Array(StructType)) 으로 만들었습니다!!
자세한건 다음 코드를 참고해 주세요..!!
https://github.com/MajorShareholderClub/CryptoStreamSpark/blob/main/src/schema/data_constructure.py
2. 환경 설정
spark-streaming을 kafka를 연결하기 위해서 topic를 연결할 환경 설정과 connect를 구비해줘야 합니다
config("spark.jars.packages", f"{SPAKR_PACKAGE}") 환경 설정을 진행해줘야 하며
저의 프로젝트에서는 kafka연결과 mysql 저장하는 것이 목표였기 때문에 2가지의 connection을 준비해주어야 했으며
https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10
https://mvnrepository.com/artifact/com.mysql/mysql-connector-j
두 가지의 package를 로드를 gradle로 받아주고 설정하였습니다
spark = (
SparkSession.builder
.appName("coin")
.master("local[*]")
.config("spark.jars.packages", f"{SPARK_PACKAGE}")
...
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.kafka.consumer.config.auto.offset.reset", "earliest")
....
.getOrCreate()
)
backpressure와 카프카에서 데이터를 읽어올 때 어떻게 잃어올 것인지 등등 configuration을 작성하였습니다
3. kafka와 mysql 설정
1. 카프카와 연결을 위해서 토픽과 부트스트랩 설정을 해주었습니다
2. 데이터 처리 후 어떻게 보내고 checkpoint와 함께 처리해 주는 함수를 작성하였습니다
2.1 checkpoint는 중단하였을 때 checkpoint를 통해서 재시작할 때 어디서부터 했는지 기록할 수 있기 때문에 지속성이 좋죠!
2.2 mysql 설정 또한 똑같이 진행해 주었고 trigger를 1분으로 하여 지속성을 어필해 보았습니다..!!
4. 그래서 분석은?
spark-streaming은 본래 배치를 위해서 만들어졌고 배치를 잘게 쪼개서 결국 Mirobatch로 보내 실시간처럼 엄청 빠른 거뿐이지 본적은 batch입니다 streaming을 흉내 낸 거처럼 사람들의 편의성을 위해 확장판 같은 존재 이죠
그럼에서 쓰이는 이유는 기존 spark 쿼리로 움직일 수 있다는 점
코드 이식성이 높아 높은 지연율에도 사용할 수 있었지 않았나라고 생각이 듭니다
4.1 kafka에서 가져올 때..
spark-streaming에서 kafka에 연결 후 토픽을 가지고 올 때 topic, key, value로 스키마가 형성되어 들어옵니다
이 중에서 제 스키마 같은 경우 ArrayType으로 되어 있기 때문에 explode로 풀어주어 안에 있는 data 스키마를 평탄화 작업을 해주는 것이 최종 목적이었죠
1. from_json으로 value를 등록하고 schama 적용해 주고
2. select(F.explode(~~~)~~~) 진행하여 Array를 풀어주어 평탄화를 진행하였고
평탄화한 작업들을 각 필요한 분석에 맞춰서 쿼리를 만들었습니다
4.2 그래서 쿼리는?
공통 사항
지연된 데이터까지 처리하기 위해서 withWaterMark를 5분으로 설정하였습니다
그다음 시간 별 window를 1분으로 30초 단위로 움직이면서 처리할 수 있도록 적용하여 처리하였습니다
Ticker
4.2.1 데이터 평탄화: 거래소 데이터를 분석에 적합하게 준비하기
첫 번째 단계로, 거래소의 중첩된 데이터 구조를 평탄화하여 접근성을 높입니다. ticker_flatten_exchange_data 함수는 지역, 마켓, 코인 심벌, 타임스탬프, 가격 데이터를 추출하여 다음 단계에서 사용할 수 있는 형태로 변환합니다. Unix 타임스탬프를 가독성 높은 일반 시간 형식으로 변환하여 시간 기반의 집계 작업을 더욱 쉽게 할 수 있도록 합니다.
4.2.2 시간 기반 지표 계산: 주요 가격 및 거래량 지표 계산
calculate_time_based_metrics 함수에서는 각 코인 심볼 및 지역별로 시간 창을 설정하여 평균 가격, 최고 및 최저 가격, 총 거래량, 가격 변동성, 이동 평균 가격 및 거래량과 같은 지표를 계산합니다. 이러한 지표들은 주로 가격 동향을 분석하고, 시장의 전반적인 변동성을 체크하였습니다
4.2.3 지역 간 차익 거래 기회 계산: 지역별 가격 차이를 통한 차익 계산
다음으로는 calculate_arbitrage 함수에서 암호화폐 가격의 지역 간 차이를 계산하여 차익 거래 기회를 탐색합니다. 예를 들어, 한국(KR), 아시아, 글로벌 지역 간의 평균 가격을 비교하여 차이를 계산하며, 이를 통해 지역 간의 거래 차익 가능성을 평가합니다. 각 지역의 평균 가격을 바탕으로 두 지역 간 가격 차익과 퍼센트 차이를 구하여 차익 기회를 판단하였습니다
4.2.4 거래소별 차익 거래 기회 계산: 거래소 간 가격 차이를 통한 차익 기회 탐색
이와 유사하게, calculate_market_arbitrage 함수는 거래소별로 거래 가격의 차이를 계산합니다. 여기서는 특정 거래소, 예를 들어 업비트와 바이낸스 또는 빗썸과 바이낸스의 가격 차익을 파악합니다. 이를 통해 한국의 주요 거래소 간 가격 차이뿐만 아니라, 글로벌 거래소와의 차익 기회도 식별할 수 있습니다. 각 거래소별 가격을 비교하여 차익 기회가 있는지 확인하고, 거래소 간 가격 차이를 퍼센트로 나타내어 차익 가능성을 더욱 명확하게 드러냅니다.
4.2.5 가격 및 거래량 신호 감지: 가격 변동 및 거래량 급증을 실시간으로 알림
마지막으로 detect_price_volume_signals 함수는 거래 가격과 거래량의 변화를 분석하여 특정 임계값 이상일 때 알림을 생성합니다. 가격의 경우 변동률이 ±2% 이상일 경우 ‘PRICE_ALERT’를, 거래량이 2.8% 이상 급증할 경우 ‘VOLUME_SURGE’를 생성하여 거래에 대한 즉각적인 인사이트를 제공하였습니다. 이러한 신호는 거래자에게 중요한 가격 급등 또는 급락, 거래량 급증 등의 트렌드를 실시간으로 파악할 수 있게 하였습니다
Orderbook
4.2.1 데이터 평탄화: 거래소 데이터를 분석에 적합하게 준비하기
ticker 데이터와 똑같이 분석하기 편하게 평탄화를 진행하였습니다
4.2.2 데이터 평균 계산: orderbook 평균 계산
bid_ask_avg 함수로 평균을 계산하고 0 보타 큰 값을 고려해서 반올림합니다
4.2.3 지역별 통계 계산
각 지역별로 통계를 계산 1분 윈도 사용하여 최고 입찰가 최저 입찰량 총 요청량의 평균 구하고 특정 조건을 만독하는 데이터만 필터링
4.2.4 모든 지역의 통계 계산
모든 지역의 통합 통계 계산 합니다
자세한 함수를 보고 싶으면 아래 링크를 타주세요!
https://github.com/MajorShareholderClub/CryptoStreamSpark/tree/main/src/preprocess
지연된 데이터에서 처리되지 못한 노이즈 데이터일 경우
timestamp가 1970 년 또는 2024년 0101로 나왔기에
특정 값이 아닌 모든 값이 isnull이 아니거나 timestamp가 현 날짜와 맞는지를
filter로 check 하여 노이즈를 필터링하였습니다
5. 그래서 어떻게 보냈는데?
이거는 깔끔하게 5줄 이내로 설명할 수 있습니다
1. spark-streaming으로 처리한 스키마를 정의
2. to_json으로 JSON화 진행
3. mysql로 저장할 경우 -> from_json과 전처리한 스키마와 함께 전송
4. kafka는 보낼 토픽 명과 함께 전송
5. checkpoint 생성
모든 코드는 여기 있습니다..!!
https://github.com/MajorShareholderClub/CryptoStreamSpark
이미 프로젝트는 다 끝난 상태입니다 미리 코드를 보고 싶으시면 다음 URL를 타주세요
많은 피드백 질타 부탁드리겠습니다
읽어주셔서 감사합니다..!!
https://github.com/MajorShareholderClub
'핀테크프로젝트 > 코인' 카테고리의 다른 글
다시 돌아온 코인 프로젝트 (1) (1) | 2024.11.11 |
---|---|
다시 돌아온 코인 프로젝트 (0) (1) | 2024.11.09 |
거래소별 API 분석 각 거래소 별 캔들 데이터는 무사한가 (0) | 2024.01.29 |
코인 모듈 그 첫번째 각 거래소 API 정형화 하기 (1) (0) | 2024.01.29 |