from django.core.management.base import BaseCommand
from ticapp.models import Data
from django.utils import timezone
import serial
import time
class Command(BaseCommand):
help = 'Capture et insère les données TIC en continu depuis le compteur Linky'
def __init__(self):
super().__init__()
# Initialisation du port série
self.ser = serial.Serial(
port="/dev/ttyAMA0",
baudrate=9600,
bytesize=serial.SEVENBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1,
xonxoff=False,
rtscts=False
)
self.listItems = ["DATE", "LTARF", "EAST", "EASF01", "EASF02", "EASD01", "EASD02", "EASD03", "EASD04", "SINSTS", "SINSTS1", "SINSTS2", "SINSTS3"]
def checkIfDictFull(self, dict, listItems):
"""Vérifie si un dictionnaire contient toutes les clés qui sont dans la liste listItems"""
setKeys = set(dict.keys())
return set(listItems).issubset(setKeys)
def capture_trame(self):
"""Capture une trame TIC complète"""
while True:
if self.ser.read(1) == b'\x02': # Début de trame
trame = b'\x02'
while True:
byte = self.ser.read(1)
trame += byte
if byte == b'\x03': # Fin de trame
return trame
def parse_trame(self, trame):
"""Parse la trame et retourne un dictionnaire avec les données"""
try:
data = trame[1:-1].decode('ascii', errors='ignore')
lines = data.split('\r\n')
dico = {}
for line in lines:
if '\t' in line:
parts = line.split('\t')
key = parts[0]
value = parts[1] if len(parts) > 1 else ''
dico[key] = value
# Ne retourner que si toutes les clés nécessaires sont présentes
if self.checkIfDictFull(dico, self.listItems):
subDico = {key: dico[key] for key in self.listItems if key in dico}
return subDico
else:
return None
except Exception as e:
self.stderr.write(self.style.ERROR(f'Erreur de parsing: {e}'))
return None
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('=== Démarrage de la capture TIC ==='))
self.stdout.write(f'Port série: {self.ser.port}')
self.stdout.write(f'Baudrate: {self.ser.baudrate}')
compteur = 0
erreurs = 0
try:
while True:
# Capture de la trame
trame = self.capture_trame()
# Parse de la trame
donnees = self.parse_trame(trame)
if donnees:
try:
# Insertion dans la base de données
Data.objects.create(
date=donnees["DATE"][1:], # Retire le premier caractère
larf=donnees["LTARF"],
east=donnees["EAST"],
easf01=donnees["EASF01"],
easf02=donnees["EASF02"],
easd01=donnees["EASD01"],
easd02=donnees["EASD02"],
easd03=donnees["EASD03"],
easd04=donnees["EASD04"],
sinsts=donnees["SINSTS"],
sinsts1=donnees["SINSTS1"],
sinsts2=donnees["SINSTS2"],
sinsts3=donnees["SINSTS3"]
)
compteur += 1
erreurs = 0 # Reset du compteur d'erreurs
# Log périodique
if compteur % 100 == 0:
self.stdout.write(
self.style.SUCCESS(
f'✓ {compteur} enregistrements insérés - '
f'Dernier: {timezone.now().strftime("%H:%M:%S")}'
)
)
except Exception as e:
erreurs += 1
self.stderr.write(
self.style.ERROR(f'Erreur insertion BD: {e}')
)
if erreurs > 10:
self.stderr.write(
self.style.ERROR('Trop d\'erreurs consécutives, arrêt')
)
break
else:
# Trame incomplète, on continue sans erreur
pass
time.sleep(0.5) # Petite pause entre les captures
except KeyboardInterrupt:
self.stdout.write(
self.style.WARNING(
f'\n=== Arrêt demandé par l\'utilisateur ===\n'
f'Total enregistrements: {compteur}'
)
)
except Exception as e:
self.stderr.write(self.style.ERROR(f'Erreur fatale: {e}'))
finally:
# Fermeture propre du port série
if self.ser.is_open:
self.ser.close()
self.stdout.write('Port série fermé')