勾配ブースティングモデルの1つであるLightGBMを分散処理させるライブラリに、mmlspark
があります。Microsoftが提供しているライブラリで、Spark上で動かすことで並列分散処理を実現します。既存のLightGBMライブラリでも、推論フェーズにおいては分散処理ができる(はず)ですが、学習フェーズではこのmmlsparkを利用することで実現できます。
現在はRC版ではありますが、安定してきたという口コミもあり、実際にSpark上で動作するか試しました。Hadoopのリソースを増やしていくことで、Lighgbmの学習フェーズの処理速度がスケールすることが確認できました。
LightGBMはGPUを利用して高速に処理できますが、Hadoop環境に大規模データを蓄積している企業などは、既存のクラスタ環境でSparkを利用して分散処理し、データローカリティの恩恵を受けて処理速度が上がれば嬉しいケースがあると思います。
検証環境
- Python3
- Spark: v2.4.3
- mmlspark: 1.0.0-rc3
- Jupyter Notebook上で実行
検証データ
Sparkの恩恵をうけるために、比較的大きめのデータが必要です。データが小さすぎると、オーバーヘッドコストの方が大きくなります。今回は、KaggleのMicrosoftマルウェアコンペのtrainデータを利用して検証しました。約900万レコードあり、ファイルサイズが約4Gです。
Requirements
mmlsparkのインストール
spark-shellやpysparkコマンドの--packages
オプションで、mmlsparkのパッケージを指定し、jarファイルを取得します。私の場合は、Jupyter Notebook環境で試したため、PYSPARK_SUBMIT_ARGS
の環境変数にpysparkの起動オプションを指定しています。この際に、mavenのレポジトリからmmlsparkのパッケージをインストールをしようとすると失敗します。ログを見る限り、mavenの誤ったpathに取りに行っているように見えました。mmlsparkのレポジトリ(https://mmlspark.azureedge.net/maven
)を明示的に指定し、そこからパッケージをインストールすると、うまくPysparkを起動することができました。
os.environ['PYSPARK_SUBMIT_ARGS'] = "--master yarn-client --num-executors 5 --executor-cores 2 --executor-memory 10g --packages ,com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc3 --repositories 'https://mmlspark.azureedge.net/maven' pyspark-shell"
実装
まずPysparkの起動とデータ読み込み、前処理として特徴量とターゲット変数の設定を行っていきます。今回はあくまで速度検証のため、14個の連続データカラムをピックアップして試しました。
from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext, HiveContext from pyspark.ml.feature import VectorAssembler from pyspark.sql.functions import col APP_NAME = 'lightgbm' conf = SparkConf().setAppName(APP_NAME) sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) df = sqlContext.read.csv("./train.csv", header=True) featureCols = ["Census_OSBuildNumber", "Census_OSBuildRevision", "Census_OSInstallLanguageIdentifier", "Census_OSUILocaleIdentifier", "Census_IsPortableOperatingSystem", "Census_IsFlightsDisabled","Census_ThresholdOptIn", "Census_FirmwareManufacturerIdentifier", "Census_FirmwareVersionIdentifier", "Census_IsSecureBootEnabled", "Census_IsWIMBootEnabled", "Census_IsVirtualDevice", "Census_IsTouchEnabled", "Census_IsPenCapable"] assembled_feature = VectorAssembler(inputCols=featureCols, outputCol='features') df_new = assembled_feature.setHandleInvalid("skip").transform(df).select(["HasDetections", "features"]) df_new = df_new.na.drop() df_new.show(4)
次のように、ターゲット変数はマルウェアかどうかの2値データであるHasDetections
です。特徴量はfeatureCols
をVector形式にしてfeatures
にまとめています。
+-------------+--------------------+ |HasDetections| features| +-------------+--------------------+ | 1|(14,[0,1,2,3,7,8]...| | 1|(14,[0,1,2,3,7,8]...| | 0|(14,[0,2,3,7,8],[...| | 0|(14,[0,1,2,3,7,8]...| +-------------+--------------------+ only showing top 4 rows
次のLightGBMClassifier
の処理速度をexecutorとcore数を変更しながら確認します。(executorとcore数の設定は、PYSPARK_SUBMIT_ARGS
の環境変数で実施します。)
from mmlspark.lightgbm import LightGBMRegressionModel from mmlspark.lightgbm import LightGBMClassifier from mmlspark.lightgbm import LightGBMClassificationModel import time start = time.time() model = LightGBMClassifier(boostingType='gbdt', objective='binary', labelCol="HasDetections", featuresCol="features", numIterations=100, numLeaves=31).fit(train) print(time.time() - start)
なお、検証外ですが、作成したモデルを利用した検証の実施や、特徴量の重要度の算出は以下のコードで実行します。
model.saveNativeModel("mymodel") model = LightGBMClassificationModel.loadNativeModelFromFile("mymodel") scoredData = model.transform(test) print("FeatureImportance: ", model.getFeatureImportances()) scoredData.limit(10).toPandas()
FeatureImportance: [155.0, 664.0, 246.0, 202.0, 21.0, 0.0, 1.0, 545.0, 896.0, 134.0, 0.0, 43.0, 21.0, 72.0]
検証リソース
下記クラスタリソースで試しました。
- Executor memory: 10G
- Executor数:1~4
- Executor Core数:1 ~ 3
検証結果
最も処理速度が速かったのは、Executo4、コア2の時でした。ローカルモードと比較して、6倍以上に処理速度が向上したことがわかります。
Executorを増加させることで、LightGBMClassifierの学習フェーズの処理速度が上がっていることが確認できます。またコア数を増やしても同様に処理速度は向上しましたが、コアを3にすると速度は低下しました。今回のデータサイズでは、オーバーヘッドコストの方が上回ってしまっていることがわかります。
コア数:1 | コア数:2 | コア数:3 | |
---|---|---|---|
ローカルモード | 377.22(s) | - | - |
Executor数:1 | 327.38(s) | 178.96(s) | 136.83(s) |
Executor数:2 | 164.98(s) | 96.42(s) | 140.06(s) |
Executor数:3 | 114.76(s) | 156.84(s) | 171.50(s) |
Executor数:4 | 94.65(s) | 56.64(s) | 161.60(s) |
まとめ
mmlsparkを利用して、分散環境でLightGBMを行うことで、クラスタのリソースにより処理速度をスケールさせることができました。そして、超大規模データで勾配ブースティングを実施したい場合には、選択肢の1つに入ってくる可能性があります。ただし、通常のLightGBMと同様の精度が出るかの確認が必要です。