Partie 3 : Capture des Données


Structure des données

La TéléInformation Client envoie en continu les données du compteur Linky. Ces données sont structurées sous forme de trames. Chaque trame est délimitée par un caractère de début STX (b'\x02') et un caractère de fin ETX (b'\x03') . Au sein de chaque trame, des étiquettes vont s’afficher avec, en correspondance, les valeurs attribuées à ces étiquettes. La liste des étiquettes varie selon que l’on est en mode historique ou standard, selon que l’on est en monophasé ou en triphasé, selon que l’on bénéficie ou non d’un tarif heures pleines / heures creuses. La liste des étiquettes est disponible chez Enedis.

Les étiquettes qui vont nous intéresser sont :

  • DATE qui affiche la date et l’heure courante saisies par le Linky
  • LTARF qui précise si on est en heures creuses ou en heures pleines
  • EAST qui est l’énergie soutirée totale depuis la remise à zéro du Linky, en Wh
  • EASF01 qui correspond aux heures creuses en Wh
  • EASF02 qui correspond aux heures pleines en Wh
  • EASD01, EASD02, EASD03, EASD04 qui sont les énregies actives soutirées par le Distributeur
  • SINSTS, SINSTS1, SINSTS2 et SNSIST3 qui sont les puissances apparentes soutirées en totalité, pour la phase1, pour la phase2, pour la phase3

Installer pyserial

  • pyserial est un module python qui permet de récupérer les données émises par un optocoupleur
  • Se mettre en environnement virtuel
  • Installer pyserial en tapant : $ pip install pyserial

Script de capture des données TIC

Script de test

  • Ce script de test n’est pas utile pour le projet en soi
  • Il a un but didactique : comprendre le code de capture des données du TIC
  • Créer un dossier script : $ mkdir ~/djangoTIC/scripts
  • Ouvrir en écriture le script readTIC_test.py : $ `nano ~/djangoTIC/scripts/readTIC_test.py
  • Insérer le code suivant :
djangoTIC/scripts/readTIC_test.py
import serial
import time

# Initialisation du port série (le port s'ouvre automatiquement) avec les paramètres ad hoc
ser = serial.Serial(
    port="/dev/ttyAMA0",            
    baudrate=9600,
    bytesize=serial.SEVENBITS,
    parity=serial.PARITY_NONE,
    stopbits=serial.STOPBITS_ONE,
    timeout=1
)

# la liste des étiquettes dont on souhaite récupérer les valeurs
listItems = ["DATE", "LTARF", "EAST", "EASF01", "EASF02", "EASD01", "EASD02", "EASD03", "EASD04","SINSTS", "SINSTS1", "SINSTS2", "SINSTS3"]

def checkIfDictFull(dict, listItems):
    """Vérifie si un dictionnaire contient toutes les clés qui sont dans la liste listItems
    Lié au fait que le flux du TIC ne vas pas nécessairement fournir toutes les données pour un même jeu de données"""
    setKeys = set(dict.keys())
    if set(listItems).issubset(setKeys):
        return True
    else:
        return False


def capture_trame():
    while True:
        if ser.read(1) == b'\x02':      # Début de trame
            trame = b'\x02'             # On crée une chaîne de caractères
            while True:
                byte = ser.read(1)      # On lit le caractère
                trame += byte           # On ajoute le caractère à la chaîne
                if byte == b'\x03':     # Fin de trame
                    return trame        

def parse_trame(trame):
    data = trame[1:-1].decode('ascii', errors='ignore')                 # décodage du binaire en ascii
    print(f'type de data = {type(data)}')
    lines = data.split('\r\n')                                          # transformer la chaîne de caractères en une succession de lignes
    print(f'type de lines = {type(lines)}')
    dico = {}
    for line in lines:                                                  # traiter les lignes une par une ; les éléments de lignes sont séparés par des tabulations
        key = line.split('\t')[0]                                       # capter l'étiquette de la ligne
        value = line.split('\t')[1] if '\t' in line else ''             # capter la valeur de l'étiquette
        dico[key] = value                                               # ajouter l'étiquette et sa valeur dans le dictionnaire dico
        # print(f'Clé: {key}, Valeur: {value}')
    subDico = {key: dico[key] for key in listItems if key in dico}      # sélectionner les étiquettes et les valeurs qui sont dans la liste listitems
        
    if checkIfDictFull(dico, listItems):
        print('Dictionnaire complet capturé')
        # return subDico
    else:
        print('Dictionnaire incomplet')

try:
    while True:                                                         # boucle d'impression des données
        trame = capture_trame()
        donnees = parse_trame(trame)
        print(donnees)
        time.sleep(1)                                                   # Pause entre les captures
except KeyboardInterrupt:
    print("\nArrêt du script par l'utilisateur.")                       # Ctrl+C pour interrompre le script
except Exception as e:
    print(f"Erreur : {e}")

Lancer le script de test

  • En environnement virtuel
  • Lancer la commande : $ python ~/djangoTIC/scripts/readTIC_test.py
  • Les trames défilent, en indiquant soit que le dictionnaire est incomplet, soit qu’il est complet.
  • Interrompre avec Ctrl-C

Conserver le script ?

  • Une fois testé, le dossier et le script peuvent être gardés ou supprimés
  • Pour les supprimer : $ rm -R ~/djangoTIC/scripts