본문 바로가기

책/실시간 데이터 파이프라인 아키텍처

Chapter 2. 클라이언트에서 데이터 가져오기: 데이터 수집

반응형

 

 

 

우리가 다루는 첫 번째 단계인 수집 단계(Collection tier)는 우리의 스트리밍 시스템으로 데이터를 입수하는 지점이다.

수집 단계를 강조한 스트리밍 데이터 아키텍처는 다음 과 같다.

 

 

 

2.1 일반적인 통신 패턴 

오늘날, 클라이언트에서 생성되는 데이터를 시스템에 입수하기 위해(또는 수집 단계에서 서버가 직접 데이터를 Pulling 하기 위해) 몇 안되는 프로토콜을 사용한다 

만물 인터넷의 등장으로 다양한 통신 패턴들이 존재하겠지만. 다음과 같은 몇 가지 통신 패턴 중 한 가지 패턴을 선택하여 통신하는 것이 일반적이다.
  • 요청/응답 패턴 (Request/response pattern) 
  • 발행/구독 패턴 (Publish/subscribe pattern)
  • 단방향 패턴 (One-way pattern)
  • 요청/확인 응답 패턴 (Request/acknowledge pattern)
  • 스트림 패턴 (Stream pattern)

 

데이터를 모으고 사용할 때 어떤 패턴을 적용할지 논의해보자.

2.1.1 요청/응답 패턴 

가장 간단한 패턴이다. 서버가 지연 없이 즉각적으로 응답하여 작업을 완료하는 것을 목표로 할 때 사용된다.

우리가 웹 브러우저에서 정보를 찾기 위해 인터넷을 하고나 스마트폰을 사용하는 동안 이 패턴이 항상 활용한다

동작하는 방향은 다음과 같다.

 

  1. 클라이언트 어플리케이션이 서버에 명령 (메시지 전송, 구직 지원, 항공권 구매 등)을 내리거나 데이터를 요청(구글링, 현재 위치 날씨 조회)을 한다.
  2. 서버는 클라이언트로 응답을 보낸다.

 

동기로 동작하는 요청/응답 패턴은 간단하게 구현할 수 있으나 서버에서 응답을 받기 위해 클라리언트는 기다려야 하는 시간이 반드시 필요하다는 단점이 있다. 최근에 구현된 서버들이 이런 단점을 그대로 가지고 있다면 사용자들에게 매우 불쾌한 경험으로 이어질 수 있다.

인터넷 등 데이터를 요구하는 곳에 모든 리소스가 동기로 처리되면? 끔직하다 마치 지구가 동기적으로 움직였으면 지금쯤 삼국시대도 못왔을꺼다..

 

이런 한계를 해결하기 위해 일반적으로 사용되는 3가지 전략이 있다.

  1. 클라이언트에서
  2. 서버에서
  3. 두 부분 모두에서 취하는 전략이다 

우선 클라이언트에서 취할 수 있는 전략을 알아보자 적용할 수 있는 가장 간단한 전략은 요청을 비동기로 수행하는 것이다.

import aiohttp
import asyncio
from typing import Coroutine, Any


async def fetch_data(url: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()
        

async def main() -> None:
    url = "https://jsonplaceholder.typicode.com/posts/1"  # 예제 URL, 원하는 URL로 변경 가능
    
    # gather 사용 5번 요청 
    tasks: list[Coroutine[Any, Any, str]] = [fetch_data(url) for _ in range(5)]
    results: Any = await asyncio.gather(*tasks)
    
    # 결과 출력
    for i, result in enumerate(results, 1):
        print(f"Response {i}: {result}")
        


if __name__ == "__main__":
    asyncio.run(main())

 

python으로 간단하게 서버에 비동기로 요청하는 클라이언트 코드를 설계해봤다 

url에 5번을 요청하여 gather로 요청하는 방식으로 진행하였다 아주 간단한 예제이므로 실 상황과는 전혀 무관한 코드이지만 예제로서는 괜찮은 코드가 되었다. tasks에서 fetch_data 를 5번을 비동기로 날리면 5번을 생성하면서 생성과 동시에 요청하니 

 

클라이언트는 지속적으로 서버로 (이 코드에서는 5번) 요청을 날리고 서버에서 응답이 오기 전까지 다른 일을 수행한다.

현대 브라우저는 이 방식을 주로 사용함 브라우저는

많은 수의 리소스를 받기 위해 비동기 요청을 서버로 날리고 응답이 도착할 때까지 기다린다. 

 

클라이언트는 이를 통해 서버로부터 응답을 기다리는 시간을 최소화할 수 있다. 

 

 

결과적으로 클라이언트는 단위 시간동안 최대한의 일을 할 수 있게 되는 것이다.
전 chapter 1에 보여줬었던 그 코드 또한 비동기로 요청하는 클라이언트라고 할 수 있을것이다.


 

"""
실시간 테스트 
"""

import asyncio

from coin.streaming.coin_rest_interaction import CoinPresentPriceReponseAPI
from coin.core.config.properties import BTC_TOPIC_NAME, ETH_TOPIC_NAME


async def btc_present_start() -> None:
    """
    bitcoin kafak stream
    """
    await CoinPresentPriceReponseAPI().total_pull_request("BTC", BTC_TOPIC_NAME)


async def eth_present_start() -> None:
    """
    ethereum kafak stream
    """
    await CoinPresentPriceReponseAPI().total_pull_request("ETH", ETH_TOPIC_NAME)


async def be_present_gether() -> None:
    """
    kafka async stream
    """
    tasks = [
        asyncio.create_task(btc_present_start()),
        asyncio.create_task(eth_present_start()),
    ]
    await asyncio.gather(*tasks, return_exceptions=True)


async def data_sending_start() -> None:
    await be_present_gether()


if __name__ == "__main__":
    asyncio.run(data_sending_start())

 

이것또한 위의 예제와 같다 다만 함수가 좀 더 나누어져 있을 뿐 본래 기능이 서버에 비동기로 요청하는 클라이언트의 일종으로 볼 수 있다.

https://github.com/PropertyCoinDashboard/CoinInstrucstureStream

 

GitHub - PropertyCoinDashboard/CoinInstrucstureStream

Contribute to PropertyCoinDashboard/CoinInstrucstureStream development by creating an account on GitHub.

github.com

 

 

 

오늘날 사용되는 언어들과 많은 수의 프레임워크는 비동기로 요청할 수 있도록 설계되었기 떄문에
구현하는 것도 어려운 일이 아니다 예제 코드를 보면 답이 나온다 이것도 구현 못하면 ... ㅠㅠ 때려쳐야지..

이런 요청과 응답이 비동기로 이루어지기 때문에 반비동기(Half-Async)라고 부른다

 

 

반 비동기 패턴으 지원하는 서버는 클라이언트에서 요청한 처리를 수행하는 동안, 처리가 완료된 데이터를 클라이언트에 응답으로 반환

이런 방식으로 서버를 구현하게 되면 더욱 확장성이 높아지고, 더 많은 클라이언트의 응답을 처리할 수 있게됨  

 

오늘늘 언어들과 많은 프레임워크들은 반비동기 요청/응답 패넡을 기본 기능으로 지원함

 

 

이 패턴의 마지막 종류는 완전비동기 요청/응답 패턴이다 이 패턴은 클라이언트와 서버 둘다 완전히 비동기로 동작하는 것을 뜻한다 

오늘날 사용되는 많은 클라이언트와 서버들은 완전 비동기 요청/응답 패턴으로 운영된다.

 

2.1.2 요청/확인응답 패턴 

요청/응답 패턴과 유사하게 통신을 해야 하지만 서버에서 반환하는 응답이 필요하지 않는 경우가 있다. 

대신에 서버가 우리의 요청을 정상적으로 받았는지 확인만 해야할 때가 있는데. 이 경우에는 요청/확인응답 패턴이 적합하다.

확인 응답으로 전달되는 데이터는 현재 요청 상태를 확인하거나 마지막 응답을 받는 용도로 사용되기도 한다 

 

아주 간단한 예제를 만들어봤다 물론 너무 극단적이여서 ... 이게 맞는지는 모르겠는데 

Socket을 여러서 확인을 하면 응답하는 형태로 만들었다.

 

클라이언트

import socket

def send_request():
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_address = ('localhost', 12345)

    try:
        client_socket.connect(server_address)
        message = "안녕하세요"
        client_socket.sendall(message.encode('utf-8'))

        data = client_socket.recv(1024).decode('utf-8')
        print("서버 응답:", data)
    finally:
        client_socket.close()

if __name__ == "__main__":
    send_request()

 

서버

import socket


def start_server() -> None:
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(("localhost", 12345))
    
    server_socket.listen(1)
    
    print("서버 시작 클라이언트 연결을 기다립니다")
    
    while True:
        connection, client_address = server_socket.accept()
        
        try:
            print("클라이언트 연결 완료", client_address)
            data = connection.recv(1024).decode("utf-8")
            if data:
                print("client request", data)
                response = "요청 확인"
                connection.sendall(response.encode("utf-8"))
            else:
                break
        finally:
            connection.close()
            
    
if __name__ == "__main__":
    start_server()

 

다음과 같이 응답을 나오는 형태로 진행해보았다

 

 

요청/확인 응답 패턴을 구현한 데이터 수집 방식은 예를 들어 이커머스라고 가정하면 고객이 처음 접속할때부터 사용된다

첫 요청이 들어오게 되면 수집 단계에서 당므 요청에 사용할 데이터르 확인 응답으로 넘겨준다 

성공이나 실패오 응답을 주는 패턴과 다르게 확인응답 패턴에서는 다음 요청으로 사용할 데이터를 넘겨주는것이다 

확인 응답데이터로 중요한 역할은 고유 식별자를 넘겨준다 
 이 고유 식별자는 고객이 이후에 방문하는 페이지들을 추적하는 데 사용한다
고객의 고유 식별자는 고갹 성향을 분석하는 서버로 넘어가게 되는데, 고유 식별자를활용하면 
고객이 누구고 어떤 알고리즘을 적용하는지 성향 분석 등 점수를 측정할 수 있을것이다..

유튜브 추천 알고리즘 
쿠팡 추천 알고리즘
등등.. 왜 내가 좋아하는것만 나올까의 대한 궁금증은 여기있을 수도 있다
결국 내가 뭘 클릭하고 뭘 활동하는지의 데이터가 남는거고 이 또한 확인 응답 데이터로 계속 주고 응답하는 형태가 있을 수 있으니 이 또한 실시간이라고 할 수 있지 않는가? 

 

 

2.1.3 발행/구독 패턴 

발행/구독 패턴은 메시지 기반 데이터 시스템에서 일반적으로 사용된다 (대표적으로 카프카 RabbitMQ)

 

발행/구독 패턴은 프로듀서가 브로커에게 메시지를 전달하는 것으로 시작된다, 메시지는 논리적인 그룹으로 나눌 수 있는 토픽으로 전송되며, 전송한 다음에는 토픽을 구독하는 모든 컨슈모에게 메시지가 전달된다 

 

브로커가 컨슈머에게 데이터를 푸시하여 데이터를 전달하는 것으로 부면 된다 일부 관련 기술을 살펴보면 컨슈머가 데이터를 브로커로부터 풀 하여 가져가는 경우도 있다 여기서 넘어갈것이 있다면 

프로듀서가 메시지를 브로커로 전달한다고 해서 반드시 컨슈머가 토픽을 꼭 구독하고 있어야한다는점은 아니다
한 컨슈머가 메시지를 프로듀스를 해야만 하는 것은 아니다 

현재 진행하고있는 실시간 코인 이 발행 구독 패턴을 이용하고 있다 

모든 코드를 설명할 수 없지만 

import asyncio

from coin.streaming.coin_rest_interaction import CoinPresentPriceReponseAPI
from coin.core.config.properties import BTC_TOPIC_NAME, ETH_TOPIC_NAME


async def btc_present_start() -> None:
    """
    bitcoin kafak stream
    """
    await CoinPresentPriceReponseAPI().total_pull_request("BTC", BTC_TOPIC_NAME)


async def eth_present_start() -> None:
    """
    ethereum kafak stream
    """
    await CoinPresentPriceReponseAPI().total_pull_request("ETH", ETH_TOPIC_NAME)


async def be_present_gether() -> None:
    """
    kafka async stream
    """
    tasks = [
        asyncio.create_task(btc_present_start()),
        asyncio.create_task(eth_present_start()),
    ]
    await asyncio.gather(*tasks, return_exceptions=True)


async def data_sending_start() -> None:
    await be_present_gether()


if __name__ == "__main__":
    asyncio.run(data_sending_start())

해당 코드로 실행을 할려고 반비동기 패턴으로 클라이언트가 비동기로 서버에 요청을 하는동시에 pydantic으로 연산을 진행한 후 각 토픽에 맞춰서 데이터를 보낸다고 할 수 있다. 

 

 

 

코드 및 발췌

    async def total_pull_request(self, coin_symbol: str, topic_name: str) -> None:
        """
        Total Pull request

        Args:
            coin_symbol (str): coin symbol
            topic_name (str): topic name
        """
        while True:
            # await asyncio.sleep(1)
            try:
                tasks: list[Coroutine[Any, Any, dict[str, Any]]] = [
                    self.__get_market_present_price(
                        market=market, coin_symbol=coin_symbol
                    )
                    for market in self.market_env
                ]
                market_result = await asyncio.gather(*tasks, return_exceptions=True)
                # 스키마 정의
                schema: dict[str, dict[str, Any]] = CoinMarket(
                    **dict(zip(self.market_env.keys(), market_result))
                ).model_dump(mode="json")
                print(schema)
                await kafka_sending(topic_name, message=schema)
            except (TimeoutError, CancelledError, ValidationError) as error:
                self.logging.error_log("Data transmission failed: %s", error)

 

이렇게 해서 카프카 로 보내면 다음과 같이 토픽에 데이터가 쌓이는것이 확인될 수 있다 

 

{
  "upbit": {
    "market": "upbit-BTC",
    "time": 1703242612684,
    "coin_symbol": "BTC",
    "data": {
      "opening_price": "59479000.000",
      "trade_price": "59452000.000",
      "max_price": "60419000.000",
      "min_price": "59094000.000",
      "prev_closing_price": "59479000.000",
      "acc_trade_volume_24h": "5956.112"
    }
  },
  "bithumb": {
    "market": "bithumb-BTC",
    "time": 1703242614113,
    "coin_symbol": "BTC",
    "data": {
      "opening_price": "59060000.000",
      "trade_price": "59325000.000",
      "max_price": "60080000.000",
      "min_price": "58835000.000",
      "prev_closing_price": "59060000.000",
      "acc_trade_volume_24h": "4397.073"
    }
  },
  "coinone": {
    "market": "coinone-BTC",
    "time": 1703242601483,
    "coin_symbol": "BTC",
    "data": {
      "opening_price": "59576000.000",
      "trade_price": "59300000.000",
      "max_price": "60000000.000",
      "min_price": "58804000.000",
      "prev_closing_price": "59571000.000",
      "acc_trade_volume_24h": "340.768"
    }
  },
  "korbit": {
    "market": "korbit-BTC",
    "time": 1703242536754,
    "coin_symbol": "BTC",
    "data": {
      "opening_price": "59507000.000",
      "trade_price": "59325000.000",
      "max_price": "60075000.000",
      "min_price": "58878000.000",
      "prev_closing_price": "59325000.000",
      "acc_trade_volume_24h": "107.255"
    }
  }
}

 

토픽에 저장되어 있는 JSON 일부를 가지고 왔다  

 

발행 구독은 이런식으로 활용이 가능하다

 

전체코드는 아래에서 확인해주세요!!

https://github.com/PropertyCoinDashboard

 

PropertyCoinDashboard

PropertyCoinDashboard has 5 repositories available. Follow their code on GitHub.

github.com

 

반응형