본문 바로가기

책/견고한 데이터 엔지니어링

Part 1 - [Chapter 2] . 데이터 엔지니어링 수명 주기 (2)

반응형

쨱쨱

 

 

 

2.1.3 데이터 저장 

데이터를 저장할 공간이 필요하다, 스토리지 설루션을 선택하는 것은 나머지 데이터 수명주기에서 성공을 거두기 위한 열쇠이면서,

다음과 같은 다양한 이유로 데이터 수명 주기에서 가장 복잡한 단계의 하나다 

 

 

첫째, 클라우드의 데이터 아키텍처는 종종 여러 스토리지 설루션을 활용함 

둘째, 복잡한 변환 쿼리를 지원하는 데이터 스토리지 설류션은 순수하게 스토리지로만 작동하는 경우가 거의 없으며 많은 설루션이 복잡한 쿼리를 지원한다 심지어 객체 스토리지 설루션도 Amazon S3 Select와 같은 강력한 쿼리 기능을 지원할 수 있다 

셋째, 저장은 데이터 엔지니어링 수명 주기의 한 단계이지만 변환 및 서비스 제공과 같은 다른 단계에서도 자주 관여한다 

 

 

둘쨰가 참 흥미로운데 이게 보면 대표적으로 S3를 예를 들었는데 

S3 가 많은걸 함께 할 수 있고 독단적인 설루션으로 활용하는 게 거의 드물다 대표적으로 내가 알고 있는 건 이렇게 같은데..

  1. Spark -> S3 -> Athana
  2. S3 -> Athana 
  3. S3 -> Glue
  4. S3 -> RedShift
  5. API or Streaming Service -> kafka -> spark-streaming -> kafka -> kafka_sinkconnector -> S3 -> Superset(Athena연결 가능) 또는 RedShift 등등  여러 갈래 분기 가능

 

이것 말고도 많은 것이다 이렇게 많은데... 내가 한 프로젝트에서는 5번을 사용했다 다음은 Github 주소이다 

https://github.com/SeoulCongestionTraffic

 

  1. API or Streaming Service -> kafka -> spark-streaming -> kafka -> kafka_sinkconnector -> S3 -> Superset(Athena연결 가능) 이거를 이용했는데 다음과 같이 이용을 하였다 

서울시 공공 API를 비동기로 가지고 온 후 -> kafka 적재 -> spark -> kafka -> sink_connect_rest로 설정 -> s3로 보내서 -> Superset으로 분석하는 프로젝트를 만들었다 

 

실시간 서비스라고 보면 될꺼같은데 물론 내가 실시간을 너무 좋아해서 ㅎㅎ.... 물론 이걸 완성이라고 하기에는 민망하긴 하다 

 

이런 식으로 pyhton에서 connect driver를 설정해 주고 Post로 요청을 보내 적용 후 

def sink_connection(topics: list[str], name: str, tasks: str, typed: str) -> None:
    KAFKA_CONNECT_URL = "http://localhost:8083"

    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
    }

    connector_config = {
        "name": f"s3-sink-connector-region-seoul-injection-{name}",
        "config": {
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "tasks.max": tasks,  # 병렬 처리를 위한 태스크 수
            "topics": topics,  # 콤마로 구분된 토픽 리스트
            "s3.bucket.name": "de-06-01-sparkcheckpointinstruction",
            "topics.dir": f"{typed}/{name}",
            "s3.region": "ap-northeast-2",
            "flush.size": "300",  # S3에 쓰기 전에 버퍼에 쌓을 레코드 수
            "storage.class": "io.confluent.connect.s3.storage.S3Storage",
            "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
            "key.converter.schemas.enable": False,
            "value.converter.schemas.enable": False,
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
            "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
            "locale": "ko-KR",
            "timezone": "Asia/Seoul",
            "partition.duration.ms": 60000,
            "bootstrap.servers": BOOTSTRAP_SERVER,
        },
    }

    response = requests.post(
        f"{KAFKA_CONNECT_URL}/connectors",
        headers=headers,
        data=json.dumps(connector_config),
        timeout=10,
    )

    return print(response.json())


topic_gender = sink_connection(
    topics=",".join(normal_topic_gender),
    name="nonmal_gender_pred",
    tasks=2,
    typed="normal",
)
.....


avg_topic_age = sink_connection(
    topics=AVG_AGE_TOPIC,
    name="avg_topic_age",
    tasks=1,
    typed="avg",
)
.....

superset에 Athena를 연결하여 이용하여 분석을 이용할 수 있다

이것 또 또한 데이터 수집 후 분석 단계까지 가는 것으로 말할 수 있지만 이 또한 수집에서 다른 단계에 자주 관여를 할 수 있다는 사실을 알 수 있다 또한 실시간이니 이 또한 심해질 것 아닌가?

 

저장은 종종 데이터 파이프라인의 여러 위치에서 발생하여 원천 시스템, 수집, 변환 및 서빙과 교차하는 스토리지 시스템을 통해 전체 데이터엔지니어링 수명 주기에 걸쳐 실행된다 

데이터 저장 방식은 데이터 엔지니어링 수명 주기의 모든 단계에서
데이터 사용 방식에 여러 가지로 영향을 미친다 

아파치 카프카와 펄사 같은 스트리밍 프레임워크는 데이터 전송을 위한 표준 계층인 객체 저장돠 함께 메시지의 수집, 저장, 쿼리 시스템으로 서 동시에 작동할 수 있다 



스토리지 시스템 평가: 주요 엔지니어링 고려 사항 

다음은 데이터 웨어하우스, 데이터 레이크하우스, 데이터베이스 또는 객체 스토리지를 위한 스토리지 시스템을 선택할 때 확일할 몇 가지 핵심 엔지니어링 질문이다 

 

  • 이 스토리지 솔류션은 아키텍처에서 요구하는 쓰기 및 읽기 속도와 잘 맞는가?
    • 현 상황에 맞춰서 써야하지 않을까 싶다.. 
  • 스토리지가 다운스트림 프로세스의 병목 현상을 초래하지는 않는가?
  • 이 스토리지 기술이 작동하는 방식을 인지하고 있는가? 스토리지 시스템을 최적으로 활용하는가? 아니면 부자연스러운 행동을 하는가? 예를 들어 객체 스토리지 시스템에 높은 비율의 임의 접근을 적용하고 있지는 않는가? (성능 오버헤드가 큰 안티 패턴이다)
  • 이 스토리지 시스템은 향후 예상되는 확장을 처리할 수 있는가? 사용 가능한 총 스토리지,. 읽기 작업 속도, 쓰기 볼륨 등 스토리지 시스템의 모든 용량 제한을 고려해야 한다. 
  • 다운 스트림 사용자와 프로세스가 필요한 서비스 수준 협약에 따라 데이터를 취득할 수 있는가? 
  • 스키마 진화, 데이터 흐름, 데이터 계보 등에 대한 메타데이터를 캡쳐하고 있는가 메타데이터는 데이터의 활용성에 큰 영향을 미친다  
    • 메타데이터는 미래에 대한 투자로, 검색 가능성과 제도적 지식을 획기적으로 향상해 미래의 프로젝트 및 아키텍처 변경을 간소화한다 
  • 순수 스토리지 솔류션인가? 아니면 복잡한 쿼리 패턴을 지원하는가 
  • 스토리지 시스템이 스키마에 구애받지는 않는가? 유연한 스키마인가? 강제 적용된 스키마 인가? 

 

데이터 접근 빈도 이해 

모든 데이터가 같은 방식으로 액세스되지는 않는다. 검색 패턴은 저장 및 쿼리되는 데이터에 따라 크게 달라진다

그에 따라 데이터읜 온도라는 개념이 나타냈는데.. 데이터 접근 빈드에 따라 데이터 온다가 결정된다. 

 

가장 자주 엑세스되는 데이터를 핫  데이터 라고 한다 핫 데이터는 일반적으로 하루에 여러번 검색된다 

이러한 데이터는 빠른 검색용으로 저장되더야하는데 여기서 빠른은 사용 사례에 따라 달라진다

 

미온적 데이터는 가끔 액세스되는 데이터 

콜드 데이터는 거의 쿼리되지 않으며 아카이브 시스템에 저장하는 데 적합하다. 

콜드 데이터는 규정 준수의 목적으로 보관되거나, 다른 시스템에 심각한 장애가 발생했을때 보관되는 경우가 많음 

 

2.1.4 데이터 수집 

데이터 원천과, 사용 중인 원천 시스템의 특징 및 데이터 저장 방법을 이해한 뒤에 데이터를 수집해야함 

데이터 엔지니어링 수명 수기의 다음 단계는 원천 시스템에서 데이터를 수집하는 것 

 

지금까지의 경험으로 미루어 볼 때 원천 시스템과 데이터 수집은 데이터 엔지니어링 수명 주기에서 가장 큰 병목 현상을 나타냄 

원천 시스템은 보통 직접 관리할 수 없으며, 임의로 응답하지 않거나 품질이 낮은 데이터를 제공할 수 있다 

또는 여러 가지 이유로 데이터 수집 서비스가 작동하지 안을 수도 있다 그 결과 데이터 흐름이 멈추거나 저장, 처리 및 서비스에 필요한 불충분한 데이터가 제공됨 

 

신뢰할 수 없는 원천 및 수집 시스템은 데이터 엔지니어링 수명 주기 전반에 걸쳐 파급 효과를 가지고올 수 있다 

하지만 원천 시스템에 대한 중요한 질문에 대한 답을 찾았다면 상황이 좋은 편이다 .

 

 

수집 단계에서의 주요 엔지니어링 고려 사항 

시스템 설계 또는 구축을 준비할 때 수집 단계에 대한 몇 가지 주요 질문은 다음과 같다

  • 수집 중인 데이터의 사용 사례는 무엇인가? 같은 데이터셋의 여러 버전을 생성하는 대신 이 데이터를 재사용할 수 있는가?
  • 시스템이 이 데이터를 안정적으로 생성하고 수집하고 있는가? 필요할 때 해당 데이터를 사용할 수 있는가?
  • 수집후 데이터의 목적지는 어디인가? 
  • 데이터에 얼마나 자주 접근해야 하는가?
  • 데이터는 보통 어느 정도의 용량을 도착하는가?
  • 데이터 형식은 무엇인가? 다운 스트림 스토리지 및 변환시스템에서 이 형식을 처리할 수 있는가? 
  • 원천 데이터는 다운 스스티림에서 즉시 사용할 수 있는 양호한 상태인가? 그렇다면 얼마나 오래 사용할 수 있으며, 사용할 수 없게 되는 요인은 무엇인가?
  • 데이터가 스트리밍 소스에서 전송된 경우, 목적지에 도달하기 전에 데이터를 변환해야하는가 ? 데이터가 스트림 자체 내에서 변환되는 형태의 변환이 적절할까? 

이러한 질문은 수집할 때 고려할 요소의 예에 불과하며 

배치 vs 스트리밍과 푸시 vs 풀이라는 두 가지 주요 데이터 수집 개념을 보자

 

 

배치 vs 스트리밍

우리가 다루는 대부분의 데이터는 본질적으로 스트리밍이다 데이터는 거의 항상 원천에서 지속해 생성되고 갱신된다

배치 수집은 이 스트림은 큰 청크로 처리하는 전문적으로 편리한 방법이다 

 

스트리밍 수집을 사용하면 다른 애플리케이션이나 데이터베이스 또는 분석 시스템 등의 다운 스트림 시스템에 데이터를 실시간으로 연속해 제공할 수 있다 여기서 실시간( 또는 실시간에 가까운)이란 데이터가 생성된 지 얼마 지나지 않는 짧은 시간(예: 1초 미만)에 다운 스트림 시스템에서 데이터를 사용할 수 있음을 의미한다 

 

자세한 내용은 여기를 봐주세요!! 

https://sky-develop.tistory.com/32

 

Batch 와 Streaming

Batch Processing 배치 (Batch) == 일괄 배치 프로세싱(Batch Processing) == 일괄 처리 많은 양의 데이터를 정해진 시간에 한꺼번에 처리하는 것 한정된 데이터 특정 시간 일괄 처리 실시간 보장이 없을때 데

sky-develop.tistory.com

 

반응형