「AWS上のPySparkを使用したビッグデータでの機械学習の活用」

Using PySpark on AWS to leverage machine learning with big data

注:Suman Debnathは、2023年8月22日から23日にかけて行われるODSC APACのスピーカーです。彼のトーク「AWS上でSparkを使用して分類および回帰モデルを構築する」もぜひご覧ください。

データサイエンスの絶え間ないダイナミックな領域では、正しいツールを見極めて適用することが機械学習の取り組みの結果に大きな影響を与える可能性があります。データサイエンスの愛好家の皆さんに心からの挨拶を!私は2023年8月22日に予定されているODSC APACカンファレンスで話す機会を得ることができて幸運だと思っています。私のプレゼンテーションは、PySparkを使用したAWS上での分類および回帰モデルの開発に焦点を当てます。

セッションの理解

この魅力的でインタラクティブなセッションでは、機械学習の分野で貴重なリソースであるPySpark MLlibについて探求し、AWS Glue/EMRをプラットフォームとして使用してさまざまな分類アルゴリズムを実装する方法を探ります。

このセッションでは、実践的な機械学習の概念とアルゴリズムの理解と応用に重点を置いて、ハンズオン形式で進めます。参加者はさまざまな機械学習アルゴリズムに触れ、バイナリ分類問題の解決における強力な教師あり学習手法であるロジスティック回帰に焦点を当てます。

しかし、このセッションは概念とアルゴリズムに留まりません。効果的な機械学習モデルの作成には、重要なデータ前処理技術も必要です。セッションの終わりまでに、参加者は欠損値の処理や列データ型の変更、データのトレーニングセットとテストセットへの分割などのスキルを身につけることができます。このハンズオン体験は、多機能なAWS Glue/EMR環境内で行われます。

何が得られるか

このセッションでは、参加者が以下のことを深く理解するのを支援することを目的としています:

  • PySpark MLlib
  • 教師なし学習手法
  • さまざまなタイプの分類アルゴリズム
  • ロジスティック回帰分類器の実装
  • PySparkを使用したAWS上でのデータ前処理(AWS GlueおよびAmazon EMRを使用)
  • PySparkを使用したAWS上でのモデル構築

データエンジニア、データサイエンティスト、または機械学習愛好家で、Apache Sparkを使用したAWSでの機械学習を始めたい方には、このセッションが最適です。

さあ、今から予定されている内容の一部をご紹介しましょう(GitHubのコードリポジトリはこちらでご覧いただけます)。

私たちは、20,057の料理名からなるデータセットを選択しました。各料理名には、材料リスト、栄養成分、および料理のカテゴリを表す680の列が詳細に記載されています。私たちの共通の目標は、料理がデザートであるかどうかを予測することです。これはシンプルでほとんど明解な質問です – 私たちのほとんどは、料理の名前を読んで簡単にデザートかどうかを分類できるでしょう。これはシンプルな機械学習モデルの優れた候補となります。

ステップ1:必要なライブラリのインポート

最初のステップでは、PySpark SQLの関数と型など、必要なライブラリをインポートします。

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import Imputer, MinMaxScaler, VectorAssembler

ステップ2:データの前処理とEDA(探索的データ解析)

Sparkのread.csv関数を使用して、食品レシピのCSVデータセットをロードします。inferSchemaパラメータはTrueに設定され、列のデータ型を推測し、headerはTrueに設定され、最初の行をヘッダーとして使用します。

# データのロード
dataset = 's3://fcc-spark-example/dataset/2023/recipes_dataset/epi_r.csv'
food = (
          spark
              .read
              .csv(dataset, inferSchema=True, header=True)
      )
     
# 列名の正規化
def sanitize_column_name(name):
  answer = name
  for i, j in ((" ", "_"), ("-", "_"), ("/", "_"), ("&", "and")):
      answer = answer.replace(i, j)
  return "".join(
      [
          char
          for char in answer
          if char.isalpha() or char.isdigit() or char == "_"
      ]
  )
food = food.toDF(*[sanitize_column_name(name) for name in food.columns])

このスクリプトの一部では、列名に含まれるスペース、ダッシュ、スラッシュ、アンパサンドをアンダースコアに置換し、非英数字文字を削除して列名を正規化します。

# データのフィルタリング
food = food.where(
  (
      F.col("cakeweek").isin([0.0, 1.0])
      | F.col("cakeweek").isNull()
  )
  & (
      F.col("wasteless").isin([0.0, 1.0])
      | F.col("wasteless").isNull()
  )
)

ここでは、cakeweekとwastelessの列が0.0または1.0の値、またはnullの行のみを保持するためにデータをフィルタリングします。

# 識別子、連続変数、ターゲット変数、バイナリ変数の定義
IDENTIFIERS = ["title"]
CONTINUOUS_COLUMNS = [
  "rating",
  "calories",
  "protein",
  "fat",
  "sodium",
]
TARGET_COLUMN = ["dessert"]
BINARY_COLUMNS = [
  x
  for x in food.columns
  if x not in CONTINUOUS_COLUMNS
  and x not in TARGET_COLUMN
  and x not in IDENTIFIERS
]

このセクションでは、どの列が識別子、連続変数、ターゲット変数、バイナリ変数であるかを定義します。

# 欠損値の処理
food = food.dropna(
  how="all",
  subset=[x for x in food.columns if x not in IDENTIFIERS],
)
food = food.dropna(subset=TARGET_COLUMN)
food = food.fillna(0.0, subset=BINARY_COLUMNS)

欠損値は、全てのnullを持つ行(識別子列を除く)を削除し、ターゲット列のnullを持つ行を削除し、バイナリ列のnullを0.0で埋めることで処理します。

# 文字列の数値を浮動小数点数に変換し、連続変数を制限する
from typing import Optional

@F.udf(T.BooleanType())
def is_a_number(value: Optional[str]) -> bool:
  if not value:
      return True
  try:
      _ = float(value)
  except ValueError:
      return False
  return True
for column in ["rating", "calories"]:
  food = food.where(is_a_number(F.col(column)))
  food = food.withColumn(column, F.col(column).cast(T.DoubleType()))
maximum = {
  "calories": 3203.0,
  "protein": 173.0,
  "fat": 207.0,
  "sodium": 5661.0,
}
for k, v in maximum.items():
  food = food.withColumn(
      k,
      F.when(F.isnull(F.col(k)), F.col(k)).otherwise(
          F.least(F.col(k), F.lit(v))
      ),
  )

この部分では、文字列を浮動小数点数に変換できるかどうかをチェックするユーザー定義関数is_a_numberを作成します。この関数を使用して、”rating”と”calories”の列の非数値をフィルタリングし、それらをdouble型にキャストします。

次に、指定された最大値で連続変数”calories”、”protein”、”fat”、”sodium”の値を制限します。

# 各バイナリ列の合計を計算する
inst_sum_of_binary_columns = [
  F.sum(F.col(x)).alias(x) for x in BINARY_COLUMNS
]
# バイナリ列の合計を選択し、結果を辞書に変換する
sum_of_binary_columns = (
  food.select(*inst_sum_of_binary_columns).head().asDict()
)
# 行の合計数を数える
num_rows = food.count()
# レアな特徴を特定する
too_rare_features = [
  k
  for k, v in sum_of_binary_columns.items()
  if v < 10 or v > (num_rows - 10)
]
# バイナリ列からレアな特徴を除外する
BINARY_COLUMNS = list(set(BINARY_COLUMNS) - set(too_rare_features))

次に、各バイナリ列の合計を計算し、その結果を辞書に変換します。その後、真の回数が10未満または真の回数が(総行数-10)未満の「レア」な特徴を特定し、バイナリ列から除外します。

# 新しい特徴の作成
food = food.withColumn(
  "protein_ratio", F.col("protein") * 4 / F.col("calories")
).withColumn(
  "fat_ratio", F.col("fat") * 9 / F.col("calories")
)
# 新しい特徴の欠損値の処理
food = food.fillna(0.0, subset=["protein_ratio", "fat_ratio"])
# 新しい特徴を連続変数に追加する
CONTINUOUS_COLUMNS += ["protein_ratio", "fat_ratio"]

ここでは、新しい特徴量「protein_ratio」と「fat_ratio」を作成します。これらの特徴量は、それぞれカロリーに対するタンパク質と脂肪の比率を表しています。これらの新しい特徴量の欠損値は0.0で埋め、連続カラムに追加します。

# 連続カラムの欠損値を補完する
OLD_COLS = ["calories", "protein", "fat", "sodium"]
NEW_COLS = ["calories_i", "protein_i", "fat_i", "sodium_i"]
imputer = Imputer(
  strategy="mean",
  inputCols=OLD_COLS,
  outputCols=NEW_COLS,
)
imputer_model = imputer.fit(food)
# 連続カラムの更新
CONTINUOUS_COLUMNS = (
  list(set(CONTINUOUS_COLUMNS) - set(OLD_COLS)) + NEW_COLS
)
# データにインピューターモデルを適用する
food = imputer_model.transform(food)

このセクションでは、SparkのImputerを使用して、「calories」、「protein」、「fat」、「sodium」のカラムの欠損値を平均値で補完します。その後、連続カラムのリストを更新して、補完されたものを含めます。

# 連続特徴量を1つのベクトルに結合する
CONTINUOUS_NB = [x for x in CONTINUOUS_COLUMNS if "ratio" not in x]
continuous_assembler = VectorAssembler(
  inputCols=CONTINUOUS_NB, outputCol="continuous"
)
food_features = continuous_assembler.transform(food)

次に、VectorAssemblerを使用して、連続特徴量を1つのベクトルカラム「continuous」に結合します。

# 連続特徴量のスケーリング
continuous_scaler = MinMaxScaler(
  inputCol="continuous",
  outputCol="continuous_scaled",
)
food_features = continuous_scaler.fit(food_features).transform(
  food_features
)

最後に、MinMaxScalerを使用して連続特徴量を範囲[0、1]にスケーリングします。データに適合させ、データを変換します。この時点で、データセットは機械学習のタスクに使用できる状態です!

ここで、機械学習のトレーニングジョブを実行する準備が整いました。

ステップ3:モデルのトレーニング、テスト、評価

データが処理され、変換されたら、トレーニングセットとテストセットに分割できます。モデルをトレーニングした後、さまざまなメトリックを使用してパフォーマンスを評価できます。このセクションでは、デザート予測の特徴準備プログラムに使用した推定器を組み合わせたMLパイプラインを構築し、モデリングステップを追加します。

from pyspark.ml import Pipeline
import pyspark.ml.feature as MF
imputer = MF.Imputer(
                              strategy="mean",
                              inputCols=["calories", "protein", "fat", "sodium"],
                              outputCols=["calories_i", "protein_i", "fat_i", "sodium_i"],
                             )
continuous_assembler = MF.VectorAssembler(
                                                        inputCols=["rating", "calories_i", "protein_i", "fat_i", "sodium_i"],
                                                        outputCol="continuous",
                                                       )
continuous_scaler = MF.MinMaxScaler(
                                                         inputCol="continuous",
                                                         outputCol="continuous_scaled",
                                                        )
food_pipeline = Pipeline(
                                          stages=[imputer, continuous_assembler, continuous_scaler]
                                         )

ベクトルカラム型で最終的なデータセットを組み立てることができます。

preml_assembler = MF.VectorAssembler(
                                                                                           inputCols=BINARY_COLUMNS
                                                                                           + ["continuous_scaled"]
                                                                                           + ["protein_ratio", "fat_ratio"],
                                                                                           outputCol="features",
                                                                                          )
food_pipeline.setStages(
                                          [imputer, continuous_assembler, continuous_scaler, preml_assembler]
                                         )
food_pipeline_model = food_pipeline.fit(food)
food_features = food_pipeline_model.transform(food)

機械学習のためのデータフレームが準備されました!各レコードには、

  • デザートのバイナリ入力(レシピがデザートの場合は1.0、それ以外の場合は0.0)が含まれるターゲット(またはラベル)カラムdessert
  • 機械学習モデルのトレーニングに使用する情報がすべて含まれる特徴量のベクトルfeatures

予測結果を表示できます:

food_features.select("title", "dessert", "features").show(30, truncate=30)

ロジスティック回帰分類器を使用して、MLモデルをトレーニングしましょう:

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(
                          featuresCol="features", labelCol="dessert", predictionCol="prediction"
                      )
food_pipeline.setStages(
  [
      imputer,
      continuous_assembler,
      continuous_scaler,
      preml_assembler,
      lr,
  ]
)
# トレーニングとテスト用にデータフレームを分割します
train, test = food.randomSplit([0.7, 0.3], 13)
train.cache()
food_pipeline_model = food_pipeline.fit(train)
results = food_pipeline_model.transform(test)
モデルを評価し、混同行列を確認しましょう
results.select("prediction", "rawPrediction", "probability").show(3, False)
# pivot()を使用してモデルの混同行列を作成します
results.groupby("dessert").pivot("prediction").count().show()
最後に、モデルの精度と再現率を計算できます:
lr_model = food_pipeline_model.stages[-1]
metrics = lr_model.evaluate(results.select("title", "dessert", "features"))
print(f"モデルの精度: {metrics.precisionByLabel[1]}")
print(f"モデルの再現率: {metrics.recallByLabel[1]}")

このチュートリアルの目的のために、フルスクリプトは簡略化されています。データの準備からモデルの展開までの詳細なコードの解説を含む、実用的な応用の包括的な理解については、ODSC APACカンファレンス2023に参加してください。

この簡単なチュートリアルでは、ODSCセッションでカバーされる内容の一部を紹介しました。セッションに参加することで、これらのトピックをより深く探求し、PySpark MLlibの複雑さを理解することができます。主な目的は、データサイエンスの愛好家や専門家が、自身の機械学習プロジェクトでSpark MLlibのフルポテンシャルを活用できるようにすることです。

覚えておいてください。どんなスキルをマスターするにも、一貫した学習と実践的な実装が重要です。ODSCカンファレンスでのSpark on AWSを使った魅力的な機械学習の世界に深く潜り込む準備をしましょう。そこでお会いできることを楽しみにしています!

著者について:

Suman Debnathは、主にデータエンジニアリング、データ分析、機械学習に焦点を当てたAmazon Web Servicesの主任デベロッパーアドボケート(データエンジニアリング)です。彼は大規模な分散システムに情熱を持ち、Pythonの熱狂的なファンです。彼のバックグラウンドはストレージパフォーマンスとツール開発であり、さまざまなパフォーマンスベンチマーキングやモニタリングツールを開発しています。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more