For Your ISHIO Blog

データ分析や機械学習やスクラムや組織とか、色々つぶやくブログです。

LightGBMを並列分散処理させるmmlsparkのスケーラビリティを検証した

勾配ブースティングモデルの1つであるLightGBMを分散処理させるライブラリに、mmlsparkがあります。Microsoftが提供しているライブラリで、Spark上で動かすことで並列分散処理を実現します。既存のLightGBMライブラリでも、推論フェーズにおいては分散処理ができる(はず)ですが、学習フェーズではこのmmlsparkを利用することで実現できます。

現在はRC版ではありますが、安定してきたという口コミもあり、実際にSpark上で動作するか試しました。Hadoopのリソースを増やしていくことで、Lighgbmの学習フェーズの処理速度がスケールすることが確認できました。

github.com

LightGBMはGPUを利用して高速に処理できますが、Hadoop環境に大規模データを蓄積している企業などは、既存のクラスタ環境でSparkを利用して分散処理し、データローカリティの恩恵を受けて処理速度が上がれば嬉しいケースがあると思います。

検証環境

  • Python3
  • Spark: v2.4.3
  • mmlspark: 1.0.0-rc3
  • Jupyter Notebook上で実行

検証データ

Sparkの恩恵をうけるために、比較的大きめのデータが必要です。データが小さすぎると、オーバーヘッドコストの方が大きくなります。今回は、KaggleのMicrosoftマルウェアコンペのtrainデータを利用して検証しました。約900万レコードあり、ファイルサイズが約4Gです。

www.kaggle.com

Requirements

  • MMLSpark requires Scala 2.11, Spark 2.4+, and Python 3.5+.

mmlsparkのインストール

spark-shellやpysparkコマンドの--packagesオプションで、mmlsparkのパッケージを指定し、jarファイルを取得します。私の場合は、Jupyter Notebook環境で試したため、PYSPARK_SUBMIT_ARGS環境変数にpysparkの起動オプションを指定しています。この際に、mavenのレポジトリからmmlsparkのパッケージをインストールをしようとすると失敗します。ログを見る限り、mavenの誤ったpathに取りに行っているように見えました。mmlsparkのレポジトリ(https://mmlspark.azureedge.net/maven)を明示的に指定し、そこからパッケージをインストールすると、うまくPysparkを起動することができました。

os.environ['PYSPARK_SUBMIT_ARGS'] = "--master yarn-client --num-executors 5 --executor-cores 2 --executor-memory 10g --packages ,com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc3 --repositories 'https://mmlspark.azureedge.net/maven' pyspark-shell"

実装

まずPysparkの起動とデータ読み込み、前処理として特徴量とターゲット変数の設定を行っていきます。今回はあくまで速度検証のため、14個の連続データカラムをピックアップして試しました。

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

APP_NAME = 'lightgbm'
conf = SparkConf().setAppName(APP_NAME)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

df = sqlContext.read.csv("./train.csv", header=True)

featureCols = ["Census_OSBuildNumber", "Census_OSBuildRevision", "Census_OSInstallLanguageIdentifier", "Census_OSUILocaleIdentifier", "Census_IsPortableOperatingSystem", "Census_IsFlightsDisabled","Census_ThresholdOptIn", "Census_FirmwareManufacturerIdentifier", "Census_FirmwareVersionIdentifier", "Census_IsSecureBootEnabled", "Census_IsWIMBootEnabled", "Census_IsVirtualDevice", "Census_IsTouchEnabled", "Census_IsPenCapable"]

assembled_feature = VectorAssembler(inputCols=featureCols, outputCol='features')
df_new = assembled_feature.setHandleInvalid("skip").transform(df).select(["HasDetections", "features"])
df_new = df_new.na.drop()
df_new.show(4)

次のように、ターゲット変数はマルウェアかどうかの2値データであるHasDetectionsです。特徴量はfeatureColsVector形式にしてfeaturesにまとめています。

+-------------+--------------------+
|HasDetections|            features|
+-------------+--------------------+
|            1|(14,[0,1,2,3,7,8]...|
|            1|(14,[0,1,2,3,7,8]...|
|            0|(14,[0,2,3,7,8],[...|
|            0|(14,[0,1,2,3,7,8]...|
+-------------+--------------------+
only showing top 4 rows

次のLightGBMClassifierの処理速度をexecutorとcore数を変更しながら確認します。(executorとcore数の設定は、PYSPARK_SUBMIT_ARGS環境変数で実施します。)

from mmlspark.lightgbm import LightGBMRegressionModel
from mmlspark.lightgbm import LightGBMClassifier
from mmlspark.lightgbm import LightGBMClassificationModel
import time

start = time.time()
model = LightGBMClassifier(boostingType='gbdt', objective='binary', labelCol="HasDetections", featuresCol="features", numIterations=100, numLeaves=31).fit(train)
print(time.time() - start)

なお、検証外ですが、作成したモデルを利用した検証の実施や、特徴量の重要度の算出は以下のコードで実行します。

model.saveNativeModel("mymodel")
model = LightGBMClassificationModel.loadNativeModelFromFile("mymodel")
scoredData = model.transform(test)

print("FeatureImportance: ", model.getFeatureImportances())
scoredData.limit(10).toPandas()
FeatureImportance:  [155.0, 664.0, 246.0, 202.0, 21.0, 0.0, 1.0, 545.0, 896.0, 134.0, 0.0, 43.0, 21.0, 72.0]

f:id:ishitonton:20201108234436p:plain

検証リソース

下記クラスタリソースで試しました。

  • Executor memory: 10G
  • Executor数:1~4
  • Executor Core数:1 ~ 3

検証結果

最も処理速度が速かったのは、Executo4、コア2の時でした。ローカルモードと比較して、6倍以上に処理速度が向上したことがわかります。

Executorを増加させることで、LightGBMClassifierの学習フェーズの処理速度が上がっていることが確認できます。またコア数を増やしても同様に処理速度は向上しましたが、コアを3にすると速度は低下しました。今回のデータサイズでは、オーバーヘッドコストの方が上回ってしまっていることがわかります。

  コア数:1 コア数:2 コア数:3
ローカルモード 377.22(s) - -
Executor数:1 327.38(s) 178.96(s) 136.83(s)
Executor数:2 164.98(s) 96.42(s) 140.06(s)
Executor数:3 114.76(s) 156.84(s) 171.50(s)
Executor数:4 94.65(s) 56.64(s) 161.60(s)

f:id:ishitonton:20201108231259p:plain

まとめ

mmlsparkを利用して、分散環境でLightGBMを行うことで、クラスタのリソースにより処理速度をスケールさせることができました。そして、超大規模データで勾配ブースティングを実施したい場合には、選択肢の1つに入ってくる可能性があります。ただし、通常のLightGBMと同様の精度が出るかの確認が必要です。