For Your ISHIO Blog

データ分析や機械学習やスクラムや組織とか、色々つぶやくブログです。

個人名や住所のダミーデータを作成するPythonライブラリFaker

Fakerというライブラリは、個人名や住所などのダミーデータを作成してくれます。テストデータを作る際に便利です。だいぶ前からあるようですが、最近知りました。

faker.readthedocs.io

ライブラリのインストール

pip install faker

実行してみる

from faker import Faker

fake = Faker()
print(fake.name())
print(fake.address())

# Michael Williams
# PSC 2239, Box 6124
# PO AE 65633

日本語にしたい場合

fake = Faker('jp-JP')
print(fake.name())
print(fake.address())
print(fake.company())
print(fake.job())
print(fake.phone_number())
print(fake.profile())
# 林 晃
# 大分県川崎市幸区土呂部30丁目8番13号 大中コート597
# 佐藤水産合同会社
# 気象予報士
# 070-0055-2190
# {'job': '寿司職人', 'company': '吉田運輸有限会社', 'ssn': '628-86-2152', 'residence': '福井県香取郡東庄町芝公園28丁目20番17号 シャルム長間854', 'current_location': (Decimal('-41.5135285'), Decimal('143.657825')), 'blood_group': 'AB-', 'website': ['http://ishii.jp/', 'http://endo.jp/'], 'username': 'yamamotoasuka', 'name': '井上 稔', 'sex': 'M', 'address': '石川県中央区隼町23丁目20番1号 卯の里アーバン120', 'mail': 'nendo@gmail.com', 'birthdate': datetime.date(1937, 10, 12)}

積読メモ:『AIエンジニアのための機械学習システム デザインパターン』を読んだ

仕事で外部の人に機械学習などの講習をする機会があったのですが、「機械学習モデルの作り方はわかったけど、それ以外のシステム実装などについても知りたい」という声がありました。いわゆるMLOps的な話の包括的な理解を得るために、積読していた下記書籍を読了。

www.amazon.co.jp

読んだ感想

  • とても読みやすく、(コード実装の細かい部分などは読み飛ばしましたが)2-3日程度で読み終えました。
  • 本書では、様々な「デザインパターン」が記載されています。自身に関連するデザインパターンを注意深く読むとよいと思います。(逆に他は結構読み飛ばしました)
  • 読み終えた上で、下記のような知識を別でつけたいと思いました。
    • チャンピオン-チャレンジャーモデルのような、実際のモデル更新のロジック/デザインについて
    • データ変化を捉える指標についてなど
      • Population stability index
      • Kolmogorov-smirnov index とか

機械学習システムを作るときの振り返りリスト

自分への備忘録として、本書内での学びを振り返りのために記載します。私自身は、MLシステムを作る際にリストを眺めて、思考の抜け漏れがないかをチェックしたいと思います。

パート1:機械学習とMLOps

  • MLOpsでは、機械学習のモデルのみならず、モデルを学習し、推論するためのワークフローやシステムを開発し運用する。
  • MLがプロダクトとして人間の役に立つためには、MLと人間の作業を住み分けることが重要。
  • Google社は、人間とAIのかかわり方を『PAIR(People + AI Research)』として公開している。PAIRでは、MLが人間に提供する価値を自動化と拡張と定義づけている。
  • MLのコアである推論の品質は、「データの品質」に左右される。
  • MLでは再現性のある学習が必要。再現性で求められることは、自分以外の人間が同じデータと同じ学習コード、同じアルゴリズムとパラメータ、同じライブラリとバージョンで学習を実行し、自分と大差ない結果を得ることができること。
  • 多くの場合、Productionを模したテスト用のStaging環境を用意して稼働試験するのがよい。
  • 推論結果が想定通りかどうかの確認に加えて、パフォーマンス試験や負荷試験も実施するとよい。
  • 推論結果から、アクションをおこす必要のある関係者やステークホルダーを含めて受け入れ試験をするのがよい。
  • 推論結果が社内プロセスに組み込まれ、人間の介入が必要になる「ヒューマンインザループ」のワークフローを作ることがある。この場合は、人間の作業が可能な範囲で実施する設計にするべき。例えば、すべてを専門家が確認するプロセスは破綻する。
  • 最初からすべてのリクエストを推論器に流すことは避けたほうがよい。徐々にリクエストを上げていく。
  • すでに推論器が稼働している更新リリースの場合には、A/Bテストして、両方のモデルを稼働させて比較するのがよい。
  • 「どう学習させるか」と同じくらい「いつ学習するか」は重要な検討事項。データの傾向の変化が毎日ほど頻繁でない場合、毎日学習する必要はない。
  • 推論器のモデルを再学習するタイミングは、モデルのパフォーマンスを計測し評価すべき。なぜなら、推論モデルのパフォーマンスは、時間とともに劣化していくことがあるため。
  • エッジサイドでモデルを推論させる場合には、モデルのインストールが必要となるため、モデルに問題があっても簡単にモデルを入れ替えることは難しい。
  • モデルの精度とスピード/コストはトレードオフ
  • 機械学習の品質は、リリース前のテストと、リリース後の劣化補修に分けられる。
  • ヒューマンインザループのプロセスの場合、人間がタスクを実行可能でない量の推論では、ビジネスにネガティブな影響を与える可能性さえある。
  • 推論器の評価には、システムとしての「リリース判定」が必要。
  • モデルの推論を定期的に評価するシステムと、評価結果を通知し、異常事態が発生しているようであればアラートを鳴らす監視システムが必要。

パート2:機械学習システムを作る

  • 機械学習の評価がよいモデルと、本番システムで使えるモデルは別。
  • ヒューマンインザループによって、機械学習の推論結果を人間が使うような仕組みの場合、使用者に推論結果を評価してもらうことによって、モデルに有用かを判断できる。使う視点で評価することが重要。
  • 推論では、レスポンススピードも重要。推論システムのパフォーマンステストや負荷テストも実施する。
  • モデル開発と本番システム(推論)で共通のOS、言語、依存ライブラリのバージョンをそろえて開発する。
  • 機械学習のプロジェクトには、名称が必要。
  • モデルのバージョン管理方法を決める。
  • 学習に利用したデータも管理する。評価時に、学習済みのデータを使ってしまい、正当に評価されていないモデルがリリースされるリスクがある。
  • 機械学習を学習フェーズをパイプラインで開発すると、段階的に実行可能になり、プロセスをテストしやすい。エラー箇所も特定しやすい。逆に、個別jobのリソース管理やコード管理が複雑になる。また、個別にリソースを確保してしまうため、jobが完了したらリソースを開放するのがよい。
  • MLモデルの多くは、学習直後がもっとも推論精度がよく、時間の経過とともに精度が劣化する。最新のデータで学習することでその劣化を回避できる。
  • バッチ実行パターンでは、推論に失敗した場合に、推論をリトライするか、運用者に通報する仕組みが必要。サービスレベルとの相談。
  • 学習環境と推論環境は別物。目的やコスト構造が異なる。モデルをリリースする際に、最初に直面する課題は、学習環境や学習コードと、推論で必要とするシステムに大きな隔たりがあること。
  • 障害や問題があった場合に、その障害を検知し、トラブルシューティングして復旧できるようにする必要がある。
  • モデルファイルは、通常学習、推論両方で同じものを使う。入出力のデータ型が変わっていないように注意する。
  • モデルファイルの配布で難しいのは、モデルファイルサイズや配布先の推論器との互換性。推論器にインストールされているライブラリのバージョンがモデルと一致しないと、推論器を正常に稼働させることができない場合がある。
  • 本番システムですでに稼働している場合には、モデルの配布や更新でシステムを止めないこと(または止めてもよい時間に対応する)を検討する必要がある。
  • OS、ライブラリ、バージョン、稼働モデル、入力データ形式、モデルの目的は一元管理する
  • 開発が終了しているライブラリや脆弱性があるライブラリのバージョンを使用することは避ける。
  • より良いモデルができたら、容易にモデルをリリースできる状態にすることが理想。
  • バッチ推論パターンは、リアルタイムor準リアルタイムに推論をする必要がないときや、過去のデータをまとめて推論したい場合に利用する。
  • バッチ推論パターンでは、バッチジョブを起動するジョブ管理サーバが必要。決められたルール(時間や条件)をトリガーにして、推論のバッチジョブを起動する。
  • バッチ推論パターンは、時間に余裕をもってスケジュールすることができれば、サーバ障害等で推論に失敗してもリトライが可能。 バッチ推論パターンでは、1回のバッチジョブで推論対象とするデータの範囲を定義する必要がある。データ量の多寡によって、推論時間が上下するため、推論結果が必要となるときまでに推論が完了するように、データ量やバッチの実行頻度の調整が必要。
  • バッチ処理が失敗した場合の方針を決めておくことが重要(全件リトライ/一部リトライ/放置)
  • ベースとなる推論器は、テンプレートから作り、必要な箇所のみ変更することで、推論器開発を効率化できる。同一入出力の推論器を大量開発、リリースしたいときにメリットがある。

パート3: 品質・運用・管理

  • MLがビジネスにインパクトを与えるのは、その推論結果がスピード、量、精度において人間の予測よりも優れているとき。
  • システムが正常に稼働しているかを評価するためにログを収集する。
  • 推論器そのものが異常な場合(推論結果が一定時間発生しない/推論数が通常と比較して異常に少ない等)と、推論結果が想定外の場合がある。推論傾向を監視することが必要。
  • 推論を視覚化するダッシュボードなどを用意すると便利
  • ログデータは、一般的にシステムの利用不可に比例して増大する。全ログデータを記録し続けるが望ましいが、定期的に圧縮してコスト削減することも重要。
  • システムにログがないと、エラー検知や障害対応、システム改善を行うことができない。最低限、Fatal, Error, Warning, Infoのログを取得する。
  • 機械学習システムが正常であるというためには、推論が正常に動作する(学習時と同様のパフォーマンス)ことと、推論器自体が正常に動作する(レイテンシーや可用性がサービスレベルを保持)が求められる。
  • 機械学習の正常性
    • 評価管理
      • 学習評価、バージョン管理
      • モデルの不備検知と修正できる仕組み
    • データ管理
      • 学習で用いるデータの妥当性管理(必要なデータ構成、評価データに含まれていないこと)
  • ソフトウェアの正常性
    • 変更管理(ソフトウェアバージョンが、モデルのバージョンと一致していることを評価する)
    • 可用性管理(正常かつ特定時間内にレスポンスされる)
  • 運用管理
    • 運用システムの用意(ログ収集、監視通知、モニタリング等)
    • 運用体制
  • 複雑なモデルの方がMLとしての評価は高くなるが、計算量が増え速度は遅くなる。スピードと負荷耐性が求められる。
  • A/Bテストパターンでは、新モデルを現行モデルと並列させて有効性を評価する。モデルが現在の本番システムのデータで有効かどうかは、リリースしてみないとわからない。

SlackのSlash CommandでサーバレスにEC2インスタンスを起動する

ChatOpsとは「Chat」と「Ops」を掛けあわせた造語で、Chatをベースにシステム運用(Ops)を行うことを指します。日々のチームコミュニケーションで利用しているSlackのSlashコマンドを利用して、AWSのEC2インスタンスの起動・停止を行えるようにしたので、その手順を記載していきます。

使ったサービスは、AWS LambdaAmazon API Gateway、そしてSlackです。

課題意識

弊チームでは、社内の検証用/分析用途で、AWS上にEC2インスタンスでClouderaのCDH(Cloudera Distribution Including Apache Hadoop)を構築しています。それなりにハイスペックなインスタンスを使っているため、利用していない時は停止してコストを抑える運用をしています。

最近では下記のような課題が出てきました。

かかわる人が増えてきた

Hadoop環境を触れる人が増えてきました。その結果、「誰が立ち上げたのかわからない。使用中なのかわからない」自体が発生するようになりました。誰が使っているのか可視化したい、また新しいメンバーも簡単に操作できた方が楽だなと思いました。

オペレーションミスを防ぐ

MasterやWorker、Gatewayノード、CDHの場合がCloudera ManagerやCloudera Atlas Directorといった関連EC2インスタンスが多く存在しました。間違って他のインスタンスを立ち上げてしまったり、一部のインスタンスを停止し忘れることがでてきました。

AWSのマネジメントコンソールへのアクセス面倒

そもそも、AWSのマネジメントコンソールに都度アクセスして、6つか7つのEC2インスタンスをチェックして起動する、という操作は面倒でした。

最終ゴール

Slackのチャネル上で、つぎのSlash Command (スラッシュコマンド) を実行し、Hadoopクラスタ関連の複数インスタンスを一度に操作(起動・停止)できるようにします。Slash Commandは、Slack のメッセージ入力欄に / からはじまるコマンドを入力することでAPIを実行できます。私の場合では、以下のようにコマンドを入力することで、EC2インスタンスを操作できるようにします。

/cdh-dev start
/cdh-dev stop

f:id:ishitonton:20201115163814p:plain

全体構成

今回はAWSの次のサービスを利用します。

AWS Lambdaは、イベントが発生した際にスクリプトを実行できる、サーバレスコンピューティングサービスです。リクエストが飛んできたときだけ、EC2インスタンスを起動/停止するスクリプトを実行してくれます。

Amazon API Gatewayは、AWS上で簡単にAPIの作成ができるサービスです。SlackからGETリクエストを取得して、それをLambdaに渡す役割を果たします。

f:id:ishitonton:20201115134616p:plain

参考

今回私は、次のブログを参考にさせて頂きました。

Slack の Slash Command で AWS の EC2 と RDS の起動と停止を実現してみた (1) 導入

ロールの作成

まず最初に、EC2の起動・停止ができるポリシーとロールを作成します。AWSマネジメントコンソールより、IAMにアクセスし、ロールの画面に飛びます。

f:id:ishitonton:20201115140203p:plain

f:id:ishitonton:20201115140309p:plain

ロールの作成をクリックし、新しいロールを作成していきます。 ロールの作成画面では、

  • 信頼されたエンティティの種類を選択:AWSサービス
  • ユースケースの選択:Lambda

をクリックし、次のステップをクリックします

f:id:ishitonton:20201115140412p:plain

ポリシーを選択する画面になりますので、ここではEC2Controlという新しいポリシーを作成しましょう。ポリシーを作成をクリックします f:id:ishitonton:20201115140745p:plain

ビジュアルエディタで、次の設定をしてください。

  • サービス1
    • サービス:EC2
    • アクション
      • 書き込み:StartInstances, StartVpcEndpointServicePrivateDnsVerification, StopInstances
    • リソース:すべてのリソース
  • サービス2
    • サービス:CloudWatch Logs
    • アクション
      • 書き込み:すべて
    • リソース:すべてのリソース

設定が完了したら、ポリシーの確認をクリックします。

f:id:ishitonton:20201115141327p:plain

ポリシーの名前をEC2Controlと入力しポリシーの作成をクリックします。これでポリシーが作成できました。 f:id:ishitonton:20201115141451p:plain

先程のロールの作成画面に戻り、ポリシー名を検索すると、今作成したポリシーが確認できます。EC2Controlにチェックを入れて、次のステップに進みます。

f:id:ishitonton:20201115141651p:plain

ロールの作成の確認画面に進みますので、ロール名をLambdaEC2Controlとしてロールの作成をクリックして完了です。 f:id:ishitonton:20201115142106p:plain

AWS Lambdaの設定

次にLambdaの設定を行っていきます。ここでは、イベント発生時に実行するスクリプトを登録します。 AWS マネジメントコンソールより、AWS Lambdaにアクセスし、右上の関数の作成をクリックします。

f:id:ishitonton:20201115142624p:plain

関数の作成では、一から作成をクリックし、関数名、ランタイム、アクセス権限を指定していきます。

  • 関数名:slack-cdh-dev(※私の場合、Dev環境にあるCDHのEC2インスタンスをSlackから操作するため、このような名前にしました)
  • ランタイム:Node.js 12.x
  • アクセス権限
    • 既存のロールを使用するをクリックし、先程作成したLambdaEC2Controlを指定します。

設定が完了したら、右下の関数の作成をクリックします f:id:ishitonton:20201115143133p:plain

作成したLambda関数の具体的な設定を行っていきます

f:id:ishitonton:20201115143334p:plain

関数コード

実行するスクリプトを登録します。私の用途としては、Hadoopクラスタに関連するインスタンスを一度に起動・停止するユースケースを想定していたため、インスタンスIDリストで全てハードコードしました。この部分は、実際は、のちに説明する環境変数に登録した方が良い情報かもしれません。

またスクリプト内では、イベント発生時にevent.tokenとevent.textをAPI Gatewayから受け取り、処理を行います。event.tokenはSlackから受け取ったtoken情報です。event.textはSlashコマンドの後ろに続くテキスト情報であり、私の場合が/cdh-dev [start/stop]のstartやstopなどの文字列を表しています。その文字列によって、Node.jsスクリプト内の異なる関数を呼び出し、各APIを実行しています。

'use strict';

const AWS = require('aws-sdk');

// 操作したいEC2インスタンスIDを入力する。※各自が入力する
const instance_list = ["i-xxxxxxxxxxxxxxxxx", "i-xxxxxxxxxxxxxxxxx"]

// EC2 インスタンスを起動する
function startEC2Instance(region, instanceId) {
    const ec2 = new AWS.EC2({ region: region });
    const params = {
        InstanceIds: instance_list,
        DryRun: false,
    };
    return new Promise((resolve, reject) => {
        ec2.startInstances(params, (err, data) => {
            if (err) reject(err);
            else     resolve(data);
        }); 
    });
}

// EC2 インスタンスを停止する
function stopEC2Instance(region, instanceId) {
    const ec2 = new AWS.EC2({ region: region });
    const params = {
        InstanceIds: instance_list,
        DryRun: false,
    };
    return new Promise((resolve, reject) => {
        ec2.stopInstances(params, (err, data) => {
            if (err) reject(err);
            else     resolve(data);
        }); 
    });
}

// EC2 インスタンスのステータスを確認する
function describeStatusEC2Instance(region, instanceId) {
    const ec2 = new AWS.EC2({ region: region });
    const params = {
        InstanceIds: instance_list,
        DryRun: false,
    };
    return new Promise((resolve, reject) => {
        ec2.describeInstanceStatus(params, (err, data) => {
            if (err) reject(err);
            else     resolve(data);
        }); 
    });
}

// 関数指定してインスタンスを制御します。
function executeControl(ec2Function) {
    const result = { EC2: null};
    const a = ec2Function(process.env.EC2_REGION, process.env.EC2_INSTANCE_ID)
        .then(data => {
            result.EC2 = { result: 'OK', data: data };
        }).catch(err => {
            result.EC2 = { result: 'NG', data: err };
        });
    const b = null
    return Promise.all([a, b]).then(() => result );
}

function getSuccessfulResponse(message, result) {
    return {
        "response_type": "in_channel",
        "attachments": [
            {
                "color": "#32cd32",
                "title": 'Success',
                "text": message,
            },
            {
                "title": 'Result',
                "text": '```' + JSON.stringify(result, null, 2) + '```',
            },
        ],
    };
}

function getErrorResponse(message) {
    return {
        "response_type": "ephemeral",
        "attachments": [
            {
                "color": "#ff0000",
                "title": 'Error',
                "text": message,
            },
        ],
    };
}

exports.handler = (event, context, callback) => {
    if (!event.token || event.token !== process.env.SLASH_COMMAND_TOKEN)
        callback(null, getErrorResponse('Invalid token'));
    if (!event.text)
        callback(null, getErrorResponse('Parameter missing'));
    if (event.text.match(/start/)) {
        executeControl(startEC2Instance)
        .then(result => { callback(null, getSuccessfulResponse('Starting...', result)); });
    } else if (event.text.match(/stop/)) {
        executeControl(stopEC2Instance)
        .then(result => { callback(null, getSuccessfulResponse('Stopping...', result)); });
    } else if (event.text.match(/status/)) {
        executeControl(describeStatusEC2Instance)
        .then(result => { callback(null, getSuccessfulResponse('Checking statuses...', result)); });
    } else {
        callback(null, getErrorResponse('Unknown parameters'));
    }
};

コードの作成が完了したらDeployボタンをクリックして、スクリプトを保存します。

環境変数

次の環境変数を入力し保存してください。各環境変数は、上のスクリプト内で利用します。なお、SLASH_COMMAND_TOKENについては、後々Slack側でSlash Commandの設定をする際に発行されるので、現在がひとまずhogehogeなどと入力しておきます。後ほど更新します。

キー
EC2_REGION ap-northeast-1
SLASH_COMMAND_TOKEN hogehoge

API Gatewayの設定

次に、AWSのマネジメントコンソールから、Amazon API Gatewayの画面に進んでください。このサービスでは簡単にAPIを作成することができます。ここでのAPI Gatewayの役割は、Slackからコマンドを受け取り、GETメソッドでLambda関数にリクエストをイベントとして引き渡すことです。APIを作成ボタンをクリックしてください。

f:id:ishitonton:20201115145451p:plain

REST APIを構築する選択に進むと、新しいAPIの設定が行えます。

  • API名:slack-cdh-dev ※Lambda関数と同じ
  • エンドポイントタイプ:リージョン

とし、APIの作成をクリックします。s f:id:ishitonton:20201115153052p:plain

まだ、何もAPIメソッドが登録されていないので、GETメソッドを作成します。アクション >> メソッドの作成をクリックし、GETを選択します。右側の✔ボタンをクリックすると、GETメソッドのセットアップに進みます。

f:id:ishitonton:20201115153637p:plain f:id:ishitonton:20201115154107p:plain

Lambda関数には、先程作成した関数名を入力してください。私の場合はslack-cdh-devです。その他の設定がそのままで保存します。Lambda 関数に権限を追加するというダイアログが出ますので、そのままOKをクリックします。

f:id:ishitonton:20201115154317p:plain

次に、メソッドリクエストと統合リクエストの設定を行っていきます。

f:id:ishitonton:20201115154609p:plain

メソッドリクエス

画面のメソッドリクエスをクリックしてください。ここでは、Slackから受信可能なパラメータの情報を指定します。

まず、リクエストの検証をクリックし、クエリ文字列パラメータおよびヘッダーの検証を選択してください。次に、URL クエリ文字列パラメータのタブを開くと、Slackから受け取るパラメータを指定できます。クエリ文字列の追加をクリックし、以下の情報を追加してください。

名前 必須
text 必須✔
token 必須✔

textは、Slackで/cdh-dev startと入力した際のstart部分の情報が受信できます。tokenは、Slackから受け取るトークン情報です。

統合リクエス

次に一画面前に戻り、統合リクエスをクリックします。ここでは、メソッドリクエストで取得したtextとtokenの情報をlambda関数(のevent引数)に渡す役目を果たします。一番下のマッピングテンプレートをクリックします。

f:id:ishitonton:20201115155651p:plain リクエスト本文のパススルーの設定では、 テンプレートが定義されていない場合 (推奨)を選択し、Content Typeとしてapplication/jsonと入力します。表示されたテンプレート部分に、次のコードを入力して保存してください。この指定で、URL パラメーターの text と token がそれぞれ Lambda 関数側にevent引数のtextとtokenとして渡されます。

{
   "token": "$input.params('token')",
   "text": "$input.params('text')"
}

f:id:ishitonton:20201115160210p:plain

これでAPIの設定は完了です。

APIのデプロイ

作成したAPIをデプロイします。アクション >> APIのデプロイを選択します。

f:id:ishitonton:20201115165950p:plain

APIのデプロイダイアログが表示されます。ここでは、ステージの設定を行えますので、dev環境へのデプロイを行いましょう。

  • デプロイされるステージ:[新しいステージ]
  • ステージ名:dev

そのままデプロイをクリックしてください。

f:id:ishitonton:20201115160850p:plain

次の画面のように、APIのエンドポイントURLが発行されます。これでAPI Gatewayの設定は完了です。 f:id:ishitonton:20201115161029p:plain

Slackの設定

Slack側で、Slash Commandの登録を行っていきます。Slackの管理画面より、その他管理項目 >> アプリを管理するに進みます。検索窓でSlash Commandと検索してもらうと、Slash Commandのアプリが表示されるはずです。Slackに追加をクリックしてください。

f:id:ishitonton:20201115161623p:plain

コマンドを選択するの項目では、実際に利用するSlash Commandを入力します。執筆者の場合には、/cdh-devと入力します。Slash Commandは、その名のとおり、スラッシュ/から始まるコマンドです。入力が終わったらスラッシュコマンドインテグレーションを追加するを押下してください。

f:id:ishitonton:20201115161854p:plain

最後にコマンドの各設定を行っていきます。

  • URL
    • API Gatewayで発行されたURLを入力します。
  • メソッド
    • GET(取得する)メソッドを指定します。
  • トーク

f:id:ishitonton:20201115162425p:plain

一番下のインテグレーションの保存をクリックすれば、Slash Commandは完成です。

lambda関数のバージョンの発行

最後に、残っていることが2つあります。1つはlambda関数の環境変数SLASH_COMMAND_TOKENの編集です。現在hogehoeという値になっていますので、Slash Commandの設定で取得したトークン情報に書き換えてください。

f:id:ishitonton:20201115162640p:plain

2つ目に、Lambda関数の発行を行います。Lambda関数の設定画面上段のアクション >> 新しいバージョンを発行をクリックしてください。これで、最新のlambda関数が利用できるようになりました。

f:id:ishitonton:20201115162912p:plain

Slackから試してみる

任意のチャネルで、/cdh-dev startと入力してください。(※このコマンドは私が指定したコマンドです)。レスポンスとして、Successというメッセージと、各インスタンスの起動状態がjson形式で返ってきます。/cdh-dev stopと入力してあげれば、同様にインスタンスを停止してあげることができます。

f:id:ishitonton:20201115163814p:plain

LightGBMを並列分散処理させるmmlsparkのスケーラビリティを検証した

勾配ブースティングモデルの1つであるLightGBMを分散処理させるライブラリに、mmlsparkがあります。Microsoftが提供しているライブラリで、Spark上で動かすことで並列分散処理を実現します。既存のLightGBMライブラリでも、推論フェーズにおいては分散処理ができる(はず)ですが、学習フェーズではこのmmlsparkを利用することで実現できます。

現在はRC版ではありますが、安定してきたという口コミもあり、実際にSpark上で動作するか試しました。Hadoopのリソースを増やしていくことで、Lighgbmの学習フェーズの処理速度がスケールすることが確認できました。

github.com

LightGBMはGPUを利用して高速に処理できますが、Hadoop環境に大規模データを蓄積している企業などは、既存のクラスタ環境でSparkを利用して分散処理し、データローカリティの恩恵を受けて処理速度が上がれば嬉しいケースがあると思います。

検証環境

  • Python3
  • Spark: v2.4.3
  • mmlspark: 1.0.0-rc3
  • Jupyter Notebook上で実行

検証データ

Sparkの恩恵をうけるために、比較的大きめのデータが必要です。データが小さすぎると、オーバーヘッドコストの方が大きくなります。今回は、KaggleのMicrosoftマルウェアコンペのtrainデータを利用して検証しました。約900万レコードあり、ファイルサイズが約4Gです。

www.kaggle.com

Requirements

  • MMLSpark requires Scala 2.11, Spark 2.4+, and Python 3.5+.

mmlsparkのインストール

spark-shellやpysparkコマンドの--packagesオプションで、mmlsparkのパッケージを指定し、jarファイルを取得します。私の場合は、Jupyter Notebook環境で試したため、PYSPARK_SUBMIT_ARGS環境変数にpysparkの起動オプションを指定しています。この際に、mavenのレポジトリからmmlsparkのパッケージをインストールをしようとすると失敗します。ログを見る限り、mavenの誤ったpathに取りに行っているように見えました。mmlsparkのレポジトリ(https://mmlspark.azureedge.net/maven)を明示的に指定し、そこからパッケージをインストールすると、うまくPysparkを起動することができました。

os.environ['PYSPARK_SUBMIT_ARGS'] = "--master yarn-client --num-executors 5 --executor-cores 2 --executor-memory 10g --packages ,com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc3 --repositories 'https://mmlspark.azureedge.net/maven' pyspark-shell"

実装

まずPysparkの起動とデータ読み込み、前処理として特徴量とターゲット変数の設定を行っていきます。今回はあくまで速度検証のため、14個の連続データカラムをピックアップして試しました。

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

APP_NAME = 'lightgbm'
conf = SparkConf().setAppName(APP_NAME)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

df = sqlContext.read.csv("./train.csv", header=True)

featureCols = ["Census_OSBuildNumber", "Census_OSBuildRevision", "Census_OSInstallLanguageIdentifier", "Census_OSUILocaleIdentifier", "Census_IsPortableOperatingSystem", "Census_IsFlightsDisabled","Census_ThresholdOptIn", "Census_FirmwareManufacturerIdentifier", "Census_FirmwareVersionIdentifier", "Census_IsSecureBootEnabled", "Census_IsWIMBootEnabled", "Census_IsVirtualDevice", "Census_IsTouchEnabled", "Census_IsPenCapable"]

assembled_feature = VectorAssembler(inputCols=featureCols, outputCol='features')
df_new = assembled_feature.setHandleInvalid("skip").transform(df).select(["HasDetections", "features"])
df_new = df_new.na.drop()
df_new.show(4)

次のように、ターゲット変数はマルウェアかどうかの2値データであるHasDetectionsです。特徴量はfeatureColsVector形式にしてfeaturesにまとめています。

+-------------+--------------------+
|HasDetections|            features|
+-------------+--------------------+
|            1|(14,[0,1,2,3,7,8]...|
|            1|(14,[0,1,2,3,7,8]...|
|            0|(14,[0,2,3,7,8],[...|
|            0|(14,[0,1,2,3,7,8]...|
+-------------+--------------------+
only showing top 4 rows

次のLightGBMClassifierの処理速度をexecutorとcore数を変更しながら確認します。(executorとcore数の設定は、PYSPARK_SUBMIT_ARGS環境変数で実施します。)

from mmlspark.lightgbm import LightGBMRegressionModel
from mmlspark.lightgbm import LightGBMClassifier
from mmlspark.lightgbm import LightGBMClassificationModel
import time

start = time.time()
model = LightGBMClassifier(boostingType='gbdt', objective='binary', labelCol="HasDetections", featuresCol="features", numIterations=100, numLeaves=31).fit(train)
print(time.time() - start)

なお、検証外ですが、作成したモデルを利用した検証の実施や、特徴量の重要度の算出は以下のコードで実行します。

model.saveNativeModel("mymodel")
model = LightGBMClassificationModel.loadNativeModelFromFile("mymodel")
scoredData = model.transform(test)

print("FeatureImportance: ", model.getFeatureImportances())
scoredData.limit(10).toPandas()
FeatureImportance:  [155.0, 664.0, 246.0, 202.0, 21.0, 0.0, 1.0, 545.0, 896.0, 134.0, 0.0, 43.0, 21.0, 72.0]

f:id:ishitonton:20201108234436p:plain

検証リソース

下記クラスタリソースで試しました。

  • Executor memory: 10G
  • Executor数:1~4
  • Executor Core数:1 ~ 3

検証結果

最も処理速度が速かったのは、Executo4、コア2の時でした。ローカルモードと比較して、6倍以上に処理速度が向上したことがわかります。

Executorを増加させることで、LightGBMClassifierの学習フェーズの処理速度が上がっていることが確認できます。またコア数を増やしても同様に処理速度は向上しましたが、コアを3にすると速度は低下しました。今回のデータサイズでは、オーバーヘッドコストの方が上回ってしまっていることがわかります。

  コア数:1 コア数:2 コア数:3
ローカルモード 377.22(s) - -
Executor数:1 327.38(s) 178.96(s) 136.83(s)
Executor数:2 164.98(s) 96.42(s) 140.06(s)
Executor数:3 114.76(s) 156.84(s) 171.50(s)
Executor数:4 94.65(s) 56.64(s) 161.60(s)

f:id:ishitonton:20201108231259p:plain

まとめ

mmlsparkを利用して、分散環境でLightGBMを行うことで、クラスタのリソースにより処理速度をスケールさせることができました。そして、超大規模データで勾配ブースティングを実施したい場合には、選択肢の1つに入ってくる可能性があります。ただし、通常のLightGBMと同様の精度が出るかの確認が必要です。

SparkでLasso回帰のハイパーパラメータλをグリッドサーチして特徴量選択する

はじめに

Lasso回帰は、正則化された線形回帰手法の1つで、線形回帰にL1正則化項を追加したモデルです。正則化により過学習を防ぐとともに、不要と判断される説明変数の偏回帰係数がゼロになる性質があります。この性質を利用して、目的変数により影響が高い説明変数のみを選択する特徴量選択を自動で行う手法でもあります。

特徴量選択の方法は、より精度が高いものから、より複雑なものまで様々な方法が用意されています。ビジネス現場では、数百万の中から迅速に変数選択を行う必要がある場合に、Lasso回帰のようなシンプルな方法論が利用されるケースがあります。

この記事では、Sparkを利用してLasso回帰を行い特徴量選択を行います。また、最適なハイパーパラメータλを探索するためにグリッドサーチを行います。

正則化とは

正則化とは、過学習を防いで汎化性能(未知のデータに対する予測力)を得るためのに追加情報を導入するプロセスです。通常、データの複雑さに対するペナルティまたは制約の形で行われます。モデルに正則化項を追加することで、データの外れ値が原因で係数が任意に引き伸ばされる可能性を減らし、モデルの過学習を防ぐことができます。

L1正則化

L2正則化項は、線形回帰の損失関数(最小二乗損失関数)に重みの絶対値和を足し合わせた項です。係数はゼロに向かって収束し、変数の選択に役立つように一部の係数は正確にゼロになります。 f:id:ishitonton:20201010160531p:plain

L2正則化

L2正則化項は、線形回帰の損失関数(最小二乗損失関数)に重みの平方和を足し合わせた項です。ハイパーパラメータλの値が増えるとL2正則化項が大きくなります。正則化を強くし、モデルの重みを小さくするように調整されます。つまり説明変数の影響が大きくなりにくいように抑えてくれます。

Elastic Net

リッジ回帰とLasso回帰の両方のペナルティ制約の効果を組み合わせたモデルです。下記の損失関数が与えられます。 f:id:ishitonton:20201010160624p:plain

Elasticパラメータのα = 1の場合、損失関数はL1正則化(Lasso)になります。α= 0の場合、損失関数はL2正則化(Ridge)となります。αが0〜1の間にある場合、損失関数は、係数にL1(ラッソ)制約とL2(リッジ)制約の両方を組み合わせて実装します。 ハイパーパラメータλが大きくなると、損失関数は切片以外の係数にペナルティを課します。

一般に、過学習を回避するために正則化を使用するため、λが異なる複数のモデルをトレーニングし、テストエラーが最小となるモデルを選択する必要があります。そこで今回はグリッドサーチでλを変動させて、最適なλを探索し、そのモデルの係数を利用して特徴量選択を行います。

Sparkでの実装

  • ハイパーパラメータα = 1とし、lasso回帰を実行します。
  • ハイパーパラメータλを0~1の間でグリッドサーチし、探索します。
  • クロスバリデーション(Folds=5)で実行します。

Lasso回帰に実行にはSparkMLを利用してデータフレーム形式で実行します。label(目的変数)のカラムとfeatures(説明変数)のカラムを用意し、featuresはVectorに変換します。

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

df_raw = spark.read.parquet("/user/test")

featureCols = df_raw.columns
featureCols.remove("label")

assembled_feature = VectorAssembler(inputCols=featureCols, outputCol='features')
df = assembled_feature.transform(df_raw).select(["label", "features"])
df.show()

データフレームの出力結果は以下の通りです。

+--------------------+--------------------+
|                 label|            features|
+--------------------+--------------------+
| 0.15088482464765735|[-0.10498780858,-...|
| 0.15088482464765735|[-0.10498780858,-...|
| 0.44954344663064794|(15,[0,1,2,3,4,5,...|
| 0.21725340731054388|[-0.10498780858,0...|
|-0.23073452566394168|[1.01488214956,1....|
| -0.3302873996582715|[-0.10498780858,-...|
| -0.9607889349556958|[1.01488214956,1....|
|  0.7150177772821946|[-1.22485776671,-...|
| 0.44954344663064794|[-1.22485776671,-...|
|-0.14777379733533322|[-0.10498780858,-...|
| -0.3800638366554367|[-0.10498780858,-...|
| -0.6123538759755407|[1.01488214956,0....|
|-0.46302456498404515|[1.01488214956,0....|
| -0.8114596239642009|[1.01488214956,0....|
|  -1.607882615918842|[1.01488214956,1....|
|  -1.607882615918842|[1.01488214956,1....|
| -0.8944203522928094|[1.01488214956,1....|
|    2.04238943053993|(15,[0,1,2,3,4,5,...|
|   1.710546517225496|(15,[0,1,2,3,4,5,...|
|  2.2912716155257553|(15,[0,1,2,3,4,5,...|
+--------------------+--------------------+
only showing top 20 rows

次に、lasso回帰を実行します。ハイパーパラメータλはグリッドサーチで探索を行い、Folds=5でクロスバリデーションを行います。また、メトリックスにはRMSEを選択します。

# grodsearch parameter scope
regParam_list = [0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1]

lr = LinearRegression(featuresCol="features",
                      labelCol="label",
                      predictionCol="predict",
                      maxIter=20,
                      elasticNetParam=1,
                      tol=1e-06)
paramGrid = ParamGridBuilder().addGrid(lr.regParam, regParam_list).build()
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(predictionCol='predict',
                                                        labelCol='label',
                                                        metricName='rmse'),
                          numFolds=5)

cvModel = crossval.fit(df)

最後にベストモデルを選択し、ハイパーパラメータλを確認します。今回は0.1が採用されました。

model = cvModel.bestModel
# Results of regParam
print(model._java_obj.getRegParam()) # 0.1

偏回帰係数と切片を取得します。いくつかの係数はゼロとなっており、変数選択されていることが確認できます。

print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
Coefficients: [-0.1997601362638407,0.0,-0.20024756083461823,0.03626482379996047,-0.43624005401234084,0.0,0.0,-0.05147738465330625,0.0,0.0,0.0,0.0,0.0,-0.05600529685189293,0.0]
Intercept: 0.048066352404

Sparkでファイル形式や圧縮形式について実験した

大規模データを処理するために、Sparkを活用しています。 先日、Twitter上での投稿に対して、もみじあめさんから、下記コメントをもらいました。

最後に述べている通り、実験が悪く、圧縮率とスループットの関係があまりつかめなかったのですが、せっかくなので今回実験した結果を共有します。

分析基盤で取り扱う3つのフォーマット

分析基盤で扱うファイルフォーマットには大きく3つのフォーマットがあると思います。

  1. テキストフォーマットCSVJsonなど)
  2. 行指向フォーマット(AVROなど)
  3. 列指向(カラムナ)フォーマット(Parquet、ORCなど)

どのように使い分けるか

chieさんのブログがわかりやすかったです。

engineer.retty.me

テキストフォーマットは、人間にとっての可読性が高いことや、アプリケーション間の連携が取りやすいことがメリットだと思います。また、データベース用途では、人間の可読性よりも保存時の圧縮効率や機械による処理のしやすさが重視されます。そこで、行指向列指向のフォーマットが登場します。

行指向フォーマットは、行方向に連続してデータを格納する方式です。従来のデータベースは、この方式を取っており、OLTP(Online Transaction Processing)向きです。逆に列指向フォーマットは、列方向に連続してデータを格納する方式であり、列単位でデータを取り出し分析するOLAP(Online Analytical Processing)向けです。

検証内容

  • 約5億レコードのデータをSparkでHDFSに書き込む処理について、HDFSに書込むファイル形式や圧縮形式を変更した。スループットHDFS上でのファイルサイズがどのように違うかを確認した。

検証環境

  • CDH6(マスター1台、スレーブ4台構成)
    • マスター:4vcpu、メモリ16G、ディスク100GB
    • スレーブ:8vcpu、メモリ32G、ディスク100GB
  • パラメータ(固定)
    • Executor数:4
    • core数:3
  • ファイル形式
    • AVRO、CSV、Parquet
  • 圧縮方法
    • 非圧縮、snappy、deflate、Gzip

検証結果

HDFS書き込み速度

  • ファイル形式によって処理速度に差が出た。parquet形式が全体的に最も処理が早く、CSV、AVROという結果になった。
  • 圧縮方法別では、非圧縮の場合が最も早かった。

f:id:ishitonton:20200917225900p:plain

ここからは雑な仮説になってしまうのですが、今回はHDFSへの書き込む処理のみなので、「圧縮時間」「各ファイル形式へのデシリアライズにかかる時間」「デシリアライズ後のファイルサイズ」が処理時間い影響していそうな気がします。

ファイルの圧縮率

  • parquetが特にファイルサイズが小さくなった。非圧縮でもファイルサイズは小さいが、特にGzipやsnappyにするとかなりの圧縮率になっている。
  • CSV形式で圧縮しないのとでは、約10倍サイズが変わった。 f:id:ishitonton:20200917230514p:plain

考察

私の実験環境、実験データでは、parquetのスループットが非常に速い結果になりました。これは、chieさんのブログにもあるとおり、parquetは、構造的に分散処理に適していることが理由と言えそうです。

また、parquetは非圧縮でも非常にファイルサイズが小さくなっていることがわかりました(対CSV)。これに関しては、parque自体が圧縮に優れていることが言えそうです。ここについては、もう少し考察を述べていきたいと思います。

parquetのエンコーディング

parquetは、列の圧縮に優れているのが特徴です。これが、snappyやGzipで圧縮をしなくても、parquet形式で保存するだけで十分に小さなファイルサイズになった要因だと考えています。では、そのparquetの圧縮についてみていきます。 下の図は、列Xの値を示しています。値が頻繁に繰り返される場合、圧縮しやすい構造です。列指向フォーマットは、列単位で保存を行いますので、同じ値が入っている可能性が高いです。

データウェアハウスで効果的な圧縮の方法に、ビットマップエンコーディングがあります。列XでN個のユニークな値が存在している場合、N個の分離されたビットマップにします。Nが非常に小さい場合には、これらのビットマップは行ごろに1つのビットですみます。Nが非常に大きい場合には、ビットマップの大部分はゼロが並んでいるだけであり、ランレングスエンコーディングを加えることでコンパクトになります。

f:id:ishitonton:20200917233102p:plain

反省

今回の検証では、圧縮率とスループットの関係をあきらかにしたかったのですが、そこまで行きつきませんでした。実際に運用する中では、圧縮形式により必要なCPUリソースが結構変わってきており、OOMになったりスループットが非常に遅くなる事象がありました。次は、もう少しCPU効率を明らかにする実験をしたいと思います。

製造現場における特徴量選択について

https://twitter.com/Ishitonton

最近、個人的に製造業現場でのデータ活用事例を調査しており、特徴量選択(Feature Selection)の重要性を感じるので、観測範囲での状況を記したいと思います。

※製造業と言っても、様々な業態・事業があると思いますので、必ずしも働いている皆様の現場の実態と一致するものではありません。

製造プロセスでは、製品の歩留まりの向上・改善に繋げるために、良品・不良品に影響する要因を明らかにします。非常に長い製造プロセスにおける、様々な製造機器・センサーの品質データを特徴量として、良品/不良品の2値や良品率といった連続値をターゲット変数としてモデリングを行い、要因分析を行います。

そしてこれらを実現するために、データの収集から現場での活用までには、大きく2段階での特徴量選択がありそうです。

  1. 高次元データ(数百万カラム)に対する特徴量選択
  2. 小~中次元データ(数千~数万)に対する特徴量選択(現場)

高次元データ(数百万カラム)に対する特徴量選択

製造プロセスには、大量の機械やセンサーが介在しており、これらのデータは数百万レベルで存在するということです。

一般的に良品や不良品の判別に、これら全てのデータを特徴量としてモデリングに活用することは現実的ではありません。

そこで、最初のフィルタリングステップとして、相関係数カイ二乗検定などの単変量統計量を利用し、数百万レベルの特徴量からモデルリング可能なサイズまでフィルタリングをかける、のが一般的です。

(線形関係のみを考慮した)厳密な精度が求められない単変量統計が活用されているのは、精度とデータ処理時間がトレードオフにある中で、ある意味データセットに対して緩いフィルタリングをかけるという立ち位置であるからです。

数百万レベルのカラムを持つデータに対する処理にはなかなか難しさがあり、各社様々な工夫を行っていることが考えられます。

一般的にはテラバイトレベル以上のデータセットになることから、HadoopやSpark等の分散処理技術が活用されるケースがありそうです。また、今後IoTセンサーの数はますます増えていき、データサイズは今後も飛躍的に増加していきそうですので、スケールアウト可能な環境の方が重宝されると思います。

小~中次元データ(数千~数万)に対する特徴量選択

小~中規模データセットに対する特徴量選択やモデリングは、「製造業 特徴量選択」で検索するとDataRobotの事例記事が多く引っかかります。この場面では、数万レベルの特徴量から、さらに良品・不良品に影響する数百~数千規模での特徴量選択が行われ、勾配ブースティングモデルにおける特徴量の重要度などが利用されたりしているようです。

製造業:センサデータを機械学習に使う l DataRobot

製造業における機械学習の活用事例 l DataRobot

今後の方向性

今後の方向性としては、高次元データにおける特徴量選択の精緻化・高速化が挙げられます。ある種、大量すぎるデータに対して雑でもよいので活用できるサイズに絞る、ということが現場で行われているとも言えます。今後は大規模データセットの環境においてもより高速に重要な特徴量を絞りこめるよう、勾配ブースティングモデル on Sparkの活用なども進んでいくのではないかと思います。

例えば、Sparkでは、勾配ブースティングモデルをデフォルトで提供しています。また、Microsoft社が「LigtGBM on Spark」のRC版をリリースしたりもしています。こういった技術を活用し、初回フィルタリングで重要な特徴を落としてしまうリスクを減らす方向にも進むのではないかと思います。

github.com

そもそも製造業では、データサイエンティストが不足しているらしい

そもそも、『企業が求めるデータサイエンティスト人材像 -データサイエンティスト国内企業採用動向調査(2019)より』の調査によると、現在日本のデータ分析者の在籍傾向として

  • 情報・通信業にデータサイエンティストが偏っている。
  • 製造業における同ロールの人材は圧倒的に不足している

らしいです。製造業ではデータサイエンティストが不足している環境下にあり、一部のスーパーデータサイエンティストの業務を民主化させていくような方向にも進んでいくのではなかと思います。転職するなら、製造現場での高度化・効率化は引き続き重宝される気がします。