LINEヤフー Tech Blog

LINEヤフー株式会社のサービスを支える、技術・開発文化を発信しています。

データレイクのデータスキャン量を25%削減する方法

こんにちは。ヤフー広告でデータエンジニアをしている長峯です。

LINEヤフー株式会社では、Yahoo! JAPANと関連企業が所有するデータを活用することでお客様のマーケティング課題の発見と解決を実現するサービスYahoo! JAPAN データマーケティングソリューションを展開しています。私は本サービスを通じてお客様がさまざまなデータを活用するためのデータ分析基盤となるデータレイクの構築・運用を担当しています。このデータレイクは、2.5ペタバイトのデータを保有しており、年間1,000万クエリが実行される大規模な環境となっています。

今回、私たちはAWS上に構築されたデータレイクのデータスキャン量を4人月という工数で25%削減することに成功しました。この記事では、その削減プロセスを紹介いたします。このデータレイクは、クエリエンジンとしてAmazon Athenaを主に使用しており、Amazon S3へのデータスキャン量の削減を目指しました。そのため本記事ではAmazon Athenaを使用した手順に焦点を当てていますが、提案する方法は他のクエリエンジンや商用製品にも応用可能です。データ分析環境の管理を担当されている方や、データ分析を行っている方々にとって、少しでも参考になれば幸いです。

課題:多種多様なクエリが実行されるデータレイク

データレイクは、データアナリストやデータサイエンティストが日々膨大なデータにアクセスし、データ分析や機械学習モデル開発を行う環境です。しかし、データレイク利用者のデータエンジニアリングのスキルには大きなばらつきがあり、常に計算リソースが最適化されているわけではありません。計算リソースの過剰な使用は、データレイクの維持費用の増加につながる可能性があります。そのため、計算リソースの使用量を削減し、最適な状態を維持することは、コスト効率の良いデータレイク運用において非常に重要です。

しかし、データレイクでは多種多様な分析が日常的に行われており、全てのプロセスを最適化することは現実的ではありません。そのため、計算リソースの消費が特に大きい処理群を特定し、そこに焦点を当てて効率的に最適化を進めることが重要です。次の章では、私たちが実際に採用した「計算リソースを大量に消費する処理群の特定方法」と「計算リソースを最適化する方法」についてご紹介します。

手順1:計算リソースを大量に消費している処理郡の特定

特定のためには、まず実行されたクエリごとの計算リソース消費量を記録しているログデータを分析する必要があります。ただし、単にログデータを用いて計算リソースを多く消費するクエリを見つけ出すだけでは、場合によっては不十分です。例えば、以下に示すパターンBのように、個々の処理は小規模でも、複数回実行されることで全体としての計算リソース消費が大きくなるケースがあります。このような状況を見逃さずに特定することも、最適化の過程で重要です。

パターン1クエリ毎のデータスキャン量1日毎のクエリ実行数1日毎の合計データスキャン量
A100 GB1 回100 GB
B1 GB1000 回1000 GB

以上の点を踏まえると、類似処理をグルーピングし、グループごとの総計算リソース消費量を比較することで、改善すべき処理群をより効率的に特定できます。私たちは、類似処理のグルーピングに際して、以下の3つの要素を用いました。

  • 実行されたクエリのユーザ
  • クエリで参照されたテーブルのリスト
  • 各クエリによる計算リソースの消費量

これらの情報を用いたグルーピングにより、クエリ文字列が若干異なるものであっても、同一の処理として比較を行うことが可能になります。実際に私たちが行った手順は以下の通りです。

  1. クエリ実行ログの収集
  2. 収集したクエリ実行ログから参照テーブルリストを抽出
  3. 分析可能にするためのログテーブルの作成
  4. グルーピングと最適化を検討すべき処理群の特定

「クエリ実行ログの収集」から「分析可能にするためのログテーブルの作成」を実現するPythonのサンプルコードを以下に示します。

#!/usr/bin/env python

from datetime import date, timedelta
from sqlfluff.core import Linter
import awswrangler as wr
import pandas as pd

yesterday = date.today() - timedelta(1)
QUERY = """
SELECT date '{date}' AS Date,
    CAST(JSON_EXTRACT(JSON_PARSE(responseelements), '$.queryExecutionId') AS varchar) AS QueryExecutionId,
    MIN(COALESCE(useridentity.username, useridentity.sessioncontext.sessionissuer.username)) AS Username
FROM aws_cloudtrail_logs
WHERE date = DATE_FORMAT(timestamp '{date}', '%Y/%m/%d')
  AND region = '{region}'
  AND eventsource = 'athena.amazonaws.com'
  AND eventname = 'StartQueryExecution'
  AND JSON_EXTRACT(JSON_PARSE(responseelements), '$.queryExecutionId') IS NOT NULL
GROUP BY 1, 2
"""

# Amazon Athenaを用いてAWS CloudTrail LogsからQueryExecutionIdと実行ユーザを取得。
region = '<your region>'
database = '<your database name>'
query_execution_id = wr.athena.start_query_execution(
    sql=QUERY.format(
        date=yesterday.strftime("%Y-%m-%d"), region=region
    ),
    database=database
)
wr.athena.wait_query(query_execution_id=query_execution_id)
cloudtrail_log_df = wr.athena.get_query_results(query_execution_id=query_execution_id)

# Amazon AthenaのGetQueryExecution APIを用いて、クエリや実行メトリクスを取得。
# 参考: https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryExecution.html
query_execution_ids = cloudtrail_log_df.QueryExecutionId.tolist()
athena_log_df = wr.athena.get_query_executions(query_execution_ids=query_execution_ids)

# SQLFluffを用いて各クエリの参照先テーブル一覧を抽出。
# 参考: https://sqlfluff.com
linter = Linter(dialect='athena')
athena_log_df['ReferencedTables'] = athena_log_df.apply(
    lambda x: sorted(linter.parse_string(x.Query).tree.get_table_references()), axis=1
)

# これらのDataFrameを結合し、Amazon Athena用のテーブル形式で出力。
bucket = '<your bucket name>'
database = '<your database name>'
table = 'amazon_athena_execution_logs'
output_df = pd.merge(cloudtrail_log_df, athena_log_df, on='QueryExecutionId')
wr.s3.to_parquet(
    df=output_df, dataset=True, path=f's3://{bucket}/warehouse/{database}/{table}',
    database=database, table=table, mode="overwrite_partitions", partition_cols=['Date']
)

上記Scriptによって以下のようなAmazon Athenaのクエリ実行ログテーブルが作成されます。

DateUsernameReferenced
Tables
Statistics_
DataScannedInBytes
Query
2024-01-01IAMUserA[table_x]10select col_a from table_x
2024-01-01IAMRoleB[table_x, table_y]100select col_a from table_x join table_y using(col_b)

次に行うことは「グルーピングと最適化を検討すべき処理群の特定」です。これを実現するサンプルクエリは下記の通りです。

WITH tmp_table AS (
    SELECT *,
        CASE
            WHEN scaned_giga_bytes < 250 THEN 'scanedBytes < 250 GB'
            WHEN 250 <= scaned_giga_bytes AND scaned_giga_bytes < 500 THEN '250 GB <= scanedBytes < 500 GB'
            WHEN 500 <= scaned_giga_bytes AND scaned_giga_bytes < 1000 THEN '500 GB <= scanedBytes < 1000 GB'
            WHEN 1000 <= scaned_giga_bytes AND scaned_giga_bytes < 2000 THEN '1000 GB <= scanedBytes < 2000 GB'
            WHEN 2000 <= scaned_giga_bytes THEN '2000 GB <= scanedBytes'
            ELSE 'Others'
        END AS scaned_bytes_range
    FROM (
            SELECT *,
                statistics_datascannedinbytes / POW(1000, 3) AS scaned_giga_bytes
            FROM amazon_athena_execution_logs
            WHERE statistics_datascannedinbytes > 0
        ) t
)
SELECT username,
    referencedtables,
    scaned_bytes_range,
    SUM(statistics_datascannedinbytes) AS sum_bytes,
    AVG(statistics_datascannedinbytes) AS avg_bytes,
    COUNT(1) AS cnt,
    ARRAY_AGG(query)[1] as sample_query1,
    TRY(ARRAY_AGG(query)[2]) as sample_query2
FROM tmp_table
GROUP BY username,
    referencedtables,
    scaned_bytes_range
ORDER BY sum_bytes DESC
LIMIT 20

上記クエリによって出力される結果のサンプルは以下です。

UsernameReferencedTablesScaned_Bytes_RangeSum_BytesSample_Query1Sample_Query2
IAMUserA[table_x]scanedBytes < 250 GB1000select col_a from table_x where col_a = 1select col_a from table_x
IAMRoleB[table_x, table_y]scanedBytes < 250 GB100select col_a from table_x join table_y using(col_b)

scaned_bytes_rangeは実際の利用状況によって適切な値は変化するため、sample_query[1-2]を参考に、ある程度類似した処理がまとまっていることを確認しながら調整してみてください。

手順2:計算リソース削減案の検討と実施

削減するべきクエリ郡が特定できたら、クエリのどの部分で計算リソースを多く消費しているかを特定していきます。Amazon AthenaはEXPLAIN ANALYZEステートメントを用いることで、クエリのどの処理でどの程度の計算リソースを消費しているかを確認することが可能です。この機能を用いることで簡単に高負荷となっている部分を簡単に特定することができます。クエリエンジンや商用製品の多くは、このような機能が搭載されているため、Amazon Athena以外を利用している場合は各製品の公式ドキュメントを確認してみてください。

Amazon Athenaはデータスキャン量毎の従量課金となっているため、今回はデータスキャン量を減らす方法を検討しました。下記表は、データスキャン量を減らすために私たちが検討したアイテムの一覧です。「検討優先度」は、工数に比べて削減効果が大きかったアイテムから順に記載しています。環境や利用実態によって異なると思いますので、参考程度に活用ください。その他にもAmazon Athenaの公式ドキュメントにさまざまなチューニング方法が記載されてるので併せて確認することをおすすめします。

検討優先度タイトル検討事項
★★★★参照不要列をクエリから削除分析や処理上不要な項目を削除する。
稀にWHERE句やCASE文でのみ参照している項目で必ずTrueになる条件式になっているケースがあるので確認してみましょう。
※ 例えば、notNullが保証されるカラムに対してnotNullを確認するなど。
★★★★参照不要行の読み飛ばし参照テーブルがパーティション化やバケット化されている場合は、それらを適用できるWHERE句に変更する。
※ 例えば、INNNER JOINなどの結合によって結果的に除外されるケースは、WHERE句やON句で明示的にレコードを除外することで計算リソース量が削減されるケースがあります。
★★★参照先テーブルのフォーマット変更Apache ORCやParquetなどの列指向フォーマットにする。
★★★参照先テーブルの圧縮GZIPやZLIBなどの圧縮フォーマットを適用する。
★★参照先テーブルの複合データ型を展開して保存Maps型やStructs型として保存せず、各キーを別カラムとして格納する。
★★参照先テーブルのパーティション化やクラスタ化WHERE句で指定されることが多いカラムを対象としたパーティションやバケットを追加する。
★★実行クエリの要件変更
  • 実行頻度を日次から週次へ変更する。
  • テーブルのフルスキャンから特定パーティションの参照に変更する。
  • TABLESAMPLE句などを用いてテーブルの一部をサンプリングした分析に変更する。
★★一時テーブルやデータマートの作成同一処理を複数回実行する場合は一度テーブルを作成し、そのテーブルを参照するよう変更する。
※ 例えば、複数回呼び出されるWITH句のサブクエリをテーブルとして事前に作成することで、計算リソースが削減されるケースがあります。
キャッシュ機能の利用Amazon AthenaのQuery Result Reuseのようなキャッシュ機能を活用する。
※ Query Result Reuseはクエリエンジン側で参照テーブルの変更が無いことを保証しない点には注意が必要です。
参照先テーブルのファイルレベルの最適化特定の項目でソートしてデータ格納することにより、効率的に圧縮してファイルサイズを小さくする。

おわりに

この記事では、データレイクの計算リソース最適化の手順をご紹介しました。データ分析環境は、さまざまなスキルセットのユーザがさまざまな分析を行っているため、全てを適切にコントロールすることは困難です。類似処理をグルーピングして、計算リソースの消化が大きいグループ毎に最適化の検討を進めてみてください。