まめログ

Javaプログラマの歩み

WindowsでLogstashを使ってElasticsearchに既存のログを投入する #elasticsearch

Elasticsearch、Kibanaの環境を構築したら、
次はログを流し込んで可視化したくなるのが人情ですよね。

ということで出番なのがLogstash。

以下のURLからダウンロードできます。

www.elastic.co

通常の使い方はLogstashを各種サーバにインストールし、
リアルタイムでログを転送し可視化するのですが、
まずは手始めとして、手元にあるログをElasticsearchに転送してみようと思います。

動作させている環境は以下の通りです。

OS:Windows 10 Pro
Java: Oracle JDK 8u121
Logstash:5.2.1

Elasticsarch,Kibanaは以下の記事で作成したものを使っています。
mamelog.hatenablog.jp


Logstashとは

公式ページには以下の記述があります。

https://www.elastic.co/jp/products/logstash

  1. Logstashは、オープンソースのサーバーサイドデータ処理パイプラインで、
  2. 膨大な数のソースから同時にデータを取り込み、
  3. 変換して、指定された任意の「格納庫(スタッシュ)」(私たちの場合は当然、Elasticsearchです)に送信します。

つまるところ、いろんな形式のデータを一か所に集約できる凄いツールってことですね。

使い方

ダウンロードしてきたzipファイルを適当な場所に解凍します。
(C:\dev配下に解凍しました。)

f:id:mamepika:20170226144205p:plain

テストデータを用意する

こんな感じの:区切りのデータを用意しました。

2017-02-26_00-00-00.123:hostap1:B01hkafjaglggkgngaJHFJHGlgkghwgh:00000123:OCC001:CREATE:0,351:0.901
2017-02-26_03-00-00.123:hostap1:B01hkafjaglggkgngaJHFJHGlgkghwgh:00000123:OCC001:CREATE:0,351:0.901
2017-02-26_06-00-00.123:hostap1:B01hkafjaglggkgngaJHFJHGlgkghwgh:00000123:OCC001:CREATE:0,351:10.901
2017-02-26_09-00-00.123:hostap1:B01hkafjaglggkgngaJHFJHGlgkghwgh:00000123:OCC001:CREATE:0,251:0.901
2017-02-26_12-00-00.123:hostap1:B01hkafjaglggkgngaJHFJHGlgkghwgh:00000123:OCC001:CREATE:0,51:0.901
2017-02-26_15-00-00.123:hostap1:B01hkafjaglggkgngaJHFJHGlgkghwgh:00000123:OCC001:DELETE:0,551:0.901
2017-02-26_18-00-00.123:hostap1:B01hkafjaglggkgngaJHFJHGlgkghwgh:00000123:OCC001:CREATE:0,351:0.901
2017-02-26_21-00-00.123:hostap1:B01hkafjaglggkgngaJHFJHGlgkghwgh:00000123:OCC001:UPDATE:0,311:0.901

設定ファイルを書く

Logstashは大きく分けて、
input,filter,outputの3つの処理に分かれます。

こんな感じで書いてみました。

input{
    file{
      path => "C:\dev\request.txt"
      start_position => "beginning"
    }
}

filter{
     csv{
       columns => [
         "processingDate",
         "hostName",
         "sessionId",
         "merchantId",
         "functionId",
         "operateId",
         "internalProcessingTime",
         "externalProcessingTime"]
       separator => ":"
     }
     date{
      match => ["processingDate" , "yyyy-MM-dd_HH-mm-ss.SSS"]
      timezone => "Asia/Tokyo"
     }
     mutate{
       convert => {
          internalProcessingTime => float
          externalProcessingTime => float
       }
       remove_field => ["telegram","@version","host"]
     }
}
output{
    elasticsearch{
       hosts => ["<elasticsearchが動作しているサーバのIPアドレス>"]
       index => "request-%{+YYYYMMdd}"
    }
}    

input-file

pathに取り込みたいファイルのパスを書きます。
start_positionはファイルのどこから取り込み対象にするかを指定します。"beginning"を指定することでファイルの最初から取り込めます。
デフォルトではLogstash起動後に更新された行から取り込み対象にするようです。

filter-csv

CSV形式のファイルを取り込むときに使用します。csvと書いていますが、今回のようにseparatorに任意の文字を指定することで
カラム区切りでない文字列にも対応できます。
columusに区切ったそれぞれの値の項目名を指定します。

date

様々な書式の日付の文字列をパースします。

mutate-convert

指定しないと全て文字列になってしまうので、数値の項目は明示的に変換をかける必要があります。
floatやintegerが指定可能です。
数値にしておかないと、平均や合計、最大値最小値などの値を可視化出来なくなるので注意が必要です。

output-elasticsearch

Elasticsearchにデータを転送するのでelasticsearchを指定します。
hostsにIPアドレスを指定すればOKです。
indexはこの設定で投入されるデータのインデックス名です。
毎日違うインデックス名で投入するようにする設定になっています。

Logstashを起動する

コマンドプロンプトでLogstashをインストールしたフォルダ直下のbinフォルダに移動します。

cd C:\dev\logstash-5.2.1\bin 

以下のように設定ファイルを指定して起動します

logstash -f request.conf

しばらくすればElasticsearchにデータが投入されKibanaで見ることができます。
f:id:mamepika:20170226143546p:plain
テストデータの通り、3時間おきの計8件のデータが投入されました。

インデックス名と@timestampの日付がずれる

時差、タイムゾーンの問題だと思うのですが、
設定ファイルでインデックス名をパラメータ指定していると、午前9時までのデータが前日のindex名で格納されてしまいます。
f:id:mamepika:20170226143859p:plain

indexを日付指定で削除するような運用をしていきたいのですが、
この状態だと日付をまたいでデータが削除されてしまうので困っています。