まめログ

Javaプログラマの歩み

LogstashのJdbc_streaming filterを使ってDBから取得した項目をログに付加する

この記事は、Elastic Stack Advent Calendar 22日目の記事となります。

qiita.com

皆さん、ログ可視化してますか?

ログにIDしか出力されておらず、名前がほしいなと思ったことはありませんか?
僕は、常々名前がほしいなと思ってました。
ログ出力処理をいじって、ログに出力されるようにするのが一番なのですが、
該当のシステムは外部ベンダーが作っており、なかなか難しかったりします。

そこで色々手段を検討していたのですが、
Logstash自身に、ログの一部をキーにDB検索をし結果をログに付与する機能があるのを見つけました。
それがJdbc_streaming filterです。
www.elastic.co

inputをJDBCで出来るのは知っていたけど、filter処理もできるとは知りませんでした。

環境

以下の環境で動作確認をしています。

OS : Windows 10 Pro
Java : Oracle JDK 8u144
Logstash : 6.1.1

下準備

DBと読み込む対象のCSVファイルを準備します。

JDBC Driverのダウンロード

使っているRDBMSに即したJDBC Driverが必要になります。
今回はPostgreSQLを使用するので、PostgreSQLJDBC Driverをダウンロードしてきます。

jdbc.postgresql.org

対象のテーブル

テーブルはこんな感じにデータを入れています。
f:id:mamepika:20171222210655p:plain

読み込む対象のCSVファイル

読み込むCSVファイルは以下のような感じです。

2017/12/22 00-00-00.000,hostap1,_07obfLIAdLHyL5VjcTWs8Zu_2xsN-QQ,1,Login
2017/12/22 00-00-00.000,hostap1,_07obfLIAdLHyL5VjcTWs8Zu_2xsN-QQ,2,Login
2017/12/22 00-00-00.000,hostap1,_07obfLIAdLHyL5VjcTWs8Zu_2xsN-QQ,3,Login
2017/12/22 00-00-00.000,hostap1,_07obfLIAdLHyL5VjcTWs8Zu_2xsN-QQ,4,Login
2017/12/22 00-00-00.000,hostap1,_07obfLIAdLHyL5VjcTWs8Zu_2xsN-QQ,5,Login
2017/12/22 00-00-00.000,hostap1,_07obfLIAdLHyL5VjcTWs8Zu_2xsN-QQ,1,Logout

Logstashの準備

設定ファイルを以下のように作成します。

上記の公式ページのjdbc_connection_string には "が2つ記述されていますが、
2つ書いたままだと動かなかったので、1つにしています。

JDBCの設定自体は普段Javaを書いている人には馴染みのあるものかと思います。

input{
    file{
      path => "C:/dev/members.txt"
      start_position => "beginning"
    }
}
filter{
     csv{
       columns => ["processingDate","hostName","sessionId","memberid","function"]
       separator => ","
     }
     date{
      match => ["processingDate" , "yyyy-MM-dd_HH-mm-ss.SSS"]
     }
     jdbc_streaming{
       jdbc_driver_library => "C:/dev/postgresql-42.1.4.jar"
       jdbc_driver_class => "org.postgresql.Driver"
       jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
       jdbc_user => "postgres"
       jdbc_password => "password"
       statement => "select * from members WHERE memberid = :memberid"
       parameters => { "memberid" => "memberid"}
       target => "member_details"
     }
     mutate{
       remove_field => ["@version","host"]
     }
}
output{
  stdout {codec => rubydebug}
}    

このファイルは、Logstashフォルダ配下のbinフォルダに保存します。

Logstashの実行

コマンドプロンプトから実行します。
Logstashフォルダ配下のbinフォルダまで下ってから以下のコマンドで実行できます。

logstash -f <設定ファイル名>

結果の確認

以下のように、ログには書かれていなかった項目がDBから取得され、付与されていることが確認できます。
尚、結果が複数返ってくるSQLの場合は配列になって格納されるようです。
今回は*で全カラムを取得していますが、カラムを指定すれば、そのカラムだけ配列に入ります。
f:id:mamepika:20171222211519p:plain

パフォーマンス

ローカルPC上で起動したLogstashから
RDS for PostgreSQLのt2.smallインスタンスに対して実行した際に、
秒間600~1000行程度のログを捌くことが出来ていたので、パフォーマンス的にはさほど問題にならないかなと思います。


Elastic Stackは進化のスピードが速く、どんどん痒いところにまで手が届いていく感じが好きです。
来年からはX-Packの機能も触っていけるので、blogに書く機会も増やしていきたいな~。