데이터 분석가가 직접 정의, 배포, 관리하는 뱅크샐러드 데이터 파이프라인

데이터 분석가가 직접 정의, 배포, 관리하는 뱅크샐러드 데이터 파이프라인

안녕하세요. 저는 뱅크샐러드 Core Data팀 Tech Lead Manager 김민수 입니다. 이번 글에서는 데이터 사용자가 직접 정의, 배포, 관리하는 뱅크샐러드 데이터 파이프라인에 대해 얘기해보려고 합니다.

뱅크샐러드 데이터 인프라 훑어보기

우선 뱅크샐러드의 데이터 분석 인프라를 먼저 소개 드리겠습니다. 뱅크샐러드 프로덕션 서비스는 십수개의 마이크로서비스와 각 서비스의 저장소로 구성됩니다. 데이터 분석 인프라에서는 각 서비스 저장소와 모바일 이벤트, 서버 이벤트, 3rd-party 광고 데이터, 공공 데이터 등을 수집하고 있습니다. 수집된 데이터는 S3, Glue 기반 데이터 레이크에 적재되, Athena, Spark를 통해 데이터 분석에 사용됩니다. 데이터 사용자는 BI툴인 Metabase와 Jupyter Notebook을 사용해서 데이터를 분석합니다. Airflow로 관리되는 데이터 파이프라인은 매 시간, 매일 Spark 작업을 실행하여 데이터를 계산하고 적재합니다.

분석 인프라 개괄 <분석 인프라 개괄>

데이터 파이프라인 개발은 어렵다

뱅크샐러드 서비스가 성장하면서 앞선 설명처럼 사용자 데이터를 오프라인 분석하기 위한 인프라가 자연스럽게 필요하게 되었습니다. 뱅크샐러드 최초의 데이터 파이프라인은 단순히 데이터베이스에 특정 쿼리를 주기적으로 실행하고 적재하는 형태였습니다. 이런 초기의 데이터 파이프라인은 활성 사용자 수, 매출 등 기본적인 앱 지표 리포트를 가능케 했었습니다. 데이터 분석에 대한 요구사항이 점점 복잡해지면서 파이프라인도 늘어나게 되었고 이를 관리하기 위해 Airflow와 같은 워크플로우 관리 도구를 도입하게 되었습니다. 이 당시 데이터 파이프라인은 데이터 엔지니어만 개발, 관리가 가능했었습니다. 예를 들어 새로운 서비스 테이블 분석이 필요하면, 데이터 엔지니어링 팀에 새 테이블 ETL을 요청했습니다. 데이터 엔지니어링 팀의 엔지니어는 매번 새로운 Spark 작업을 작성하여 새 테이블을 적재하도록 배포하고 이를 데이터 사용자에게 전달했습니다. 만약 데이터 사용자가 실행하는 쿼리를 주기적으로 실행하여 적재할 필요가 있을 경우, 매번 새 테이블을 어떻게, 어떤 형태로 적재해야 할지 협의, 설계, 구현, 배포하는 복잡한 협업이 필요했었습니다.

데이터 파이프라인 요청 사례1 <데이터 파이프라인 요청 사례1>

데이터 파이프라인 요청 사례2 <데이터 파이프라인 요청 사례2>

이 때의 데이터 파이프라인을 통해 뱅크샐러드 A/B 테스팅 시스템과 사용자 분석 태그 기능이 구현되는 등 이전에는 구현할 수 없었던 복잡한 워크플로우 개발할 수 있었습니다. 하지만 점점 더 많고 복잡한 데이터 파이프라인 요구사항이 생기면서 유일하게 데이터 파이프라인을 개발할 수 있는 데이터 엔지니어링 팀이 병목이 되는 경우가 계속 발생했습니다. 따라서 데이터 엔지니어가 꼭 필요하지 않은 작업을 데이터 사용자가 직접할 수 있게 새로운 파이프라인 구조를 만들어야겠다는 공감대가 생겼습니다.

누구나 개발할 수 있는 데이터 파이프라인

그리하여 앞서 설명 드린 문제 의식과 공감대에서 “누구나 개발할 수 있는 데이터 파이프라인” 구조를 설계하게 되었습니다.

Tech Spec

설계 단계에서 어떤 문제를 풀어야 할지 많은 고민과 상상이 있었습니다. “누구나 데이터 파이프라인을 개발” 하려면 얼마나 쉬워야 할지, 모든걸 Web UI에서 클릭으로 해결할 수 있게 해야 할지 상상했지만, 문제를 너무 어렵고 크게 풀기 전에 문제 지점과 요구사항을 파악해보았습니다:

• 데이터 파이프라인을 구현하기 위해 알아야 하는 것, 써야 하는 코드가 너무 많다. — 하나의 테이블을 적재하기 위해서 EMR 클러스터를 띄우는 방법, Airflow DAG를 만드는 방법, MySQL 드라이버에 넘겨야 하는 인자, …를 모두 이해하고, 코드로 써야 했습니다.
→ 데이터 사용자는 데이터 처리 로직만 신경 쓸 수 있어야 한다.

• 데이터 파이프라인을 직접 배포할 수 없고 배포된 파이프라인을 직접 조작할 수 없다.
→ 데이터 사용자가 뱅크샐러드 표준 배포 방식으로 배포하고 개개인의 권한으로 파이프라인을 조작할 수 있어야 한다.

• 데이터 파이프라인을 배포 전에 실행할 수 없다. 또한 파이프라인 코드 동작을 검증하는 테스트가 없다.
→ 개발 단계에서 데이터 파이프라인 통합 테스트를 할 수 있고, 유닛 테스트 역시 실행되어야 한다.

위와 같은 요구사항에서 뱅크샐러드 데이터 파이프라인 명세, 데이터 적재 과정, 스케줄링을 구현하는 datapipe 프로젝트를 설계했습니다. 글의 남은 부분에서는 datapipe는 위 문제를 어떻게 풀었는지 설명하겠습니다.

문제 1 - 데이터 사용자는 데이터 처리 로직만 신경 쓸 수 있어야 한다.

데이터 사용자가 데이터 파이프라인을 만들면서 신경을 쓰는(또는 잘 아는) 부분은 테이블 명세, 의존성, 적재 로직 입니다. 반대로 데이터 파이프라인 개발을 위해 꼭 필요하지만 사용자 입장에서 이해하기 어려운 부분은 명세, 의존성, 적재 로직 아래의 데이터 인프라나 Airflow와 같은 프레임워크가 어떻게 구성되는지 이겠습니다. datapipe에서는 로직 아래 인프라를 사용자가 신경 쓰지 않고 데이터 자체 비즈니스 로직만 구현할 수 있도록 추상화와 몇가지 도구를 구현했습니다.

아래는 새로운 테이블을 적재하는 파이프라인을 만드는 경우 필요한 코드 입니다.

# 테이블 메타 데이터 객체 선언
s_user_index = MartTable(
    dag_id="core_analysis_dag",
    database="core_user",
    table_name="s_user_index",
    ddl="sql/core_user/s_user_index_ddl.sql",
    pyspark_job_path="sql.core_user.s_user_index_dml",
    start_date="2022-12-14",
)

# DAG 디펜던시 설정
core_user.s_user_index.task.set_upstream([
    lake_user.user.get_daily_sensor(), # 외부 daily 테이블에 의존하는 경우
    lake_event.user_event.get_daily_to_hourly_sensor(), # 외부 hourly 테이블에 의존하는 경우
    core_user.f_signup.task, # 같은 DAG 테이블에 의존하는 경우
])

먼저 새 테이블 메타 데이터 객체를 선언합니다. 메타 데이터 객체는 데이터베이스, 테이블 이름과 명세 경로, 적재 로직(PySpark 작업 또는 SQL 경로) 등 속성을 가집니다. 그리고 이 테이블 적재 로직이 실행되는데 필요한 디펜던시를 DAG에서 설정합니다. 만약 다른 daily 또는 hourly DAG task에 의존한다면 역시 메소드를 사용하여 설정할 수 있습니다. 이 때 해당 DAG 와의 시간 차이 등은 고려하지 않을 수 있게 구현되어 있습니다. 새로운 파이프라인을 구축하기 위해 데이터 사용자가 알아야 하는 데이터 엔지니어링 지식은 위 코드 스니펫 정도가 전부 입니다. 이렇게 코드를 작성하면 명세는 Glue를 통해, 적재 작업은 Spark를 통해 실행하여 테이블이 매일 적재 됩니다.

PySpark 작업은 PySparkOperator라는 Airflow operator를 구현하여 실행합니다. 아래는 PySparkOperator를 사용하는 예시 입니다.

all_asset_job = PySparkOperator(
    task_id="all_asset_job",
    pyspark_job_path="dag.adhoc.all_asset.save_all_asset",
    resource=SparkPool.common().medium,
    task_params=dict(
        dt="{{ execution_date.in_timezone('Asia/Seoul').strftime('%Y-%m-%d') }}",
    ),
)

PySparkOperator는 작업 경로와 리소스, 작업 파라미터를 입력 받습니다. Operator가 트리거되면 작업 경로의 PySpark job을 render 된 파라미터와 함께 제출하여 실행합니다. 이 때 입력한 리소스 만큼 Spark 자원을 요청하고, 또한 Airflow pool, pool_slots로 동시 실행 작업을 제한합니다. 이것이 데이터 사용자가 알아야 하는 전부 입니다. 데이터 사용자는 PySpark job이 제출되는 클러스터에 대한 배포나, 클러스터에 제출되는 각종 파라미터들, 심지어는 어떤 클러스터에서 실행되는지 전혀 신경쓰지 않고 코드를 작성합니다. (물론 필요한 경우 더 아래 레이어에서 작업을 할 수도 있습니다.)

한편, 몇가지 추상화로 작업 실행을 단순화했기에 EMR 기반 Spark에서 Kubernetes 기반으로 옮기는 대규모 구조 변경도 데이터 사용자 입장에서 경험을 전혀 바꾸지 않고 진행할 수 있었습니다.

정리하자면 datapipe에서는 최소한의 코드로 데이터 파이프라인을 구현할 수 있게 하여 데이터 사용자가 데이터 엔지니어링에 대한 깊은 지식 없이 데이터 처리 로직에만 신경쓸 수 있도록 하였습니다.

문제 2 - 데이터 사용자가 직접 데이터 파이프라인을 배포하고 조작할 수 있어야 한다.

구현된 파이프라인은 데이터 사용자가 직접 배포하고 실행할 수 없다면 여전히 데이터 파이프라인 개발, 운영에서 데이터 엔지니어의 도움이 필요합니다. 따라서 datapipe에서 사용자가 직접 배포할 수 있고, 조작할 수 있게 하는 것은 중요한 스펙이었습니다.

뱅크샐러드에는 테크 조직이 공통으로 사용하는 Slack ChatOps 배포 워크플로우가 존재합니다. 따라서 datapipe에서도 이 배포 방식을 그대로 따르기로 결정했습니다. 다만 기존 방식처럼 쿠버네티스 리소스가 업데이트 되는 것과는 다르게 DAG와 프로젝트 파일들을 Airflow Scheduler에 sync할 수 있도록 구현이 필요했습니다.

데이터 사용자가 직접 배포 <데이터 사용자가 직접 배포>

현재도 매일 많으면 수십번씩 데이터 사용자가 직접 데이터 파이프라인을 배포하고 있습니다.

한편 파이프라인을 운영하면서 배포된 DAG를 Airflow Web UI에서 켜주거나 실패한 작업을 재시작시키는 등 직접 Web UI를 조작해야 하는 경우가 빈번하게 있습니다. datapipe에는 모든 데이터 파이프라인이 관리되고 있기도 하고, 맥락을 충분히 알지 못 하는 상태에서 실행하면 위험한 작업도 종종 있습니다. 그렇기 때문에 사용자 별로 적합한 권한을 부여하고 권한 안에서만 DAG를 조작할 수 있어야 했습니다.

뱅크샐러드 백오피스와 인프라는 Fusion Auth라는 인증 플랫폼을 사용합니다. 따라서 datapipe에서는 Airflow의 AirflowSecurityManager를 Fusion Auth를 백엔드로 사용하도록 구현하고, Fusion Auth의 Role을 통해 사용자 DAG 권한을 조정했습니다. datapipe에 새로운 사용자를 추가하거나, 사용자의 권한을 수정하는 경우 Fusion Auth를 통해 역할을 조정하며, 추가된 사용자는 Fusion Auth로 datapipe Web UI에 로그인하여 직접 DAG를 조작할 수 있게 되었습니다.

마지막으로, 파이프라인 실행이 실패할 경우 실행 정보와 로그를 담은 alert 슬랙 메시지를 DAG owner를 멘션하여 보내고 있습니다. 해당 파이프라인를 운영하는 팀은 이 메시지를 시작으로 직접 실패 대응 작업을 진행하게 됩니다.

파이프라인 실패 메시지 <파이프라인 실패 메시지>

데이터 파이프라인 운영(장애시 영향도, 대응 정도, 온콜 정책, …)은 데이터 파이프라인을 구현한 팀이 가장 잘 할 수 있을텐데요, 이렇게 데이터 파이프라인을 데이터 사용자가 직접 조작할 수 있게 되었기에, 뱅크샐러드에서는 데이터 엔지니어는 가장 코어의 데이터 파이프라인 운영에만 집중하고 각자 파이프라인은 직접 관리, 운영하고 있습니다.

문제 3 - 개발 중인 데이터 파이프라인을 단위 및 통합 테스트할 수 있어야 한다.

개발 단계에서 데이터 파이프라인이 잘 동작하는지, 결과 데이터가 정합한지, DAG가 제대로 그려지는지 확인하는 것은 중요합니다. PR 리뷰시 PR 작성자의 테스트 방법을 상호 검증 하는게 리뷰 중 중요한 과정이기 때문입니다. 그런데 데이터 파이프라인 실행 환경에는 S3, Glue, Spark cluster 등 많은 컴포넌트가 상호작용하기 때문에 데이터 파이프라인을 배포 전에 테스트 하는 건 꽤 어렵습니다. 또한 로컬에서 띄운 Airflow는 프로덕션의 것과 설정이 다를 수도 있습니다.

datapipe는 PR을 생성하면 해당 PR만을 위한 테스트 서버 구성을 띄워줍니다. 테스트 서버 구성에는 독립된 Airflow 스케줄러, 웹서버와 namespace 등이 포함됩니다. PR 작성자는 테스트 Airflow 서버에 접속하여 DAG가 제대로 그려지는지 확인하고 파이프라인을 직접 실행할 수 있습니다.

PR을 생성하면 테스트 서버가 자동으로 뜹니다

테스트 서버는 프로덕션과 (거의) 동일한 환경입니다. 프로덕션과 같은 Airflow connection, pool 등을 가지고 있고, 프로덕션 S3, Glue에 접근할 수 있습니다. 다만 데이터 출력은 테스트 namespace로 쓰게 됩니다. 예를 들어, A, B 테이블을 조합하여 C 테이블을 출력하는 쿼리는 테스트 서버에서 실행할 경우 입력은 프로덕션 S3, Glue에서 가져오지만 출력은 테스트 용 버킷과 테스트 용 데이터베이스로 하게 됩니다. 따라서 프로덕션에 영향을 주지 않으면서, 프로덕션과 거의 같은 세팅으로 파이프라인을 실행하는 것이 가능합니다.

테스트 서버로 대부분 테스트 시나리오를 커버할 수 있지만 매번 작업자의 테스트 방법에 의존하게 됩니다. 따라서 자동화된 테스트 모음을 관리하고 매번 실행하는 것도 필요합니다. datapipe에서는 테스트 서버와 별개로 pytest 기반의 유닛 테스트를 작성할 수 있습니다.

CI에서 실행되는 유닛 테스트

모든 테스트 케이스는 커밋마다 실행되고 PR에서는 테스트 결과를 요약해서 메시지를 달아주게 workflow를 구성하여 PR 작업자와 리뷰어가 확인할 수 있습니다.

정리하자면 datapipe는 테스트 서버와 유닛 테스트를 통해 개발 단계에서 데이터 파이프라인을 테스트하고 테스트 결과를 리뷰어와 상호 검증하여 안정성, 정합성을 보장하고 있습니다.

번외 - 데이터 퀄리티

충분한 테스트와 리뷰를 거친 데이터 파이프라인이라고 하더라도 원본 데이터 변경이나 확인하지 못 한 이슈 등 문제로 예상치 못한 데이터 문제가 발생할 수 있습니다. 그렇기 때문에 주기적으로 데이터 통계를 확인해야 하는데요, 매번 데이터 통계를 확인하거나 대시보드를 만들기는 어렵습니다. datapipe에서는 테이블 별로 간단한 데이터 퀄리티를 확인하는 방법을 제공하여 데이터 파이프라인과 검수 절차를 통합시켰습니다.

앞서 봤던 테이블 메타 데이터 객체입니다. 메타 데이터에 추가적으로 데이터 퀄리티 속성을 설정할 수 있게 구성되어 있습니다. 데이터 파이프라인이 매번 실행될 때 데이터 퀄리티 속성의 테이블 통계를 계산합니다.

  • fail_conditions - 테이블 통계가 기준에 맞지 않은 경우 테이블 적재를 실패시킵니다. 중요한 completeness, uniqueness 문제의 경우 설정합니다.
  • dq_metrics - 테이블 통계와 기준을 확인하고 리포트합니다. 기준에 맞지 않다고 하더라도 테이블 적재가 실패하지는 않습니다.
# 테이블 메타 데이터 객체 선언
s_user_index = MartTable(
    ...
    # 실패 조건
    fail_conditions=[
        # 만약 null 인 id 가 있다면 실패합니다.
        DqCriteria("banksalad_user_id_should_not_be_null", 1.0, "SUM(CASE WHEN id IS NULL THEN 0 ELSE 1 END)"),
        # id 중복이 있다면 실패합니다.
        DqCriteria("banksalad_user_id_should_be_unique", 1.0, "COUNT(DISTINCT id)"),
    ],
    # DQ 지표
    dq_metrics=[
        # korean_age 컬럼의 null 비율이 5% 보다 크면 안 됩니다. 하지만 임계치를 넘더라도 테이블 적재가 실패하지는 않습니다.
        DqCriteria("korean_age_null_ratio_should_be_lower_than_5%", 0.95, "SUM(CASE WHEN korean_age IS NULL THEN 0 ELSE 1 END)"),
    ],
)

이렇게 설정된 데이터 퀄리티는 매일 슬랙으로 리포트가 됩니다. 데이터 퀄리티 기준에 맞지 않거나 아직 계산 되지 않은 데이터 퀄리티 지표가 있다면 이 리포트로 확인할 수 있습니다.

데이터 퀄리티 문제와 후속 논의 사례 <데이터 퀄리티 문제와 후속 논의 사례>

데이터 퀄리티를 손쉽게 설정할 수 있게 되어 주요 데이터 문제로 하위 데이터 파이프라인이나 데이터 제품에 문제가 생기는걸 예방할 수 있었습니다. 예를 들어 “사용자 탈퇴” 테이블 로직 문제로 사용자 분석의 중심이 되는 “유저 인덱스” 테이블에서 유저가 중복되는 데이터 문제가 있었는데요, 유저 인덱스 테이블의 fail_conditions를 통해 발견하고 하위 파이프라인이 돌아가는 것을 막아, 많은 분석 테이블들이 정합하지 않은 데이터로 계산되고 사용되는 것을 사전에 막을 수 있었습니다. 그리고 모든 데이터 제품 테이블에는 테이블이 비었는지 확인하는 데이터 퀄리티 속성이 설정되어 있기에 무언가 이슈로 빈 데이터가 사용자에게 노출되는 것을 방지하고 있기도 합니다. 또한, 데이터 퀄리티 속성을 코드로 관리하게 되어 PR에서 리뷰할 수 있게 된 소소한 장점도 있었습니다.

결론 - 데이터 분석가가 직접 정의, 배포, 관리하는 뱅크샐러드 데이터 파이프라인

datapipe 사내 출시된 후 빠르게 뱅크샐러드 모든 데이터 파이프라인 명세, 데이터 적재 과정, 스케줄링, 그리고 데이터 퀄리티 등 데이터 파이프라인 구성 요소가 통합 되었습니다. datapipe가 출시된 이후로도 물론 데이터 엔지니어의 리소스가 많이 필요하긴 했지만, 데이터 엔지니어들은 주로 코어 파이프라인과 데이터 인프라 문제에 더 집중할 수 있었습니다. 반면에 각 제품팀 별 지표나 데이터 제품은 제품팀 데이터 분석가가 직접 개발, 운영할 수 있게 되었는데요, 데이터 엔지니어가 아닌 직군이 직접 데이터 파이프라인을 개발, 수정, 배포하는 사례가 연간 600건으로 이전 대비 6배 이상 늘어나게 되었습니다. 무엇보다, 데이터 파이프라인 개발과 운영이 더 쉬워졌기에 A/B 테스팅 플랫폼, 코어 데이터 테이블, 전사 및 제품팀 별 지표, 데이터 프로덕트 등 다양한 데이터 문제를 뱅크샐러드에서 풀 수 있게 되었습니다.

물론 아직도 풀어야 하는 데이터 파이프라인 문제가 많이 있습니다. 소개 드렸던 데이터 인프라를 직접 관리하지 않으면서도 더 간단하게 파이프라인을 만드는게 최근 인프라 팀 내 화두인데요, 동시에 인프라 비용도 줄일 방법도 많이 고민하고 있습니다. 뱅크샐러드 데이터 파이프라인은 앞으로도 더욱 더 진화할 예정입니다. 긴 글 읽어주셔서 감사합니다.

보다 빠르게 뱅크샐러드에 도달하는 방법 🚀

지원하기

Featured Posts

post preview
post preview
post preview
post preview
post preview

Related Posts