ELK, un SIEM à prendre au sérieux

MISC n° 094 | novembre 2017 | Erik Lenoir
Creative Commons
  • Actuellement 4 sur 5 étoiles
4
Merci d'avoir participé !
Vous avez déjà noté cette page, vous ne pouvez la noter qu'une fois !
Votre note a été changée, merci de votre participation !
ELK, acronyme issu des trois composants principaux de la suite (Elasticsearch, Logstash, Kibana) est depuis longtemps maintenant une référence dans la collecte des logs et leur analyse. Les derniers ajouts sur la suite Elastic et notamment sur le Machine Learning posent une question : ELK ne serait-il pas en passe de devenir un SIEM incontournable ?

Dans le numéro 92 de votre magazine préféré, Nicolas Vieux nous présentait les divers besoins et objectifs de mise en place d’un SIEM avant de nous présenter un exemple avec la partie gratuite de Splunk. L’article qui suit va présenter une suite que l’on retrouve de plus en plus chez nos clients : ELK, ou Elastic Stack de son nouveau nom officiel. Nous verrons dans cet article une présentation des composants ainsi que leur mise en place. Enfin, nous terminerons en présentant les derniers ajouts en termes d’analyses que peut fournir cette suite pour enfin avoir la place qu’elle mérite dans le monde des SIEM.

1. Présentation de la suite ELK

ELK, maintenant appelé Elastic Stack, est une suite de logiciels édités par la société Elastic.

Elle permet en outre :

- la collecte des données au travers de Logstash (ou des Beats comme nous le verrons plus loin) ;

- le stockage des données dans le moteur d’indexation Elasticsearch ;

- l’exploitation et l’analyse des données au travers de Kibana.

La mise en place d’un cluster est chose aisée ainsi que le déploiement dans le Cloud.

Plusieurs types d’architectures sont possibles, car nous verrons notamment que la collecte et le déversement des données peut se faire avec plusieurs types de sources ou puits de données.

1.1 Elasticsearch

Elasticsearch est un moteur d’indexation basé sur Lucene. Il dispose d’une API REST et permet de faire du clustering très facilement, l’ajout d’un nouveau nœud étant très facile. En terme de requêtes, il est possible de faire des requêtes très simples comme des recherches sur des termes ou des chaînes de caractères, mais aussi de faire des agrégations ou encore de la percolation.

Les données sont stockées sous forme de documents dans des index Elasticsearch, un index étant lui-même découpé en types de documents. Attention, un type de document doit contenir des documents avec une structure commune.

En terme de découpage de données, nous pourrions avoir un index bibliotheque dans lequel nous retrouverons les types de document livre ou auteur… Concernant les logs, nous retrouvons souvent un découpage temporel ; par exemple, un index est créé pour chaque jour de données (ex : logstash-2017-08-20) dans lequel on retrouve des types de documents correspondant aux applications.

Les requêtes comme les réponses se font au travers de requêtes au format JSON avec la syntaxe Lucene ou en utilisant le Query DSL de l’outil (un futur langage spécifique à Kibana, Kuery, sera bientôt disponible).

Le logiciel est capable d’absorber des milliards de documents, tout est question de dimensionnement du cluster : pas de règle absolue, une étude devra être réalisée pour estimer les volumes et adapter le cluster en conséquence.

Elasticsearch est un outil extrêmement complet, et la description exhaustive de toutes ses fonctionnalités dépasse largement le cadre de cet article.

1.2 Logstash

Historiquement dédié aux logs comme son nom l’indique, Logstash est un collecteur de données. Écrit en JRuby, son fonctionnement est décrit dans un fichier de configuration avec trois phases principales :

1. Input : l’alimentation des données ; Logstash dispose de plugins permettant de se connecter à tout type de source de données (fichiers, base de données, broker…) ;

2. Filter : c’est la partie la plus « intelligente » de Logstash, car c’est là où a lieu la transformation des données lues ; dans le cas où l’outil est utilisé pour écrire dans Elasticsearch, il sera donc nécessaire de transformer ces données en JSON. Très souvent, GROK [GROK] sera utilisé comme outil de parsing d’expressions régulières pour transformer facilement nos champs ;

3. Output : le déversement des données ; une fois lues et transformées, il est nécessaire de déverser les données. Ici encore, Logstash dispose de pléthore de plugins ; le plugin Elasticsearch est natif et il suffit de quelques lignes pour le configurer.

1.3 Kibana

Kibana est l’interface web qui permet d’analyser toutes les données. L’outil s’occupe principalement de générer les requêtes Elasticsearch qui vont bien selon les demandes de l’utilisateur réalisées dans l’interface graphique.

Par défaut pour l’analyse, nous pouvons utiliser les fonctionnalités suivantes :

- Discover : requête permettant de trier/filtrer les données sur un index donné ;

- Visualize : graphique permettant de montrer les données sous forme agrégée (camembert, cartographie, histogramme…) ;

- Dashboard : c’est la partie « métier » souvent présentée à nos clients : ce n’est ni plus ni moins qu’une page qui agrège des résultats de Discover ou encore des graphes réalisés dans la partie Visualize de l’application ;

- DevTools : anciennement Sense dans les précédentes versions de Kibana ; il permet de requêter directement Elasticsearch (avec coloration syntaxique et indentation des requêtes).

Depuis les dernières versions, Kibana tend à être de plus en plus complet puisque nous avons à présent par défaut la présence du Timelion, un outil permettant de comparer facilement des données provenant d’index différents au même moment (par exemple, dessiner des « time series » sur une fenêtre de temps donnée pour expliquer une consommation CPU trop élevée).

1.4 Divers : beats, X-Pack et licences

Comme nous l’avons vu précédemment, Logstash permet de gérer l’alimentation et le déversement des données, mais il est également possible d’utiliser des beats. Ce sont des « shippers » écrits en Go qui permettent de répondre à des problématiques diverses et variées de manière efficace : collecte de métriques (TopBeat), de fichiers (FileBeat), de données réseau (PacketBeat), d’évènements Windows (Winlogbeat) et bien d’autres... [BEATS]

Logstash dispose évidemment d’un connecteur par défaut sur les beats pour s’alimenter de leurs données (transmises via TCP).

Il est maintenant temps de parler un petit peu de licence ; jusqu’à maintenant, tout ce que nous avons vu ne nécessitait pas d’achat particulier.

Cependant, vous n’aurez pas les fonctionnalités suivantes :

- Sécurité : assurée par Security (anciennement Shield), cela permet de filtrer les accès à Kibana, à Elasticsearch et de mettre en place des règles très fines (par exemple un utilisateur ne pourra pas voir un ensemble de champs d’un type de document particulier) ainsi que la communication HTTPS ;

- Alerting : assuré par Watcher, cela permet notamment de pouvoir lancer des alertes en cas d’évènements particuliers que vous pouvez configurer sous forme de règles à appliquer sur vos données ;

- Monitoring : anciennement Marvel, cela permet d’avoir une vue exhaustive sur l’état du cluster et des nœuds Elasticsearch et de pouvoir anticiper les éventuels besoins de capacité (ajout de nœuds) ;

- Reporting : générer et partager des rapports facilement, planifier l’envoi automatique de documents tout en gardant des historiques complets et accessibles ;

- Graph : analyser les liens entre vos données, cette fonctionnalité est très utile pour la détection de fraudes ou pour permettre de mettre en place des recommandations ;

- Machine Learning : grande nouveauté encore en bêta, le Machine Learning va nous permettre d’analyser les données et de détecter des comportements anormaux et surtout d’en extraire les parties prenantes.

Bien qu’il soit possible de développer des outils maison (authentification avec Kibana en utilisant un reverse proxy...) ou d’utiliser d’autres outils/plugins open source (SearchGuard pour assurer la sécurité), vous n’aurez pas la flexibilité et la robustesse des outils suscités.

Cet ensemble d’outils forme ce qu’on appelle le X-Pack, et l’utiliser est payant. Il vous donnera automatiquement accès au support quelque soit l'offre choisie (Gold, Platinum, Enterprise).

2. Cas pratique : exploitation de logs

Pour illustrer le fonctionnement de la suite, nous avons choisi d’installer Cowrie sur un serveur afin de récupérer les logs relatifs à des connexions SSH et Telnet. Également, nous allons récupérer des logs d’un poste Windows ainsi que ceux d’un serveur Tomcat hébergeant une application Java.

2.1 Installation des outils

Note

Des images Docker officielles existent et peuvent être utilisées pour installer ELK et Filebeat.

Il est nécessaire d’avoir une JVM 8 d’installée pour faire tourner Logstash et Elasticsearch. Concernant Kibana, pas de dépendance particulière puisque le logiciel embarque son propre serveur web.

Les livrables peuvent être téléchargés sous forme d’archive directement depuis le site Elastic, assurez-vous juste de prendre la même version pour les trois outils afin de garantir la compatibilité entre eux.

Exemple des trois commandes wget permettant de télécharger les outils :

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.2.tar.gz

wget https://artifacts.elastic.co/downloads/logstash/logstash-5.5.2.tar.gz

wget https://artifacts.elastic.co/downloads/kibana/kibana-5.5.2-linux-x86_64.tar.gz

Après avoir décompressé les archives dans leurs dossiers respectifs, vous pouvez lancer :

- Elasticsearch avec la commande elasticsearch du dossier bin/ ;

- Kibana avec la commande kibana du dossier bin/.

Pour installer Cowrie, nous allons utiliser l’image Docker officielle du produit afin de s’abstraire de l’installation des dépendances (plus d’infos sur [COWRIE DOCKER]).

Pour initialiser les volumes Docker afin de partager les fichiers entre l’host et le conteneur :

mkdir -p/home/cowrie/volumes/etc /home/cowrie/volumes/var/log/cowrie /home/cowrie/volumes/var/run

Nous pouvons à présent stocker notre fichier de configuration dans /home/cowrie/volumes/etc et récupérer les fichiers de logs dans /home/cowrie/volumes/var/log/cowrie.

La commande suivante permet de lancer notre conteneur :

docker run -d -p 2222:2222 -p 2223:2223 -v /home/cowrie/volumes/etc:/cowrie/cowrie-git/etc -v /home/cowrie/volumes/var:/cowrie/cowrie-git/var cowrie/cowrie

Cowrie permet de générer les journaux au format JSON, cette option est sélectionnée par défaut avec l’image Docker.

Si jamais vous n’optez pas pour l’utilisation de Docker, vous pouvez décommenter les variables suivantes dans le fichier cowrie.cfg :

[output_jsonlog]
logfile = log/cowrie.json

Également, pensez à activer l’option pour les connexions Telnet :

[telnet]

enabled = yes

Note

Cowrie dispose d’un plugin (pas encore stable) pour directement écrire dans Elasticsearch.

2.2 Récupération des logs de Cowrie

Voici un exemple de log avec deux évènements de connexion Telnet puis SSH :

{

   "eventid": "cowrie.direct-tcpip.request",

   "timestamp": "2017-05-23T22:00:53.589252Z",

   "session": "e2c3b09b",

   "src_port": 5556,

   "message": "direct-tcp connection request to 185.86.151.225:80 from localhost:5556",

   "system": "SSHService 'ssh-connection' on HoneyPotSSHTransport,6305,91.230.47.82",

   "isError": 0,

   "src_ip": "91.230.47.82",

   "dst_port": 80,

   "dst_ip": "185.86.151.225",

   "sensor": "little-finger"

}

{

   "eventid": "cowrie.direct-tcpip.data",

   "timestamp": "2017-05-23T22:00:53.704199Z",

   "sensor": "little-finger",

   "system": "SSHChannel None (11) on SSHService 'ssh-connection' on HoneyPotSSHTransport,6305,91.230.47.82",

   "isError": 0,

   "src_ip": "91.230.47.82",

   "session": "e2c3b09b",

   "dst_port": 80,

   "dst_ip": "185.86.151.225",

   "data": "'GET /getip.php HTTP/1.1\\r\\nUser-Agent: Mozilla/5.0\\r\\nHost: s1.ipinfo.pw\\r\\nAccept: */*\\r\\n\\r\\n'",

   "message": "direct-tcp forward to 185.86.151.225:80 with data 'GET /getip.php HTTP/1.1\\r\\nUser-Agent: Mozilla/5.0\\r\\nHost: s1.ipinfo.pw\\r\\nAccept: */*\\r\\n\\r\\n'"

}

Nous pouvons voir que nous obtenons quelques informations sur les tentatives effectuées par les attaquants (adresse IP, heure…).

2.2.1 Utilisation directe de Logstash

Afin de les utiliser comme sources de Logstash, nous allons créer le fichier 01-cowrie-es.conf :

input {
   file {
      path => ["/home/cowrie/volumes/var/log/cowrie/cowrie*.json"]
      codec => json
      type => "cowrie"
      start_position => "beginning"
   }
}
filter {
   date {
      match => [ "timestamp", "ISO8601" ]
   }
   if [src_ip] {
      mutate {
         add_field => { "src_host" => "%{src_ip}" }
      }
      dns {
         reverse => [ "src_host" ]
         nameserver => [ "8.8.8.8", "8.8.4.4" ]
         action => "replace"
      }
      geoip {
         source => "src_ip"
         target => "geoip"
      }
   }
}
output {
   elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "logstash-%{+YYYY.MM.dd}"
      document_type => "%{type}"

   }
}

Note

Dans le cas où Elasticsearch est protégé par un couple identifiant/mot de passe, vous pouvez ajouter dans l’output les attributs user et password.

Nous retrouvons bien nos trois phases input/filter/output.

La partie input est relativement simple puisqu’il s’agit de s’alimenter à partir d’un pattern de nom de fichiers auquel nous associons un type (qui sera le fameux type de document stocké dans les index Elasticsearch).

Idem pour la partie output, nous spécifions simplement l’host de notre Elasticsearch ainsi que le nom de l’index journalier (notez l’utilisation du format de date) dans lequel stocker les données et le type de document.

Comme exposé dans l’introduction de cet article, c’est la partie filter qui va nous permettre d’enrichir un peu le traitement des fichiers de Cowrie. Ici nul besoin d’utiliser GROK puisque nous avons déjà les données au format JSON.

Suivons donc le séquencement de la partie filter :

date {
   match => [ "timestamp", "ISO8601" ]
}

Le plugin date nous permet de convertir le champ timestamp du message JSON en champ typé timestamp pour Elasticsearch au format ISO8601.

if [src_ip] {
   mutate {
      add_field => { "src_host" => "%{src_ip}" }
   }
   dns {
      reverse => [ "src_host" ]
      nameserver => [ "8.8.8.8", "8.8.4.4" ]
      action => "replace"
   }
   geoip {
      source => "src_ip"
      target => "geoip"
   }
}

Ici, nous allons agrémenter notre JSON dans le cas où le champ src_ip existe dans le message. L’objectif est d’ajouter un champ src_host (utilisation du bloc de mutation) et de faire une recherche DNS inversée à partir de l’adresse IP (utilisation du bloc dns). Enfin, pour pouvoir effectuer de jolis graphes cartographiques, nous utilisons le plugin geoip qui nous permettra d’ajouter les bonnes informations géographiques compréhensibles par Elasticsearch.

Nous pouvons alors lancer Logstash avec la commande suivante :

/home/user/logstash-5.5.2/bin/logstash -f /home/user/logstash/conf/01-cowrie-es.conf

Voici à quoi ressemble un JSON de sortie pour le message SSH (pour les voir, il suffit d’ajouter le plugin file à la partie output) :

{

   "eventid": "cowrie.direct-tcpip.request",

   "geoip": {

      "ip": "91.230.47.82",

      "latitude": 55.7386,

      "country_name": "Russia",

      "country_code2": "RU",

      "continent_code": "EU",

      "country_code3": "RU",

      "location": {

         "lon": 37.6068,

         "lat": 55.7386

      },

      "longitude": 37.6068

   },

   "session": "e2c3b09b",

   "src_host": "91.230.47.82",

   "message": "direct-tcp connection request to 185.86.151.225:80 from localhost:5556",

   "type": "cowrie",

   "dst_ip": "185.86.151.225",

   "src_port": 5556,

   "src_ip": "91.230.47.82",

   "path": "/home/cowrie/cowrie/log/cowrie.json",

   "system": "SSHService 'ssh-connection' on HoneyPotSSHTransport,6305,91.230.47.82",

   "isError": 0,

   "@timestamp": "2017-05-23T22:00:53.589Z",

   "dst_port": 80,

   "@version": "1",

   "host": "HP00000334",

   "sensor": "little-finger",

   "timestamp": "2017-05-23T22:00:53.589252Z"

}

Nous retrouvons bien les informations précédentes ainsi que les deux champs significatifs de Logstash :

1) @timestamp (horodatage de la prise en compte du message) ;

2) @version (version du parser utilisé pour traiter l’évènement).

2.2.2 Utilisation d’un Filebeat

L’objectif est d’installer un agent de type Filebeat sur le serveur sur lequel se trouvent les logs afin de les envoyer directement à Logstash.

L’installation du Filebeat et son utilisation sont simples : il est d’abord nécessaire de télécharger l’archive que nous souhaitons utiliser :

wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.5.2-linux-x86_64.tar.gz

Une fois décompressée, il suffit d’éditer les propriétés suivantes dans le fichier filebeat.yml :

paths:

- /home/cowrie/volumes/var/log/cowrie/cowrie*.json

 

output.logstash:

# The Logstash hosts

hosts: ["logstashost:5044"]

Concernant Logstash, seule la partie input va changer puisque nous n’allons plus directement récupérer les fichiers, mais attendre que Filebeat les envoie.

Ainsi, seule la partie input est à modifier dans le fichier 01-cowrie-es.conf :

input {
   beats {
      port => 5044
   }
}

Voilà, il ne reste plus qu’à redémarrer Logstash (même commande que précédemment) et à démarrer Filebeat afin de lancer l’alimentation des données :

./filebeat

Note

Il est possible de configurer Logstash afin de pouvoir effectuer des redémarrages à chaud avec l’option –config.reload.automatic.

2.3 Récupération des logs Windows

L’opération n’est pas plus compliquée puisqu’un beat officiel existe : Winlogbeat. Celui-ci permet de transférer les logs des évènements de la machine Windows sur laquelle il est installé ; il est possible d’affiner et de paramétrer les types d’évènements à envoyer (application, sécurité, système…).

Comme pour Filebeat, vous pouvez télécharger l’archive depuis le dépôt officiel : https://artifacts.elastic.co/downloads/beats/winlogbeat/winlogbeat-5.5.2-windows-x86_64.zip.

Une fois décompressée, il suffit d’éditer les propriétés suivantes dans le fichier winlogbeat.yml :

output.logstash:

# The Logstash hosts

hosts: ["logstashost:5045"]

Concernant Logstash, nous allons créer un fichier de configuration dédié (02-windows-es.conf)afin d’éviter de tout mélanger dans un seul fichier :

input {

 beats {

  port => 5045
 }
}

output {
 elasticsearch {
 hosts => ["localhost:9200"]
 index => "logstash-%{+YYYY.MM.dd}"
 }
}

Note

Il est évidemment possible de configurer Winlogbeat afin qu’il écrive directement dans Elasticsearch sans passer par Logstash en utilisant la propriété output.elasticsearch.

2.4 Récupération des logs de Tomcat

Nous supposerons que Tomcat écrit ses logs dans un dossier accessible en lecture par Logstash.

Voici un exemple de fichier de logs :

0:0:0:0:0:0:0:1 - - [01/May/2017:18:40:23 +0200] "GET / HTTP/1.1" 200 1644 125

0:0:0:0:0:0:0:1 - - [01/May/2017:18:40:29 +0200] "POST /rest HTTP/1.1" 200 – 15

Ce sont des access logs « classiques », le pattern utilisé est : %h %l %u %t \"%r\" %s %b %D.

Nous avons donc entre autres les informations sur l’IP émettrice de la requête, l’horodatage, la ressource accédée, le code retour HTTP, la taille de la réponse et la durée de la requête (plus d’infos sur [TOMCAT ACCESS LOG]).

Voici le fichier Logstash (04-tomcat-es.conf) permettant de traiter ces logs:

input {
   file {
      path => "/home/tomcat/logs/access*.log"
      sincedb_path => "/home/tomcat/access.log.db"
      type => "access"
      start_position => "beginning"
   }
}
filter {
   if [type] == "access" {
      grok {
         match => [ "message", "%{COMMONAPACHELOG} %{NUMBER:durationMs}" ]
      }
      date {
         match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
         locale => "en-US"
      }
      mutate {
         convert => {
            "bytes" => "integer"
            "durationMs" => "integer"
            "response" => "integer"
         }
      }
   }
}
output {
   elasticsearch {
      hosts => "localhost"
      index => "logstash-%{+YYYY.MM.dd}"
      document_type => "%{type}"

   }
}

Dans la partie input, notons l’utilisation de sincedb qui permet à Logstash de stocker sa tête de lecture sur les différents fichiers afin d’éviter qu’il ne relise des fichiers déjà lus notamment lors du redémarrage (il stocke dans le fichier le numéro d’inode du fichier lu ainsi que le numéro de la dernière ligne lue, le numéro d’inode garantissant le bon fonctionnement du mécanisme lors de la rotation des fichiers de logs). Le lecteur aura pour exercice de transposer la gestion de sincedb dans les logs de Cowrie (lorsque Filebeat n’est pas utilisé puisque celui-ci a également sa gestion des doublons).

La partie filter nous fait découvrir l’utilisation du parsing GROK avec l’utilisation de l’expression "%{COMMONAPACHELOG} %{NUMBER:durationMs}" pour gérer les logs Tomcat. Nous voyons là toute la puissance de l’outil puisqu’il existe de nombreux patterns prédéfinis pour gérer les formats de logs standards.

Dans notre exemple, COMMONAPACHELOG correspond à :

COMMONAPACHELOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)

Ainsi, nous constatons bien que c’est juste une composition d’expressions régulières ; pensez à bien chercher dans les patterns existants quand vous souhaitez utiliser GROK, car il est très rare que le pattern voulu n’existe pas déjà.

La suite de cette partie consiste à mapper correctement le champ date (déjà vu précédemment) et à utiliser les mutations pour effectuer des conversions de type.

Enfin, rien de nouveau pour la partie output.

Voici un exemple de JSON correspondant à une ligne dans le fichier de logs :

{

   "request": "/",

   "auth": "-",

   "ident": "-",

   "verb": "GET",

   "message": "0:0:0:0:0:0:0:1 - - [01/May/2017:18:40:23 +0200] \"GET / HTTP/1.1\" 200 1644 125\r",

   "type": "access",

   "path": "/home/tomcat/logs/access_log.2017-05-01.log",

   "@timestamp": "2017-05-01T16:40:23.000Z",

   "response": 200,

   "bytes": 1644,

   "clientip": "0:0:0:0:0:0:0:1",

   "@version": "1",

   "host": "magichost",

   "httpversion": "1.1",

   "durationMs": 125,

   "timestamp": "01/May/2017:18:40:23 +0200"

}

Le JSON correspond bien aux champs vus dans le pattern GROK.

À présent, il ne reste plus qu’à lancer Logstash en lui donnant le répertoire dans lequel se trouvent nos fichiers de configuration :

/home/user/logstash-5.5.2/bin/logstash -f /home/user/logstash/conf/

2.5 Exploration avec Kibana

2.5.1 Requêtes

Le menu Discover permet, après avoir sélectionné un index, d’afficher et les données sous forme tabulaire. Il est ensuite possible d’ajouter des colonnes, de filtrer et même d’indiquer une requête au format Lucene dans la barre de recherche de Kibana.

2.5.2 Visualisations

Le menu Visualize permet de faire toutes sortes de graphes (camembert, histogramme, cartographie, nuage de mots…) en partant :

- soit d’une requête précédemment sauvegardée ;

- soit d’une nouvelle recherche.

La figure 1 illustre un exemple de nuage de mots-clés sur les messages capturés par Cowrie.

Figure 1

Et la figure 2 montre un exemple de cartographie pour identifier l’origine des attaques.

Figure 2

2.5.3 Tableaux de bord

La partie « Dashboard » permet de rassembler toutes les requêtes et visualisations dans une seule page ; il est ensuite possible de partager un lien à des utilisateurs pour qu’ils y accèdent directement ou bien générer un PDF.

La figure 3 présente un exemple de tableau de bord dans Kibana.

Figure 3

Ce tableau de bord est composé de trois entités :

1. une requête qui affiche les identifiants capturés par Cowrie ;

2. une visualisation de type camembert affichant la répartition des requêtes par continent ;

3. une visualisation de type « nuage de mots ».

Il est important de remarquer que Kibana utilise Elasticsearch pour stocker ses données (par défaut dans l’index .kibana).

3. Graph & Machine Learning : le SIEM en puissance ?

Une option déjà accessible dans Kibana est le Timelion ; il permet de pouvoir mettre en parallèle des données indépendantes les unes des autres et cela sous une même fenêtre temporelle. Pour le manipuler, il est nécessaire d’apprendre le langage spécifique au Timelion qui est composé de plus d’une vingtaine de fonctions.

Voici un petit exemple illustrant les possibilités de l’outil :

.es(index=logstash-*, metric=count, q=type:wineventlog).label("Nombre d'évènements Winlogbeat").color("#0000ff"),

.es(index=logstash-*, metric='sum:duration', q=type:cowrie).derivative().label("Variation durée des sessions Cowrie").color("#00ff00")

Dans cet exemple, deux courbes seront dessinées (le titre de chacune d’elles décrit clairement l’objectif).

Une fois le langage appris, il est possible de réaliser des graphes très utiles qui permettent de mettre en avant des comportements et parfois même d’expliquer des phénomènes sur le SI (une consommation de RAM qui explose sur une machine dû à un grand nombre de connexions sur cette machine par exemple).

Le X-Pack va encore plus loin dans ce genre d’analyse avec deux outils phares : Graph et Machine Learning.

3.1 Graph

Pour découvrir des relations parfois insoupçonnées et surtout pouvoir mettre en évidence des comportements ou relations, Elastic dispose d’un outil : Graph. Celui-ci se décompose à la fois en API directement accessible, mais aussi sous forme d’interface graphique dans Kibana (vous pouvez apercevoir le menu complet de Kibana sur la capture d’écran du tableau de bord).

Pour le faire fonctionner, il faut comme d’habitude sélectionner l’index que l’on souhaite exploiter et ensuite les composants de ce graphe (ce sont simplement les champs que l’on souhaite relier, ils apparaîtront sous forme de sommet dans le graphe obtenu).

Voici un exemple de graphe généré à partir de trois axes :

1. Le capteur Cowrie qui a récupéré les informations ;

2. Le pays émetteur de la requête ;

3. Le service du honeypot attaqué (SSH, Telnet).

Figure 4

Graph nous fournit donc un graphe mettant en évidence les relations entre ces trois axes.

Nous pouvons donc facilement voir les cas d’utilisation que nous pourrions obtenir dans le cadre d’un SIEM notamment pour détecter et identifier des comportements suspects, mais également pouvoir appuyer des analyses forensiques.

3.2 Machine Learning

Enfin, la dernière nouveauté est l’arrivée du Machine Learning dans la suite Elastic.

Cette option permet de pouvoir détecter dans les données des anomalies (comportement déviant du comportement habituellement observé dans le temps) et aussi de pouvoir alerter en cas de détection de cette anomalie.

Une fonctionnalité vraiment intéressante est de pouvoir identifier les « Influencers » qui ne sont ni plus ni moins que les responsables des anomalies.

Trois options sont possibles pour créer un job :

1. Job avec une seule métrique : un seul indicateur sera observé ;

2. Job avec plusieurs métriques : plusieurs indicateurs seront observés ;

3. Job avancé : toutes les options sont finement paramétrables, cette configuration étant réservée à des utilisateurs déjà expérimentés avec les deux autres catégories de job.

Typiquement, la configuration minimale pour un job est de choisir la fonction d’analyse à utiliser (Count, Sum, Min, Max, Mean…), le champ à analyser et une fenêtre temporelle afin de découper les données en « bucket ». Le travail du job consiste donc à appliquer la fonction mathématique sur chacun de ces buckets et d’observer les anomalies.

Ainsi, selon le job et les configurations souhaitées, plus ou moins d’options sont accessibles.

Le résultat consiste donc à nous présenter une chronologie des anomalies détectées, ainsi que les responsables ; libre à l’utilisateur de cliquer ensuite sur ces anomalies pour en savoir plus sur les évènements associés ainsi que l’explication mathématique qui a amené la déviation.

Le Machine Learning offre tout un tas de cas d’utilisation dans le cadre d’un SIEM : recherche d’anomalies sur des serveurs (consommation excessive de CPU/RAM, nombre de connexions anormales), analyse de comportements dans le temps...

Comme nous avons pu le voir dans cette partie, la triplette Timelion/Graph/Machine Learning, une fois maîtrisée, offre vraiment un panel de possibilités d’analyses important.

Conclusion

Elasticsearch est un outil très puissant et dans cet article, nous n’avons vu que très peu de ses possibilités puisque nous n’avons pas évoqué les modalités de clustering, de dump/restauration ainsi que les configurations et recherches avancées.

Je ne peux que vous inviter à aller voir la documentation officielle qui est complète et vous donnera des pistes à explorer pour mieux approfondir vos connaissances ainsi que de nombreux exemples [ELASTIC DOC].

En utilisant Elastic Stack, nous avons vu qu’il était aisé de se constituer une chaîne de collecte de fichiers, d’indexation et d’analyses. Beaucoup d’architectures différentes sont possibles (utilisation de broker comme Apache Kafka, Redis…) et très faciles à mettre en œuvre avec tous les plugins par défaut.

Les derniers ajouts comme Graph et Machine Learning dans le X-Pack nous permettent d’enfin automatiser et découvrir un tas de liens entre nos données ; les options possibles dans la configuration des jobs de Machine Learning sont déjà très complètes et la suite ne pourra être que meilleure (encore plus de fonctions mathématiques et de stabilité notamment).

Il est clair qu’il ne faut pas négliger cette suite lorsque l’on souhaite mettre en place un SIEM ; ne pas oublier également que pour bénéficier pleinement de toute la puissance de la suite, il faudra souscrire au X-Pack ou encore utiliser Elastic Cloud, un « Elasticsearch as a service » hébergé sur Amazon ou Google.

Remerciements

Merci à Claire et Hervé de m’avoir permis de rédiger cet article. Merci aussi à Yoann, Emmanuelle et Morane pour leur relecture attentive.

Références

[BEATS] Liste des beats : https://www.elastic.co/fr/products/beats

[GROK] Documentation sur GROK : https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

[COWRIE DOCKER] Informations sur l’image Docker de Cowrie : https://github.com/micheloosterhof/docker-cowrie

[TOMCAT ACCESS LOG] Informations sur les access log de Tomcat : https://tomcat.apache.org/tomcat-8.0-doc/config/valve.html#Access_Log_Valve

[ELASTIC DOC] Documentation officielle Elastic : https://www.elastic.co/guide/index.html