TP11 : Centralisation des Logs avec Deux Applications Flask et la Stack Elastic (ELK)
L’objectif de ce TP sera d’envoyer les logs de deux instances d’une application Python Flask vers Logstash, qui les transmettra à Elasticsearch pour stockage, et enfin les visualiser dans Kibana. Nous mettrons en évidence l’importance de la synchronisation des horloges et de la gestion des timestamps.
Arborescence des Fichiers à Créer
elk-flask-exercice/
├── docker-compose.yml
├── logstash/
│ └── pipeline/
│ └── logstash.conf
└── app/
├── Dockerfile
├── requirements.txt
└── app.py
Schéma de la Stack et Communications
+----------------------+ HTTP +-----------------+ Python Logging +-----------------+ Parsed Data +-----------------+
| Votre Navigateur | <--------------> | Flask App 1 | -----------------> | | --------------> | |
| (http://localhost:5001)| | (Container) | | | | |
+----------------------+ +-----------------+ | | | |
(Port 5000 TCP) | Logstash | | Elasticsearch |
+----------------------+ HTTP +-----------------+ | (Container) | (HTTP/9200) | (Container) |
| Votre Navigateur | <--------------> | Flask App 2 | -----------------> | | | |
| (http://localhost:5002)| | (Container) | | | | |
+----------------------+ +-----------------+ (Port 5000 TCP) +-----------------+ +--------+--------+
^
| (Requêtes Kibana)
|
+---------+---------+
| Kibana |
| (Container) |
| (UI: localhost:5601)|
+-------------------+
La Stack Elastic (ELK) pour les Logs
- Elasticsearch : C’est une base de données NoSQL distribuée et un moteur de recherche très puissant. Il stocke les logs (généralement au format JSON) et permet des recherches rapides et complexes.
- Logstash : C’est un pipeline de traitement de données côté serveur. Il peut ingérer des données de multiples sources (dont nos logs applicatifs), les transformer (parser, enrichir, filtrer) et les envoyer vers une ou plusieurs destinations (appelées “stash”), comme Elasticsearch.
- Kibana : C’est une plateforme de visualisation et d’exploration de données. Elle se connecte à Elasticsearch pour permettre de créer des graphiques, des tableaux de bord (dashboards) et d’explorer interactivement les logs.
Dans notre cas, les applications Flask enverront leurs logs à Logstash, qui les préparera et les enverra à Elasticsearch. Kibana nous permettra de les voir.
Partie 1 : Mise en Place de l’Environnement
- Créez le répertoire principal elk-flask-exercice.
-
À l’intérieur, créez
docker-compose.yml
:services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9 # Utilisez une version compatible container_name: elasticsearch environment: - discovery.type=single-node - ES_JAVA_OPTS=-Xms256m -Xmx256m - xpack.security.enabled=false # Désactiver la sécurité pour la simplicité de l'exercice ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - es_data:/usr/share/elasticsearch/data ports: - "9200:9200" # Port API Elasticsearch networks: - elk_net logstash: image: docker.elastic.co/logstash/logstash:7.17.9 container_name: logstash volumes: - ./logstash/pipeline:/usr/share/logstash/pipeline:ro # Monte le pipeline de Logstash ports: - "5000:5000/tcp" # Port pour recevoir les logs de Flask depends_on: - elasticsearch networks: - elk_net environment: LS_JAVA_OPTS: "-Xms128m -Xmx128m" # Allouer de la mémoire kibana: image: docker.elastic.co/kibana/kibana:7.17.9 container_name: kibana ports: - "5601:5601" # Port UI Kibana depends_on: - elasticsearch networks: - elk_net environment: ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]' # Dit à Kibana où trouver Elasticsearch flask_app_1: # Anciennement flask_app build: ./app container_name: flask_app_1 ports: - "5001:8000" # Application Flask 1 accessible sur http://localhost:5001 environment: - LOGSTASH_HOST=logstash - LOGSTASH_PORT=5000 - APP_INSTANCE_ID=app1 # Identifiant pour cette instance - APP_TIMESTAMP_OFFSET_SECONDS=0 # Pas de décalage pour la première app depends_on: - logstash networks: - elk_net flask_app_2: # Nouvelle instance de Flask build: ./app # Utilise la même image container_name: flask_app_2 ports: - "5002:8000" # Application Flask 2 accessible sur http://localhost:5002 environment: - LOGSTASH_HOST=logstash - LOGSTASH_PORT=5000 - APP_INSTANCE_ID=app2 # Identifiant pour cette instance - APP_TIMESTAMP_OFFSET_SECONDS=-3600 # Simule un décalage d'une heure en arrière depends_on: - logstash networks: - elk_net volumes: es_data: # Docker gère ce volume pour la persistance des données d'Elasticsearch networks: elk_net: driver: bridge
-
Créez le répertoire
logstash/pipeline/
et à l’intérieur, le fichierlogstash.conf
:# logstash/pipeline/logstash.conf input { tcp { port => 5000 codec => json_lines # On s'attend à recevoir des lignes JSON de Flask } } # La section filter est optionnelle ici, mais c'est là qu'on pourrait # parser des dates, ajouter/supprimer des champs, utiliser grok pour du texte non structuré, etc. # Si on voulait s'assurer que event_timestamp devienne le @timestamp principal: # filter { # if [extra][event_timestamp] { # date { # match => [ "[extra][event_timestamp]", "ISO8601" ] # target => "@timestamp" # } # } # } output { elasticsearch { hosts => ["http://elasticsearch:9200"] # Adresse du service Elasticsearch index => "flask-logs-%{+YYYY.MM.dd}" # Nom de l'index dans Elasticsearch (quotidien) } # Pour le débogage, on peut aussi afficher les logs dans la console de Logstash # stdout { codec => rubydebug } }
- Créez le répertoire
app/
et à l’intérieur :-
requirements.txt
:Flask python-logstash-async # Pour envoyer les logs à Logstash
-
Dockerfile
:FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . EXPOSE 8000 CMD ["python", "app.py"]
-
app.py
:import logging import os import time import random from datetime import datetime, timezone, timedelta from flask import Flask from logstash_async.handler import AsynchronousLogstashHandler app = Flask(__name__) # Configuration du logger pour envoyer à Logstash logstash_host = os.environ.get('LOGSTASH_HOST', 'localhost') logstash_port = int(os.environ.get('LOGSTASH_PORT', 5000)) app_instance_id = os.environ.get('APP_INSTANCE_ID', 'unknown_flask_app') # Lire le décalage en secondes, par défaut 0 timestamp_offset_seconds = int(os.environ.get('APP_TIMESTAMP_OFFSET_SECONDS', 0)) # Créer un logger standard Python logger = logging.getLogger(f'flask-logstash-logger-{app_instance_id}') logger.setLevel(logging.INFO) # Capturer les logs à partir du niveau INFO # Ajouter le handler Logstash logstash_handler = AsynchronousLogstashHandler( host=logstash_host, port=logstash_port, database_path=None # Désactiver la mise en cache sur disque pour la simplicité ) logger.addHandler(logstash_handler) def log_with_custom_fields(level, message, extra_fields=None): # Simuler une latence réseau avant l'envoi du log time.sleep(random.uniform(0, 2)) # Délai aléatoire entre 0 et 2 secondes # Générer le timestamp de l'événement par l'application # Appliquer le décalage simulé event_time_utc = datetime.now(timezone.utc) + timedelta(seconds=timestamp_offset_seconds) base_log_data = { 'source_app': app_instance_id, 'event_timestamp': event_time_utc.isoformat() # Format ISO8601 avec Z pour UTC } if extra_fields: base_log_data.update(extra_fields) if level == logging.INFO: logger.info(message, extra=base_log_data) elif level == logging.WARNING: logger.warning(message, extra=base_log_data) elif level == logging.ERROR: # exc_info=True ne peut pas être passé directement via 'extra', donc on le gère ici # Pour logger.error avec exc_info, il faut appeler logger.error directement. # Cette fonction helper est simplifiée pour l'exemple. # Pour une solution complète, il faudrait gérer exc_info séparément. logger.error(message, extra=base_log_data) # ... ajouter d'autres niveaux si nécessaire @app.route('/') def index(): log_with_custom_fields( logging.INFO, "Page d'accueil visitée.", extra_fields={'client_ip': '127.0.0.1', 'endpoint': '/'} ) return f"Hello from {app_instance_id}! Log INFO envoyé." @app.route('/action') def action(): log_with_custom_fields( logging.WARNING, "Une action sensible a été déclenchée.", extra_fields={'user_id': 'dev_user', 'action_name': 'trigger_event'} ) return f"Action effectuée par {app_instance_id}. Log WARNING envoyé." @app.route('/error') def error_route(): try: x = 10 / 0 except ZeroDivisionError as e: # Pour les erreurs avec traceback, il est préférable d'appeler logger.error directement # car exc_info doit être un argument de la méthode de logging. time.sleep(random.uniform(0, 2)) # Latence event_time_utc = datetime.now(timezone.utc) + timedelta(seconds=timestamp_offset_seconds) log_data = { 'source_app': app_instance_id, 'event_timestamp': event_time_utc.isoformat(), 'error_code': 'DIV_BY_ZERO', 'context': 'calculation_module' } logger.error( f"Une erreur de division par zéro s'est produite: {str(e)}", exc_info=True, # Ajoute automatiquement les informations de traceback extra=log_data ) return f"Erreur intentionnelle générée et loguée par {app_instance_id}.", 500 if __name__ == '__main__': # Le log de démarrage n'utilisera pas la latence simulée ni le timestamp d'événement pour la simplicité startup_message = f"Application Flask {app_instance_id} (avec logs vers ELK) démarrée. Timestamp offset: {timestamp_offset_seconds}s." logger.info(startup_message, extra={'source_app': app_instance_id}) print(startup_message) # Aussi sur la console pour le debug de docker-compose logs app.run(host='0.0.0.0', port=8000, debug=False)
-
-
Démarrez les services :
Vérifiez que vous avez suffisament de ressources sur votre host (par exemble avec ‘htop’) et coupez les services que vous n’utilisez pas présentement.
Ca peut être utile de vous créer une swap, car vous n’en avez pas sur votre VPS et votre ram est très limité:
sudo dd if=/dev/zero of=/swapfile bs=1M count=2048 status=progress sudo chmod 600 /swapfile sudo mkswap /swapfile sudo swapon /swapfile echo '/swapfile none swap sw 0 0' | sudo tee -a /etc/fstab
Puis, depuis la racine de
elk-flask-exercice
, exécutez :docker-compose up -d --build
Le
--build
est important siapp.py
a changé. Attendez quelques minutes que tous les services démarrent. Vous pouvez suivre les logs avecdocker-compose logs -f elasticsearch kibana logstash flask_app_1 flask_app_2
.
Logs Structurés, Timestamps et Sources Multiples
Elasticsearch fonctionne mieux avec des données structurées. Chaque “champ” dans le JSON (ex:
@timestamp
,level
,message
,extra.source_app
,extra.event_timestamp
) peut être indexé. Cela permet :
- Recherches ciblées :
extra.source_app : "app2" and extra.user_id : "dev_user"
- Agrégations : Compter le nombre d’erreurs par
extra.error_code
.- Visualisations riches dans Kibana.
Notre
app.py
ajouteextra.source_app
pour identifier l’origine du log etextra.event_timestamp
pour enregistrer le moment où l’événement s’est produit du point de vue de l’application (potentiellement affecté par la désynchronisation de l’horloge et avant la latence d’envoi simulée). Logstash ajoute@timestamp
qui correspond au moment où il a reçu et traité le log.
Partie 2 : Génération et Visualisation des Logs
- Générez des logs : Ouvrez votre navigateur et visitez les URLs des deux applications Flask :
- Application 1 (pas de décalage d’horloge simulé) :
http://localhost:5001/
http://localhost:5001/action
http://localhost:5001/error
- Application 2 (avec décalage d’horloge simulé d’une heure en arrière) :
http://localhost:5002/
http://localhost:5002/action
http://localhost:5002/error
Visitez chaque URL plusieurs fois pour générer suffisamment de logs.
- Application 1 (pas de décalage d’horloge simulé) :
-
Accédez à Kibana : Ouvrez
http://localhost:5601
dans votre navigateur. - Configurez un “Index Pattern” dans Kibana (première fois seulement) :
- Cliquez sur le menu “hamburger” (☰) en haut à gauche, puis allez dans Management > Stack Management.
- Sous Kibana, cliquez sur Index Patterns.
- Cliquez sur Create index pattern.
- Dans le champ “Index pattern name”, tapez
flask-logs-*
(cela correspond à l’index que nous avons défini danslogstash.conf
). - Pour “Time field”, choisissez
@timestamp
(Logstash ajoute ce champ automatiquement et c’est généralement le plus fiable pour le tri chronologique général car il est centralisé). - Cliquez sur Create index pattern.
- Visualisez les logs :
- Cliquez à nouveau sur le menu “hamburger” (☰).
- Sous Analytics, cliquez sur Discover.
- Vous devriez voir vos logs apparaître ! Si ce n’est pas le cas, vérifiez le sélecteur de plage de temps en haut à droite (ex: “Last 15 minutes”).
- Ajoutez les colonnes
extra.source_app
etextra.event_timestamp
à la vue pour faciliter la comparaison. Cliquez sur un nom de champ dans la liste “Available fields” à gauche, puis sur l’icône “+” qui apparaît.
- a) Dans Kibana Discover, trouvez un log généré par la visite de la page
/action
deflask_app_1
. Quels sont les valeurs des champslevel
,message
,extra.source_app
et du champ personnaliséextra.user_id
?- b) Cliquez sur un des documents de log pour l’étendre. En plus des champs de la question précédente, citez deux autres champs (standard ou personnalisés que vous avez ajoutés via
extra=
) présents dans ce log, avec leurs valeurs. Notez en particulier la valeur de@timestamp
etextra.event_timestamp
.
Question sur la Synchronisation des Horloges et Timestamps :
- a) Générez quelques logs depuis
flask_app_1
(pas de décalage) puis immédiatement après, quelques logs depuisflask_app_2
(décalage d’une heure en arrière).- b) Dans Kibana Discover, triez les logs par
@timestamp
(le plus récent en premier). Observez l’ordre des logs des deux applications.- c) Maintenant, examinez attentivement les valeurs de
@timestamp
(heure de réception par Logstash) etextra.event_timestamp
(heure de l’événement selon l’application) pour les logs deflask_app_2
. Quelle différence majeure observez-vous entre ces deux timestamps pourflask_app_2
?- d) Si vous deviez reconstituer la séquence exacte des événements tels qu’ils se sont produits chronologiquement dans le monde réel, et que les horloges de vos serveurs applicatifs n’étaient pas parfaitement synchronisées (comme simulé ici), lequel de ces timestamps (
@timestamp
ouextra.event_timestamp
) serait le plus trompeur si utilisé seul ? Pourquoi ?- e) Expliquez brièvement pourquoi il est crucial que tous les serveurs d’un système distribué aient leurs horloges synchronisées (par exemple, en utilisant NTP - Network Time Protocol). Quel timestamp devient alors le plus fiable pour l’ordre des événements ?
Partie 3 : Filtrage et Recherche dans Kibana
Kibana utilise KQL (Kibana Query Language) dans sa barre de recherche (en haut de la page Discover).
- Filtrer par niveau :
level : "ERROR"
. - Filtrer par champ personnalisé :
extra.error_code : "DIV_BY_ZERO"
(les champs deextra
sont imbriqués). - Filtrer par source applicative :
extra.source_app : "app2"
- Filtrer par texte dans le message :
message : "Page d'accueil"
(recherche de phrase exacte) oumessage : Page d'accueil
(recherche des mots). - Combiner :
extra.source_app : "app1" and level : "ERROR" and extra.context : "calculation_module"
Quelle requête KQL utiliseriez-vous dans Kibana pour afficher uniquement les messages de niveau WARNING de
flask_app_2
qui contiennent la chaîne de caractèressensible
dans le champmessage
?
Rôle de Logstash dans le Pipeline
Même si dans cet exercice notre
logstash.conf
est simple, Logstash est très puissant. Dans la sectionfilter
(que nous avons commentée), on pourrait :
- Parser des logs non structurés : Si Flask écrivait du texte simple, Logstash pourrait utiliser des patterns pour extraire des champs.
- Enrichir les logs : Ajouter des informations basées sur l’IP source (géolocalisation), modifier des champs, en supprimer.
- Normaliser les données : S’assurer que les timestamps sont dans le bon format, que les niveaux de sévérité sont cohérents entre différentes sources de logs. Par exemple, on pourrait configurer Logstash pour utiliser
extra.event_timestamp
comme@timestamp
principal si on le jugeait plus pertinent (voir exemple commenté danslogstash.conf
).Logstash agit comme un ETL (Extract, Transform, Load) pour vos logs avant qu’ils n’atteignent Elasticsearch.
[Optionnel] Si votre application Flask (
app1
) générait des logs avec des dates dans un format non standard (ex:DD/MM/YYYY HH:MM:SS
) dans un champextra.custom_event_time
, quelle section delogstash.conf
modifieriez-vous et quel type de plugin Logstash (juste le nom du type, ex: json, mutate, date) pourriez-vous utiliser pour convertir ce champ en un vrai timestamp ISO8601 et potentiellement l’assigner à@timestamp
?
Quel est le principal avantage de stocker les logs dans Elasticsearch (par rapport à des fichiers texte sur chaque serveur) lorsque vous devez rechercher des erreurs spécifiques qui se sont produites sur plusieurs jours et sur plusieurs applications différentes (comme
flask_app_1
etflask_app_2
) ?
Fin de l’exercice
Vous avez maintenant deux applications Flask qui envoient leurs logs à une stack Elastic complète, et vous savez comment les visualiser et les interroger basiquement dans Kibana. Vous avez également exploré l’importance des timestamps et de la synchronisation des horloges.
Pour arrêter et supprimer les conteneurs, les volumes et les réseaux :
docker-compose down -v