Logo

dev-resources.site

for different kinds of informations.

Building a Smart Log Pipeline: Syslog Parsing, Data Enrichment, and Analytics with Logstash, Elasticsearch, and Ruby

Published at
11/19/2024
Categories
ruby
infrastructureascode
elasticsearch
Author
daviducolo
Author
10 person written this
daviducolo
open
Building a Smart Log Pipeline: Syslog Parsing, Data Enrichment, and Analytics with Logstash, Elasticsearch, and Ruby

Introduction

In modern IT environments, effectively managing and analyzing log data is essential for monitoring user access, ensuring security, and maintaining system integrity. Logs serve as the backbone of any monitoring system, providing a wealth of information about system behavior, user actions, and potential security threats. However, the sheer volume of logs and the diversity of log formats can make it challenging to extract meaningful insights.

To address these challenges, we leverage Logstash, a robust data processing pipeline, to parse unstructured syslog messages and transform them into a structured format. This allows us to extract critical information such as user access details, timestamps, and IP addresses. But parsing alone isnโ€™t enough. By integrating Logstash with a production database, we can enrich the log data with additional context, such as user roles, email addresses, or organizational departments.

Once enriched, this data is sent to Elasticsearch, a powerful search and analytics engine that enables rapid querying and visualization of logs. Finally, we use Ruby to craft sophisticated queries and perform targeted analyses, empowering teams to gain actionable insights quickly.

Why This Pipeline Matters

  • Enhanced Security: Logs enriched with user details help detect unauthorized access and monitor suspicious activities.
  • Operational Efficiency: Structured and searchable logs make troubleshooting faster and more accurate.
  • Data-Driven Decisions: Insights derived from log data enable proactive decision-making, minimizing downtime and optimizing system performance.
  • Scalability: This pipeline handles large volumes of log data, making it suitable for enterprise-scale applications.

This article provides a step-by-step guide to building this advanced logging pipeline. By the end, youโ€™ll have a scalable solution capable of transforming raw logs into actionable intelligence.


Table of Contents

  1. Prerequisites
  2. Logstash Configuration
  3. Sending Data to Elasticsearch
  4. Querying Elasticsearch with Ruby
  5. References
  6. Conclusion

Prerequisites

Before proceeding, ensure you have the following components installed and properly configured:

  • Logstash: Installed on the server that will process the syslog data.
  • Elasticsearch: Running and accessible for storing the parsed logs.
  • Ruby: Installed on your system to execute Ruby scripts for querying.
  • Production Database: Accessible from the Logstash server for data enrichment (e.g., MySQL, PostgreSQL).

Additionally, install the necessary Logstash plugins and Ruby gems:

# Install Logstash JDBC input plugin if not already installed
bin/logstash-plugin install logstash-input-jdbc

# Install Ruby gems
gem install elasticsearch
gem install mysql2   # Replace with appropriate gem for your DB
Enter fullscreen mode Exit fullscreen mode

Logstash Configuration

Logstash uses a configuration file to define the data pipeline, consisting of input, filter, and output stages. Below is a sample configuration tailored to parse syslog messages, enrich them with user data from a production database, and send the results to Elasticsearch.

Input Configuration

Configure Logstash to listen for syslog messages over UDP (port 514 is standard for syslog).

input {
  udp {
    port => 514
    type => "syslog"
    codec => "plain"  # Assumes syslog messages are plain text
  }
}
Enter fullscreen mode Exit fullscreen mode

Filter Configuration

Grok Filter for Syslog Parsing

Use the Grok filter to parse the incoming syslog messages and extract relevant fields such as timestamp, hostname, program, and user access details.

filter {
  if [type] == "syslog" {
    grok {
      match => { 
        "message" => "%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} %{DATA:program}\[%{POSINT:pid}\]: User %{WORD:user} accessed %{URIPATH:resource} from %{IP:ip_address}"
      }
      overwrite => ["message"]
    }

    date {
      match => [ "timestamp", "MMM dd HH:mm:ss", "MMM  d HH:mm:ss" ]
      timezone => "UTC"
    }

    # Remove unnecessary fields
    mutate {
      remove_field => ["type", "timestamp"]
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Explanation:

  • grok: Parses the syslog message to extract fields.
  • date: Converts the extracted timestamp to Logstash's @timestamp field.
  • mutate: Cleans up by removing redundant fields.

Data Enrichment from Production Database

To enrich the log data with additional user information from a production database, use the jdbc filter. This example assumes a MySQL database containing user details.

filter {
  if [type] == "syslog" {
    jdbc {
      jdbc_connection_string => "jdbc:mysql://db_host:3306/production_db"
      jdbc_user => "db_user"
      jdbc_password => "db_password"
      jdbc_driver_library => "/path/to/mysql-connector-java.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      statement => "SELECT email, department FROM users WHERE username = :user"
      parameters => { "user" => "%{user}" }
      target => "user_info"
    }

    # Merge the user_info into the main event
    mutate {
      add_field => { "email" => "%{[user_info][email]}" }
      add_field => { "department" => "%{[user_info][department]}" }
      remove_field => ["user_info"]
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Explanation:

  • jdbc: Connects to the production database to retrieve additional user information based on the username extracted from the syslog.
  • parameters: Uses the %{user} field from the log event to query the database.
  • mutate: Incorporates the retrieved email and department fields into the main log event and removes the temporary user_info field.

Note: Ensure the JDBC driver for your database is available at the specified path.

Output Configuration

Send the enriched log data to Elasticsearch for storage and analysis.

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "syslog-%{+YYYY.MM.dd}"
    user => "elastic_user"       # If Elasticsearch security is enabled
    password => "elastic_pass"   # Replace with actual credentials
  }

  # Optional: Output to stdout for debugging
  stdout { codec => rubydebug }
}
Enter fullscreen mode Exit fullscreen mode

Sending Data to Elasticsearch

With the above configuration, Logstash will parse incoming syslog messages, enrich them with data from the production database, and index them into Elasticsearch. Ensure that Elasticsearch is running and accessible from the Logstash server. You can verify the ingestion by querying Elasticsearch or using Kibanaโ€™s Discover feature.

# Example curl command to verify data ingestion
curl -X GET "localhost:9200/syslog-*/_search?pretty"
Enter fullscreen mode Exit fullscreen mode

Querying Elasticsearch with Ruby

Ruby can be used to perform advanced queries on the indexed log data in Elasticsearch. Below is a sample Ruby script that connects to Elasticsearch, retrieves logs for a specific user, and displays relevant information.

Sample Ruby Script

# query_syslog.rb
require 'elasticsearch'
require 'dotenv/load'  # If using environment variables

# Initialize the Elasticsearch client
client = Elasticsearch::Client.new(
  host: 'localhost:9200',
  user: 'elastic_user',
  password: 'elastic_pass',
  log: true
)

# Define the index pattern
index_pattern = 'syslog-*'

# Define the search query
search_query = {
  query: {
    bool: {
      must: [
        { match: { user: 'john_doe' } }
      ],
      filter: [
        { range: { "@timestamp" => { gte: "now-7d/d", lte: "now/d" } } }
      ]
    }
  },
  sort: [
    { "@timestamp" => { order: "desc" } }
  ],
  size: 50
}

begin
  # Execute the search
  response = client.search(index: index_pattern, body: search_query)

  # Process and display the results
  response['hits']['hits'].each do |hit|
    source = hit['_source']
    puts "Timestamp: #{source['@timestamp']}"
    puts "User: #{source['user']}"
    puts "Email: #{source['email']}"
    puts "Department: #{source['department']}"
    puts "Resource Accessed: #{source['resource']}"
    puts "IP Address: #{source['ip_address']}"
    puts "-" * 40
  end
rescue => e
  puts "An error occurred: #{e.message}"
end
Enter fullscreen mode Exit fullscreen mode

Running the Script

Save the script to a file, for example, query_syslog.rb, and execute it using Ruby:

ruby query_syslog.rb
Enter fullscreen mode Exit fullscreen mode

Ensure that the Elasticsearch credentials and host details match your setup.

References

  1. Logstash Documentation
  2. Elasticsearch Ruby Client
  3. Groking Logs with Logstash
  4. JDBC Input Plugin

Conclusion

Configuring Logstash to parse syslog messages, enrich them with data from a production database, and send the results to Elasticsearch provides a powerful solution for monitoring user access and enhancing security insights. By leveraging Ruby for querying, you can perform sophisticated analyses and generate reports tailored to your organizational needs. This setup not only centralizes log management but also facilitates real-time data enrichment and comprehensive querying capabilities, thereby enhancing your ability to maintain and secure your IT infrastructure effectively.

elasticsearch Article's
30 articles in total
Favicon
Intelligent PDF Data Extraction and database creation
Favicon
Debugging Elasticsearch Cluster Issues: Insights from the Field
Favicon
Search Engine Optimisation
Favicon
Advantages of search databases
Favicon
Advanced Search in .NET with Elasticsearch(Full Video)
Favicon
Real-Time Data Indexing: Powering Instant Insights and Scalable Querying
Favicon
Coding challenge: Design and Implement an Advanced Text Search System
Favicon
tuistash: A Terminal User Interface for Logstash
Favicon
Navigating Search Solutions: A Comprehensive Comparison Guide to Meilisearch, Algolia, and ElasticSearch
Favicon
Elastic Cloud on Kubernetes (ECK) with custom domain name
Favicon
Step-by-Step Guide to Configuring Cribl and Grafana for Data Processing
Favicon
Exploring Logging Best Practices
Favicon
Building a Smart Log Pipeline: Syslog Parsing, Data Enrichment, and Analytics with Logstash, Elasticsearch, and Ruby
Favicon
How to connect to AWS OpenSearch or Elasticsearch clusters using python
Favicon
Elasticsearch Was Great, But Vector Databases Are the Future
Favicon
Building Real-Time Data Pipelines with Debezium and Kafka: A Practical Guide
Favicon
AI + Search + Real Time Data = ๐Ÿ”ฅ (๐’ฎ๐‘’๐’ถ๐“‡๐’ธ๐’ฝ ๐“Œ๐’พ๐“๐“ ๐’ท๐‘’ ๐“‰๐’ฝ๐‘’ ๐’ป๐“Š๐“‰๐“Š๐“‡๐‘’ ๐‘œ๐’ป ๐’œ๐ผ)
Favicon
Size Doesn't Matter: Why Your Elasticsearch Fields Need to Stop Caring About Length
Favicon
ELK Stack Mastery: Building a Scalable Log Management System
Favicon
Elastop: An HTOP Inspired Elasticsearch Monitoring Tool
Favicon
Hybrid Search with Elasticsearch in .NET
Favicon
Proximity Search: A Complete Guide for Developers
Favicon
How I can run elasticsearch locally for development using docker?
Favicon
Improving search experience using Elasticsearch
Favicon
How to integrate Elasticsearch in Express
Favicon
Advanced Techniques for Search Indexing with Go: Implementing Full-Text Search for Product Catalogs
Favicon
Semantic Search with Elasticsearch in .NET
Favicon
15 WordPress Search Plugins to Supercharge Your Websiteโ€™s Search Functionality
Favicon
Building a Web Search Engine in Go with Elasticsearch
Favicon
github action services: mysql, redis and elasticsearch

Featured ones: