日志搜集处理框架Logstash-安装配置

Catalogue
  1. 1. 介绍
  2. 2. 下载和安装
    1. 2.1. 下载地址
  3. 3. 配置
    1. 3.1. 创建配置目录
    2. 3.2. Nginx 日志格式定义
  4. 4. Logstash启动和停止
    1. 4.1. 测试命令
      1. 4.1.1. 测试logstash
      2. 4.1.2. 测试配置文件是否 正确
    2. 4.2. 启动
      1. 4.2.1. 加载*.conf
      2. 4.2.2. 后台运行
    3. 4.3. 停止
      1. 4.3.1. 查找进程 ID
  5. 5. 高级配置
  6. 6. 插件安装
  7. 7. 插件
    1. 7.1. grok,useragent,urldecode和kv插件功能
    2. 7.2. 使用方式
  8. 8. 参考资料

介绍

Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置,比如某个服务器或者文件。
http://kibana.logstash.es/content/logstash/

下载和安装

下载地址

https://www.elastic.co/downloads/logstash

1.方式一:TAR

wget https://download.elastic.co/logstash/logstash/logstash-2.4.0.tar.gz
tar -zxvf logstash-2.4.0.tar.gz

2.方式二:deb

curl -L -O https://download.elastic.co/logstash/logstash/packages/debian/logstash-2.4.0_all.deb
sudo dpkg -i logstash-2.4.0_all.deb

3.方式三:rpm

curl -L -O https://download.elastic.co/logstash/logstash/packages/centos/logstash-2.4.0.noarch.rpm
sudo rpm -vi logstash-2.4.0.noarch.rpm

配置

创建配置目录

先进入 Logstash 根目录
mkdir -p etc
vim etc/www.lanmps.com.conf

etc/test.conf 文件内容

input {
  file {
    type => "nginx-access"
    path => ["/www/wwwLogs/www.lanmps.com/*.log"]
    start_position => "beginning"
  }
}

filter {
    grok {
        "message"=>"%{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" (%{HOSTNAME:domain}|-) %{NUMBER:response} (?:%{NUMBER:bytes}|-) (%{QS:referrer}) %{QS:agent} \"(%{WORD:x_forword}|-)\" (%{URIHOST:upstream_host}|-) (%{NUMBER:upstream_response}|-) (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{USERNAME:upstream_response_time}) > (%{USERNAME:response_time})"
    #匹配模式 message是每段读进来的日志,IP、HTTPDATE、WORD、NOTSPACE、NUMBER都是patterns/grok-patterns中定义好的正则格式名称,对照上面的日志进行编写,冒号,(?:%{USER:ident}|-)这种形式是条件判断,相当于程序里面的二目运算。如果有双引号""或者[]号,需要在前面加\进行转义。
    }
    kv {
                source => "request"
                field_split => "&?"
                value_split => "="
        }
  #再单独将取得的URL、request字段取出来进行key-value值匹配,需要kv插件。提供字段分隔符"&?",值键分隔符"=",则会自动将字段和值采集出来。
    urldecode {
        all_fields => true
    }
  #把所有字段进行urldecode(显示中文)
}

output {
  elasticsearch {
        hosts => ["10.1.5.66:9200"]
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        document_type => "%{type}"
  }
}

配置说明
http://kibana.logstash.es/content/logstash/plugins/input/file.html

Nginx 日志格式定义

    log_format access '$remote_addr - $remote_user [$time_local] "$request" $http_host $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" $upstream_addr $upstream_status $upstream_cache_status "$upstream_http_content_type" $upstream_response_time > $request_time';

Logstash启动和停止

测试命令

测试logstash

bin/logstash -e 'input { stdin { } } output { stdout {codec=>rubydebug} }'

然后你会发现终端在等待你的输入。没问题,敲入 Hello World,回车,然后看看会返回什么结果!

测试配置文件是否 正确

bin/logstash -t -f etc/

启动

加载*.conf

加载etc文件夹下所有 *.conf 的文本文件,然后在自己内存里拼接成一个完整的大配置文件

bin/logstash -f etc/

后台运行

nohup bin/logstash -f etc/ &

停止

查找进程 ID

ps -ef |grep logstash
kill -9 id

高级配置

http://kibana.logstash.es/content/logstash/get_start/full_config.html
http://kibana.logstash.es/content/logstash/plugins/output/elasticsearch.html

插件安装

http://www.jianshu.com/p/4fe495639a9a

插件

grok,useragent,urldecode和kv插件功能

./bin/logstash-plugin install kv
./bin/logstash-plugin install urldecode
./bin/logstash-plugin install grok
./bin/logstash-plugin install useragent

使用方式

input {
    file {
        path => "/home/vovo/access.log"  #指定日志目录或文件,也可以使用通配符*.log输入目录中的log文件。
        start_position => "beginning"
    }
}
filter {
    grok {
        match => {
        "message"=>"%{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" (%{HOSTNAME:domain}|-) %{NUMBER:response} (?:%{NUMBER:bytes}|-) (%{QS:referrer}) %{QS:agent} \"(%{WORD:x_forword}|-)\" (%{URIHOST:upstream_host}|-) (%{NUMBER:upstream_response}|-) (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{USERNAME:upstream_response_time}) > (%{USERNAME:response_time})"
        }
    }
    kv {
                source => "request"
                field_split => "&?"
                value_split => "="
        }
  #再单独将取得的URL、request字段取出来进行key-value值匹配,需要kv插件。提供字段分隔符"&?",值键分隔符"=",则会自动将字段和值采集出来。
    urldecode {
        all_fields => true
    }
  #把所有字段进行urldecode(显示中文)
}

参考资料

-Logstash 日志搜集处理框架 安装配置