TP11 : Centralisation des Logs avec Deux Applications Flask et la Stack Elastic (ELK)

L’objectif de ce TP sera d’envoyer les logs de deux instances d’une application Python Flask vers Logstash, qui les transmettra à Elasticsearch pour stockage, et enfin les visualiser dans Kibana. Nous mettrons en évidence l’importance de la synchronisation des horloges et de la gestion des timestamps.

Arborescence des Fichiers à Créer

elk-flask-exercice/
├── docker-compose.yml
├── logstash/
│   └── pipeline/
│       └── logstash.conf
└── app/
    ├── Dockerfile
    ├── requirements.txt
    └── app.py

Schéma de la Stack et Communications


+----------------------+     HTTP         +-----------------+   Python Logging   +-----------------+   Parsed Data   +-----------------+
| Votre Navigateur     | <--------------> | Flask App 1     | -----------------> |                 | --------------> |                 |
| (http://localhost:5001)|                | (Container)     |                    |                 |                 |                 |
+----------------------+                  +-----------------+                    |                 |                 |                 |
                                               (Port 5000 TCP)                   |    Logstash     |                 |  Elasticsearch  |
+----------------------+     HTTP         +-----------------+                    |   (Container)   | (HTTP/9200)   |   (Container)   |
| Votre Navigateur     | <--------------> | Flask App 2     | -----------------> |                 |                 |                 |
| (http://localhost:5002)|                | (Container)     |                    |                 |                 |                 |
+----------------------+                  +-----------------+ (Port 5000 TCP)    +-----------------+                 +--------+--------+
                                                                                                                                ^
                                                                                                                                | (Requêtes Kibana)
                                                                                                                                |
                                                                                                                      +---------+---------+
                                                                                                                      |      Kibana       |
                                                                                                                      |    (Container)    |
                                                                                                                      | (UI: localhost:5601)|
                                                                                                                      +-------------------+

La Stack Elastic (ELK) pour les Logs

  • Elasticsearch : C’est une base de données NoSQL distribuée et un moteur de recherche très puissant. Il stocke les logs (généralement au format JSON) et permet des recherches rapides et complexes.
  • Logstash : C’est un pipeline de traitement de données côté serveur. Il peut ingérer des données de multiples sources (dont nos logs applicatifs), les transformer (parser, enrichir, filtrer) et les envoyer vers une ou plusieurs destinations (appelées “stash”), comme Elasticsearch.
  • Kibana : C’est une plateforme de visualisation et d’exploration de données. Elle se connecte à Elasticsearch pour permettre de créer des graphiques, des tableaux de bord (dashboards) et d’explorer interactivement les logs.

Dans notre cas, les applications Flask enverront leurs logs à Logstash, qui les préparera et les enverra à Elasticsearch. Kibana nous permettra de les voir.


Partie 1 : Mise en Place de l’Environnement

  1. Créez le répertoire principal elk-flask-exercice.
  2. À l’intérieur, créez docker-compose.yml:

    services:
      elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9 # Utilisez une version compatible
        container_name: elasticsearch
        environment:
          - discovery.type=single-node
          - ES_JAVA_OPTS=-Xms256m -Xmx256m 
          - xpack.security.enabled=false # Désactiver la sécurité pour la simplicité de l'exercice
        ulimits:
          memlock:
            soft: -1
            hard: -1
          nofile:
            soft: 65536
            hard: 65536
        volumes:
          - es_data:/usr/share/elasticsearch/data
        ports:
          - "9200:9200" # Port API Elasticsearch
        networks:
          - elk_net
    
      logstash:
        image: docker.elastic.co/logstash/logstash:7.17.9
        container_name: logstash
        volumes:
          - ./logstash/pipeline:/usr/share/logstash/pipeline:ro # Monte le pipeline de Logstash
        ports:
          - "5000:5000/tcp" # Port pour recevoir les logs de Flask
        depends_on:
          - elasticsearch
        networks:
          - elk_net
        environment:
            LS_JAVA_OPTS: "-Xms128m -Xmx128m" # Allouer de la mémoire
    
      kibana:
        image: docker.elastic.co/kibana/kibana:7.17.9
        container_name: kibana
        ports:
          - "5601:5601" # Port UI Kibana
        depends_on:
          - elasticsearch
        networks:
          - elk_net
        environment:
          ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]' # Dit à Kibana où trouver Elasticsearch
    
      flask_app_1: # Anciennement flask_app
        build: ./app
        container_name: flask_app_1
        ports:
          - "5001:8000" # Application Flask 1 accessible sur http://localhost:5001
        environment:
          - LOGSTASH_HOST=logstash
          - LOGSTASH_PORT=5000
          - APP_INSTANCE_ID=app1 # Identifiant pour cette instance
          - APP_TIMESTAMP_OFFSET_SECONDS=0 # Pas de décalage pour la première app
        depends_on:
          - logstash
        networks:
          - elk_net
    
      flask_app_2: # Nouvelle instance de Flask
        build: ./app # Utilise la même image
        container_name: flask_app_2
        ports:
          - "5002:8000" # Application Flask 2 accessible sur http://localhost:5002
        environment:
          - LOGSTASH_HOST=logstash
          - LOGSTASH_PORT=5000
          - APP_INSTANCE_ID=app2 # Identifiant pour cette instance
          - APP_TIMESTAMP_OFFSET_SECONDS=-3600 # Simule un décalage d'une heure en arrière
        depends_on:
          - logstash
        networks:
          - elk_net
    
    volumes:
      es_data: # Docker gère ce volume pour la persistance des données d'Elasticsearch
    
    networks:
      elk_net:
        driver: bridge
    
  3. Créez le répertoire logstash/pipeline/ et à l’intérieur, le fichier logstash.conf:

    # logstash/pipeline/logstash.conf
    
    input {
      tcp {
        port => 5000
        codec => json_lines # On s'attend à recevoir des lignes JSON de Flask
      }
    }
    
    # La section filter est optionnelle ici, mais c'est là qu'on pourrait
    # parser des dates, ajouter/supprimer des champs, utiliser grok pour du texte non structuré, etc.
    # Si on voulait s'assurer que event_timestamp devienne le @timestamp principal:
    # filter {
    #   if [extra][event_timestamp] {
    #     date {
    #       match => [ "[extra][event_timestamp]", "ISO8601" ]
    #       target => "@timestamp"
    #     }
    #   }
    # }
    
    output {
      elasticsearch {
        hosts => ["http://elasticsearch:9200"] # Adresse du service Elasticsearch
        index => "flask-logs-%{+YYYY.MM.dd}"  # Nom de l'index dans Elasticsearch (quotidien)
      }
      # Pour le débogage, on peut aussi afficher les logs dans la console de Logstash
      # stdout { codec => rubydebug }
    }
    
  4. Créez le répertoire app/ et à l’intérieur :
    • requirements.txt:

      Flask
      python-logstash-async # Pour envoyer les logs à Logstash
      
    • Dockerfile:

      FROM python:3.9-slim
      WORKDIR /app
      COPY requirements.txt .
      RUN pip install --no-cache-dir -r requirements.txt
      COPY . .
      EXPOSE 8000
      CMD ["python", "app.py"]
      
    • app.py:

      import logging
      import os
      import time
      import random
      from datetime import datetime, timezone, timedelta
      from flask import Flask
      from logstash_async.handler import AsynchronousLogstashHandler
      
      app = Flask(__name__)
      
      # Configuration du logger pour envoyer à Logstash
      logstash_host = os.environ.get('LOGSTASH_HOST', 'localhost')
      logstash_port = int(os.environ.get('LOGSTASH_PORT', 5000))
      app_instance_id = os.environ.get('APP_INSTANCE_ID', 'unknown_flask_app')
      # Lire le décalage en secondes, par défaut 0
      timestamp_offset_seconds = int(os.environ.get('APP_TIMESTAMP_OFFSET_SECONDS', 0))
      
      
      # Créer un logger standard Python
      logger = logging.getLogger(f'flask-logstash-logger-{app_instance_id}')
      logger.setLevel(logging.INFO) # Capturer les logs à partir du niveau INFO
      
      # Ajouter le handler Logstash
      logstash_handler = AsynchronousLogstashHandler(
          host=logstash_host,
          port=logstash_port,
          database_path=None # Désactiver la mise en cache sur disque pour la simplicité
      )
      logger.addHandler(logstash_handler)
      
      def log_with_custom_fields(level, message, extra_fields=None):
          # Simuler une latence réseau avant l'envoi du log
          time.sleep(random.uniform(0, 2)) # Délai aléatoire entre 0 et 2 secondes
      
          # Générer le timestamp de l'événement par l'application
          # Appliquer le décalage simulé
          event_time_utc = datetime.now(timezone.utc) + timedelta(seconds=timestamp_offset_seconds)
                  
          base_log_data = {
              'source_app': app_instance_id,
              'event_timestamp': event_time_utc.isoformat() # Format ISO8601 avec Z pour UTC
          }
          if extra_fields:
              base_log_data.update(extra_fields)
                  
          if level == logging.INFO:
              logger.info(message, extra=base_log_data)
          elif level == logging.WARNING:
              logger.warning(message, extra=base_log_data)
          elif level == logging.ERROR:
              # exc_info=True ne peut pas être passé directement via 'extra', donc on le gère ici
              # Pour logger.error avec exc_info, il faut appeler logger.error directement.
              # Cette fonction helper est simplifiée pour l'exemple.
              # Pour une solution complète, il faudrait gérer exc_info séparément.
              logger.error(message, extra=base_log_data)
          # ... ajouter d'autres niveaux si nécessaire
      
      @app.route('/')
      def index():
          log_with_custom_fields(
              logging.INFO,
              "Page d'accueil visitée.",
              extra_fields={'client_ip': '127.0.0.1', 'endpoint': '/'}
          )
          return f"Hello from {app_instance_id}! Log INFO envoyé."
      
      @app.route('/action')
      def action():
          log_with_custom_fields(
              logging.WARNING,
              "Une action sensible a été déclenchée.",
              extra_fields={'user_id': 'dev_user', 'action_name': 'trigger_event'}
          )
          return f"Action effectuée par {app_instance_id}. Log WARNING envoyé."
      
      @app.route('/error')
      def error_route():
          try:
              x = 10 / 0
          except ZeroDivisionError as e:
              # Pour les erreurs avec traceback, il est préférable d'appeler logger.error directement
              # car exc_info doit être un argument de la méthode de logging.
              time.sleep(random.uniform(0, 2)) # Latence
              event_time_utc = datetime.now(timezone.utc) + timedelta(seconds=timestamp_offset_seconds)
              log_data = {
                  'source_app': app_instance_id,
                  'event_timestamp': event_time_utc.isoformat(),
                  'error_code': 'DIV_BY_ZERO',
                  'context': 'calculation_module'
              }
              logger.error(
                  f"Une erreur de division par zéro s'est produite: {str(e)}",
                  exc_info=True, # Ajoute automatiquement les informations de traceback
                  extra=log_data
              )
          return f"Erreur intentionnelle générée et loguée par {app_instance_id}.", 500
      
      if __name__ == '__main__':
          # Le log de démarrage n'utilisera pas la latence simulée ni le timestamp d'événement pour la simplicité
          startup_message = f"Application Flask {app_instance_id} (avec logs vers ELK) démarrée. Timestamp offset: {timestamp_offset_seconds}s."
          logger.info(startup_message, extra={'source_app': app_instance_id})
          print(startup_message) # Aussi sur la console pour le debug de docker-compose logs
          app.run(host='0.0.0.0', port=8000, debug=False)
      
      
  5. Démarrez les services :

    Vérifiez que vous avez suffisament de ressources sur votre host (par exemble avec ‘htop’) et coupez les services que vous n’utilisez pas présentement.

    Ca peut être utile de vous créer une swap, car vous n’en avez pas sur votre VPS et votre ram est très limité:

    sudo dd if=/dev/zero of=/swapfile bs=1M count=2048 status=progress 
    sudo chmod 600 /swapfile 
    sudo mkswap /swapfile 
    sudo swapon /swapfile 
    echo '/swapfile none swap sw 0 0' | sudo tee -a /etc/fstab
    

    Puis, depuis la racine de elk-flask-exercice, exécutez :

    docker-compose up -d --build
    

    Le --build est important si app.py a changé. Attendez quelques minutes que tous les services démarrent. Vous pouvez suivre les logs avec docker-compose logs -f elasticsearch kibana logstash flask_app_1 flask_app_2.


Logs Structurés, Timestamps et Sources Multiples

Elasticsearch fonctionne mieux avec des données structurées. Chaque “champ” dans le JSON (ex: @timestamp, level, message, extra.source_app, extra.event_timestamp) peut être indexé. Cela permet :

  • Recherches ciblées : extra.source_app : "app2" and extra.user_id : "dev_user"
  • Agrégations : Compter le nombre d’erreurs par extra.error_code.
  • Visualisations riches dans Kibana.

Notre app.py ajoute extra.source_app pour identifier l’origine du log et extra.event_timestamp pour enregistrer le moment où l’événement s’est produit du point de vue de l’application (potentiellement affecté par la désynchronisation de l’horloge et avant la latence d’envoi simulée). Logstash ajoute @timestamp qui correspond au moment où il a reçu et traité le log.


Partie 2 : Génération et Visualisation des Logs

  1. Générez des logs : Ouvrez votre navigateur et visitez les URLs des deux applications Flask :
    • Application 1 (pas de décalage d’horloge simulé) :
      • http://localhost:5001/
      • http://localhost:5001/action
      • http://localhost:5001/error
    • Application 2 (avec décalage d’horloge simulé d’une heure en arrière) :
      • http://localhost:5002/
      • http://localhost:5002/action
      • http://localhost:5002/error Visitez chaque URL plusieurs fois pour générer suffisamment de logs.
  2. Accédez à Kibana : Ouvrez http://localhost:5601 dans votre navigateur.

  3. Configurez un “Index Pattern” dans Kibana (première fois seulement) :
    • Cliquez sur le menu “hamburger” (☰) en haut à gauche, puis allez dans Management > Stack Management.
    • Sous Kibana, cliquez sur Index Patterns.
    • Cliquez sur Create index pattern.
    • Dans le champ “Index pattern name”, tapez flask-logs-* (cela correspond à l’index que nous avons défini dans logstash.conf).
    • Pour “Time field”, choisissez @timestamp (Logstash ajoute ce champ automatiquement et c’est généralement le plus fiable pour le tri chronologique général car il est centralisé).
    • Cliquez sur Create index pattern.
  4. Visualisez les logs :
    • Cliquez à nouveau sur le menu “hamburger” (☰).
    • Sous Analytics, cliquez sur Discover.
    • Vous devriez voir vos logs apparaître ! Si ce n’est pas le cas, vérifiez le sélecteur de plage de temps en haut à droite (ex: “Last 15 minutes”).
    • Ajoutez les colonnes extra.source_app et extra.event_timestamp à la vue pour faciliter la comparaison. Cliquez sur un nom de champ dans la liste “Available fields” à gauche, puis sur l’icône “+” qui apparaît.

  • a) Dans Kibana Discover, trouvez un log généré par la visite de la page /action de flask_app_1. Quels sont les valeurs des champs level, message, extra.source_app et du champ personnalisé extra.user_id ?
  • b) Cliquez sur un des documents de log pour l’étendre. En plus des champs de la question précédente, citez deux autres champs (standard ou personnalisés que vous avez ajoutés via extra=) présents dans ce log, avec leurs valeurs. Notez en particulier la valeur de @timestamp et extra.event_timestamp.

Question sur la Synchronisation des Horloges et Timestamps :

  • a) Générez quelques logs depuis flask_app_1 (pas de décalage) puis immédiatement après, quelques logs depuis flask_app_2 (décalage d’une heure en arrière).
  • b) Dans Kibana Discover, triez les logs par @timestamp (le plus récent en premier). Observez l’ordre des logs des deux applications.
  • c) Maintenant, examinez attentivement les valeurs de @timestamp (heure de réception par Logstash) et extra.event_timestamp (heure de l’événement selon l’application) pour les logs de flask_app_2. Quelle différence majeure observez-vous entre ces deux timestamps pour flask_app_2 ?
  • d) Si vous deviez reconstituer la séquence exacte des événements tels qu’ils se sont produits chronologiquement dans le monde réel, et que les horloges de vos serveurs applicatifs n’étaient pas parfaitement synchronisées (comme simulé ici), lequel de ces timestamps (@timestamp ou extra.event_timestamp) serait le plus trompeur si utilisé seul ? Pourquoi ?
  • e) Expliquez brièvement pourquoi il est crucial que tous les serveurs d’un système distribué aient leurs horloges synchronisées (par exemple, en utilisant NTP - Network Time Protocol). Quel timestamp devient alors le plus fiable pour l’ordre des événements ?

Partie 3 : Filtrage et Recherche dans Kibana

Kibana utilise KQL (Kibana Query Language) dans sa barre de recherche (en haut de la page Discover).

  1. Filtrer par niveau : level : "ERROR" .
  2. Filtrer par champ personnalisé : extra.error_code : "DIV_BY_ZERO" (les champs de extra sont imbriqués).
  3. Filtrer par source applicative : extra.source_app : "app2"
  4. Filtrer par texte dans le message : message : "Page d'accueil" (recherche de phrase exacte) ou message : Page d'accueil (recherche des mots).
  5. Combiner : extra.source_app : "app1" and level : "ERROR" and extra.context : "calculation_module"

Quelle requête KQL utiliseriez-vous dans Kibana pour afficher uniquement les messages de niveau WARNING de flask_app_2 qui contiennent la chaîne de caractères sensible dans le champ message ?


Rôle de Logstash dans le Pipeline

Même si dans cet exercice notre logstash.conf est simple, Logstash est très puissant. Dans la section filter (que nous avons commentée), on pourrait :

  • Parser des logs non structurés : Si Flask écrivait du texte simple, Logstash pourrait utiliser des patterns pour extraire des champs.
  • Enrichir les logs : Ajouter des informations basées sur l’IP source (géolocalisation), modifier des champs, en supprimer.
  • Normaliser les données : S’assurer que les timestamps sont dans le bon format, que les niveaux de sévérité sont cohérents entre différentes sources de logs. Par exemple, on pourrait configurer Logstash pour utiliser extra.event_timestamp comme @timestamp principal si on le jugeait plus pertinent (voir exemple commenté dans logstash.conf).

Logstash agit comme un ETL (Extract, Transform, Load) pour vos logs avant qu’ils n’atteignent Elasticsearch.


[Optionnel] Si votre application Flask (app1) générait des logs avec des dates dans un format non standard (ex: DD/MM/YYYY HH:MM:SS) dans un champ extra.custom_event_time, quelle section de logstash.conf modifieriez-vous et quel type de plugin Logstash (juste le nom du type, ex: json, mutate, date) pourriez-vous utiliser pour convertir ce champ en un vrai timestamp ISO8601 et potentiellement l’assigner à @timestamp ?


Quel est le principal avantage de stocker les logs dans Elasticsearch (par rapport à des fichiers texte sur chaque serveur) lorsque vous devez rechercher des erreurs spécifiques qui se sont produites sur plusieurs jours et sur plusieurs applications différentes (comme flask_app_1 et flask_app_2) ?


Fin de l’exercice

Vous avez maintenant deux applications Flask qui envoient leurs logs à une stack Elastic complète, et vous savez comment les visualiser et les interroger basiquement dans Kibana. Vous avez également exploré l’importance des timestamps et de la synchronisation des horloges.

Pour arrêter et supprimer les conteneurs, les volumes et les réseaux :

docker-compose down -v