점점 커지는 RDB Table, S3로 귀양 보내고 Athena로 불러오기 - feat. Optimization with Spark Bucketing

점점 커지는 RDB Table, S3로 귀양 보내고 Athena로 불러오기 - feat. Optimization with Spark Bucketing

안녕하세요, 뱅크샐러드 Core Infra 팀의 Data engineer 김문수 입니다. 점점 커지는 이력성 데이터를 MySQL에서 더 저렴한 저장소인 S3로 옮기면서도, 서비스에서 호출할 때 딱 필요한 데이터만 읽을 수 있도록 bucketing을 적용한 사례를 공유하려고 합니다.

MySQL 테이블이… 계속 커진다.

뱅크샐러드의 신용올리기 서비스는 신용평가에 반영되지 못한 자산 정보를 마이데이터와 공공데이터를 통해 신용평가사에 제출하는 서비스입니다. 신용점수가 오를만한 요소만 찾아 반영되고, 유저의 1년 치 마이데이터 정보를 신용정보회사에 보내야합니다.

마이데이터 서비스가 시작되고 최초 구성에서는 서비스 DB에 마이데이터 이벤트 로그를 모두 쌓고 있었습니다. 그래서 신용올리기 서비스가 개발될 때는 자연스럽게 DB의 데이터를 사용하도록 개발이 되었습니다. 그런데 서비스가 지속되면서 DB에 쌓이는 로그가 점점 늘어나고 비용이 늘어나고 우리의 걱정도 늘어났습니다.

우리는 S3로 간다.

그래서 저장 비용이 저렴한 저장소에 데이터를 저장하고 필요할 때 호출해서 쓰기로 결정했습니다. 뱅크샐러드에서는 앱 내 이벤트 데이터를 Kafka에 보내고 매 시간마다 Spark Job으로 Kafka에서 읽은 데이터를 S3에 Parquet 형식으로 쓰고 있는데요, partition은 dt=YYYY-MM-DD, hour=HH 2 depth 구조로 구성되어 있습니다. 마이데이터 연동 데이터도 같은 방식으로 이벤트를 발행하고 Kafka 쌓여있는 데이터를 주기적으로 S3에 쌓도록 변경했습니다.

이렇게 쌓인 S3에 있는 데이터를 읽을 때는 Spark Thrift Server 와 Athena를 Query Engine으로 사용해서 불러오고 있는데요, Athena가 Spark Thrift Server보다 비싸긴 하지만 비용 대비 안정성이 높기 때문에 서비스에서는 Athena를 호출해서 데이터를 가져가기로 했습니다.

간략하게 변경 전과 후의 데이터 흐름을 표현하자면 이렇게 됩니다.

As-Is
Mydata 연동 결과 -> MySQL <- 신용 올리기 서비스 요청

To-Be
Mydata 연동 결과 -> event 발행 -> Kafka --(Spark)--> S3 <- Athena <- 신용 올리기 서비스 요청 

이렇게 비싸다니

그런데 웬걸, 서비스가 Athena 사용하도록 변경하고나서 인프라의 비용이 예상 범위보다 훨씬 많이 늘었습니다. Athena의 비용은 스캔한 데이터 양에 비례해서 증가하는데요, 여기서 놓치면 안되는 부분은 Athena 비용으로 계산되는 스캔한 데이터뿐 아니라 S3 API 호출에도 비용이 있다는 겁니다. 당연히 호출하는 Object(S3의 파일 개념)가 많을 수록 비용이 늘어나는 구조입니다.

저희가 Athena로 호출하는 마이데이터 이벤트 테이블은 다른 이벤트 테이블처럼 날짜, 시간으로 두 개의 파티션 컬럼이 있었습니다. 만약 1번 유저가 신용올리기를 요청하면 유저의 이벤트가 어느 파일에 있는지 알 수 없기 때문에 Athena는 1년 전부터 지금까지 쌓여있는 모든 S3 Object를 호출하게 됩니다. 그래서 만약 파티션마다 30개의 Object가 있다면 30(개) * 24(시간) * 365(일) 의 S3 Object를 읽어야 합니다.

me-when-I-saw-the-price

그래서 일단 후다닥 롤백을 했습니다.

Optimization with Bucketing

해결책으로 1. 버켓팅을 적용하고 2. 파티션 구조를 변경했습니다.

파티셔닝과 버켓팅을 설명하자면, 파티셔닝은 같은 ‘디렉터리’에 데이터를 구성하는 방법입니다. 예를 들어, 2023년 5월 1일의 데이터와 2023년 5월 2일의 데이터를 다른 경로에 나눠서 저장하면 날짜 컬럼의 값이 5월 1일인 데이터를 읽을 때는 모든 경로의 데이터를 읽을 필요없이 5월 1일 경로의 파일만 읽으면 됩니다. 여기서는 날짜가 파티션 키가 됩니다. 일반적으로 날짜, 연, 월 등으로 파티션을 구성합니다. 쿼리에서 자주 사용되고 카디널리티가 낮은 값이 파티션키의 좋은 후보가 됩니다.

버켓팅은 같은 버킷 단위로 데이터를 구성하는 방법이고, 버킷을 ‘파일’로 치환해서 이해해도 좋습니다. user id를 10으로 나눠서 나머지가 같은 숫자끼리 같은 파일에 쓴다고 생각하시면 됩니다. user id가 10인 row를 찾고 싶은데 만약 파티션 경로에 파일이 30개라면, 버켓팅이 되어 있지 않으면 30개의 파일을 다 읽어야합니다. 어떤 값이 어떤 파일에 있는지 모르니까요. 그런데 버켓팅된 테이블에서는 요청 user id 값을 계산해서 이 row가 어느 파일에 위치하는지 바로 알 수 있기 때문에 필요한 파일만 읽으면 됩니다. (실제로는 hash 함수를 사용합니다.) 고유한 값이 많은 높은 카디널리티가 있고, 특정 값을 자주 조회하게 된다면 버켓팅 키의 좋은 후보가 됩니다.

우리는 서비스에서 쿼리할 때마다 특정 user id의 데이터만 조회를 요청하니까 버켓팅키로 user id를 설정하는 것이 아주 좋은 선택지였습니다.

  1. 버켓팅을 적용하면 이제 파티션마다 1개의 파일을 읽으면 됩니다.
    • Object 호출 수가 30 * 24 * 365 에서 1 * 24 * 365 가 됩니다.
  2. 시간 별로 파티션을 나누던 방식(dt, hour)에서 일별 파티션(dt)으로 바꿨습니다.
    • Object 호출 수가 1 * 24 * 365 에서 1 * 1 * 365 가 됩니다.

Athena는 만만하지 않았다…

하지만 Bucketing 설정할 때 어려움이 있었는데요. 뱅크샐러드에서는 데이터를 가공할 때 Spark Job을 제출해서 수행하는데, Presto 기반으로 만들어진 Athena는 Spark Bucketing 형식을 지원한다고 하지만 호환이 안되는 경우가 있었습니다. 버켓팅 설정한 데이터를 적재하고 Spark Thrift Server에서는 조회가 잘 되는데, Athena에서는 where 조건이 적용 되지 않는 버그가 있었습니다. 이 때 많은 고민을 했습니다. (STS(Spark Thrift Server) 써야하나? Delta convert 해야하나?)

여러가지 시도하다가 CTAS(CREATE TABLE AS)은 Athena에서 조회가 되는걸 확인했습니다. 그래서 파티션 경로에 임시 테이블을 만들고 지우는 방식으로 적용했습니다. 순서대로 보자면,

  1. 버켓팅이 설정된 Athena 테이블을 생성한다.
  2. CTAS로 1번 테이블 파티션 경로에 테이블을 생성한다. 이 때 버켓팅 설정은 1번 테이블과 같아야한다.
  3. 1번 Athena 테이블의 파티션을 2번에서 생성한 경로로 설정한다.
  4. 임시 테이블을 제거(DROP)한다.

1번은 한번만 실행되고, 파티션이 추가되는 주기마다 2~4 과정이 실행됩니다.
‘INSERT INTO’ 방식은 Spark Dataframe API에서 Bucketing 지원 하지 않습니다. (이 부분도 삽질을 조금 했는데, Spark 코드부터 봤어야했습니다. 😂)

Spark Bucketing은 Hive랑 다르다.

여기서 또 위기가 있었는데요, Hive와 달리 Spark는 MapReduce 방식이 아니라서 Reducer가 없습니다. 그래서 bucketing 할 때 Spark의 Executor가 Bucket마다 한개씩 파일을 만들어서 테이블에 쓰기 때문에 파일이 여러개가 생깁니다. 예를 들어서, bucket 10로 설정하고 Executor 10개로 작업하면 10 * 10 = 100 개의 파일이 생기게 됩니다. 파일이 많이 만들어지면 또 Athena가 Object 많이 불러서 비용 그만큼 비싸질 겁니다.

bucketing-compare-hive-with-spark Hive는 reducer가 있어서 bucket마다 파일이 1개 생성되지만, Spark는 executor가 bucket마다 파일을 1개씩 만듭니다. 이미지출처

그래서 bucketing에 쓰는 key와 bucket 숫자로 미리 repartition 을 수행해서 파일 수를 줄였습니다. 이러면 한 개의 Partition을 읽어서 그대로 쓰면 1개의 파일이 됩니다. (여기서의 파티션은 테이블의 파티션 개념이 아니고 Spark의 작업 단위 입니다. 1 Partition = 1 Task 가 됩니다. ) 가장 작은 작업 단위에 이미 모든 레코드가 들어있으니 이제 Bucket마다 단 한개의 파일이 만들어집니다. 대신 하나의 파티션이 모두 올라갈 만큼의 메모리가 Executor에 할당되어야 합니다. 간략화한 코드로 보자면 이렇습니다.

# Spark Dataframe API
dummy_table_name = "dummy"
spark.sql(f"DROP TABLE IF EXISTS {dummy_table_name}")

(
  df.repartition(F.expr(f"pmod(hash({','.join(bucket_key)}), {bucket_number})"))
  .write
  .mode("overwrite")
  .bucketBy(bucket_number, bucket_key)
  .sortBy(sort_key_1, sort_key_2)
  .option("path", partition_location)  # target table의 파티션 경로로 설정합니다. 
  .saveAsTable(f"dummy_database.{dummy_table_name}")
)

spark.sql(f"DROP TABLE {dummy_table_name}")

S3에 쌓여있는 CloudTrail log로 S3 object가 얼마나 많이 호출되었는지 확인할 수 있습니다. 아쉽게도 CloudTrail console에서는 S3 접근 log를 지원하지 않기 때문에, CloudTrail 로그가 쌓이는 경로에 Athena 테이블을 생성하고, 조회해야합니다.

/* athena에서 object 몇 번 불렀는지 확인하는 query */
SELECT
  eventid, 
  eventTime, 
  eventName, 
  eventSource, 
  sourceIpAddress, 
  userAgent, 
  json_extract_scalar(requestParameters, '$.bucketName') as bucketName, 
  json_extract_scalar(requestParameters, '$.key') as object
FROM
  s3_cloudtrail_events_db.cloudtrail_table
WHERE
  eventName = 'GetObject'
  AND eventsource = 's3.amazonaws.com'
  AND eventTime >= '2023-05-11T13:15:00Z'
  AND eventTime < '2023-05-11T13:20:00Z'

그래서 이런식으로 bucketing이 되어있으면 파일을 파티션당 단 한개만 호출하는 것을 확인했습니다.

그런데 그 한 개를 Athena가 여러 번 호출하기는 합니다. 이벤트 기록을 살펴봤는데, "bytesTransferredOut":16448 같은 값이 서로 다른 것을 보니 같은 object라도 header만 훑고 다시 데이터를 가져온다거나 하는 것으로 파악했습니다.

마무리

이런 과정을 통해 목표였던 엄청 큰 MySQL 테이블의 크기를 줄였습니다. 데이터를 호출하는 비용이 늘어나기는 했지만, 늘어난 호출 비용에 비해서 줄인 MySQL 데이터 저장 비용이 세 배 정도였기 때문에 무리 없이 진행했습니다.

Athena를 통해 데이터 호출하는 비용은 버켓팅과 변경된 파티션 구조를 적용하기 전에 비해서 약 700배 줄였습니다. 버켓팅이 되어 있는 테이블은 버켓팅 특성을 고려해서 쿼리한다면 더 효율적으로 데이터를 처리할 수 있는데요, (bucketed Atable - bucketd Btable, bucketed Atable - plain Btable 랑 파티션 수에 따라서 쿼리 플랜이 달라집니다.) 버켓팅이 적용되는 사례가 처음이기 때문에 다른 테이블에 버켓팅을 적용할 때나, 버켓팅 테이블을 다룰 때 다른 엔지니어나 분석가가 효율적으로 처리할 수 있도록 작업하는 과정이 남아있습니다.

긴 글 읽어주셔서 감사합니다.

보다 빠르게 뱅크샐러드에 도달하는 방법 🚀

지원하기

Featured Posts

post preview
post preview
post preview
post preview
post preview

Related Posts