Logstash 过滤

1、Logstash 介绍

图片[1]-Logstash 过滤-李佳程的个人主页

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中
Logstash 可以水平伸缩,而且logstash是整个ELK中拥有最多插件的一个组件
Losgtash 基于 Ruby 语言开发

图片[2]-Logstash 过滤-李佳程的个人主页

Logstash 主要功能:

  • 输入 Input:用于日志收集,常见插件:Stdin、File、Kafka、Redis、Filebeat、Http
  • 过滤 Filter:日志过滤和转换,常用插件:grok、date、geoip、mutate、useragent
  • 输出 Output:将过滤转换过的日志输出,常见插件:File,Stdout,Elasticsearch,MySQL,Redis,Kafka
https://www.elastic.co/cn/logstash/
https://www.elastic.co/cn/downloads/logstash
https://www.elastic.co/cn/downloads/past-releases#logstash

2、Logstash 安装

2.1、环境准备安装 Java 环境

root@logstash01:~# apt install -y openjdk-11-jdk

2.2、安装 Logstash

注意: Logstash 版本要和 Elasticsearch 相同的版本,否则可能会出错

https://mirrors.tuna.tsinghua.edu.cn/elasticstack/7.x
root@logstash01:~# wget https://mirrors.tuna.tsinghua.edu.cn/elasticstack/7.x/apt/pool/main/l/logstash/logstash-7.17.8-amd64.deb

root@logstash01:~# dpkg -i logstash-7.17.8-amd64.deb

2.3、修改 Logstash 配置

root@logstash01:~# vim /etc/logstash/logstash.yml
node.name: logstash-node01
pipeline.workers: 2
pipeline.batch.size: 1000    #批量写入的消息个数,可以根据ES的性能做性能优化
pipeline.batch.delay: 5      #批量写入的延时,可以根据ES的性能做性能优化
path.data: /var/lib/logstash #默认值
path.logs: /var/log/logstash #默认值

# 内存优化
root@logstash01:~# vim /etc/logstash/jvm.options
-Xms1g
-Xmx1g

# Logstash默认以logstash用户运行,如果logstash需要收集本机的日志,可能会有权限问题,可以修改为root
root@logstash01:~# vim /etc/systemd/system/logstash.service
[Service]
User=root
Group=root

root@logstash01:~# systemctl daemon-reload
root@logstash01:~# systemctl enable --now logstash.service

3、Logstash 命令

# 配置环境变量
root@logstash01:~# vim /etc/profile.d/logstash.sh
export PATH=/usr/share/logstash/bin/:$PATH

root@logstash01:~# . /etc/profile.d/logstash.sh
https://www.elastic.co/guide/en/logstash/current/first-event.html
# 各种插件
https://www.elastic.co/guide/en/logstash/7.6/input-plugins.html
https://www.elastic.co/guide/en/logstash/7.6/filter-plugins.html
https://www.elastic.co/guide/en/logstash/7.6/output-plugins.html
# 常用选项
-e 指定配置内容
-f 指定配置文件
-t 语法检查
-r 修改配置文件,自动加载生效
# 各种插件帮助
https://www.elastic.co/guide/en/logstash/current/index.html
图片[3]-Logstash 过滤-李佳程的个人主页

范例:列出所有插件

root@logstash01:~# /usr/share/logstash/bin/logstash-plugin list

Github logstash插件链接

https://github.com/logstash-plugin
图片[4]-Logstash 过滤-李佳程的个人主页

4、Logstash 输入 Input 插件

https://www.elastic.co/guide/en/logstash/7.6/input-plugins.html

4.1、标准输入

范例:交互式实现

# 标准输入和输出,codec => rubydebug指输出格式,是默认值,可以省略
root@logstash01:~# /usr/share/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug }}'

Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2023-01-04 01:35:13.009 [main] runner - Starting Logstash {"logstash.version"=>"7.17.8", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.17+8 on 11.0.17+8 +indy +jit [linux-x86_64]"}
[INFO ] 2023-01-04 01:35:13.022 [main] runner - JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djdk.io.File.enableADS=true, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true]
[INFO ] 2023-01-04 01:35:13.113 [main] settings - Creating directory {:setting=>"path.queue", :path=>"/usr/share/logstash/data/queue"}
[INFO ] 2023-01-04 01:35:13.138 [main] settings - Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/share/logstash/data/dead_letter_queue"}
[WARN ] 2023-01-04 01:35:13.471 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2023-01-04 01:35:13.515 [LogStash::Runner] agent - No persistent UUID file found. Generating new UUID {:uuid=>"bb1b73b3-3186-4151-a0c9-e0ebe8f06773", :path=>"/usr/share/logstash/data/uuid"}
[INFO ] 2023-01-04 01:35:15.157 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[INFO ] 2023-01-04 01:35:16.295 [Converge PipelineAction::Create<main>] Reflections - Reflections took 99 ms to scan 1 urls, producing 119 keys and 419 values
[WARN ] 2023-01-04 01:35:17.063 [Converge PipelineAction::Create<main>] line - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[WARN ] 2023-01-04 01:35:17.097 [Converge PipelineAction::Create<main>] stdin - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[INFO ] 2023-01-04 01:35:17.493 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x731e7f0a run>"}
[INFO ] 2023-01-04 01:35:18.617 [[main]-pipeline-manager] javapipeline - Pipeline Java execution initialization time {"seconds"=>1.12}
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.jrubystdinchannel.StdinChannelLibrary$Reader (file:/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/jruby-stdin-channel-0.2.0-java/lib/jruby_stdin_channel/jruby_stdin_channel.jar) to field java.io.FilterInputStream.in
WARNING: Please consider reporting this to the maintainers of com.jrubystdinchannel.StdinChannelLibrary$Reader
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[INFO ] 2023-01-04 01:35:18.715 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[INFO ] 2023-01-04 01:35:18.818 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
hello world
{
    "@timestamp" => 2023-01-04T01:35:42.475Z,  #当前事件的发生时间
          "host" => "logstash01.test.com",     #标记事件发生在哪里
       "message" => "hello world",             #消息的具体内容
      "@version" => "1"          #事件版本号,一个事件就是一个ruby对象
}

# 后续还可继续输入其它信息,按ctrl+c退出
# 指定输入信息为Json格式
root@logstash01:~# /usr/share/logstash/bin/logstash -e 'input { stdin{ codec => json } } output { stdout{ codec => rubydebug }}'
......
 { "name":"ljc","age": "18"}
{
          "name" => "ljc",
           "age" => "18",
    "@timestamp" => 2023-01-04T01:38:59.918Z,
      "@version" => "1",
          "host" => "logstash01.test.com"
}

范例:以配置文件实现

root@logstash01:~# vim /etc/logstash/conf.d/stdin_to_stdout.conf
input {
   stdin {
       type => "stdin_type"      #自定义事件类型,可用于后续判断
       tags => "stdin_tag"       #自定义事件tag,可用于后续判断
       codec => "json"           #指定Json 格式

   }
}
output {
   stdout {
       codec => "rubydebug" #输出格式,此为默认值,可省略
   }
}

# 语法检查
root@logstash01:~# logstash -f /etc/logstash/conf.d/stdin_to_stdout.conf -t
......
Configuration OK
[INFO ] 2023-01-04 01:44:38.104 [LogStash::Runner] runner - Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

# 执行logstash
root@logstash01:~# logstash -f /etc/logstash/conf.d/stdin_to_stdout.conf
......
hello,lijiacheng
[WARN ] 2023-01-04 01:45:33.465 [[main]<stdin] jsonlines - JSON parse error, original data now in message field {:message=>"Unrecognized token 'hello': was expecting ('true', 'false' or 'null')\n at [Source: (String)\"hello,lijiacheng\"; line: 1, column: 6]", :exception=>LogStash::Json::ParserError, :data=>"hello,lijiacheng"}
{
          "type" => "stdin_type",
       "message" => "hello,lijiacheng",
          "host" => "logstash01.test.com",
    "@timestamp" => 2023-01-04T01:45:33.489Z,
      "@version" => "1",
          "tags" => [
        [0] "_jsonparsefailure",
        [1] "stdin_tag"
    ]
}

hello,logstash
[WARN ] 2023-01-04 01:46:03.199 [[main]<stdin] jsonlines - JSON parse error, original data now in message field {:message=>"Unrecognized token 'hello': was expecting ('true', 'false' or 'null')\n at [Source: (String)\"hello,logstash\"; line: 1, column: 6]", :exception=>LogStash::Json::ParserError, :data=>"hello,logstash"}
{
          "type" => "stdin_type",
       "message" => "hello,logstash",
          "host" => "logstash01.test.com",
    "@timestamp" => 2023-01-04T01:46:03.200Z,
      "@version" => "1",
          "tags" => [
        [0] "_jsonparsefailure",
        [1] "stdin_tag"
    ]
}

 { "name":"ljc","age": "18"}
{
           "age" => "18",
          "type" => "stdin_type",
          "name" => "ljc",
          "host" => "logstash01.test.com",
    "@timestamp" => 2023-01-04T01:46:46.683Z,
      "@version" => "1",
          "tags" => [
        [0] "stdin_tag"
    ]
}

4.2、从文件输入

Logstash 会记录每个文件的读取位置,下次自动从此位置继续向后读取
每个文件的读取位置记录在 /var/lib/logsstash/plugins/inputs/files/.sincedb_xxxx对应的文件中
此文件包括文件的 inode号,大小等信息

范例:

root@logstash01:~# vim /etc/logstash/conf.d/file_to_stdout.conf
input {
   file {
       path => "/tmp/wang.*"
       type => "wanglog"             #添加自定义的type字段,可以用于条件判断
       exclude => "*.txt"            #排除不想采集数据的文件,基于通配符glob匹配语法
       start_position => "beginning" #第一次丛头开始读取文件,可以取值为:beginning和
end
       stat_interval => "3"  #定时检查文件是否更新,默认1s
       codec => json                 #如果文件是Json格式,需要指定此项才能解析,如果不是Json格式,却添加此行也不会影响结果
   }
   file {
   path => "/var/log/syslog"
   type => "syslog"
   start_position => "beginning"
   stat_interval => "3"
   }
}
output {
   stdout {
       codec => rubydebug
   }
}

root@logstash01:~# logstash -f /etc/logstash/conf.d/file_to_stdout.conf -t

root@logstash01:~# echo line1 >> /tmp/test.log

root@logstash01:~# logstash -f /etc/logstash/conf.d/file_to_stdout.conf
{
      "@version" => "1",
          "path" => "/tmp/test.log",
          "type" => "testlog",
       "message" => "line1",
          "tags" => [
        [0] "_jsonparsefailure"
    ],
    "@timestamp" => 2023-01-04T01:54:15.144Z,
          "host" => "logstash01.test.com"
}

root@logstash01:~# echo line2 >> /tmp/test.log

{
      "@version" => "1",
          "path" => "/tmp/test.log",
          "type" => "testlog",
       "message" => "line2",
          "tags" => [
        [0] "_jsonparsefailure"
    ],
    "@timestamp" => 2023-01-04T01:54:27.151Z,
          "host" => "logstash01.test.com"
}

logstash利用 sincedb 文件记录了logstash收集的记录文件的信息,比如位置,以方便下次接着从此位置继续收集日志

root@logstash01:~# cat /var/lib/logstash/plugins/inputs/file/.sincedb_4188f728070d4ab63807bb3525fa05c4
143 0 64768 24 1672797265.2522361 /tmp/test.log
# 记录了收集文件的inode和大小等信息
root@logstash01:~# ll -li /tmp/test.log
143 -rw-r--r-- 1 root root 24 Jan  4 01:54 /tmp/test.log

4.3、从 Http 请求买取数据

root@logstash01:~# vim /etc/logstash/conf.d/http_to_stdout.conf
input {
   http {
       port =>6666
       codec => json
   }
}
output {
   stdout {
       codec => rubydebug
   }
}

root@logstash01:~# logstash -f /etc/logstash/conf.d/http_to_stdout.conf -t

root@logstash01:~# logstash -f /etc/logstash/conf.d/http_to_stdout.conf -r
{
      "@version" => "1",
    "@timestamp" => 2023-01-04T02:53:32.321Z,
          "host" => "192.168.1.105",
       "message" => "",
       "headers" => {
           "request_path" => "/",
            "http_accept" => "*/*",
              "http_host" => "192.168.1.108:6666",
         "content_length" => "0",
         "request_method" => "GET",
        "http_user_agent" => "curl/7.68.0",
           "http_version" => "HTTP/1.1"
    },
          "tags" => [
        [0] "_grokparsefailure"
    ]
}
{
      "@version" => "1",
    "@timestamp" => 2023-01-04T02:54:12.139Z,
          "host" => "192.168.1.105",
       "message" => "test log message",
       "headers" => {
           "request_path" => "/",
            "http_accept" => "*/*",
              "http_host" => "192.168.1.108:6666",
         "content_length" => "16",
         "request_method" => "POST",
           "content_type" => "application/x-www-form-urlencoded",
        "http_user_agent" => "curl/7.68.0",
           "http_version" => "HTTP/1.1"
    },
          "tags" => [
        [0] "_grokparsefailure"
    ]
}

# 执行下面访问可以看到上面信息
root@web01:~# curl http://192.168.1.108:6666
ok
root@web01:~# curl -XPOST -d'test log message' http://192.168.1.108:6666
ok

4.4、从 Filebeat 读取数据

root@web01:~# vim /etc/filebeat/filebeat.yml

output.logstash:
  # The Logstash hosts
  hosts: ["192.168.1.108:5044"]

root@web01:~# systemctl restart filebeat.service

root@logstash01:~# vim /etc/logstash/conf.d/filebeat_to_stdout.conf
input {
   beats {
       port => 5044
   }
}
output {
   stdout {
       codec => rubydebug
   }
}

root@logstash01:~# logstash -f /etc/logstash/conf.d/filebeat_to_stdout.conf -t

root@logstash01:~# logstash -f /etc/logstash/conf.d/filebeat_to_stdout.conf
{
                "ecs" => {
        "version" => "1.12.0"
    },
       "responsetime" => 0,
            "tcp_xff" => "-",
               "tags" => [
        [0] "nginx-access",
        [1] "beats_input_raw_event"
    ],
             "domain" => "192.168.1.105",
           "clientip" => "192.168.1.108",
               "size" => 12,
               "host" => {
                   "id" => "9eaba3e85c9a4e16b66cda021bc16221",
                   "os" => {
             "version" => "20.04.5 LTS (Focal Fossa)",
            "codename" => "focal",
                "type" => "linux",
            "platform" => "ubuntu",
                "name" => "Ubuntu",
              "family" => "debian",
              "kernel" => "5.4.0-131-generic"
        },
             "hostname" => "web01.test.com",
                 "name" => "web01.test.com",
        "containerized" => false,
                   "ip" => [
            [0] "192.168.1.105",
            [1] "fe80::20c:29ff:fe6c:ce9f"
        ],
         "architecture" => "x86_64",
                  "mac" => [
            [0] "00:0c:29:6c:ce:9f"
        ]
    },
             "status" => "200",
              "agent" => {
                  "id" => "2fe34205-12d8-4f37-a3f5-39e0d42f73e6",
             "version" => "7.17.8",
                "type" => "filebeat",
                "name" => "web01.test.com",
            "hostname" => "web01.test.com",
        "ephemeral_id" => "71304619-c65a-447f-b9dc-c0cd4b67c0b8"
    },
       "upstreamtime" => "-",
            "referer" => "-",
                "uri" => "/index.html",
       "upstreamhost" => "-",
    "http_user_agent" => "curl/7.68.0",
                "log" => {
        "offset" => 426307,
          "file" => {
            "path" => "/var/log/nginx/access_json.log"
        }
    },
           "@version" => "1",
              "input" => {
        "type" => "log"
    },
          "http_host" => "192.168.1.105",
         "@timestamp" => 2023-01-04T02:17:30.000Z,
                "xff" => "-"
}

# 执行下面访问可以看到上面信息
root@logstash01:~# curl http://192.168.1.105

4.5、从 Redis 中读取数据

支持由多个 Logstash 从 Redis 读取日志,提高性能
Logstash 从 Redis 收集完数据后,将删除对应的列表Key

input {
  redis {
    host => 'Redis_IP'
    port => "6379"
    password => "123456"
    db => "0"
    data_type => 'list'
    key => "nginx-accesslog"
  }
}
output {
   stdout {
       codec => rubydebug
   }
}

4.6、从 Kafka 中读取数据

input {
   kafka {
       bootstrap_servers => "192.168.1.87:9092"
       group_id => "logstash"
       topics => ["nginx-accesslog","nginx-errorlog"]
       codec => "json"
       consumer_threads => 8
   }
}
output {
   stdout {
       codec => rubydebug
   }
}

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享