본문 바로가기

책/실시간 데이터 파이프라인 아키텍처

Chapter 1. 스트리밍 데이터 소개

반응형

치지직...!

 

 

 

1. 스트리밍 데이터 소개

2013년 5월, 스칸다니비아 연구 센터에서 발간한 보고서에 따르면

전 세계에 존재하는 90% 가량의 데이터는 자닌 2년 안에 생성되었다고 추정했다

 

 

2014년 EMC가 IDC와 함께 파트너십을 맺고 디지털 유니버스 연구를 발표 했는데 

디지털 유니버스의 규모가 2년마다 두 배로 증가하고 있으며
2013년부터 2020년 사이에 4조 4천억 기가바이트에서 44조 기가바이트로 10배 증가할 것으로 보고 했다 



ㄷ..딘땨류..

 

 

빅데이터라는개념은 오랜기간 존재했지만 수집, 적재, 분석하는 기술을 갖게 된 것은 그리 오래되지 않았다

이로써 빅데이터를 기반으로 빠른 의사 결정을 내리고 소비자와 기업 주변에서 일어나는일을 서비스에서 활용하여 사용할 수 있게 되었다

하지만 기술만 있다고 빅데이터를 잘 다룰 수 있는 것은 아니다.

 

 

소셜 미디어의 데이터부터 마트에 있는 고객의 위치 센서 데이터까지 지금 바로 생성되는 데이터를 다루는 세상이 왔다.

오늘날 실시간으로 생성된 데이터가 어떻게 사용되고 있는지는 사례로 추가로 말하지 않아도 다 알것이다..

 

그러나 현재뿐만 아니라 미래에도 사용될 이 실시간 서비스의 아키텍처를
어떻게 구성해야 할지 아는 사람은 거의없다

하지만 난 해낼꺼거든

 

 

1.1 실시간 시스템이란

실시간 시스템 Real-time systems과 실시간 컴퓨팅 Real-time computing이라는 단어는 수십 년 동안 사용되어왔고,

인터넷의 출현으로 더욱 활성화되었다 

그러나 실시간 시스템이라는 모호한 용어에 대한 논쟁이 종종이어졌다.
실시간 시스템이란 무엇일까?

 

실시간 시스템은 하드, 소프트 니어 로 구분된다

하드 리얼타임과 소프트 리얼타임 대한 정의는  Real-Time System에서 확인할수 있으며, 니어 리얼타임에 대한정의는 

Portland Pattern Repository의 위치에서 살펴볼 수 있다 

실시간 시스템의 정의는 '실시간 또는 실시간보다 약간 느린 데이터 처리 시스템이나 연관된 부분'

실시간 시스템의 분류는 다음과 같다
하드 리얼 타임 심박조율기 등 수 마이크로초 ~ 수밀리초 없음 - 전체 시스템 오류, 인명 손실 우려
소프트 리얼 타임 항공사 예약 시스템, 온라인 주식시세 수 밀리초 ~ 수초 낮음 - 시스템 오류 없음, 인명 손실 없음
니어 리얼 타임 스카이프 화상통화, 스마트 홈 수 초 - 수 분 높음-시스템 오류 없음, 인명 손실 우려 없음

 

 

하드 리얼타임은 상당히 쉽게 구분할 수 있는데, 보통 임베디드에서 많이 활용되고 있으며 시간 요구사항이 매우 엄격하다

만약 시간 요구사항이 만족되지 않는다면 전체 시스템의 장애 상황으로 판단한다,

하드 리얼타임 시스템의 설계 및 규현에 대한 내용은 이미 많은 곳에서 연구되고 있으며 이 책에서 다루고자 하는 내용은과는 벗어난다 

 

 

 

 

우리가 시스템을 만들 때 소프트 리얼타임 시스템으로 구현할지 아니면 니어 리얼타임 시스템으로 구현할지 
결정하는 것은 매우 중요하다 왜냐하면 이 정의가 겹치게 되면 종종 혼동이 발생하기 때문이다 

 

 

 

관련된 예시를 보자 

  1. 당신이 팔로우하고 있는 누군가가 트위터에 글을 올리면, 잠시 후 트위터 앱에서 그 글을 보게 된다.
  2. 실시간 항공편 추적 서비스인 플레이트웨어를 통해 뉴욕 주변의 항공편을 조회한다.
  3. 나스닥 실시간 주식 시세 어플리케이션을 사용하여 관심 있는 주식을 추적한다

이 시스템들은 비슷한 부분이 없어 보이는 서비스들이지만 그림 처럼 간단하게 표현할 수 있음

 

이전에 언급한 예시들은 모두 몇 초간 지연이 발생할 수 있다.

지연이 발생하더라도 누군가의 생명에 위험을 끼치지 않으면서도 일시적으로 몇 분간 늦어질 수 있다.

 

이런 지연 현상이 전체 시스템의 오류라고 판단하기 어렵지 않을까?

 

당신이 트위터에서 누군가가 올린 글을 본다면 그것은 소프트 리얼타임일까 니어 리얼타임일까?
실시간 항공편 현황이나 실시간 주식 시세는 어떨까?
또는 커피숍이나 항공기에서 사용하는 느린 와이파이 때문에 일부 데이터가 느리게 표시될 수도 있다 


결국 소프트 리얼타임과 니어 리얼타임 시스템의 경계는 매우 불명확하고 주관적이라는 것을 알 수 있다 
이에 더해 컨슈머의 상태에 따라 정의가 달라질 수도 있다.


데이터를 보야주는 컨슈머를 구성도에서 제외하여 예제를 약간 변경하면 다음과 같이 서비스가 정리됨 

  1. 트위터에 글이 게시된다.
  2. 플라이트웨어의 실시간 항공편 추적 서비스가 항공편을 추적하고 있다.
  3. 나스닥 실시간 시세 어플리케이션이 주식 시세를 추적한다

 

잠깐 생각해보면 ... 이런 시스템이 내부적으로 어떻게 동작하는지 알 수는 없겠지만 공통적으로 요구하는 본질에 대한 질문은 다음과 같다.

데이터가 처리되어 사용될 준비가 된 데이터가 있다면 
 이 시점은 소프트 리얼타임인가 니어 리얼타임인가..?

데이터 처리에만 집중하고 컨슈머를 제거하면 정답이 바뀔 수 있을까?
다음과 같이 분류하는 것을 예로 들 수 있다..


트위터로 작성된 글 

vs

내가 팔로우하는 사람과 트위터에서 보고 있는 글 

이렇게 다르게 분류한 이유는 뭘까? 트위터 앱에서 글을 볼 때는 지연이 발생했기 때문일까?

결국 소프트나 니어나 구분짓기 매우 어렵다는것임 이런 시스템을 실시간시스템으로 부르는 것..

 

 

1.2 실시간 시스템과 스트리밍 시스템의 차이점

소프트 리얼타임이나 니어 리얼타임으로 불리는 이 시스템은
기본적으로 데이터를 사용하는 애플리케이션이 일정 시점 이후에 데이터를 가져가는 모습을 보임

 

이렇게 서로 다른 시스템을 비교하고 구분할 때 많은 사람들이 혼란스러울 수 있음 

그림[1.3]

 

1.3 그림을 잘 보면 왼쪽에는 하드리얼타임이 아닌 서비스 또는 연산을 하고 있는 시스템이 있고 오른쪽에는 데이터를 소비하는 클라이어트가 있음

 

스트리밍 데이터 시스템이란?
대부분 시스템에서는데이터 산출 시스템을 하드 리얼 타임 방식으로 운영하지 않는다.
왜냐하면 클라이언트가 네트워크 상태의 이슈, 어플리케이션 설계의 영향으로 데이터를 소비하지 못할 수 있고 또는 클라이언트 어플리케이션이 종료된 상태일 수도 있기 때문이다

다시 말해

우리는 하드 리얼 타임이 아닌 서버의 실시간 데이터를 클라이언트가 원할때 사용하는 시스템을원한다 
이것이 바로 스트리밍 데이터 시스템이다 

 

 

 

다음의 내가 만든 예를 확인해보겠다 

다음 Orgnaization 의 Repository는 Socket 과 Rest를 두개 다 지원하는 3~4개의 거래소에서 실시간으로 데이터를 가지고 오는 형식이다 언어는 파이썬으로 제작했다

https://github.com/PropertyCoinDashboard/CoinInstrucstureStream

 

GitHub - PropertyCoinDashboard/CoinInstrucstureStream

Contribute to PropertyCoinDashboard/CoinInstrucstureStream development by creating an account on GitHub.

github.com

 

 

모든 시스템을 리뷰는 못하지만 핵심만 봤을때 While True polling 방식을 살펴보자 물론 좀더 나은 가격대를 살펴볼꺼면 웹소켓이 낫다 

그 이유는 Rest는 호출하는 시점에서 확인이 되었을때 몇초 후 데이터가 될 수 있으니 진정하게 병목이 없으려면 웹소켓이 나을 수 있다 

"""
Coin async present price kafka data streaming 
"""
import asyncio
from pathlib import Path
from asyncio.exceptions import CancelledError
from typing import Any, Coroutine

from pydantic.errors import PydanticUserError
from pydantic_core._pydantic_core import ValidationError


from coin.core.market.data_format import CoinMarket, CoinMarketData
from coin.core.settings.properties import market_setting
from coin.core.settings.create_log import SocketLogCustomer
from coin.core.data_mq.data_interaction import produce_sending
from coin.streaming.java_code_intersection import kafka_sending

present_path = Path(__file__).parent


class CoinPresentPriceReponseAPI:
    """
    Coin present price market place
    """

    def __init__(self) -> None:
        self.market_env = market_setting("rest")
        self.logging = SocketLogCustomer()

    async def coin_present_architecture(
        self,
        market: str,
        time: str,
        coin_symbol: str,
        api: Any,
        data: tuple[str, str, str, str, str, str],
    ) -> Coroutine[Any, Any, dict[str, Any] | None]:
        """
        Coin present price architecture

        Args:
            market (str): marketname-coinsymbol
            coin_symbol (str): coinsymbol("BTC".."EHT"...)
            api (Any): coin_apis.py in class
            parameter (tuple[str * 6]): search parameter

        Returns:
            CoinMarketData: pydantic in JSON transformation
        """
        try:
            api_response = api.get_coin_present_price(coin_name=coin_symbol.upper())
            market_time: int = api_response[time]

            return CoinMarketData.from_api(
                market=market,
                time=market_time,
                coin_symbol=coin_symbol,
                api=api_response,
                data=data,
            ).model_dump()
        except (PydanticUserError, ValidationError) as error:
            self.logging.error_log("Exception occurred: %s", error)

    async def __get_market_present_price(
        self, market: str, coin_symbol: str
    ) -> Coroutine[Any, Any, dict[str, Any] | None]:
        """
        Get market present price

        Args:
            market (str): market name
            coin_symbol (str): coin symbol

        Returns:
            str: market data as a string
        """
        market_info = self.market_env[market]
        return await self.coin_present_architecture(
            market=f"{market}-{coin_symbol.upper()}",
            coin_symbol=coin_symbol,
            time=market_info["timestamp"],
            api=market_info["api"],
            data=market_info["parameter"],
        )

    async def total_pull_request(self, coin_symbol: str, topic_name: str) -> None:
        """
        Total Pull request

        Args:
            coin_symbol (str): coin symbol
            topic_name (str): topic name
        """
        while True:
            # await asyncio.sleep(1)
            try:
                tasks: list[Coroutine[Any, Any, dict[str, Any]]] = [
                    self.__get_market_present_price(
                        market=market, coin_symbol=coin_symbol
                    )
                    for market in self.market_env
                ]
                market_result = await asyncio.gather(*tasks, return_exceptions=True)
                # 스키마 정의
                schema: dict[str, dict[str, Any]] = CoinMarket(
                    **dict(zip(self.market_env.keys(), market_result))
                ).model_dump(mode="json")
                print(schema)
                await kafka_sending(topic_name, message=schema)
            except (TimeoutError, CancelledError, ValidationError) as error:
                self.logging.error_log("Data transmission failed: %s", error)

 

 

Pydantic으로 정형화 하는 데이터를 While True polling 방식으로 계속 호출하여 데이터를 정형화하는 연산을통해서 카프카로 보내고 있는형식이다 해당 연산되어 최종적으로 보내지는 데이터 형식은 다음과 같다 

{
    "market": "upbit-BTC",
    "time": 1689659170616,
    "coin_symbol": "BTC",
    "data": {
        "opening_price": 38761000.0,
        "trade_price": 38100000.0
        "high_price": 38828000.0,
        "low_price": 38470000.0,
        "prev_closing_price": 38742000.0,
        "acc_trade_volume_24h": 2754.0481778
    }
}

 

클라이언트가 필요로 하는 시점에 데이터를 가져가서 처리할 수 있도록 구현이 되어 있는 점이다 
이런 것을 소프트 리얼 타임이나 니어 리얼타임이 아닌 스트리밍이라고 부를 수 있다

하지만 지금 보여준것은 몇 초 후 데이터일 가능성도 있기 때문에 니어 리얼타임이라고 말할수 있겠는데
이것또한 
데이터를 생성하는 시점부터 필요할때마다 계속 호출해서 데이터를 가지고 오기 때문에
니어 리얼타임 또한 아니 소프트도 아니고 그렇기 때문에 통으로 잡아서
스트리밍 데이터 시스템 처리라고 하는것이다 

 

 

 

데이터가 필요한시점에 가져갈 수 있는 시스템 설계에 집중할 수 있게된다.

스트리밍 데이터 서버와 스트리밍 클라이언트를 구별하여 다시 처음 이야기한 예시에 스트리밍 에 대한 개념을 적용해보자 

 

  1. 누군가 트위터에 쓴 글을 쓰면 잠시 후 당신의 트위터 앱에서 글을 볼 수 있다.
    1. 스트리밍 시스템은 트위터에 작성된 글을 처리한다 그리고 잠시 후 또는 몇 초 또는 몇 시간 뒤 어플리케이션 클라이언트에서 글을 조회할 때 데이터를 가져간다.
  2. 플라이트에웨어에서 제공하는 실시간 항공 추적 서비스를 사용하여 뉴욕 주분의 항공기를 조회할 수 있다
    1. 스트리밍 시스템은 최근의 항공기 상태 데이터를 처리한다. 이후에 클라이언트에서 특정 공항 또는 항공기에 대해 조회할 때 데이터를 가져감
  3. 즐겨찾기 한 주식들을 나스닥 실시간 시세 어플리케이션에서 조회한다
    1. 스트리밍 시스템은 주식들의 실시간 시세를 처리함 이후에 클아이너트들에서 특정 주식을 조회할 때 데이터를 가져감 

 

결국 위에서도 누차 얘기했지만 소프트 니어 이런걸 더이상 걱정할 필요가 없다는것임 클라이언트가 데이터를 요청하는 그 시점에 서버에서 어떤 데이터를 어떻게 전달할지 구현하는 것에 집중하는것이 더 중요함 

 

이렇게 데이터가 필요한 시점에 전달이 가능한 시스템을 인더모먼트 시스템[In-the-moment system]이라고 할 수 있음 

내가 방금 소개한 이 프로젝트 또한 인더모먼트 시스템이라고 할 수 있다

반응형