CérénIT

Le blog tech de Nicolas Steinmetz (Time Series, IoT, Web, Ops, Data)

IoT - Qualité de l'air avec un esp32 (TTGo T-Display), le service ThingSpeak, du MQTT, Warp 10 et Discovery

iotmqttarduinowarp10thingspeakco2esp32discoverydashboard

Le projet Nous Aérons propose de réaliser ses propres détecteurs de CO2 avec un ESP32 avec un écran comme le Lilygo TTGo T-Display et un capteur Senseair S8-LP.

L'idée est donc de déployer plusieurs capteurs, faire remonter les valeurs via ThingSpeak et ensuite les ingérer puis analyser avec Warp 10 et faire un dashboard avec Discovery.

schema du projet

Montage

Pour le montage, je vous invite à consuler principalement :

ThingSpeak

L'exemple de code fourni utilise le service ThingSpeak pour la remontée des valeurs. Comme il s'agit de mon premier projet Arduino et que cela fonctionne, j'ai cherché à rester dans les clous du code proposé et tester par la même occasion ce service. J'aurais pu directement poster les valeurs sur mon instance Warp 10 mais c'est aussi l'occasion de tester la récupération d'informations via le client MQTT de Warp 10.

Il vous faut :

  • Créer un compte
  • Créer un channel avec le champ 1 qui accueillera vos données
  • Récupérer la clé d'API en écriture
  • Noter votre Channel ID

Code Arduino

Disclaimer : c'est mon premier projet Arduino.

En repartant du code fourni sur le site Capteur de CO2, j'ai fait quelques ajustements :

  • Remplacer l'appel HTTP par la librairie ThingSpeak
  • La librairie retourne des status code : 200 si c'est OK, 40x si incorrect et -XXX si erreur ; j'ai un amélioré le message de debug pour savoir si l'insertion était OK ou KO.
  • Ajustement de positionnement du "CO2" et du "ppm" sur l'écran pour une meilleure lisibilité.

Il vous faut modifier :

  • la configuration wifi : ssid1, password1 et éventuellement ssid2, password2
  • la configuration ThingSpeak : channelID, writeAPIKey

Compiler le tout et uploader le code sur votre ESP32.

/************************************************
 * 
 *   Capteur de CO2 par Grégoire Rinolfi
 *   https://co2.rinolfi.ch
 * 
 ***********************************************/
#include <TFT_eSPI.h>
#include <SPI.h>
#include <Wire.h>
#include <WiFiMulti.h>
#include <ThingSpeak.h>

WiFiMulti wifiMulti;
TFT_eSPI tft = TFT_eSPI(135, 240);

/************************************************
 * 
 *   Paramètres utilisateur
 * 
 ***********************************************/
 
#define TXD2 21         // série capteur TX
#define RXD2 22         // série capteur RX
#define BOUTON_CAL 35
#define DEBOUNCE_TIME 1000

const char* ssid1     = "wifi1";
const char* password1 = "XXXXXXXXXXXXXXXX";
const char* ssid2     = "wifi2";
const char* password2 = "XXXXXXXXXXXXXXXX";

unsigned long channelID = XXXXXXXXXXXXXXXX;
char* readAPIKey = "XXXXXXXXXXXXXXXX";
char* writeAPIKey = "XXXXXXXXXXXXXXXX";
unsigned int dataFieldOne = 1;                       // Field to write temperature data
const unsigned long postingInterval = 12L * 1000L;   // 12s
unsigned long lastTime = 0;

// gestion de l'horloge pour la validation des certificats HTTPS
void setClock() {
  configTime(0, 0, "pool.ntp.org", "time.nist.gov");

  Serial.print(F("Waiting for NTP time sync: "));
  time_t nowSecs = time(nullptr);
  while (nowSecs < 8 * 3600 * 2) {
    delay(500);
    Serial.print(F("."));
    yield();
    nowSecs = time(nullptr);
  }

  Serial.println();
  struct tm timeinfo;
  gmtime_r(&nowSecs, &timeinfo);
  Serial.print(F("Current time: "));
  Serial.print(asctime(&timeinfo));
}

/************************************************
 * 
 * Thinkgspeak functions
 * https://fr.mathworks.com/help/thingspeak/read-and-post-temperature-data.html
 * 
 ***********************************************/

float readTSData( long TSChannel,unsigned int TSField ){
    
  float data =  ThingSpeak.readFloatField( TSChannel, TSField, readAPIKey );
  Serial.println( " Data read from ThingSpeak: " + String( data, 9 ) );
  return data;

}

// Use this function if you want to write a single field.
int writeTSData( long TSChannel, unsigned int TSField, float data ){
  int  writeSuccess = ThingSpeak.writeField( TSChannel, TSField, data, writeAPIKey ); // Write the data to the channel
  if(writeSuccess == 200){
    Serial.println("Channel updated successfully!");
  }
  else{
    Serial.println("Problem updating channel. HTTP error code " + String(writeSuccess));
  }
  return writeSuccess;
}

// Use this function if you want to write multiple fields simultaneously.
int write2TSData( long TSChannel, unsigned int TSField1, long field1Data, unsigned int TSField2, long field2Data ){

  ThingSpeak.setField( TSField1, field1Data );
  ThingSpeak.setField( TSField2, field2Data );
    
  int writeSuccess = ThingSpeak.writeFields( TSChannel, writeAPIKey );
  if(writeSuccess == 200){
    Serial.println("Channel updated successfully!");
  }
  else{
    Serial.println("Problem updating channel. HTTP error code " + String(writeSuccess));
  }
  return writeSuccess;
}

/************************************************
 * 
 *   Code de gestion du capteur CO2 via ModBus
 *   inspiré de : https://github.com/SFeli/ESP32_S8
 * 
 ***********************************************/
volatile uint32_t DebounceTimer = 0;

byte CO2req[] = {0xFE, 0X04, 0X00, 0X03, 0X00, 0X01, 0XD5, 0XC5};
byte ABCreq[] = {0xFE, 0X03, 0X00, 0X1F, 0X00, 0X01, 0XA1, 0XC3}; 
byte disableABC[] = {0xFE, 0X06, 0X00, 0X1F, 0X00, 0X00, 0XAC, 0X03};  // écrit la période 0 dans le registre HR32 à adresse 0x001f
byte enableABC[] = {0xFE, 0X06, 0X00, 0X1F, 0X00, 0XB4, 0XAC, 0X74}; // écrit la période 180
byte clearHR1[] = {0xFE, 0X06, 0X00, 0X00, 0X00, 0X00, 0X9D, 0XC5}; // ecrit 0 dans le registe HR1 adresse 0x00
byte HR1req[] = {0xFE, 0X03, 0X00, 0X00, 0X00, 0X01, 0X90, 0X05}; // lit le registre HR1 (vérifier bit 5 = 1 )
byte calReq[] = {0xFE, 0X06, 0X00, 0X01, 0X7C, 0X06, 0X6C, 0XC7}; // commence la calibration background
byte Response[20];
uint16_t crc_02;
int ASCII_WERT;
int int01, int02, int03;
unsigned long ReadCRC;      // CRC Control Return Code 

void send_Request (byte * Request, int Re_len)
{
  while (!Serial1.available())
  {
    Serial1.write(Request, Re_len);   // Send request to S8-Sensor
    delay(50);
  }

  Serial.print("Requete : ");
  for (int02 = 0; int02 < Re_len; int02++)    // Empfangsbytes
  {
    Serial.print(Request[int02],HEX);
    Serial.print(" ");
  }
  Serial.println();
  
}

void read_Response (int RS_len)
{
  int01 = 0;
  while (Serial1.available() < 7 ) 
  {
    int01++;
    if (int01 > 10)
    {
      while (Serial1.available())
        Serial1.read();
      break;
    }
    delay(50);
  }

  Serial.print("Reponse : ");
  for (int02 = 0; int02 < RS_len; int02++)    // Empfangsbytes
  {
    Response[int02] = Serial1.read();
    
    Serial.print(Response[int02],HEX);
    Serial.print(" ");
  }
  Serial.println();
}

unsigned short int ModBus_CRC(unsigned char * buf, int len)
{
  unsigned short int crc = 0xFFFF;
  for (int pos = 0; pos < len; pos++) {
    crc ^= (unsigned short int)buf[pos];   // XOR byte into least sig. byte of crc
    for (int i = 8; i != 0; i--) {         // Loop over each bit
      if ((crc & 0x0001) != 0) {           // If the LSB is set
        crc >>= 1;                         // Shift right and XOR 0xA001
        crc ^= 0xA001;
      }
      else                            // else LSB is not set
        crc >>= 1;                    // Just shift right
    }
  }  // Note, this number has low and high bytes swapped, so use it accordingly (or swap bytes)
  return crc;  
}

unsigned long get_Value(int RS_len)
{

// Check the CRC //
  ReadCRC = (uint16_t)Response[RS_len-1] * 256 + (uint16_t)Response[RS_len-2];
  if (ModBus_CRC(Response, RS_len-2) == ReadCRC) {
    // Read the Value //
    unsigned long val = (uint16_t)Response[3] * 256 + (uint16_t)Response[4];
    return val * 1;       // S8 = 1. K-30 3% = 3, K-33 ICB = 10
  }
  else {
    Serial.print("CRC Error");
    return 99;
  }
}

// interruption pour lire le bouton d'étalonnage
bool demandeEtalonnage = false;
void IRAM_ATTR etalonnage() {
  if ( millis() - DEBOUNCE_TIME  >= DebounceTimer ) {
    DebounceTimer = millis();
    
    Serial.println("Etalonnage manuel !!");

    tft.fillScreen(TFT_BLACK);
    tft.setTextSize(3);
    tft.setTextColor(TFT_WHITE);
    tft.drawString("Etalonnage", tft.width() / 2, tft.height()/2);

    demandeEtalonnage = true;
  } 
}

// nettoie l'écran et affiche les infos utiles
void prepareEcran() {
  tft.fillScreen(TFT_BLACK);
  // texte co2 à gauche
  tft.setTextSize(4);
  tft.setTextColor(TFT_WHITE);
  tft.drawString("CO",25, 120);
  tft.setTextSize(3);
  tft.drawString("2",60, 125);

  // texte PPM à droite ppm
  tft.drawString("ppm",215, 120);

  // écriture du chiffre
  tft.setTextColor(TFT_GREEN,TFT_BLACK);
  tft.setTextSize(8);
}

void setup() {
  // bouton de calibration
  pinMode(BOUTON_CAL, INPUT);

  // ports série de debug et de communication capteur
  Serial.begin(115200);
  Serial1.begin(9600, SERIAL_8N1, RXD2, TXD2);

  // initialise l'écran
  tft.init();
  delay(20);
  tft.setRotation(1);
  tft.fillScreen(TFT_BLACK);
  tft.setTextDatum(MC_DATUM); // imprime la string middle centre
  
  // vérifie l'état de l'ABC
  send_Request(ABCreq, 8);
  read_Response(7);
  Serial.print("Période ABC : ");
  Serial.printf("%02ld", get_Value(7));
  Serial.println();
  int abc = get_Value(7);

  // active ou désactive l'ABC au démarrage
  if(digitalRead(BOUTON_CAL) == LOW){
    if(abc == 0){
      send_Request(enableABC, 8);
    }else{
      send_Request(disableABC, 8);
    }
    read_Response(7);
    get_Value(7);
  }
  
  tft.setTextSize(2);
  tft.setTextColor(TFT_BLUE,TFT_BLACK);
  tft.drawString("Autocalibration", tft.width() / 2, 10);
  if( abc != 0 ){
    tft.drawString(String(abc)+"h", tft.width() / 2, 40);
  }else{
    tft.drawString("OFF", tft.width() / 2, 40);
  }

  // gestion du wifi
  wifiMulti.addAP(ssid1, password1);
  wifiMulti.addAP(ssid2, password2);

  Serial.print("Connexion au wifi");
  tft.setTextSize(2);
  tft.setTextColor(TFT_WHITE,TFT_BLACK);
  tft.drawString("Recherche wifi", tft.width() / 2, tft.height() / 2);
  
  int i = 0;
  while(wifiMulti.run() != WL_CONNECTED && i < 3){
    Serial.print(".");
    delay(500);
    i++;
  }
  if(wifiMulti.run() == WL_CONNECTED){
    tft.setTextColor(TFT_GREEN,TFT_BLACK);
    Serial.println("Connecté au wifi");
    Serial.println("IP address: ");
    Serial.println(WiFi.localIP());
    tft.drawString("Wifi OK", tft.width() / 2, 100);
    setClock();
  }else{
    tft.setTextColor(TFT_RED,TFT_BLACK);
    Serial.println("Echec de la connexion wifi");
    tft.drawString("Pas de wifi", tft.width() / 2, 100);
  }
  delay(3000); // laisse un temps pour lire les infos

  // préparation de l'écran
  prepareEcran();

  //interruption de lecture du bouton
  attachInterrupt(BOUTON_CAL, etalonnage, FALLING);
}  

unsigned long ancienCO2 = 0;
int seuil = 0;

void loop() {

  // effectue l'étalonnage si on a appuyé sur le bouton
  if( demandeEtalonnage ){
    demandeEtalonnage = false;
    // nettoye le registre de verification
    send_Request(clearHR1, 8); 
    read_Response(8); 
    delay(100);
    // demande la calibration
    send_Request(calReq, 8); 
    read_Response(8); 
    delay(4500); // attend selon le cycle de la lampe

    // lit le registre de verification
    send_Request(HR1req, 8); 
    read_Response(7); 
    int verif = get_Value(7);
    Serial.println("resultat calibration "+String(verif));
    if(verif == 32){
      tft.setTextColor(TFT_GREEN);
      tft.drawString("OK", tft.width() / 2, tft.height()/2+30);
    }else{
      tft.setTextColor(TFT_RED);
      tft.drawString("Erreur", tft.width() / 2, tft.height()/2+20);
    }
    delay(3000);
    prepareEcran();
    seuil = 0;
  }

  // lecture du capteur
  send_Request(CO2req, 8);
  read_Response(7);
  unsigned long CO2 = get_Value(7);
  
  String CO2s = "CO2: " + String(CO2);
  Serial.println(CO2s);

  // efface le chiffre du texte
  if(CO2 != ancienCO2){
    tft.fillRect(0,0, tft.width(), 60, TFT_BLACK);
  }

  if( CO2 < 800 ){
    tft.setTextColor(TFT_GREEN,TFT_BLACK);
    if( seuil != 1 ){
      tft.setTextSize(2);
      tft.fillRect(0,61, tft.width(), 25, TFT_BLACK);
      tft.drawString("Air Excellent", tft.width() / 2, tft.height() / 2 + 10);
    }
    seuil = 1;
  }else if( CO2 >= 800 && CO2 < 1000){
    tft.setTextColor(TFT_ORANGE,TFT_BLACK);
    if( seuil != 2 ){
      tft.setTextSize(2);
      tft.fillRect(0,61, tft.width(), 25, TFT_BLACK);
      tft.drawString("Air Moyen", tft.width() / 2, tft.height() / 2 + 10);
    }
    seuil = 2;
  }else if (CO2 >= 1000 && CO2 < 1500){
    tft.setTextColor(TFT_RED,TFT_BLACK);
    if( seuil != 3 ){
      tft.setTextSize(2);
      tft.fillRect(0,61, tft.width(), 25, TFT_BLACK);
      tft.drawString("Air Mediocre", tft.width() / 2, tft.height() / 2 + 10);
    }
    seuil = 3;
  }else{
    tft.setTextColor(TFT_RED,TFT_BLACK);
    if( seuil != 4 ){
      tft.setTextSize(2);
      tft.fillRect(0,61, tft.width(), 25, TFT_BLACK);
      tft.drawString("Air Vicie", tft.width() / 2, tft.height() / 2 + 10);
    }
    seuil = 4;
  }

  tft.setTextSize(8);
  tft.drawString(String(CO2), tft.width() / 2, tft.height() / 2 - 30);


  // envoi de la valeur sur le cloud
  if((millis() - lastTime) >= postingInterval) {
    if((wifiMulti.run() == WL_CONNECTED)) {
      WiFiClient client;
      ThingSpeak.begin( client );
      
      writeTSData( channelID , dataFieldOne , CO2 );  
      lastTime = millis();
    }
  }

  ancienCO2 = CO2;
  delay(10000); // attend 10 secondes avant la prochaine mesure
}

MQTT

Sur ThingSpeak, aller dans Devices > MQTT et compléter si besoin avec la lecture de la documentation MQTT Basics:

  • Créer un device,
  • Ajouter la/les channel(s) voulu(s),
  • Limiter les droits à la partie "subscribe" ; notre client en effet n'est pas prévu pour publier des données sur ThingSpeak,
  • Conserver précautionneusement vos identifiants MQTT.

Sur l'instance Warp 10, déployer le plugin MQTT :

Avec le script /path/to/warp10/mqtt/test.mc2 :

// subscribe to the topics, attach a WarpScript™ macro callback to each message
// the macro reads ThingSpeak message to extract the first byte of payload,
// the server timestamp, the channel id and the value

'Loading MQTT ThingSpeak Air Quality Warpscript™' STDOUT
{
  'host' 'mqtt3.thingspeak.com'
  'port' 1883
  'user' 'XXXXXXXXXX'
  'password' 'XXXXXXXXXX'
  'clientid' 'XXXXXXXXXX'
  'topics' [
    'channels/channelID 1/subscribe'
    'channels/channelID 2/subscribe'
    'channels/channelID 3/subscribe'
  ]
  'timeout' 20000
  'parallelism' 1
  'autoack' true

  'macro'
  <%
    //in case of timeout, the macro is called to flush buffers, if any, with NULL on the stack.
    'message' STORE
    <% $message ISNULL ! %>
    <%
      // message structure :
      // {elevation=null, latitude=null, created_at=2022-01-11T10:02:27Z, field1=412.00000, field7=null, field6=null, field8=null, field3=null, channel_id=1630275, entry_id=417, field2=null, field5=null, field4=null, longitude=null, status=null}
      $message MQTTPAYLOAD 'ascii' BYTES-> JSON-> 'TSmessage' STORE
      $TSmessage 'created_at' GET TOTIMESTAMP 'ts' STORE
      $TSmessage 'channel_id' GET 'channelId' STORE
      $TSmessage 'field1' GET 'sensorValue' STORE

      $message MQTTTOPIC ' ' +
      $ts ISO8601 + ' ' +
      $channelId TOSTRING + ' ' +
      $sensorValue +
      STDOUT // print to warp10.log
    %> IFT
  %>
}

Vous devriez avoir dans /path/to/warp10/log/warp10.log :

Loading MQTT ThingSpeak Air Quality Warpscript™
channels/<channelID 1>/subscribe 2022-01-11T10:30:51.000000Z <channelID 1> 820.00000
channels/<channelID 2>/subscribe 2022-01-11T10:30:53.000000Z <channelID 2> 715.00000
channels/<channelID 3>/subscribe 2022-01-11T10:30:54.000000Z <channelID 3> 410.00000

Maintenant que l'intégration MQTT est validée, supprimez ce fichier et passons à la gestion de la persistence des données dans Warp 10.

Avec le script suivant :

// subscribe to the topics, attach a WarpScript™ macro callback to each message
// the macro reads ThingSpeak message to extract the first byte of payload,
// the server timestamp, the channel id and the value.

{
  'host' 'mqtt3.thingspeak.com'
  'port' 1883
  'user' 'XXXXXXXXXX'
  'password' 'XXXXXXXXXX'
  'clientid' 'XXXXXXXXXX'
  'topics' [
    'channels/channelID 1/subscribe'
    'channels/channelID 2/subscribe'
    'channels/channelID 3/subscribe'
  ]
  'timeout' 20000
  'parallelism' 1
  'autoack' true

  'macro'
  <%
    //in case of timeout, the macro is called to flush buffers, if any, with NULL on the stack.
    'message' STORE
    <% $message ISNULL ! %>
    <%
      // message structure :
      // {elevation=null, latitude=null, created_at=2022-01-11T10:02:27Z, field1=412.00000, field7=null, field6=null, field8=null, field3=null, channel_id=1630275, entry_id=417, field2=null, field5=null, field4=null, longitude=null, status=null}
      $message MQTTPAYLOAD 'ascii' BYTES-> JSON-> 'TSmessage' STORE
      $TSmessage 'created_at' GET TOTIMESTAMP 'ts' STORE
      $TSmessage 'channel_id' GET 'channelId' STORE
      $TSmessage 'field1' GET 'sensorValue' STORE

      // Tableau de correspondance entre mes channel IDs et mes devices en vue de définir des labels pour les GTS
      {
        <channelID 1> 'air1'
        <channelID 2> 'air2'
        <channelID 3> 'air3'
      } 'deviceMap' STORE
      // Récupération du nom du device dans la variable senssorId
      $deviceMap $channelId GET 'sensorId' STORE

      // Création d'une GTS air.quality.home
      // Le label "device" aura pour valeur le nom du device, via la variable sensorId
      // On crée une entrée qui correspond à la valeur que nous venons de récupérer
      // sensorValue est une string, il faut la repasser sur un format numérique
      // Une fois la GTS reconstituée avec son entrée, on la periste en base via UPDATE
      '<writeToken>' 'writeToken' STORE
      NEWGTS 'air.quality.home' RENAME
      { 'device' $sensorId } RELABEL
      $ts NaN NaN NaN $sensorValue TODOUBLE TOLONG ADDVALUE
      $writeToken UPDATE
    %> IFT
  %>
}

Depuis le WarpStudio, vérifiez la disposnibilité de vos données :

'<readToken>' 'readToken'  STORE
[ $readToken 'air.quality.home' {} NOW -1000 ] FETCH

warp10 - validation de l'ingestion des données IoT

Ensuite, il nous reste plus qu'à faire une petite macro et un dashboard pour présenter les données.

Pour la macro :

  • On lui passe un nom de device en paramètre qui servira à filtrer sur le label
  • Elle retourne une GTS avec l'ensemble des valeurs disponibles
<%
  {
    'name' 'cerenit/iot/co2'
    'desc' 'Provide CO2 levels per device'
    'sig' [ [ [ [  'device:STRING' ] ]  [ 'result:GTS' ] ] ]
    'params' {
        'device' 'String'
        'result' 'GTS'
    }
    'examples' [
    <'
air1 @cerenit/iot/co2
    '>
    ]
  } INFO

  // Actual code
  SAVE 'context' STORE

  'device' STORE // Save parameter as year

  '<readToken>' 'readToken' STORE
  [ $readToken 'air.quality.home' { 'device' $device } MAXLONG MINLONG ] FETCH
  0 GET

  $context RESTORE
%>
'macro' STORE
$macro

Et pour le dashboard Discovery :

  • Chaque tile utilise la macro que l'on vient de réaliser en lui passant le device en paramètre,
  • Chaque tile affiche un système de seuils avec des couleurs associées.
<%
{
    'title' 'Home CO2 Analysis'
    'description' 'esp32 + Senseair S8 sensors at home'
    'options' {
      'scheme' 'CHARTANA'
    }
    'tiles' [
        {
            'title' 'Informations'
            'type' 'display'
            'w' 6 'h' 1 'x' 0 'y' 0
            'data' {
                'data' 'D&eacute;tails et informations compl&eacute;mentaires :  <a href="https://www.cerenit.fr/blog/air-quality-iot-esp32-senseair-thingspeak-mqtt-warp10-discovery/">IoT - Qualit&eacute; de l air avec un esp32 (TTGo T-Display), le service ThingSpeak, du MQTT, Warp 10 et Discovery</a>'
            }
        }
        {
            'title' 'Device AIR1'
            'type' 'line'
            'w' 6 'h' 2 'x' 0 'y' 1
            'macro' <% 'air1' @cerenit/macros/co2 %>
            'options' {
                'thresholds' [
                    { 'value' 400 'color' '#008000' }
                    { 'value' 600 'color' '#329932' }
                    { 'value' 800 'color' '#66b266' }
                    { 'value' 960 'color' '#ffdb99' }
                    { 'value' 1210 'color' '#ffa500' }
                    { 'value' 1760 'color' '#ff0000' }
	        ]
	    }
        }
        {
            'title' 'Device AIR2'
            'type' 'line'
            'w' 6 'h' 2 'x' 6 'y' 1
            'macro' <% 'air2' @cerenit/macros/co2 %>
            'options' {
                'thresholds' [
                    { 'value' 400 'color' '#008000' }
                    { 'value' 600 'color' '#329932' }
                    { 'value' 800 'color' '#66b266' }
                    { 'value' 960 'color' '#ffdb99' }
                    { 'value' 1210 'color' '#ffa500' }
                    { 'value' 1760 'color' '#ff0000' }
	        ]
	    }
        }
        {
            'title' 'Device AIR3'
            'type' 'line'
            'w' 6 'h' 2 'x' 0 'y' 3
            'macro' <% 'air3' @cerenit/macros/co2 %>
            'options' {
                'thresholds' [
                    { 'value' 400 'color' '#008000' }
                    { 'value' 600 'color' '#329932' }
                    { 'value' 800 'color' '#66b266' }
                    { 'value' 960 'color' '#ffdb99' }
                    { 'value' 1210 'color' '#ffa500' }
                    { 'value' 1760 'color' '#ff0000' }
                ]
           }
        }
    ]
}
{ 'url' 'https://w.ts.cerenit.fr/api/v0/exec'  }
@senx/discovery2/render
%>

Le résultat est alors :

warp10 - dashboard IoT CO2

Bilan de ce que nous avons vu :

  • Comment monter son capteur de CO2 en profitant des ressources mises à disposition par le projet "Nous Aérons",
  • Comment envoyer les données du capteur vers le service ThingSpeak,
  • Comment récupérer les données du service ThingSpeak via le protoole MQTT et les stocker dans Warp 10,
  • Comment créer un dashboard Discovery avec une macro permettant de récupérer les données et mettre en place un système de seuils.

L'ensemble des fichiers peuvent être récupérés depuis cerenit/iot-air-quality.

Web, Ops, Data et Time Series - Novembre 2021

postgresqltimeseriestimecalewarp10warpstudioinfluxdb

Containers & Orchestration

  • Announcing General Availability of HashiCorp Nomad 1.2 : Arrivée des "system batchs jobs" prévu pour gérer des jobs à destination du cluster nomad en lui même (purge, backup, etc) et non des clients. Cette version apporte également des améliorations au niveau de l'interface, ainsi que les "nomad pack", format de distribution de vos applications à destination de nomad.

IoT

Monitoring & Observabilité

Time Series

Annonces & Produits :

  • Timescale 2.5.0 : support de Postgresql 14, continuous aggregates for distributed hypertables (la fonction fonctionne donc maintenant en multi-nodes) et support des timezone pour la fonction time_bucket_ng
  • Warp Studio 2.0.6 : version mineure du studio pour la gesion de CORS-RFC1918 ; c'est pour utiliser le studio avec vos instances locales depuis Chrome 92 (et bientôt les autres navigateurs) du fait des restrictrions d'accès mises en place dans les navigateurs.
  • Release Announcement: InfluxDB OSS 2.1.0 | InfluxData : Arrivée des annotations et des notebooks, le client influx n'est plus distribué avec le serveur (sauf dans l'image Docker), améliorations de flux, amélioration de l'API et de la CLI et mise à jour de l'extension VSCode.
  • Announcing PyCaret’s New Time Series Module :la librairie "low code" de machine learning PyCaret se dote d'un module de gestion de séries temporelles comprenant 30+ modèles (ARIMA, SARIMA, FBProphet, etc) et fonctions.

Articles :

Ma comptabilité, une série temporelle comme les autres - partie 6 - Les FEC et le compte de résultat

warp10timeseriescomptabilitérésultatfecdashboarddiscovery

Suite de notre épopée :

Dans ce sixième et dernier billet pour cette série, nous continuons avec les Fichier d'Ecritures Comptables (FEC) pour produire le compte de résultat et déterminer ainsi le bénéfice de l'exercice en cours. Il faut donc prendre toutes les opérations en classe 6 (charges) et 7 (produits). Pour chaque classe de compte, il peut y avoir des crédits ou des débits (ex pour un compte de classe 7 : un avoir sur une facture émise). C'est donc un chouilla plus compliqué que le compte de trésorerie.

Depuis le dernier billet, j'ai légèrement fait évoluer le modèle de données :

  • Initialement, j'avais : <société>.<bilan ou resultat>.<classe de compte>.<type d'opération: credit ou debit>
  • Cela a évolué vers : <société>.<bilan ou resultat>.<classe de compte> ; le type d'opération est maintenant un label

Pour un crédit de 100€ avec une référence de pièce à 1234 pour le compte 706, on passe donc de :

<Timestamp de l'écriture comptable>// cerenit.resultat.706.credit{PieceRef=1234} 100

à :

<Timestamp de l'écriture comptable>// cerenit.resultat.706{PieceRef=1234, operation=credit} 100

Compte de résultat

"<readToken>" "readToken"  STORE

// Récupération de toutes les opérations de crédit pour les comptes charges (classe 6xx)
// Le SORT permet d'être sur d'avoir toutes les opérations triées par date
// Stockage du résultat dans une variable
[ $readToken '~comptabilite.resultat.6.*' { "operation" "credit" } '2020-01-01T00:00:00Z' '2020-12-31T23:59:59Z' ] FETCH
MERGE
SORT
'charges_credit' RENAME
'charges_credit' STORE


// Récupération de toutes les opérations de débit pour les comptes charges (classe 6xx)
// Le SORT permet d'être sur d'avoir toutes les opérations triées par date
// Stockage du résultat dans une variable
[ $readToken '~comptabilite.resultat.6.*' { "operation" "debit" } '2020-01-01T00:00:00Z' '2020-12-31T23:59:59Z' ] FETCH
MERGE
SORT
'charges_debit' RENAME
'charges_debit' STORE

// Fusion des deux listes de séries en une liste qui va avoir l'ensemble des opérations
// Les opérations de débit sont mis en valeur négative du calcul du solde
// Le SORT permet d'être sur d'avoir toutes les opérations triées par date
// Stockage du résultat dans une variable qui contient l'ensemble des opérations
[
  $charges_debit -1 *
  $charges_credit
] MERGE
SORT
'charges_flux' RENAME
'charges_flux' STORE

// Même opération pour les comptes de produit (7xx)
[ $readToken '~comptabilite.resultat.7.*' { "operation" "credit" } '2020-01-01T00:00:00Z' '2020-12-31T23:59:59Z' ] FETCH
MERGE
SORT
'produits_credit' RENAME
'produits_credit' STORE

[ $readToken '~comptabilite.resultat.7.*' { "operation" "debit" } '2020-01-01T00:00:00Z' '2020-12-31T23:59:59Z' ] FETCH
MERGE
SORT
'produits_debit' RENAME
'produits_debit' STORE

[
  $produits_debit -1 *
  $produits_credit 
] MERGE
SORT
'produits_flux' RENAME
'produits_flux' STORE

// Fusion des 2 flux d'opérations (charges et produits) pour avoir une vision temporelle de ces opérations
// Le SORT permet d'être sur d'avoir toutes les opérations triées par date
// Renommage de la série en "compte_resultat" qu'elle va permettre de batir
// Somme cumulée de l'ensemble des opérations pour avoir un solde à date
// Stockage sous la forme d'une variable
// Affichage de la variable
[
  $produits_flux
  $charges_flux
] MERGE
SORT
'compte_resultat' RENAME
[ SWAP mapper.sum MAXLONG 0 0 ] MAP
'compte_resultat' STORE
$compte_resultat

Ce qui nous donne dans le Studio :

warp10 - compte de résultat

Du précédent billet et ce celui-ci, nous avons donc :

  • Un compte de résultat annuel
  • Un compte de trésorerie annuel

Tout ce qu'il faut donc pour faire un dashboard avec Discovery. Il faut dire que le billet Covid Tracker built with Warp 10 and Discovery et dans une moindre mesure Server monitoring with Warp 10 and Telegraf donnent accès à plein d'options pour réaliser ses dashboards.

Macros

Je pourrais mettre le code de mes requêtes directement dans les dashboards mais j'aime pas trop quand des tokens se balladent dans les pages web. Du coup, je vais déporter le code dans des macros. J'ai églément rendu les macro dynamiques dans le sens où elles prennent une année en paramètre pour afficher les données de l'année en question.

On a déjà vu le fonctionnement des macros précédemment, je ne reviendrais donc pas dessus.

La macro du compte de résultat à titre d'exemple :

<%
    {
        'name' 'cerenit/accountancy/compte-resultat'
        'desc' 'Function to calculate the cumulative benefit (or loss) of the company'
        'sig' [ [ [ [  'year:LONG' ] ]  [ 'result:GTS' ] ] ]
        'params' {
            'year' 'Year, YYYY'
            'result' 'GTS'
        }
        'examples' [
    <'
2020 @cerenit/accountancy/compte-resultat
    '>
    ]
    } INFO

    // Actual code
    SAVE 'context' STORE

    TOLONG // When called from dashboard, it's a string - so convert paramter to LONG first
    'year' STORE // Save parameter as year
    
    // Compute 1st Jan of given year
    [ $year 01 01 ] TSELEMENTS-> ISO8601 
    'start' STORE
    
    // Compute 31 Dec of given year
    [ $year 12 31 23 59 59 ] TSELEMENTS-> ISO8601 
    'end' STORE
    
    "<readToken>" "readToken"  STORE

    [ $readToken '~comptabilite.resultat.6.*' { "operation" "credit" } $start $end ] FETCH
    MERGE
    SORT
    'charges_credit' RENAME
    'charges_credit' STORE

    [ $readToken '~comptabilite.resultat.6.*' { "operation" "debit" } $start $end ] FETCH
    MERGE
    SORT
    'charges_debit' RENAME
    'charges_debit' STORE

    [
    $charges_debit -1 *
    $charges_credit
    ] MERGE
    SORT
    { NULL NULL } RELABEL
    'charges_flux' RENAME
    'charges_flux' STORE

    [ $readToken '~comptabilite.resultat.7.*' { "operation" "credit" } $start $end ] FETCH
    MERGE
    SORT
    'produits_credit' RENAME
    'produits_credit' STORE

    [ $readToken '~comptabilite.resultat.7.*' { "operation" "debit" } $start $end ] FETCH
    MERGE
    SORT
    'produits_debit' RENAME
    'produits_debit' STORE

    [
    $produits_debit -1 *
    $produits_credit 
    ] MERGE
    SORT
    { NULL NULL } RELABEL
    'produits_flux' RENAME
    'produits_flux' STORE

    [
    $produits_flux
    $charges_flux
    ] MERGE
    SORT
    'compte_resultat' RENAME
    [ SWAP mapper.sum MAXLONG 0 0 ] MAP
    'compte_resultat' STORE
    $compte_resultat

    $context RESTORE
%>
'macro' STORE
$macro

Comme le décrit l'exemple, si on veut le compte de résultat de l'année 2020, on utilisera le code suivant :

2020 @cerenit/accountancy/compte-resultat

J'ai profité de ce billet pour utiliser Warpfleet Synchronizer & Warpfleet Resolver pour simplifier le déploiement des macros ; cela explique que les signatures pour appeler les macros changent par la suite dans le dashboard.

Dashboards

Ci-après le code du dashboard :

<%
{
    'title' 'Comptabilité CérénIT'
    'description' 'Trésorerie et compte de résultat'
    'vars' {
        'myYear' 2020
    }    
    'tiles' [
        {
            'title' 'Informations'
            'type' 'display'
            'w' 11 'h' 1 'x' 0 'y' 0
            'data' {
                'data' 'R&eacute;sultat de la s&eacute;rie <a href="https://www.cerenit.fr/blog/premiers-pas-avec-warp10-comptabilite-et-previsions/">Ma comptabilit&eacute;, une s&eacute;rie temporelle comme les autres</a> et de l&apos;ingestion des Fichiers d&apos;&eacute;critures comptables.'
                'globalParams' { 'timeMode' 'custom' }
            }
        }
        {
            'title' 'Année'
            'type' 'input:list'
            'w' 1 'h' 1 'x' 11 'y' 0
            'data' {
                'data' [ '2017' '2018' '2019' '2020' ]
                'events' [ { 'type' 'variable' 'tags' 'year' 'selector' 'myYear' } ]
                'globalParams' { 'input' { 'value' '2020' } }   
            }
        }
        {
            'title' 'Trésorerie (annuel)'
            'type' 'line'
            'w' 6 'h' 2 'x' 0 'y' 1
            'macro' <% $myYear @cerenit/macros/treso %>
            'options' { 'eventHandler' 'type=(variable),tag=year' }
        }
        {
            'title' 'Compte de résultat (annuel)'
            'type' 'line'
            'w' 6 'h' 2 'x' 6 'y' 1
            'macro' <% $myYear @cerenit/macros/compteresultat %>
            'options' { 'eventHandler' 'type=(variable),tag=year' }
        }
        {
            'title' 'Trésorerie (pluri-annuelle)'
            'type' 'line'
            'w' 12 'h' 2 'x' 0 'y' 3
            'macro' <% [ 2017 $myYear ] @cerenit/macros/treso_multi %>
            'options' { 'eventHandler' 'type=(variable),tag=year' }
        }  
    ]
}
{ 'url' 'https://w.ts.cerenit.fr/api/v0/exec'  } 
@senx/discovery2/render
%>

et son rendu :

warp10 - dashboard

Dans le bloc global du dashboard, on définir une variable myYear, initialisée à 2020. Cette variable est mise à jour dynamiquement lorsque l'on choisit une valeur dans la liste déroulante du bloc "Année".

<%
{
    'title' 'Comptabilité CérénIT'
    'description' 'Trésorerie et compte de résultat'
    'vars' {
        'myYear' 2020
    }    
    ...

Le bloc Année justement :

        {
            'title' 'Année'
            'type' 'input:list'
            'w' 1 'h' 1 'x' 11 'y' 0
            'data' {
                'data' [ '2017' '2018' '2019' '2020' ]
                'events' [ { 'type' 'variable' 'tags' 'year' 'selector' 'myYear' } ]
                'globalParams' { 'input' { 'value' '2020' } }   
            }
        }

C'est une liste déroulante (type: input:list) avec pour valeurs les années 2017 à 2020. Par défaut, elle est initialisée à 2020. Via le mécanisme des "events", lorsqu'une valeur est choisie, celle-ci est émise sous la forme d'une variable, nommée myYear et ayant pour tag la valeur year.

Ainsi, si je sélectionne 2017 dans la liste, la variable myYear prendra cette valeur. Maintenant que la valeur est définie suite à mon choix et émise vers le reste du dashboard, il faut que les autres tiles récupèrent l'information.

Regardons le tile Trésorerie :

        {
            'title' 'Trésorerie (annuel)'
            'type' 'line'
            'w' 6 'h' 2 'x' 0 'y' 1
            'macro' <% $myYear @cerenit/macros/treso %>
            'options' { 'eventHandler' 'type=(variable),tag=year' }
        }

La récupération de la variable se fait via la proriété options et la récupération de l'eventHandler associé et défini précédemment.

Une fois récupérée, la variable myYear peut être utilisée dans le bloc macro et le tile est mis à jour dynamiquement.

En conséquence :

  • Les deux premiers tiles afficheront le solde de trésorerie et le compte de résultat de l'année sélectionnée
  • Le dernier tile affichera la trésorerie depuis début 2017 jusqu'à la fin d'année sélectionnée. Donc au minimum 2017 et au maximum 2017 > 2020.

Ainsi s'achève cette série sur les données comptable et les séries temporelles. Des analyses complémentaires pourraient être menées (analyse de stocks, réparition d'activité, etc) mais mes données comptables sont insuffisantes pour en valoir l'intérêt. J'espère néanmoins que cela aura sucité votre intérêt et ouvert des horizons.

Cette série fut aussi l'occasion de faire un tour de la solution Warp 10 et de voir :

  • l'ingestion de données,
  • la manipulation et l'analyse des données,
  • la mise en place de dashboards,
  • la projection de données avec les algorythmes de machine learning.

Si vous souhaitez poursuivre l'aventure et le sujet, n'hésitez pas à me contacter.

Web, Ops, Data et Time Series - Octobre 2021

postgresqltimeseriesbidatataskdbtmetabasesingertimescaleinfluxdbquasardbvectornomadclever-cloudyieldpivotwarp10flowsvscodekapacitorchronograftelegrafclickhouse

BI

Code

  • vscode.dev : l'ère de l'IDE dans le navigateur continue après gitpod ou githuab codspaces, c'est au tour de vscode.dev qui permet d'avoir une IDE dans son navigateur. Affaire à suivre...

Observabilité et monitoring

Orchestration & conteneurs

  • damon, un dashboard pour nomad en ligne de commande.
  • Announcing HashiCorp Nomad 1.2 Beta : ajout des "System Batch" qui sont des (petits) jobs globaux au cluster, des améliorations de l'interface et l'ajout des Nomad Pack, une sorte de catalogue d'applications prêtes à être déployées dans votre cluster.

SQL

Sécurité

Time Series

Annonces & Produits :

Articles & Vidéos :

Pour le retour sur les InfluxDays North America qui ont lieu cette semaine, ce sera pour un prochain billet ou édition du Time Series France Meetup

n8n & Warp 10 - Automatisez vos manipulations de séries temporelles

n8nautomationwarp10timeseriesworkflow

Il y a quelques temps et sachant que j'utilisais n8n pour automatiser la génération des brèves du BigData Hebdo, Mathias m'a demandé s'il était possible de faire la même chose entre n8n et Warp 10 qu'avec node-red et Warp 10.

La réponse est oui mais voyons comment faire cela.

n8n/Warp 10 - workflow

Pour ceux qui ne connaissent pas n8n, c'est un clone open source (sous licence fair-code) à des services comme Zapier ou IFTTT. Il permet d'automatiser des processus via la création de workflows. Ces workflows sont composés d'étapes et d'actions. n8n dispose d'un grand nombre de connecteurs vers les différents services existants, des opérateurs génériques (faire un appel http, appliquer une fonction), des opérateurs logiques (si, etc), des opérateurs de transformation de données, etc. Chacun de ces éléments est implémenté via une node. A chaque étape du workflow, une node est instanciée puis paramétrée. Les nodes peuvent être reliées entre-elles et la sortie d'une node peut alimenter la suivante.

Le workflow se veut basique et va être le suivant :

  • Récupération d'une entrée de monitoring CPU dans Warp 10
  • Si la valeur est supérieure ou égale à 90%, alors création d'une entrée dans une série dédiée à cet effet.

Ce n'est pas le workflow le plus passionnant du monde, mais cela permet de faire deux appels à l'API HTTP de Warp 10 :

  • Le premier permet de tester l'exécution de code WarpScript via l'API /api/v0/exec ; vu le code, j'aurais pu passer par /api/V0/fetch mais cela me permet de tester l'exécution de code WarpScript.
  • Le second utilisera l'API /api/v0/update pour insérer une donnée dans une série. Cela permet de tester le passage du token d'authentification via un header.

Pour commencer le workflow, la donnée de départ est la valeur en pourcentage du métrique "CPU Idle" d'un de mes serveurs.

En WarpScript, cela donne:

'<readToken>' 'readToken'  STORE

[ $readToken 'crnt-ovh.cpu.usage_idle' { "host" "crnt-d10-gitlab" "cpu" "cpu-total" }  NOW -1 ] FETCH

Et la réponse :

[
	[{
		"c": "crnt-ovh.cpu.usage_idle",
		"l": {
			"host": "crnt-d10-gitlab",
			"cpu": "cpu-total",
			"source": "telegraf",
			".app": "io.warp10.bootstrap"
		},
		"a": {},
		"la": 0,
		"v": [
			[1634505650000000, 91.675025]
		]
	}]
]

n8n dispose d'une node HTTP Request, qui comme son nom l'indique permet de faire des requêtes HTTP vers un serveur distant. Toutefois, il n'est pas possible de passer notre code WarpScript directement dans l'appel HTTP. Il faut créer un objet avec le code WarpScript et passer ensuite l'objet créé et le nom de la propriété contenant le code WarpScript à la node HTTP Request.

Pour stocker le code WarpScript dans un objet, il faut utiliser la node Set. Une fois la node Set ajoutée dans le workflow, aller dans Parameters > Add Value > Type: String

Saisir:

  • Name: warpscript
  • Value: le code WarpScript ci-dessus

En cliquant sur "Execute Node", on peut valider la variable (la partie grisée étant mon token) :

n8n/Warp 10 - set node

On peut maintenant ajouter une node HTTP Request dans le workflow et la relier à la node Set nouvellement créée. Ainsi, la node HTTP Request aura directement accès au résultat de la node Set.

Pour les ajustements à faire :

  • Parameters :
    • Request Method : POST
    • URL : http://url.de.votre.instance.warp.10/api/v0/exec
    • Activer la case JSON/RAW Parameters
  • Options :
    • Add Option > Mime Type : text/plain
    • Add Option > Body Content Type : RAW/Custom
    • Body Parameters > Add Expression > Current Node > Input Data > JSON > warpscript (les colonnes de droites doivent se remplir avec la clé en haut et la valeur en dessous ; cliquer sur la croix pour revenir à l'écran précédent)

En cliquant sur "Execute Node", le résultat de la requête est visible (la partie grisée étant un bout de mon token) :

n8n/Warp 10 - http request node

On retrouve notre objet JSON mais il est imbriqué dans des Array Javascript, on va applanir tout ça et extraire le timestamp et la valeur du cpu via l'ajout de deux nodes Function que l'on relie à la node HTTP Request. La node Function permet d'exécuter du code javascript sur les données et de réaliser des transformations que l'on ne peut pas forcément faire avec les autres nodes. Cela n'étant pas le coeur du sujet, cela ne sera pas détaillé.

A l'issue des deux exécutions, les données sont réduites à ce qui suit :

[{
	"ts": 1634503660000000,
	"cpu": 93.219488
}]

La node IF ne sera pas détaillée non plus ; elle sert juste à introduire un semblant de logique dans le workflow. En l'occurence, si la valeur de "cpu" >= 90, alors le test est considéré comme vrai et faux sinon. Dans le cas où c'est faux, une node noOp a été ajoutée pour matérialiser la fin du workflow.

Dans le cas où le test est vrai (valeur de "cpu" >= 90), on veut alors insérer le timestamp et la valeur dans une autre série sur une instance Warp 10. Comme précédemment, cela va se faire en deux fois:

  • Préparation de la donnée au format GTS Input Format et mise à disposition sous la forme d'une propriété
  • Exécution de l'appel HTTP.

On ajoute une node Set, ensuite dans Parameters > Add Value > Type: String

Saisir:

  • Name: gtsinput
  • Value > Add Expression et dans la partie expression, on met:
{{$json["ts"]}}// n8n{} {{$json["cpu"]}}

Ce qui nous donne l'écran suivant :

n8n/Warp 10 - set node 2 - expression

On revient à l'écran précédent en cliquant sur la croix à droite et en exécutant la node, on obtient :

n8n/Warp 10 - set node 2 - result

Ensuite, il faut ajouter une nouvelle node HTTP Request avec le paramétrage suivant :

  • Parameters :
    • Authentication > Header Auth
    • Request Method : POST
    • URL : http://url.de.votre.instance.warp.10/api/v0/update
    • Activer la case JSON/RAW Parameters
  • Options :
    • Add Option > Mime Type : text/plain
    • Add Option > Body Content Type : RAW/Custom
    • Add Option > Full Response et activer là pour voir la réponse complète de votre instance Warp 10
    • Body Parameters > Add Expression > Current Node > Input Data > JSON > gtsinput (les colonnes de droites doivent se remplir avec la clé en haut et la valeur en dessous ; cliquer sur la croix pour revenir à l'écran précédent)

En haut du menu de gauche, une section "Credentials" est apparue ; dans la liste déroulante, cliquer sur "Create new" et remplissez le formulaire de la façon suivante:

  • Name: X-Warp10-Token
  • Value : votre token Warp 10 avec des droits d'écriture

n8n/Warp 10 - auth token

Revener ensuite dans votre node HTTP Request dont on peut lancer l'exécution et on obtient :

n8n/Warp 10 - auth token

Si je vais ensuite voir le contenu de ma série n8n :

'<readToken>' 'readToken'  STORE

[ $readToken 'n8n' {}  NOW -100 ] FETCH

J'obtiens comme réponse :

[
	[{
		"c": "n8n",
		"l": {
			".app": "io.warp10.bootstrap"
		},
		"a": {},
		"la": 0,
		"v": [
			[1634503660000000, 93.219488],
			[1634502790000000, 94.808468],
			[1634501690000000, 93.7751],
			[1634501550000000, 91.741742],
			[1634478300000000, 92.774711]
		]
	}]
]

Avec une entrée pour chaque exécution du workflow sous réserve d'avoir un "CPU idle" >= 90%.

En conclusion, nous pouvons retenir que :

  • Il est facile d'intégrer Warp 10 dans un workflow n8n grâce à l'API HTTP de Warp 10 et la node HTTP Request de n8n
  • Pour interagir avec Warp 10, il faut d'abord créer un objet portant le code WarpScript ou les donées au format GTS Input pour l'envoyer ensuite à Warp 10 via la node HTTP Request
  • Même si cela n'a pas été détaillé, il est possible de manipuler les données issues de Warp 10 ou de préparer des données à destination de Warp 10.

Le workflow était très basique pour permettre de montrer rapidement cette intégration. Des workflows plus complexes et riches sont laissés à votre imagination :

  • sur la base d'un événement avec la node Webhook : insertion de données ou lancement d'une analyse suite à un événement, etc.
  • sur la base d'une tache planifiée avec la node Cron : analyse de données, etc
  • ou depuis Warp 10, on peut appeler n8n en utilisant HTTP, URLFETCH ou WEBCALL pour lancer l'exécution d'un workflow ou récupérer le résultat d'un workflow.

Web, Ops, Data et Time Series - Septembre 2021

warp10automltelegrafdiscoveryanomaliepythondockerpodmannpmnodejsjvmadoptopenjdkquestdbcloudflareawss3dockerwarp10discoverytinygocircuitpythonnrtsearchelasticsearchinfluxdb

Cloud

Container et Orchestration

  • Docker is Updating and Extending Our Product Subscriptions : TL;DR: Docker Desktop requiert un abonnement Pro/Team/Business si vous êtes une organisation de plus de 250 employés et 10 Millions de Chiffre d'affaires. L'abonnement commence à 5$/mois/utilisateur. Ce changement démarre au 31/08/2021 avec une période de grâce jusqu'au 31/01/2022. Si certains crient au scandale, il faut bien voir tout ce que Docker Desktop fourni et le travail d'intégration que cela représente. Il faut bien que la société Docker vive pour maintenir ses produits. Tout cela se retrouve dans The Magic Behind the Scenes of Docker Desktop.
  • Podman Release v3.3.0 : cette version apporte "podman machine" qui devrait notamment permettre un meilleur support de podman sous OSX avec une couche de virtualisation intermédiaire dans la même veine que Docker Desktop dans le but de proposer une intégration native. Cela ne semble pas fonctionner sur un Apple M1 à cause de l'incompatibilité actuelle de Virtual Box avec ces puces. Si Podman peut certes être une alternative à Docker (Desktop), cela montre aussi le travail d'intégration réalisé par Docker Inc notamment pour le support des Apple M1.
  • Podman on Macs Update : statut sur le support de Podman dans un context MacOS/Intel, Windows/Intel et le reste à faire pour MacOS/M1. En attendant, podman machine est supporté nativement sur Linux et MacOS/Intel et en remote client sur Windows/Intel.
  • How Docker broke in half : restrospective sur Docker de ses origines à aujourd'hui et quelques pistes pour le futur...
  • Docker Compose V2.0.0 : L'outil a été réécrit en go plutôt qu'en python et se veut accessible via la docker cli en tant que sous système (ie docker compose xxx). Pour Windows & OSX, il est fourni avec Docker Desktop.
  • Accelerating New Features in Docker Desktop où l'on parle de l'arrivée prochaine d'un Docker Desktop For Linux !!
  • No, we don’t use Kubernetes : un billet rafraichissant qui rappelle que Kubernetes n'est pas l'alpha et l'omega de l'infrasatructure.

IoT

JVM

Recherche

Sécurité

Time Series

Web, Ops, Data et Time Series - Septembre 2021

warp10automltelegrafdiscoveryanomaliepythondockerpodmannpmnodejsjvmadoptopenjdkquestdbcloudflareawss3dockerwarp10discoverytinygocircuitpythonnrtsearchelasticsearchinfluxdb

Cloud

Container et Orchestration

  • Docker is Updating and Extending Our Product Subscriptions : TL;DR: Docker Desktop requiert un abonnement Pro/Team/Business si vous êtes une organisation de plus de 250 employés et 10 Millions de Chiffre d'affaires. L'abonnement commence à 5$/mois/utilisateur. Ce changement démarre au 31/08/2021 avec une période de grâce jusqu'au 31/01/2022. Si certains crient au scandale, il faut bien voir tout ce que Docker Desktop fourni et le travail d'intégration que cela représente. Il faut bien que la société Docker vive pour maintenir ses produits. Tout cela se retrouve dans The Magic Behind the Scenes of Docker Desktop.
  • Podman Release v3.3.0 : cette version apporte "podman machine" qui devrait notamment permettre un meilleur support de podman sous OSX avec une couche de virtualisation intermédiaire dans la même veine que Docker Desktop dans le but de proposer une intégration native. Cela ne semble pas fonctionner sur un Apple M1 à cause de l'incompatibilité actuelle de Virtual Box avec ces puces. Si Podman peut certes être une alternative à Docker (Desktop), cela montre aussi le travail d'intégration réalisé par Docker Inc notamment pour le support des Apple M1.
  • Podman on Macs Update : statut sur le support de Podman dans un context MacOS/Intel, Windows/Intel et le reste à faire pour MacOS/M1. En attendant, podman machine est supporté nativement sur Linux et MacOS/Intel et en remote client sur Windows/Intel.
  • How Docker broke in half : restrospective sur Docker de ses origines à aujourd'hui et quelques pistes pour le futur...
  • Docker Compose V2.0.0 : L'outil a été réécrit en go plutôt qu'en python et se veut accessible via la docker cli en tant que sous système (ie docker compose xxx). Pour Windows & OSX, il est fourni avec Docker Desktop.
  • Accelerating New Features in Docker Desktop où l'on parle de l'arrivée prochaine d'un Docker Desktop For Linux !!
  • No, we don’t use Kubernetes : un billet rafraichissant qui rappelle que Kubernetes n'est pas l'alpha et l'omega de l'infrasatructure.

IoT

JVM

Recherche

Sécurité

Time Series

Web, Ops, Data et Time Series - Juin 2021

grafanapostgresqlterraformvectorwarp10quasardbinfluxdbk6telegrafwarpstudioconsulchronograftraefiklens

Automatisation

Conteneurs et orchestration

  • Lens 5 Released - Release Notes : le "Kubernetes IDE" passe en version 5 avec diverses améliorations dont notamment du collaboratif avec du partage de contexte kubernetes.
  • Traefik 2.5, quoi de neuf ? : actuellement en RC2, la version 2.5.0 de Traefik devrait apporter un support expérimental d'HTTP/3, le support des plugins privés, la mise à jour des CRD Kubernetes et les métriques par routeur (désactivé par défaut)

Monitoring & Observabilité

Postgresql

  • PostgreSQL as a Microservice : on pense souvent qu'une base de données permet la persistence des données. Ce n'est pas le principal enjeu d'une base de données mais la gestion de la concurrence.

Time Series

Ma comptabilité, une série temporelle comme les autres - partie 5 - Les FEC et le compte 512

warp10timeseriescomptabilitétrésoreriebanquefec

Suite de notre épopée :

Dans ce cinquième billet, nous allons parler de Fichier d'Ecritures Comptables (FEC) et d'un compte simple à analyser : le compte 512 qui correspond à votre compte en banque.

Le Fichier des Ecritures Comptables (FEC)

Le Fichier des Ecritures Comptables (FEC) est un format de fichier normalisé. Sa spécification est disponible et grosso modo, ce qu'il faut en savoir à ce stade :

  • C'est un fichier TSV (un CSV avec les champs séparés par des tabulations)
  • Il contient 18 champs permettant de décrire les différentes écritures comptables :
    • JournalCode
    • JournalLib
    • EcritureNum
    • EcritureDate
    • CompteNum
    • CompteLib
    • CompAuxNum
    • CompAuxLib
    • PieceRef
    • PieceDate
    • EcritureLib
    • Debit
    • Credit
    • EcritureLet
    • DateLet
    • ValidDate
    • Montantdevise
    • Idevise

En partant de ces informations et après quelques précisions fournies par mon expert-comptable Fabrice Heuvrard sur le fichier, nous avons convenu de commencer par l'analyse du compte 512 correspondant aux opérations bancaires. Facile à calculer (somme des crédits - somme des débits) et facile à vérifier, il me suffit de regarder mon compte en banque et/ou mon bilan en fin d'année.

Continuant à utiliser Warp 10 pour y stocker mes séries temporelles, j'ai réalisé un script en Go qui prend le fichier FEC en entrée et envoie les données dans Warp 10 avec le formalisme suivant : <société>.<bilan ou resultat>.<classe de compte>.<type d'opération: credit ou debit> :

  • <société> est juste le début de l'arborescence
  • <bilan ou résultat> : le Plan Comptable Général Francais défini que si les comptes de classe 1 à 5 sont des classes de bilan et les classes 6 et 7 sont des classes de compte de résultat. Je suis donc le même principe de séparation des comptes et défiinr la valeur bilan et resultat. Le compte 512 que nous allons étudier commençant par 5, c'est un compte de bilan. Il sera donc dans la série cerenit.bilan.*
  • <classe de compte> : le plan comptable général est normalisé sur ces trois premiers chiffres. Les trois suivants sont à la discrétion du comptable. Du coup, pour ne pas avoir une série par code comptable, je retrouve par classe du plan de compte. Ainsi, toutes les opérations ayant le code 512xxx se retrouvera dans la série cerenit.bilan.512.*
  • <type d'opération: crédit ou débit> : suivant si l'opération est un débit ou crédit, cela prend la valeur adéquat. Ainsi, toutes les opérations ayant le code 512xxx se retrouvera dans la série cerenit.bilan.512.credit ou ``cerenit.bilan.512.debit`
  • Le montant de l'écriture comptable sera la valeur associée à mon point dans la série.
  • Les autres informations seront mises sous la forme de labels (ie meta données de mon point) pour d'éventuelles analyses ultérieures. Il s'agit de couple clé/valeurs.

Ainsi, un crédit de 100€ avec une référence de pièce à 1234 sera représenté sous la forme :

<Timestamp de l'écriture comptable>// cerenit.bilan.512.credit{PieceRef=1234} 100

La modélisation est peut être un peu naive à ce stade, il sera toujours temps de la faire évoluer dans un second temps mais a priori :

  • J'ai ma séparation bilan / compte de résultat
  • J'ai ma séparation par classe de compte
  • J"ai ma séparation débit / crédit
  • au pire via les labels, j'ai des axes complémentaires de recherche / sélection.

Contrôle des données

Avant de commencer la moindre analyse, j'ai voulu vérifier l'intégrité de mes données.

"<readToken>" "readToken"  STORE

// Récupération des données de 2020 pour le compte 512
[ $readToken 'cerenit.bilan.512.credit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
// Fusion de l'ensemble des séries temporelles en une seule série
MERGE
// Calcul de la somme de l'ensemle des valeurs de la séries -
// MAXLONG permet de tout récupérer sans calculer la taille exacte de la liste (pour peu que votre liste soit plus petite que la valeur de MAXLONG)
// 1 permet de ne sortir qu'une valeur en sortie
[ SWAP mapper.sum MAXLONG MAXLONG 1 ] MAP
// C'est une liste avec une liste à 1 élément, on "applatit" tout ça
MERGE
VALUES
0 GET
// On stocke la valeur finale dans totalCredit
'totalCredit' STORE

// Même opération sur les débits
[ $readToken 'cerenit.bilan.512.debit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
[ SWAP mapper.sum MAXLONG MAXLONG 1 ] MAP
MERGE
VALUES
0 GET
'totalDebit' STORE

// Calcul du solde
$totalCredit $totalDebit -

Cela me donne : 27746.830000000075

Exploration de la trésorerie

"<readToken>" "readToken"  STORE

// Récupération des données de 2020 pour le compte 512
[ $readToken 'cerenit.bilan.512.credit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
// Fusion de l'ensemble des séries temporelles en une seule série
MERGE
// Tri des points par date
SORT
// Renommage de la série
'credit' RENAME
// Suppression des labels
{ NULL NULL } RELABEL
// Stockage dans une variable
'credit' STORE

// Même opération sur les débits
[ $readToken 'cerenit.bilan.512.debit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
SORT
'debit' RENAME
{ NULL NULL } RELABEL
'debit' STORE

// Affichage des deux séries
$credit
$debit

// Création de la série de mouvements
$credit $debit -
'mouvements' RENAME

Cela nous donne ces courbes:

warp10 - exploration treso

Mais on voit bien à fin décembre qu'il y a des opérations de débit qui ne sont pas prises en compte dans le solde (la ligne orange s'arrête avant la verte).

En cherchant un peu, je me dis qu'il faudrait que je calcule une nouvelle série avec tous les éléments de crédit et débit et faire l'addition de tout cela. Je vois également que FLATTEN (doc)permet de fusionner plusieurs listes en une seule. Mais finalement, seul MERGE sera nécessaire.

Cela me donne la piste suivante :

"<readToken>" "readToken"  STORE

// Récupération des données de 2020 pour le compte 512
[ $readToken 'cerenit.bilan.512.credit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
SORT
'credit' RENAME
{ NULL NULL } RELABEL
'credit' STORE

// Même opération sur les débits
[ $readToken 'cerenit.bilan.512.debit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
SORT
'debit' RENAME
{ NULL NULL } RELABEL
// Je multiplie les debits par -1 pour pouvoir faire l'opération de solde ensuite
[ SWAP -1 mapper.mul 0 0 0 ] MAP
'debit' STORE

// Je fusionne les deux séries avec MERGE
[
  $credit
  $debit
] MERGE
// Je trie les éléments par date
SORT
'mouvements' RENAME

Cette fois-ci, mon solde prend bien en compte toutes les opérations de l'année.

warp10 - mouvements treso

Pour la version consolidée avec le solde du compte :

// Récupération des données de 2020 pour le compte 512
[ $readToken 'cerenit.bilan.512.credit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
SORT
'credit' RENAME
{ NULL NULL } RELABEL
'credit' STORE

// Récupération des données de 2020 pour le compte 512
[ $readToken 'cerenit.bilan.512.debit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
SORT
'debit' RENAME
{ NULL NULL } RELABEL
-1 *
'debit' STORE

// Fusion des débits/crédits comme vu précédemment
[
  $credit
  $debit
] MERGE
SORT
'mouvements' RENAME

// On applique mapper.sum sur l'ensemble des points précédents le point qui est considéré
// Le premier point ne va donc prendre que lui même
// Le 2nd point va prendre sa valeur et ajouter celle du précédédent
// Le 3ème point va prendre sa valeur et la somme des points précédents
// Et ainsi de quiste
[ SWAP mapper.sum MAXLONG 0 0 ] MAP

Et le résultat en images :

warp10 - mouvements treso

Et voilà !

Il ne me reste plus qu'à :

  • ingérer les FEC des autres années pour envisager des comparaisons entre les différents exercices comptables, voire du prédictif pour l'exercice en cours et à venir,
  • étendre ces analyses à d'autres comptes maintenant,
  • et à créer les dashboards adéquats avec Discovery.

Web, Ops, Data et Time Series - Mai 2021

hashicorpnomadovhtimeleap secondgitlab-cipythondbtmetabasedatataskwarp10monitoringwasmsécuritéspectretimescalesqlclireadmebootstrapinfluxdatakapacitorchronograf

CI

Cloud

Conteneur et orchestration

  • Announcing General Availability of HashiCorp Nomad 1.1 : 10 nouvelles fonctionnalités au programme (7 en OSS, 3 en entreprise) : surallocation de mémoire (soft et hard limit), les CPU peuvent être réservés en tant que tel (et non plus uniquement via une fraction), amélioration d'UI, amélioration coté support CSI, distinction entre les "readyness checks" et "liveness checks" au niveau des health checks, exécution distante sur AWS Lambda et AWS ECS (tech preview). Pour la version entreprise : supper des namespaces consul, chargement automatique des licences lors du déploiement de nouveaux noeuds, amélioration de l'autoscaling.

Data

Docs

  • readme.so (via MACI #42) : Vous ne savez pas quoi mettre dans votre README ? Ce site est fait pour vous et peut aussi vous aider à réorganiser vos fichiers.

Europe

  • Souveraineté et cloud, quel rapport ? : remise en perspective du cloud souverain et implications des décisions européenes. La remise en cause du Privacy Shield et les clauses contractuelles font qu'au final : "tout transfert de données personnelles sous juridiction américaine est illégal.". La reglementation européene, centré sur le respect des droits des personnes permettrait de fiare un protectionnisme reglementaire dans l'idée de développer un écosystème numérique européen et conforme aux valeurs européennes. A lire et méditer !

License

Système

  • negative leap second news! : une seconde est intercallée de temps à autre pour se resynchroniser avec la rotation terrestre. En général, on ajoutait une seconde. Là, on va retirer une seconde - c'est apparemment la première fois que cela se passe.

Sécurité

Time Series

  • $40 million to help developers measure everything that matters : Timescale annonce une levée en série B de 40 Millions de dollars - environ 2 millions d'instances actives et une dizaine de sorties produits pour le mois de Mai.
  • How we made DISTINCT queries up to 8000x faster on PostgreSQL : dans le cadre de la sortie de TimescaleDB 2.2.1, l'arrivée de "Skip Scan" permet d'accélérer les SELECT DISTINCT entre 28x et 8000x. Cela est valable tant pour les données Timescale que les données natives Postgres. Une contribution upstream est prévue.
  • TimescaleDB 2.3: Improving columnar compression for time-series on PostgreSQL : Après le rajout des ALTER/RENAME des colonnes compressées en 2.1 - le rajout des INSERT avec une compression en deux temps (compression de l'insert en lui même puis recompaction des données au niveau du chunk)
  • QuestDB 6.0 : implémentation de la gestion du Out Of Order, amélioration sur le InfluxDB Inline Protocol ainsi que sur l'UI et la couche SQL.
  • How we achieved write speeds of 1.4 million rows per second : retour plus détaillé sur la gestion du Out Of Order dans QuestDB.
  • InfluxDB OSS and Enterprise Roadmap Update from InfluxDays EMEA : InfluxData juge qu'à partir de la version 2.0.6, la mise à jour depuis une version 1.8 est stable. La version 1.8 sera donc maintenue jusqu'à la fin d'année. Au-delà de cette date, les correctifs ajoutés seront dans la branche master mais il n'y aura plus de packaging de la version 1.8 OSS. Seule la version 1.8 Entreprise aura de nouveaux binaires. Abandon des binaires en 32 bits pour InfluxDB 2.x. Concernant la version Entreprise, InfluxDB 1.9 va apporter des améliorations notamment concernant le support de Flux. Par ailleurs Chronograf 1.9 et Kapacitor 1.6 vont sortir en juin avec diverses améliorations. Ces deux produits seront compatibles avec InfluxDB 2.x pour aider à la montée de version vers InfluxDB 2.x. Enfin, InfluxDB 0SS 2.1 va sortir aussi en juin avec notamment l'ajout des notebooks, les annotations sur les dashboards et des améliorations de Flux.
  • Release Announcement: InfluxDB OSS and InfluxDB Enterprise 1.8.6 : version de maintenance avec une faille de sécurité pour la version Entreprise.
  • Monitorer son infra avec Warp 10 - Partie 1, Partie 2, Partie 3 : Mise en oeuvre des outils de la plateforme Warp 10 pour monitorer son infrastructure. Cela couvre l'installation, la collecte des métriques, l'exploration des données et calcul des premiers métriques, et pour finir la création des dashboards.
  • Mon Linky dans Warp 10 avec un joli dashboard : Ingestion des données issues du Linky dans Warp 10 et présentation de ces données dans un Dashboard Discovery.
  • May 2021: Warp 10 releases 2.8.0 and 2.8.1 - SenX : En résumé (liste non exhaustive, va falloir qqs billets plus détaillés pour comprendre toutes les nouveautés) : Gestion plus fine des "capabilities" au niveau des tokens, Utilisation de FLoWS simplifié, Intégration avec la blockchain Ethereum, Des fonctions de crypto / signature / ..., Des améliorations sur la manipulation de JSON, Une fonction HTTP pour permettre des appels distants, Ajout de mapper.geo.fence pour voir si un point est dans/en dehors d'une zone, Des choses autours des MACRO et plein d'autres améliorations/corrections.
  • Working with GEOSHAPEs: code contest results : le corrigé du concours lancé par SenX autour des GEOSHAPEs dans Warp 10. Concours que j'ai remporté et voici mes réponses : partie 1 & partie 2
  • Wikipedia / Warp 10 : Warp 10 dispose de sa page Wikipedia
  • « Le bateau qui vole » : l’analytique en temps réel au service d’un skipper : de l'utilité des séries temporelles dans le monde de la course au large pour une meilleure appréhension du fonctionnement du bateau et de ses performances. Ce retour d'expérience sera le thème d'une prochaine édition du Time Series France !

Web

  • Bootstrap 5 : nouvelle version majeure du framework Boostrap avec la suppression de la dépendance à JQuery et la fin de support de plein de vieux navigateurs notamment.
← Précédent 1 / 4