Design von ELK-Filtern

Design von ELK-Filtern

Rocco Gagliardi
von Rocco Gagliardi
Lesezeit: 12 Minuten

Log Messages werden von einer Vielzahl Systemkomponenten und Applikationen im System selbst generiert. In vielen Fällen gibt es zu den Messages keine Dokumentation oder sie ist nur oberflächlich gehalten. Um etwas Sinn und Struktur in diese Masse an Informationen zu kriegen, können Kalkulationen angewendet werden oder Vorhersagen getroffen werden. Das ist aber enorm komplex. Dieser Artikel fasst Erfahrungen aus dem Design und der Implementation einiger ELK-Systeme bei Klienten zusammen.

Problem

Die Interpretation einer Log File bedeutet, dass Informationen extrahiert werden und alles andere ignoriert wird.

Daten, die über syslog transferiert werden, haben im Feld data kein strikt vorgegebenes Format. Es kann also Daten in jeder Form und in jeder Reihenfolge enthalten. Es gibt rund 1000 verschiedene Log-Formate. Jede Applikation protokolliert spezifische Informationen in proprietären Formaten. Manchmal nutzt eine Applikation gar mehrere Formate, je nach Event.

Es braucht Zeit, um Filter zu erstellen, damit die empfangenen Daten kohärent interpretiert und strukturiert werden. Zudem, mit der steigenden Anzahl der Log-Interpretationen, die ein System vornehmen muss, wird selbiges komplex und fehleranfällig.

Lösung am praktischen Modell

Um die Filter effektiv und effizient zu verwalten, bedarf es einer gesunden Dosis an Flexibilität und Robustheit. Dies ist nicht nur eine Frage der Programmierung. Um ein effektives System zu erhalten, muss akkurat dokumentiert werden, um genau feststellen zu können, wie die Daten fliessen und wie jeder Filter die Log Message modifiziert.

Filterorganisation

Es empfiehlt sich, die Filter auf mehrere Dateien zu verteilen.

Serie Filter Kommentar
10_* Input Definition Hier sind alle Definitionen des Inputs abgelegt. Unter anderem syslog / lumberjack / file.
20_* Input Classification Zu Beginn ist es notwendig, die Log Messages zu klassifizieren. Ein passender Regex, genau spezifizierte Quellen und Programme können unter anderem genutzt werden, um eine Message mit Tags zu kennzeichnen und die Datei für weitere Manipulation vorzubereiten.
30_* Message Interpretation Basierend auf den Tags, die im vorherigen Schritt definiert worden sind, wird die Message durch Filter geschleust und von den Passenden analysiert. Extrahierte Felder werden indiziert und, wenn nötig, anders modifiziert (zum Beispiel Normalisierung der Feldnamen). Auch wenn mehrere Mechanismen genutzt werden können um Informationen aus einer Message zu extrahieren, bleibt grok das nützlichste Tool. Siehe dazu auch Interpreting a Log File with Grok
90_* Data Output Die Analyse ist komplett. Nun wird entschieden, was mit den Daten geschieht. Zum Beispiel: Die Metriken an Carbon senden oder nur einige Messages oder Felder speichern.

Für jeden Analysefilter muss eine Matrix mit extrahierten Feldern und deren Inhalt entwickelt werden, damit die Feldnamen normalisiert sein können. Dies wird während dem Design des Dashboards und/oder den Reports für die Korrelation zwischen Message-Typen nützlich.

Die Filterdetails müssen von der Filterkonfiguration getrennt werden. Der grok-filter (Analyse) selbst sollte eine Zeile mit einem Muster in der Pattern Database abgleichen. Dies wird zu mehr Robustheit führen, da die Konfiguration simpel gehalten wird. Gleichzeitig aber wird die Pattern Database dabei helfen, die Feldnormalisierung aufrecht zu erhalten und die Übersichtlichkeit über die geparsten Messages kann rekursiv mit den implementierten Filtern optimiert werden.

Um eine Pattern Database aufzubauen, empfiehlt es sich, ein Set von _Atom_-Grok-Expressions zu schaffen, dann die Line Expression, die vom Filter gegeben ist.

Während mehr Lineparser hinzugefügt werden, kann einige Redundanz mit der Erhöhung der Komplexität der grok Expression entfernt werden, während stets darauf geachtet wird, dass die Flexibilität in der _Atom_-Expression erhalten bleibt.

Während dem Aufbau der Expressions empfiehlt sich die Nutzung von grokdebug um die Patterns interaktiv zu testen. Regex kann mit Rubular getestet werden.

Mit der Zeit wird es nützlich, Anpassungen vorzunehmen. Kaufen Sie Ihre eigene Umgebung und nutzen Sie grok mit IRB.

Nutzen Sie eine separate Plattform, um die Syntax der Filtersets und die Pattern-db zu testen. Achtung: Wenn sie CentOS verwenden, dann wird das rpm-Package im Testmodus nicht starten. Nutzen Sie die gleiche ELK-Version auf OSX/Windows/Debian, um die Filter zu testen.

Beispiel: Bau eines Fortigate Filters

Das Logstash grok Plug-In markiert automatisch Messages, die nicht mit dem _grokparsefailure_-Tag übereinstimmen. Überwachen Sie dieses Tag um Messages zu finden, die nicht vom Filter aufgespürt werden.

Grundsätzlich lautet das Pattern der Entwicklung wie folgt. Wir werden dies anhand des Beispiels eines Fortigate Filters ansehen:

  1. Sicherstellen, dass alle Messages der Fortigate Firewall korrekt markiert werden
  2. Die Filter korrekt setzen
  3. Erstellen sie einen ersten Message Parser
  4. Wiederholen Sie dies bis die Failure Rate bei etwa null liegt
    1. Fügen Sie Parser für zusätzliche Messages hinzu
    2. Optimieren Sie die Patterns
    3. Überwachen Sie den _grokparsefailure_-Trend mit Kibana

Patterns

Entwickeln Sie ein _Atoms_-Pattern, das einige Expressions enthält, die einen spezifischen Part der Log-Message greppt.

# Fortigate atoms parsers
FWFN_BASE %{SYSLOGTIMESTAMP} %{IPORHOST:logsource}.*devname=%{DATA:dev_name} device_id=%{DATA:dev_id} log_id=%{NUMBER:log_id} type=%{DATA:type} subtype=%{DATA:stype} pri=%{DATA:severity} vd=%{WORD:vdom}
FWFN_SRCDST src=%{IP:src:ip} src_port=%{NUMBER:src_port} src_int=%{QUOTEDSTRING:src_intf} dst=%{IP:dst_ip} dst_port=%{NUMBER:dst_port} dst_int=%{QUOTEDSTRING:dst_intf}
FWFN_SN SN=%{NUMBER:sn} status=%{DATA:status} policyid=%{NUMBER:pol_id}
FWFN_CNTR dst_country=%{QUOTEDSTRING:dst_country} src_country=%{QUOTEDSTRING:src_country}
FWFN_SVC service=%{DATA:service} proto=%{NUMBER:proto} duration=%{NUMBER:duration} sent=%{NUMBER:B_sent} rcvd=%{NUMBER:B_rcvd}
FWFN_MSG msg=%{QUOTEDSTRING:msg}
FWFN_TRAN dir_disp=%{DATA:direction} tran_disp=%{DATA:trans_type} tran_sip=%{IP:trs_ip} tran_sport=%{NUMBER:trs_port}

Konstruieren Sie danach die Line Message Parser:

#
# Fortigate message line matchers (Version 1 - Straight forward)
FWFN_VAR_01 %{FWFN_BASE} %{FWFN_SRCDST} %{FWFN_SN} %{FWFN_CNTR} %{FWFN_SVC}
FWFN_VAR_02 %{FWFN_BASE} %{FWFN_SRCDST} %{FWFN_SN} %{FWFN_CNTR} %{FWFN_SVC} %{FWFN_MSG}
FWFN_VAR_03 %{FWFN_BASE} %{FWFN_SRCDST} %{FWFN_SN} %{FWFN_CNTR} %{FWFN_TRAN} %{FWFN_SVC}
FWFN_VAR_04 %{FWFN_BASE} %{FWFN_SRCDST} %{FWFN_SN} %{FWFN_CNTR} %{FWFN_TRAN} %{FWFN_SVC} %{FWFN_MSG}

Sie können die Redundanz schnell im Code bemerken und dann das DRY-Prinzip anwenden damit die grok-Expression rekursiv zusammengefasst wird.

#
# Fortigate message line matchers (Version 2 - Optimization step 1 )
FWFN_BASE_2 %{FWFN_BASE} %{FWFN_SRCDST} %{FWFN_SN} %{FWFN_CNTR}
FWFN_VAR_01 %{FWFN_BASE_2} %{FWFN_SVC}
FWFN_VAR_02 %{FWFN_BASE_2} %{FWFN_SVC} %{FWFN_MSG}
FWFN_VAR_03 %{FWFN_BASE_2} %{FWFN_TRAN} %{FWFN_SVC}
FWFN_VAR_04 %{FWFN_BASE_2} %{FWFN_TRAN} %{FWFN_SVC} %{FWFN_MSG}

Runde zwei:

#
  1. Fortigate message line matchers (Version 3 – Optimization step 1 )
    FWFN_BASE_2 %{FWFN_BASE} %{FWFN_SRCDST} %{FWFN_SN} %{FWFN_CNTR}

FWFN_VAR_01 %{FWFN_BASE_2} %{FWFN_SVC} FWFN_VAR_02 %{FWFN_BASE_2} %{FWFN_SVC} %{FWFN_MSG} FWFN_VAR_03 %{FWFN_BASE_2} %{FWFN_TRAN} %{FWFN_SVC} FWFN_VAR_04 %{FWFN_BASE_2} %{FWFN_TRAN} %{FWFN_SVC} %{FWFN_MSG}

Runde drei:

#
# Fortigate Message line matchers (Version 4 - Optimization step 1 )
FWFN_VAR_01 %{FWFN_BASE_2}\s*(?:%{FWFN_TRAN}|)\s*(?:%{FWFN_SVC}|)\s*(?:%{FWFN_MSG}|)

Behalten Sie dabei die _grokparsefailure_-Quote im Auge um einen möglichen Aufwärtstrend zu bemerken.

Finale Filter-Version

Nach der Optimierung der Patterns kann die Match Line im grok-Filter auf eine einzige Zeile reduziert werden. Dies hält die Filterkonfiguration einfach, sauber und hilft dabei, schnell mögliche Syntax-Fehler zu entdecken.

filter {
    if "fortigate" in [tags] {
        mutate {
            add_tag       => [ "firewall" ]
            remove_tag    => [ "fortigate" ]
            replace       => [ "program", "fortigate" ]
            replace       => [ "severity", "" ]
        }
        grok {
                 overwrite   => [ "message" ]                                      
                 match       => [ "message" , "%{FWFN_VAR_01}" ]
        }
    }
}

Nach einigen rekursiven Verbesserungen und einigen Stunden Tuning wird die _grokparsefailure_-Rate gegen null fallen. In dieser Phase wird der Filter für die meisten eintreffenden Messages korrekt funktionieren. Ein wöchentliches Review der Messages mit dem _grokparsefailure_-Tag wird ausreichen, um die ungeparsten Messages aufzufangen.

Warum das alles?

Mid-Data Interpretation kann ein Job sein, für den Ihnen nie gedankt werden wird. Das für die Interpretation notwendige System kann sehr komplex sein, wenn es denn nicht gut gewartet wird, und kann instabil werden. Und wenn dies nicht auf die Software selbst zutrifft, dann kann das auf die Anpassungen zutreffen.

Über den Autor

Rocco Gagliardi

Rocco Gagliardi ist seit den 1980er Jahren im Bereich der Informationstechnologie tätig. In den 1990er Jahren hat er sich ganz der Informationssicherheit verschrieben. Die Schwerpunkte seiner Arbeit liegen im Bereich Security Frameworks, Routing, Firewalling und Log Management.

Links

Sie wollen die Sicherheit Ihrer Firewall prüfen?

Unsere Spezialisten kontaktieren Sie gern!

×
Übergang zu OpenSearch

Übergang zu OpenSearch

Rocco Gagliardi

Graylog v5

Graylog v5

Rocco Gagliardi

auditd

auditd

Rocco Gagliardi

Security Frameworks

Security Frameworks

Rocco Gagliardi

Sie wollen mehr?

Weitere Artikel im Archiv

Sie brauchen Unterstützung bei einem solchen Projekt?

Unsere Spezialisten kontaktieren Sie gern!

Sie wollen mehr?

Weitere Artikel im Archiv