안녕하세요. 저희는 뱅크샐러드 Data Platform 팀 김민수, 김태일 입니다. 이번 글에서는 뱅크샐러드 데이터 분석환경 컴퓨팅을 EMR, YARN 기반 Spark에서 Self-hosted Kubernetes 기반으로 옮겨가게 된 모험에 대해 소개드리려고 합니다.
본격적인 문제 소개에 앞서 저희 데이터 분석 환경에 대해 먼저 설명드리겠습니다. 뱅크샐러드 데이터 분석 환경은 Amazon S3, Glue, Spark(on EMR) 기반의 data lake 형태로 구축되어 있습니다. 저희가 주로 분석 대상으로 하는 데이터 소스는 프로덕션 데이터베이스 스냅샷과 사용자 이벤트 입니다. 매일/매시간 수백 종의 테이블 스냅샷과 사용자 이벤트가 S3에 적재되며 Glue 메타스토어에 등록됩니다. 또한 분석을 목적으로 정제된 테이블인 data mart 쿼리와 결과들도 lake 테이블들을 참조하여 계산되며, 역시 S3, Glue에 저장됩니다. 이렇게 data lake에 모인 데이터들은 data mart로 배치 분석을 하거나, BI 도구로 애드혹 분석을 하는데 사용되고, 데이터 제품 API로 활용되기도 합니다.
각각 ‘테이블’을 만드는 컴퓨팅은 Spark를 사용하여 구현됩니다. 다시 말해서, S3와 Glue로 적재되는 테이블은 그것을 적재하는 테이블 만의 Spark job이 있거나 Spark DML 쿼리로 기술되어 있어 실행되는 구조 입니다. 따라서 저희는 매일 수백~천 종의 제각기 다른 사이즈의 Spark job을 실행하고 있습니다. 이를 위해 매우 큰 AWS EMR 클러스터를 하나 띄우고 스케줄러가 클러스터로 job을 제출하는 방식을 선택했습니다. EMR에서는 Spark 실행을 위한 구성요소들이 잘 갖춰져 있고 제출되는 작업에 따라 core, task node가 autoscaling 되기에 제각기 다른 사이즈 작업을 던져도 적당히 잘 실행할 수 있었습니다.
하지만 문제가 없는 시스템은 존재하지 않습니다. 약 1년 반 정도 EMR 클러스터를 본격적으로 운영하게 되면서 크고 작은, 그리고 많은 문제들을 만났습니다. 몇 가지만 예를 들자면 이런 것들입니다. 👇
나열한 문제들은 EMR와 YARN 만의 문제가 아닌 것도 있습니다. 일부는 리전 자체의 문제고 또 다른 일부는 더 좋은 설정을 찾아서 해결할 수도 있습니다. 그런데 분석 환경을 제외한 뱅크샐러드 인프라에서는 Hadoop, EMR을 쓰는 경우가 전무하기에 사내 전문가들께 도움을 받기 어려웠고, 무엇보다 데이터 엔지니어들이 복구를 우선으로 하다 보니 노하우가 쌓이는 것이 아니라 A라는 문제에서는 B로 대응한다.
같은 풀이 과정이 없는 답안 식의 지식만 늘어갔습니다.
DevOps분들에게 도움을 요청하기도 해 보았지만 뱅크샐러드의 DevOps분들은 모두 k8s 환경이 더 익숙하신 분들이라 큰 도움이 되지는 못 했습니다.
DE: DevOps님들 이거 어떻게 하면 되나요? 징징 😭
DevOps: 이거 저렇게 하면 안 되나요?
DE: 엇 이거 EMR에서는요?
DevOps: …?
따라서 저희는 비록 managed service 기능을 일부 재구현하게 되더라도 DevOps에서 잘 만들어둔 self-hosted Kubernetes 기반으로 컴퓨팅을 옮기는 방향이 장기적으로 이득일거라 판단하게 되었습니다. 다행히 저희의 모든 컴퓨팅 워크로드는 Spark job 형태였기에 이번 장에서는 Spark job을 Kubernetes에서 띄우고 실행하게된 여정에 집중해서 더 소개드리도록 하겠습니다.
뱅크샐러드의 배치 파이프라인 서비스에서는 파이프라인을 구현하고자 하는 사용자를 위해 Spark job, 내부적으로 설정한 resource level를 정의하기만 하면 Spark를 통해 job이 실행되도록 하는 인터페이스를 제공하고 있습니다. 따라서 이 인터페이스는 그대로 유지하는 한편 내부적인 구동 시스템만 EMR에서 Kubernetes로 변경해야 할 필요가 있었습니다.
일반적으로 YARN을 통해서 client모드로 Spark job을 실행시켜보면 master node에서 driver가 동작하고 worker node에서 executor들이 동작하게 됩니다. 물론 cluster모드라면 driver 또한 worker node어딘가에서 동작할테지요.
Kubernetes에서는 driver가 뜨고 이 driver가 executor pod들을 실행시키게 되는데요. client모드로 Spark job을 실행하면 실행하고자 하는 pod이 driver가 되고 executor pod들이 새로 뜨게 됩니다. cluster 모드로 실행하면 실행한 pod과는 별개로 driver pod이 새로 뜨게되며 새로 뜬 driver pod이 executor pod들을 띄우게 되는 구조입니다.
뱅크샐러드의 Kubernetes 클러스터는 self-hosted Kubernetes 이기 때문에 spot instance를 위한 Auto Scaling Group을 추가하고 aws-cluster-autoscaler를 통해 autoscaling을 관리했습니다.
또한 이렇게 추가된 node에 spark job이 실행된 pod을 할당하기 위해서 job이 제출될 때, node selector와 toleration 설정을 해주게 되면 Spot instance들로 이루어진 ASG에 Spark job을 제출하는 것이 가능해집니다.
Spark driver pod이 executor pod을 생성하는 구조로 인해 서로간 통신을 위해서는 driver pod를 위한 service를 만들어야 driver를 특정할 수 있습니다. 특히나 주로 client 모드로 Spark job을 실행하기 때문에 headless service를 Spark driver가 실행되기 전에 생성하고, [spark.driver.host](http://spark.driver.host)
와 spark.driver.port
설정을 통해 driver를 알려줘야 합니다.
EMR에서는 기본적으로 만들어진 AMI를 이용하였다면, Kubernetes에서 Spark를 실행하기 위해서는 이미지가 필요합니다. Spark에서 Kubenetes는 이미 3.1버전부터 GA가 되어 본격적으로 지원하기 시작했고, Spark repository에 이미지를 만들기 위한 Dockerfile들이 존재합니다. 순서는 Spark에서 기본적으로 제공하는 build script에 Kubernetes옵션을 추가하여 Kubernetes관련 값들이 포함되도록 build 하고, 산출물을 image로 만들고 만들어진 image에 Python을 끼얹어서 한번 더 image로 만드는, 기본적으로 Spark에서 권장하는 순서를 그대로 따라가고 있습니다.
다만 image를 build할 때 필요한 Python library들을 추가하거나, 원하는 Python 버전으로 변경하는 작업들을 추가해주고 있습니다. 물론 여기에 필요한 package들을 설치해주는 일은 기본적으로 따라오는 일이겠네요. 이 외에도 뱅크샐러드의 모든 테이블 메타데이터는 Glue catalog를 기반으로 하기 때문에 Glue catalog를 참조하도록 해야 했는데요. 이 내용은 아래에서 추가적으로 설명해 보도록 하겠습니다.
KubernetesPodOperator
라는 operator를 지원합니다. 딱 이름만 들어도 Spark driver로 만들기에 적절하다는 느낌이 오지 않으신가요?KubernetesPodOperator
를 사용하여 Spark 이미지의 spark submit ...
을 실행하면 Airflow에서 Spark job을 손쉽게 돌릴 수 있습니다.앞서 설명드린대로 뱅크샐러드 분석환경의 모든 테이블들은 Glue를 통해 관리됩니다. EMR에서는 어찌보면 당연하게 확인할 수 있었던 테이블 메타데이터들을 Spark on K8s에서는 어떻게 접근할 수 있을까요?
저희가 찾아낸 답은 awslabs에서 제공하는 aws-glue-data-catalog-client-for-apache-hive-metastore
repo에 있었습니다. 이 repo를 build한 산출물 jar들을 Spark image에 포함시키면 Spark job에서 Glue 테이블들을 볼 수 있었습니다.
다만, Spark 3.1 이후의 기능들과 충돌이 있기 때문에 3.1 미만으로 버전을 낮춰야 했습니다.
앞의 내용과 비슷하다면 비슷한 내용인데요. EMR에서는 s3://
protocol을 통해 S3 object들에 접근이 가능하지만 EMR외부에서는 그렇지 않습니다. 그렇지만 Glue에는 이미 모든 테이블들의 데이터 위치가 s3://
로 정의되어 있기 때문에 INSERT OVERWRITE TABLE ... PARTITION ...
등의 쿼리를 실행하게 되면 문제가 발생하게 됩니다.
이 문제는 앞서 언급한 aws-glue-data-catalog-client-for-apache-hive-metastore
repo를 fork 해서 s3a://
로 변환되도록 만들어 놓으신 코드를 반영하여 문제를 해결했습니다.
이 자리를 빌어 해당 repo를 작성해주신 분에게 감사의 말씀을 올립니다. 🙇
EMR에서는 EBS를 기반으로 하는 EMRFS를 사용하여 Spark로 처리된 결과물들이 빠르게 S3로 옮겨 갈 수 있었습니다. 하지만 Spark on k8s에서는 S3를 사용하기 때문에, 그리고 S3는 파일 시스템이 아니기 때문에 file commit 로직에 따라 object copy에 시간이 매우 오래 걸릴 수 있습니다. 그렇기에 Spark는 “magic” 이라는 S3A committer를 제공하고 있습니다. 이 committer를 사용하면 object 복사 없이 바로 S3에 파일을 업로드하게 됩니다.
한편 뱅크샐러드의 모든 mart 테이블들이 INSERT OVERWRITE TABLE ... PARTITION ...
쿼리로 만들어지는 로직으로 구현이 되어 있었는데, 매우매우매우매우 느려서 magic committer도 느리다고 생각하고 있었던 와중, 팀원분께서 Spark SQL에는 magic committer가 적용되지 않는 것을 발견하였습니다! 문제의 해결을 위해 Spark job을 실행할 때 committer옵션을 정확하게 넣어줘야 했습니다.
Magic committer가 제대로 설정되면 Spark job이 끝난 후에 생성되는 _SUCCESS
파일의 내용이 바뀌는데, 제대로 적용되는지를 알기 위해서 옵션 수정 -> Spark job 실행 -> 결과 확인 의 무한 루프를 돌아야 했습니다. 물론 중간에 발생하는 로그도 엄청 꼼꼼하게 샅샅이 살펴보았던 것은 안 비밀입니다. 😭
최종적으로는, 아래의 옵션들이 추가가 되어야 했습니다.
spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled=true
spark.hadoop.fs.s3a.committer.name=magic
spark.hadoop.fs.s3a.committer.magic.enabled=true
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=10
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.sql.sources.commitProtocolClass=org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class=org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
또 다른 케이스로는 Spark job의 결과로 작은 파일들이 많이 생성되는 경우 위 file commit 로직에 따라 시간이 매우 오래걸리게 되는데 이를 방지하기 위해 강제 shuffle hint를 넣기도 했습니다. 이렇게 되면 파일 갯수로 맞춰주기 때문에 용량이 좀 더 크고 적은 갯수의 파일들이 생성됩니다.
솔직히 이 때만큼은 EMR로 돌아갈까 하는 생각을 잠깐 했었고, EMR 비용의 대부분은 EMRFS의 편리함이 아닐까 느낄 정도로 EMR이 그리워졌었습니다.
막상 글로 옮겨 적어보니 꽤 짧고 별 것 없이 느껴지지만 사실은 셀 수 없는 오류들과 휴-먼 에러들을 만났고, 하나하나 손으로 해결해야 하는 과정들이었습니다.
하지만 덕분에 이렇게 Spark on Kubernetes로 옮기고 나니 잘 모르는 YARN과 씨름하던 autoscaling에 비해 DevOps 선생님들의 도움을 받은 autoscaling은 그 성능의 궤를 달리 했습니다. 전보다 훨씬 민감하게 늘어나고 줄어드는 EC2 instance들을 눈으로 확인할 수 있었습니다.
당연하게도 EMR을 사용하던 때와 비교하여 피크 시간 기준 약 45%의 비용 절감 효과를 확인할 수 있었습니다.
이걸로 끝!…이면 좋겠지만 아직 넘어야 할 산이 너무나 많습니다.
Spark on Kubernetes로 Spark job의 실행 환경을 옮긴 것은 많이 쳐 줘야 4~50% 정도의 공정률로 볼 수 있겠습니다. 그 만큼 앞으로 해야 할 일들이 많이 남았는데요. 대충 나열만 해보자면, 먼저 아직 우리는 Spark가 실제로 pod안에서 어떻게 리소스를 사용하는지 등의 metric들을 자세히, 통계적으로 관찰 할 수 없습니다. 또, Spark log들을 관리 가능한 외부 저장소에 지속적으로 남기지도 않아 observability 측면에서의 작업들도 아직 많이 남아 있습니다. Observability가 어느정도 챙겨지면 Spark job을 좀 더 잘 돌릴 수 있도록 resource optimization이나, 항상 Spark job을 돌릴 수 있도록 availability 측면의 개선도 이루어져야 할 것입니다. 그리고 지금 사용중인 Spark의 버전은 3.0.3이기에 Spark on Kubernets가 정식으로 GA되기 전의 버전을 사용하고 있어 개인적으로는 조금 불안한 느낌이 드는 것도 사실입니다. 버전 업그레이드를 위해서는 Glue와 Hive의 dependency들을 함께 해결해야 할 필요도 있겠습니다.
휴…큰 산을 넘어야 했고 넘어왔다고 생각했지만 아직도 많은 일들이 남아 있었네요. 하지만 괜찮습니다. 뱅크샐러드의(비단 뱅크샐러드 뿐은 아니겠지만) 데이터 엔지니어들은 새로운 문제를 해결 할 생각에 벌써 몸이 근질거리거든요.
지금 까지 이 글을 읽어주셔서 감사합니다. 만약 지금 이 글을 보시는 분도 몸이 근질거리시다면(?) 뱅크샐러드에 합류하여 함께 문제를 풀어보는 것은 어떨까요?
보다 빠르게 뱅크샐러드에 도달하는 방법 🚀
지원하기