10TB超えのBigQuery巨大データを高速にS3に同期する

ogp

こんにちは。SRE部MA基盤チームの川津です。

私たちのチームでは今年サービスを終了した「IQON」の10TBを超える大規模データをBigQueryからS3へ移行しました。本記事ではデータ移行を行った際に検討したこと、実際にどのようにデータ移行を行ったかを紹介します。

データ移行の経緯

IQONは2020年4月6日をもってサービスを終了しました。そのIQONではデータ分析にBigQueryを利用していましたが、Amazon Web Services(AWS)上にもIQONに関するリソースが存在します。そのため、IQONはGCPとAWSの2つのクラウドで運用していました。

しかし、サービス終了に伴いGCP・AWSどちらかにリソースを統一する必要が出てきました。統一する意図としては、終了したサービスが利用する取引先を減らし、請求対応などの事務的なコストを減らしたい意図がありました。そのためGCPとAWSの両方にあったデータをどちらか片方に寄せて、もう片方を解約することにしました。

解約するためにGCP・AWSどちらがIQONのリソースを多く利用しているのか確認を行いました。その結果、AWSではクローズ告知ページ用Webサーバーやドメインの管理を行っており、GCP側はBigQueryのみリソースを使用していました。移行の手間を考えた結果、AWSではなくGCPのリソースを消すことにしました。

以上の経緯からBigQueryのデータをAWS上に移行を行うことに決めました。

AWSにデータを移行するにあたり以下の要件を満たす必要がありました。

  • データにアクセスする頻度は年に数回程度なので、維持費を抑えたい

  • データ量が多く、ローカルにダウンロードしてクライアントPCで検索すると時間がかかるため、BigQueryのようにSQLを使いクラウド上でデータ検索をしたい

以上の要件よりデータを保存するリソースとしてS3を選択しました。S3にはS3の料金表にある通り、低頻度アクセス向けの料金プランがあり、データの取り出しに料金がかかる代わりにストレージの料金を抑えるプランがあります。

またAmazon Athenaを利用することでS3に保存したデータに対してSQLを利用して検索できます。Amazon AthenaはAmazon Athenaの概要にある通り、CSV、JSON、ORC、Parquetなどのデータフォーマットに対応しています。加えて圧縮されたファイル形式に対してもSQLが実行でき、データを確認できます。現在はドキュメントに記載のあるSnappy、zlib、LZO、gzip、bzip2形式がサポートされています。

事前準備

アーキテクチャの選定

まずBigQueryからS3へ移行するにあたりどのような方法があるか調べることにしました。

アーキテクチャを選定する際、下記の項目を考慮しました。

  • 費用をなるべく抑える
  • 導入コストをなるべく小さくする
  • データの欠損が起こらないようにする

以上を踏まえて検討をした結果、以下のようなアーキテクチャを採用しました。

architechture

下記の手順で作業を行います。

  1. bq extractコマンドを用いてGCSへテーブルデータを転送する
  2. gsutil rsyncコマンドを利用しGCSからS3へ転送する
  3. GCSからS3へ移行できたか確認する

ポイントはgsutil rsyncコマンドを使いS3へデータを移行する点です。

gsutilコマンドはGoogle Cloud Storage(GCS)へアクセスできるコマンドラインツールで、Google Cloud PlatformがOSSとして公開されています。そして、gsutil rsyncコマンドはgsutilコマンドで提供されているコマンドの1つです。

gsutil rsyncコマンドはバケット間の同期ができる機能であり、この機能はGCP間のバケットの同期だけでなく、S3とGCS間のバケットの同期もサポートしています。また、オプションに-mをつけることにより並列でデータを同期することもできます。

gsutil rsyncコマンドを使う際には気をつける点があります。gsutil rsyncの注意点にも記載があるので引用して紹介します。

Note 2: If you are synchronizing a large amount of data between clouds you might consider setting up a Google Compute Engine account and running gsutil there. Since cross-provider gsutil data transfers flow through the machine where gsutil is running, doing this can make your transfer run significantly faster than running gsutil on your local workstation.

この注釈の文脈から、gustil rsyncコマンドを利用する際、クラウド間の転送を行うと一度コマンドを実行した環境内にデータ転送される仕組みであることがわかります。そのため、転送するデータのサイズが大きい場合には、手元のPC環境から転送する際に注意する必要があると言えます。

IQONのデータサイズをBigQueryのinformation schemaから取得したところ、およそ10TBのデータ量がありました。データ量的に手元のPCで転送を行うのは難しいのでAWS環境のEC2インスタンスを利用し転送を行うことにしました。GCPではなくAWS環境を利用した理由は、会社としてAWSを活用しており、契約面においてEC2の利用料金を抑えることができるからです。

AWSのアーキテクチャの選定

次に、gsutil rsync実行のためのEC2インスタンスを用意するためのインフラ構成について考えます。

まずGCS→EC2、EC2→S3の2つの経路に分けてどのような構成にするか考えました。

前半の経路:GCS→EC2

GCS→EC2の経路に関して考えられる経路としては下記の経路が考えられます。

  1. GCS → Internet → Internet Gateway → EC2(public subnet)
  2. GCS → Internet → Internet Gateway → NAT Gateway → EC2(private subnet)

1つ目の経路はEC2をpublic subnetに配置する構成です。

price_ec2_1

この構成にかかる費用はGCPネットワーク料金を元に算出しています。

EC2をpublic subnetに置く場合、グローバルIPを直接割り当てることができるのでインターネットと通信を行うことができます。しかし不特定多数のサーバーから通信を行うことが可能になるため、EC2のセキュリティグループの設定に気をつける必要があります。

2つ目の経路はEC2をprivate subnetに配置する構成です。

price_ec2_2

こちらの構成では、GCPネットワーク料金NAT Gateway料金を参考にして費用を参考にしています。

private subnetにEC2を配置することで外部のトラフィックを遮断できます。しかし、EC2単体だとGCSへ通信を行えないのでNAT Gatewayを配置してGCSへ通信を行うことができるようにしています。NAT Gatewayを配置することで外向きの通信を行うことができます。そしてprivate subnetにEC2を配置しているので内向きの通信を遮断できます。よって外部からの通信を必要とする攻撃を防ぐことができます。

ここで、10TBのデータを転送すると仮定してGCS→EC2の経路でかかる費用を算出してみます。

1つ目の経路を図と照らし合わせるとGCS → Internet Gateway間で料金が発生します。GCS → Internet Gateway間の料金はGCPネットワーク料金表より0.11(USD/GB)なので1つ目の経路は約1100USDかかることがわかります。

2つ目の経路を図と照らし合わせるとGCS → Internet Gateway、NAT Gatewayで料金が発生します。NAT Gatewayの通信はNAT Gatewayの料金表から0.062(USD/GB)と確認できるので約600USDかかることになります。1つ目の経路で計算したGCS → Internet Gateway間の料金と合計すると約1700USDです。

後半の経路:EC2→S3

次にEC2→S3の経路です。こちらは下記の経路が考えられます。

  1. EC2(public subnet) → Internet Gateway → S3
  2. EC2(public subnet or private subnet) → VPC Endpoint → S3
  3. EC2(private subnet) → NAT Gateway → Internet Gateway → S3

1つ目の経路としてはInternet Gatewayを通る経路です。

price_s3_1

この構成にかかる費用はAWSネットワーク料金を元に算出しています。

2つ目の経路はVPC Endpointを経由する経路です。こちらはEC2をpublic subnetかprivate subnetに置く2つの方法が存在します。通信経路の部分はこの2つに相違点がありますが、後述する料金に関しては同じなのでまとめて考えます。

price_s3_2

この構成にかかる費用はVPC Endpoint料金を元に算出しています。

VPC Endpointを利用することで料金を抑えながらEC2 → S3の経路を内部の通信で完結できます。VPC EndpointはInterface EndpointとGateway Endpointの2種類存在します。今回はAmazon S3で利用可能なGateway Endpointを利用しています。

料金に関してはGateway Endpointの料金説明に記載のある通り追加料金なしで利用できます。しかし、用途次第ではVPC Endpointの料金とは別に、通常のAWSデータ転送料金が発生します。今回の用途の場合、AWSのデータ転送料金に説明があるので引用します。

Data transferred between Amazon S3, Amazon Glacier, Amazon DynamoDB, Amazon SES, Amazon SQS, Amazon Kinesis, Amazon ECR, Amazon SNS or Amazon SimpleDB and Amazon EC2 instances in the same AWS Region is free.

つまり同一リージョンにあるEC2とS3間の通信は無料です。今回はEC2のリージョンとS3のリージョンを同じにしているので0USDで利用できます。

3つ目の通信はNAT Gatewayを経由し、Internet Gatewayを経てS3へ通信する経路です。

price_s3_3

この構成にかかる費用はNAT Gateway料金AWSネットワーク料金を元に算出しています。

3つ目の経路に使っているNAT GatewayはGCS→EC2に用いたNAT Gatewayと同じ用途で使っています。

EC2→S3の部分でも10TBを転送すると仮定してEC2→S3の経路までの費用を算出してみます。

1つ目の費用を図から辿るとEC2 → Internet Gatewayの間で料金が発生します。そのためAWSネットワーク料金によると0.114(USD/GB)発生することになります。合計すると約1200USD発生することがわかります。

2つ目はVPC Endpointを経由してS3と通信します。Gateway Endpointの料金説明からEC2がpublic subnet、private subnetに配置しても0USDであることがわかります。

3つ目の費用はNAT Gateway、NAT Gateway → Internet Gatewayで発生していることがわかります。NAT Gatewayの料金表からNAT Gatewayを通る際0.062(USD/GB)発生し、AWSネットワーク料金も別途発生するのでAWSネットワーク料金表から0.114(USD/GB)かかることがわかります。合計すると約1700USDです。

経路全体:GCS→S3

単純に考えるとGCS→EC2とEC2→S3の経路の組み合わせで6通り考えられますがpublic subnet or private subnetの条件で以下の4つの経路の組み合わせに絞られます。

  1. GCS → Internet → Internet Gateway → NAT Gateway → EC2(private subnet) → VPC Endpoint → S3
  2. GCS → Internet → Internet Gateway → NAT Gateway → EC2(private subnet) → NAT Gateway → Internet Gateway → S3
  3. GCS → Internet → Internet Gateway → EC2(public subnet) → VPC Endpoint → S3
  4. GCS → Internet → Internet Gateway → EC2(public subnet) → Internet Gateway → S3

各経路で発生する費用をGCS→EC2、EC2→S3の2つの経路に分けて計算した費用を組み合わせると下記の通りです。

  1. 1700USD
  2. 3400USD
  3. 1100USD
  4. 2300USD

最後にNAT GatewayとVPC Endpointの有無での組み合わせを表にまとめます。

VPC Endpointあり VPC Endpointなし
Private Subnet, NAT Gatewayあり 料金:1700USD
セキュリティ:内向きの通信を遮断できる
通信経路: 1つ目
料金:3400USD
セキュリティ:内向きの通信を遮断できる
通信経路: 2つ目
Public Subnet, NAT Gatewayなし 料金:1100USD
セキュリティ:アクセス元をより厳格に管理する必要がある
通信経路: 3つ目
料金:2300USD
セキュリティ:アクセス元をより厳格に管理する必要がある
通信経路: 4つ目

上記の表より、1番コストが低いのは左下の項目です。懸念点はセキュリティなのですが、今回用意したEC2インスタンスはgsutil rsyncを実行するだけで、内向きの通信はオペレーション用のSSHしかありません。public subnetに置く際、アクセス元を限定したSSHだけを許可して露出を最低限にしました。

これらの考察から、コストが一番安く、セキュリティも設定をしっかりすれば担保できる3つ目の経路の構成を採用することにしました。最終的な構成は下記の図の通りです。

architecture

移行手順

今回移行するBigQueryのデータはテーブルの数とテーブルサイズが大きいので、スレッドプールを作ってJOBを効率的に処理するためRubyを用いて自動化しています。

データ転送:BigQuery → GCS

まずgsutil rsyncを扱うにはBigQueryに存在するデータをGCSに移行する必要があります。GoogleはRubyに対してBigQueryのSDKを提供しており、extract_jobメソッドを使うことによって対象のテーブルをGCSに転送できます。extract_jobを使う際にポイントが2つあります。

1つ目のポイントとして、extract_jobメソッドは転送するデータの圧縮形式が指定できる点です。圧縮形式はCSVであれはgzip形式がサポートされています。今回のIQONのデータは10TB以上あることがわかっています。そのためファイルを圧縮して転送できれば先程計算したデータ転送の料金を抑えることができます。また最終的にAmazon Athenaを利用する際もgzip形式でクエリを実行することが可能です。しかしgzipでどれだけ料金が抑えられるかわからないので、いくつかのファイルをgzipで圧縮し確認しました。適当にCSVのファイルを5ファイルほど用意しgzipで圧縮しました。下記の表が圧縮結果です。

圧縮前(Byte) 圧縮後(Byte) 圧縮率(%)
81920 7168 92
11264 2048 82
1266989 49177 62
23552 6843 71
57344 11787 80

gzipに圧縮するとおよそ70〜80%ほど圧縮できました。そのため、10TBのデータも7〜8割ほど圧縮できると予想できます。結論として、データ移行の料金は転送したデータの量に比例するので7〜8割ほどgzipで料金コストを削減できることがわかりました。他にgzipで圧縮した際に起こるデメリットはAmazon Athenaでクエリを実行する際、gzipを解凍する必要があるので速度低下が考えられます。しかしgzipにすることでクエリを実行する際のデータ量を削減できるのでAmazon Athenaの利用料金を抑えることができます。クエリを実行する頻度として年1〜2回程度実行する程度なのでS3の利用料金を抑える利点やAmazon Athenaの利用料金を抑える点を考慮するとgzipで圧縮するメリットが大きいのでgzipで転送しました。

2つ目のポイントはextract_jobを用いてファイルを転送する際、転送するファイルを分割する必要がある点です。分割する必要があるファイルの条件はサイズが1GB以上あるファイルです。そのため転送する元データのサイズが1GB以上の場合は別名を付けてファイルを分割する必要があります。公式ドキュメントによるとワイルドカードで指定ができます。今回は下記のようなURIで分割を行いました。

定義するURI
gs://hoge/file-*.csv

出力されるファイル名
gs://hoge/file-000000000000.csv
gs://hoge/file-000000000001.csv
gs://hoge/file-000000000002.csv
.
.
.

実際に移行で利用したコードを以下に示します。これをEC2上でバックグラウンド実行しました。

require "google/cloud/bigquery"
require "google/cloud/storage"
require "logger"
require "parallel"
def import project_id = ""
  bigquery = Google::Cloud::Bigquery.new(project: project_id)
  storage = Google::Cloud::Storage.new(project: project_id)
  bucket_name = ""
  bucket = storage.bucket("")
  bq_table_name = []
  bigquery.datasets.all.each do |dataset|
    # 100並列で転送を行う
    Parallel.map(dataset.tables.all, in_threads: 100) do |table|
      if (table.bytes_count / (1024.0 * 1024.0 * 1024.0)) < 1
        import_gcs(table, bucket_name, dataset.dataset_id, "-*.csv.gz", "CSV")
      else
        import_gcs(table, bucket_name, dataset.dataset_id, "-*.csv.gz", "CSV")
      end
    end
  end
end

def import_gcs(table, bucket_name, dataset_name, extend, extension)
  log = Logger.new("log.txt")
  uri = "gs://#{bucket_name}/#{dataset_name}/#{table.table_id}/#{table.table_id}#{extend}"
  extract_job = table.extract_job uri, compression: "GZIP", format: extension do |config|
    config.location = "US"
  end
  extract_job.wait_until_done!
  if extract_job.failed?
    log.debug("#{table.table_id} failed") 
    log.debug("#{extract_job.error}")
  end
  return extract_job.failed?
end

import()

実装のポイントはParallelを用いて並列で転送を行っている点です。最初は並列に行わず直列で処理を行っていたのですが1日経っても終わりませんでした。CloudWatchでEC2のメトリクスを確認するとネットワークの帯域やCPU使用率は余裕がありそうでした。BigQuery → GCSの転送自体はGCP側で行っているので100並列で様子を見ながら転送を行いました。その結果、半日かからず終了させることができました。

また、念のためRubyのloggerで簡単なログを取っています。extract_jobの戻り値のfailed?でJobの成功、失敗を確認できます。最初はログを取っておらず、途中でプログラムが落ちた際どのテーブルで失敗したのかがわからず原因を突き止めるのに苦労しました。結論としては、特にJobが失敗したログは発生しませんでした。

最終的にgzipで転送した結果、元のデータサイズと比較すると7〜8割ほどデータを圧縮できました。さらに、料金に関しても7〜8割コストを削減できました。

データ転送:GCS → S3

GCS→S3に関してはgsutil rsyncコマンドを使い転送を行いました。

S3のディレクトリは下記の構成にしました。

├── BigQueryのdataset名
│   ├── BigQueryのtable名
│       ├── (BigQueryのtable名).csv.gz

gsutil rsyncで転送する際は、ルートprefixからgsutil rsyncを行うとエラーが出た際に始点が最初からになってしまうので、今回はdatasetのprefix毎に分けて転送します。

転送には時間がかかるので、ログを残す点やバックグラウンドで動かす点などに注意し、下記のコードを実装しました。

require "google/cloud/bigquery"
require "google/cloud/storage"
require "parallel"
require "logger"
def gcs_to_s3 project_id = ""
  storage = Google::Cloud::Storage.new(project: project_id)
  log = Logger.new("log.txt")
  bucket = storage.bucket("")
  s3_bucket_name = ""
  gcs_bucket_name = ""
  directory_name = bucket.files(delimiter: "/")
  directory_name.prefixes.each do |directory|
    log.debug("start #{directory}")
    success = system("gsutil -m rsync -r gs://#{gcs_bucket_name}/#{directory.gsub("/", "")} s3://#{s3_bucket_name}/#{directory.gsub("/", "")}")
    if not success
      log.debug("failed #{directory}")
      next
    end
    log.debug("success #{directory}")
  end
end

gcs_to_s3()

上記のプログラムで想定通りにgsutil rsync側で転送が行われているか確認を行いました。CloudWatchでEC2のメトリクスを確認するとCPU使用率が飽和している状態でした。CPU使用率が飽和している場合の対策としてEC2インスタンスのインスタンスタイプを上げたり、EC2インスタンスを複数作成して処理を分散する対策が考えられます。しかし、S3とGCPバケットの総データ量を都度確認しファイルの転送速度を確認すると対策をするほど遅くなかったのでこのままの状態で転送を行いました。

ファイルの確認作業

最後の作業として、GCSからS3にデータを転送する際、欠損が起きていないか確認を行います。

確認する項目は以下の通りです。

  • ファイルの存在確認

  • GCSとS3のチェックサム検証

  • GCSとS3のサイズ比較

上記の各項目の確認方法について説明します。

ファイルの存在確認

GCSとS3にあるファイルの存在確認をするためにはコンソール上で確認する方法があります。しかし、ファイル数が数千ファイル存在するのでファイルを1つずつ確認するためには時間と労力が必要です。

そのため、GCS、S3に対象のprefixが存在するか比較し存在の有無を確認します。GCSに存在するファイルはすでにBigQueryから全て転送できていることが確認できているのでGCSのprefixを起点としてS3のprefixを確認します。objectメソッドの戻り値のexits?メソッドで対象のファイルが存在するか確認できます。

GCSとS3のチェックサム検証

チェックサムを確認することによってGCSから送られてきたファイルはGCSと同一のファイルであるか確認できます。hash値の確認に関しては手元に対象のファイルをダウンロードして確認する方法でも可能ですが、こちらも時間と労力が必要です。hash値はGCS、AWSのSDKを使用して確認可能なので各環境のSDKを使用し確認します。

S3に関してはAws::S3::Objectクラスのetagメソッドで確認できます。

GCPではGoogle::Cloud::Storage::Fileクラスのmd5メソッドで確認できます。こちらはbase64でエンコードされた値が返ってくるのでmd5で比較するために一度デコードして比較します。デコードした値はbinaryなのでunpackを行う必要があります。

GCSとS3のサイズ比較

GCSとS3のサイズ比較はgsutil duコマンドを用いることで確認できます。下記のようなコマンドを入力するとbyte表記でバケットの合計サイズを確認できます。

# GCSのバケットの容量の確認する場合
$ gsutil du -s gs://bucket_name
123456
# S3のバケットの容量の確認する場合
$ gsutil du -s s3://bucket_name
123456

GCSとS3のサイズ比較に関してはバケット単位での比較なのでコマンドを複数回実行すれば確認できます。しかし、ファイルの存在確認とチェックサムの検証はファイル単位なので下記のスクリプトを利用して確認します。

require "google/cloud/storage"
require "aws-sdk"
require "parallel"
require "logger"
require "google/cloud/bigquery"
require "digest/md5"
require "base64"

def check_file project_id = "iqon-data-mining"
  storage = Google::Cloud::Storage.new(project: project_id)
  resource = Aws::S3::Resource.new(region: "ap-northeast-1")
  log = Logger.new("log.txt")
  s3 = resource.bucket("iqon-backup")
  bucket = storage.bucket("export-s3-failed")
  files = bucket.files()
  Parallel.map(files.all, in_threads: 100) do |obj|
    dataset_name = obj.name.sub(/\/.+/, "")
    file_name = obj.name.sub(/.+\//, "")
    directory_name = file_name.sub(/-\d+.csv.+/, "").sub(/.csv.+/, "")

    s3_directory = "#{dataset_name}/#{directory_name}/#{file_name}"

    s3object = s3.object(obj.name)

    if (s3object.exists?)
      # gcsのetagとs3のetagを比較
      if (s3object.etag.gsub("\"", "") == Base64.decode64(obj.md5).unpack("H*")[0])
        puts("checked")
      else
        log.debug("file etag validation failed at gcs: #{obj.name}/s3: #{s3_directory}")
      end
    else
      log.debug("file not found gcs: #{obj.name}/s3: #{s3_directory}")
    end
  end
end

check_file()

今回、上記のプログラムで確認作業を行った際、GCSとS3のファイルのhash値が合わない問題に遭遇しました。原因としては、GCS → S3へ転送済みのテーブルに対してBigQuery → GCSへファイル転送を再び行いGCSのファイルを上書きしてしまったことでした。BigQuery → GCSへ転送する場合、CSVの行の順序が保証されていません。そのためBigQuery → GCSへ転送するたびにhash値が変わってしまいます。結局S3に保存されているhash値が一致しないファイルを削除し、削除したファイルをGCSから再転送を行いました。

最後にAmazon Athenaを使ってS3に転送完了したファイルに対してクエリを実行してみました。結果としてgzipで圧縮されたファイルでも問題なく中身を確認できました。

まとめ

BigQueryからS3に移行するまでの手順を紹介しました。S3へデータ移行が完了したのでGCP側のリソースを削除できました。今回のデータ移行は転送するデータ量がかなり多く、転送完了するまで数日かかりました。また移行するデータ量が多い場合は転送時に発生する料金も多く発生し、選定するアーキテクチャによって料金も大きく変わることがわかりました。そのため、たった1回のデータ転送でもデータを移行する前に移行でかかる時間と料金を見積もることが重要になると感じました。

移行が完了した後、継続的にS3の料金が発生します。現在S3の料金プランはS3標準プランに設定していますが徐々にプランを変更し最終的にS3 Glacier Deep Archiveプランへ移行する予定です。気をつけるべき点としてAmazon AthenaがS3にアクセスできるプランはストレージタイプがスタンダードかスタンダードIAであることがあげられます。S3 Glacierプランまで変更するとAmazon Athenaでアクセスするにはファイルをrestoreする必要があります。そのため今後Amazon Athenaでクエリを打つ必要がなくなったタイミングでS3 Glacier Deep Archiveプランへ移行します。

aws.amazon.com

料金を比較するとBigQueryの場合長期保存プランは0.010(USD/GB)に対してS3のS3 Glacier Deep Archiveプランは0.002(USD/GB)です。そのためS3 Glacier Deep Archiveプランに移行すると月およそ82USD削減できます。

MA基盤チームではデータ転送に関わる業務が多く、他のチームと連携しながら仕事をすることがあります。今回のタスクをこなすことで他部署と関わりながらデータの転送方法について知ることができました。

最後に

ZOZOテクノロジーズではより良いサービスを提供するための基盤作りを開発したい仲間を募集中です。以下のリンクからご応募ください。

tech.zozo.com

カテゴリー