CérénIT

Le blog tech de Nicolas Steinmetz (Time Series, IoT, Web, Ops, Data)

Vers de nouveaux horizons...

floveacérénittimeseriesbigdatahebdoiotctociodsiiiot

Je l'évoquais dans le billet "Bilan 2021 et perspectives 2022", je peux en parler maintenant officiellement : j'ai été contacté par Flovea pour piloter le projet Flowbox Interactive et mettre en place l'équipe projet associée.

Après trois mois environ de mission permettant de faire connaissance, d'auditer la solution existante, de définir une roadmap et de mettre en place l'équipe projet, mon recrutement en tant que DSI/CIO de Flovéa est acté depuis début avril. J'ai le plaisir de rejoindre une belle équipe pour réaliser un beau projet tant d'un point de vue technique que d'un point de vue du sens et de son utilité. La seule ombre au tableau étant le contexte de pénurie de composants qui illustre bien la dimension "hard" d'un projet "hardware".

L'activité de CérénIT va donc ralentir puis se mettre en mode minimal ; le temps pour moi de finir quelques activités de support pour un client et ne conserver ensuite que l'infogérance de Compta-Online et un autre projet avec Fabrice Heuvrard à destination des experts comptables.

L'animation du meetup Time Series France sera moins régulière et surement de façon plus opportuniste que précédemment. Je continue à contribuer à BigData Hebdo même si mes contributions au podcast sont minimes depuis le début d'année.

Je remercie tous les clients et les personnes que j'ai pu accompagner pendant ces 6 ans ; j'ai appris énormément de choses grâce à eux et j'ai pu travailler sur des sujets et dans des entreprises sur/pour lesquel(le)s je n'aurais jamais pensé travailler. Je remercie plus particulièrement :

  • Frédéric Rocci, j'aurai du rejoindre Compta Online début 2017, cela ne s'est pas fait mais cela m'a permis de devenir indépendant
  • Vincent Heuschling : il était mon prestataire lorsque j'étais encore chez JCDecaux, il devient mon premier client en 2017 pour lancer les premières fondations de DataTask. On a remis ça en 2020/2021 mais les conditions sanitaires et économiques font que je ne peux pas rester sur le projet fin 2021. C'est grace à cette rencontre que je découvre le podcast BigData Hebdo puis rejoint l'équipe en 2019.
  • Thomas Bosviel, prestataire également chez JCDecaux en 2016/17 et qui me met en contact en 2019 avec Frédéric Mefiant de la SAFT et pouvoir ainsi commencer mon activité "Time Series".
  • Denis Rampnoux pour la mission chez LesFurets.com et Youen Chéné pour la mission chez Saagie.

Ces années ont été très riches et passionnantes mais j'aspirai à aller vers d'autres choses ; le projet et la rencontre avec Flovéa semblent être la réponse que j'attendais. Il est donc temps de tourner la page et de découvrir ces nouveaux horizons.

Bilan 2021 et perspectives 2022

bilanperspectivecérénittimeseriesbigdatahebdoinfluxaceiot

Routine habituelle de début d'année pour la clôture de ce 5ème exercice (déjà !).

Bilan 2021

Au global, une année mitigée qui se termine un peu sur le fil du rasoir au niveau comptable. Pour la partie positive, j'ai l'impression que cette année a été "l'année des possibles" où les efforts commencés les années précédentes commencent à payer. Des premiers projets en Go, des missions Time Series intéressantes et ambitieuses par certains aspects et un projet annexe en Python/Django sur la fin d'année qui consolide différents éléments permettant de gagner en confiance et de réduire un peu ce cher syndrome de l'imposteur avec lequel j'apprends à composer et à dépasser parfois.

CérénIT

D'un point de vue comptable, cela donne :

20212020201920182017Variation n/n-1
Chiffre d'affaires~130 K€~138 K€~150 K€~132 K€~100 K€-6%
Résultat après impôts~1K€~10 K€~13.5 K€~10 K€~20 K€-90% 😱
Jours facturés164175197178160-6%
TJM793€789€761€742€625€+0.5%

Les années précédentes, les missions annexes (missions courtes d'audit/expertise principalement) venaient apporter le bénéfice annuel et la mission principale couvrait les charges. Cette année, avec une réduction de mes missions principales en fin d'année, les missions annexes ont permis de couvrir les charges mais sans permettre de dégager de bénéfices. Un TJM plus élevé sur ces missions annexes permet d'amortir et de compenser la baisse d'activité.

202120202019Variation n/n-1
Chiffre d'affaires Total~130 K€~138 K€~150 K€-6%
Chiffre d'affaires Time Series~25K€~5 K€~2 K€+400% 💪

Une des grandes satisfactions de l'année est incontestablement l'essor de l'activité Time Series. Etre référencé comme InfluxAce m'a apporté une mission chez Axens sur les Shard Duration & Retention Policies et quelques contacts/prospects qui n'ont pas pu aboutir durant cette année. C'est aussi la poursuite de mon accompagnement des équipes de la SAFT pour la 3ème année consécutive et la mise à jour d'InfluxDB OSS v1.x vers v2.x avec un sujet de migration des alertes Kapacitor vers des Tasks en Flux.

Autres activités

Pour le reste, j'ai eu le plaisir de :

Pas de contribution de ma société à un quelconque projet comme en 2020 avec une contribution financière au projet Makair suite à la question du rôle social d'une entreprise dans notre société en temps de COVID. Petite déception à ce niveau-là.

Perspectives 2022

2021 s'est terminée avec une certaine forme de lassitude autour de l'activité de freelance (et de la solitude du freelance, même si j'ai toujours cherché à gommer cet aspect dans mes missions) et des activités autour de l'automatisation/industrialisation comme une fin en soit. J'éprouvais le besoin de créer un commun avec des personnes et je voulais accélérer la bascule vers l'IoT industriel mais ne sachant pas par quel bout attaquer le sujet. Une mission "DevOps AWS pour déployer un outil de CI/CD" sans autre finalité ne m'intéressait pas/plus (🤢).

Et là, de nulle part surgit une prise de contact sur LinkedIn par une recruteuse pour rejoindre une entreprise dans l'IoT industriel, avec une dimension environnementale qui cherchait un profil de responsable technique DevOps/IoT. Les astres ne pouvant pas être plus alignés, les cases du job idéal / de la mission idéale étant toutes cochées, je ne pouvais pas laisser passer cette opportunité. Depuis mi-janvier, j'ai donc commencé une mission pour cette entreprise pour qu'on fasse connaissance et pour avancer rapidement sur le sujet le temps que d'autres points soient traités de leur côté. De jolies choses semblent se dessiner et devraient aboutir prochainement... 🤩

Affaire à suivre comme on dit, 2022 semble pleine de promesses, de défis et de changements ! 😎

Web, Ops, Data et Time Series - Novembre 2021

postgresqltimeseriestimecalewarp10warpstudioinfluxdb

Containers & Orchestration

  • Announcing General Availability of HashiCorp Nomad 1.2 : Arrivée des "system batchs jobs" prévu pour gérer des jobs à destination du cluster nomad en lui même (purge, backup, etc) et non des clients. Cette version apporte également des améliorations au niveau de l'interface, ainsi que les "nomad pack", format de distribution de vos applications à destination de nomad.

IoT

Monitoring & Observabilité

Time Series

Annonces & Produits :

  • Timescale 2.5.0 : support de Postgresql 14, continuous aggregates for distributed hypertables (la fonction fonctionne donc maintenant en multi-nodes) et support des timezone pour la fonction time_bucket_ng
  • Warp Studio 2.0.6 : version mineure du studio pour la gesion de CORS-RFC1918 ; c'est pour utiliser le studio avec vos instances locales depuis Chrome 92 (et bientôt les autres navigateurs) du fait des restrictrions d'accès mises en place dans les navigateurs.
  • Release Announcement: InfluxDB OSS 2.1.0 | InfluxData : Arrivée des annotations et des notebooks, le client influx n'est plus distribué avec le serveur (sauf dans l'image Docker), améliorations de flux, amélioration de l'API et de la CLI et mise à jour de l'extension VSCode.
  • Announcing PyCaret’s New Time Series Module :la librairie "low code" de machine learning PyCaret se dote d'un module de gestion de séries temporelles comprenant 30+ modèles (ARIMA, SARIMA, FBProphet, etc) et fonctions.

Articles :

Ma comptabilité, une série temporelle comme les autres - partie 6 - Les FEC et le compte de résultat

warp10timeseriescomptabilitérésultatfecdashboarddiscovery

Suite de notre épopée :

Dans ce sixième et dernier billet pour cette série, nous continuons avec les Fichier d'Ecritures Comptables (FEC) pour produire le compte de résultat et déterminer ainsi le bénéfice de l'exercice en cours. Il faut donc prendre toutes les opérations en classe 6 (charges) et 7 (produits). Pour chaque classe de compte, il peut y avoir des crédits ou des débits (ex pour un compte de classe 7 : un avoir sur une facture émise). C'est donc un chouilla plus compliqué que le compte de trésorerie.

Depuis le dernier billet, j'ai légèrement fait évoluer le modèle de données :

  • Initialement, j'avais : <société>.<bilan ou resultat>.<classe de compte>.<type d'opération: credit ou debit>
  • Cela a évolué vers : <société>.<bilan ou resultat>.<classe de compte> ; le type d'opération est maintenant un label

Pour un crédit de 100€ avec une référence de pièce à 1234 pour le compte 706, on passe donc de :

<Timestamp de l'écriture comptable>// cerenit.resultat.706.credit{PieceRef=1234} 100

à :

<Timestamp de l'écriture comptable>// cerenit.resultat.706{PieceRef=1234, operation=credit} 100

Compte de résultat

"<readToken>" "readToken"  STORE

// Récupération de toutes les opérations de crédit pour les comptes charges (classe 6xx)
// Le SORT permet d'être sur d'avoir toutes les opérations triées par date
// Stockage du résultat dans une variable
[ $readToken '~comptabilite.resultat.6.*' { "operation" "credit" } '2020-01-01T00:00:00Z' '2020-12-31T23:59:59Z' ] FETCH
MERGE
SORT
'charges_credit' RENAME
'charges_credit' STORE


// Récupération de toutes les opérations de débit pour les comptes charges (classe 6xx)
// Le SORT permet d'être sur d'avoir toutes les opérations triées par date
// Stockage du résultat dans une variable
[ $readToken '~comptabilite.resultat.6.*' { "operation" "debit" } '2020-01-01T00:00:00Z' '2020-12-31T23:59:59Z' ] FETCH
MERGE
SORT
'charges_debit' RENAME
'charges_debit' STORE

// Fusion des deux listes de séries en une liste qui va avoir l'ensemble des opérations
// Les opérations de débit sont mis en valeur négative du calcul du solde
// Le SORT permet d'être sur d'avoir toutes les opérations triées par date
// Stockage du résultat dans une variable qui contient l'ensemble des opérations
[
  $charges_debit -1 *
  $charges_credit
] MERGE
SORT
'charges_flux' RENAME
'charges_flux' STORE

// Même opération pour les comptes de produit (7xx)
[ $readToken '~comptabilite.resultat.7.*' { "operation" "credit" } '2020-01-01T00:00:00Z' '2020-12-31T23:59:59Z' ] FETCH
MERGE
SORT
'produits_credit' RENAME
'produits_credit' STORE

[ $readToken '~comptabilite.resultat.7.*' { "operation" "debit" } '2020-01-01T00:00:00Z' '2020-12-31T23:59:59Z' ] FETCH
MERGE
SORT
'produits_debit' RENAME
'produits_debit' STORE

[
  $produits_debit -1 *
  $produits_credit 
] MERGE
SORT
'produits_flux' RENAME
'produits_flux' STORE

// Fusion des 2 flux d'opérations (charges et produits) pour avoir une vision temporelle de ces opérations
// Le SORT permet d'être sur d'avoir toutes les opérations triées par date
// Renommage de la série en "compte_resultat" qu'elle va permettre de batir
// Somme cumulée de l'ensemble des opérations pour avoir un solde à date
// Stockage sous la forme d'une variable
// Affichage de la variable
[
  $produits_flux
  $charges_flux
] MERGE
SORT
'compte_resultat' RENAME
[ SWAP mapper.sum MAXLONG 0 0 ] MAP
'compte_resultat' STORE
$compte_resultat

Ce qui nous donne dans le Studio :

warp10 - compte de résultat

Du précédent billet et ce celui-ci, nous avons donc :

  • Un compte de résultat annuel
  • Un compte de trésorerie annuel

Tout ce qu'il faut donc pour faire un dashboard avec Discovery. Il faut dire que le billet Covid Tracker built with Warp 10 and Discovery et dans une moindre mesure Server monitoring with Warp 10 and Telegraf donnent accès à plein d'options pour réaliser ses dashboards.

Macros

Je pourrais mettre le code de mes requêtes directement dans les dashboards mais j'aime pas trop quand des tokens se balladent dans les pages web. Du coup, je vais déporter le code dans des macros. J'ai églément rendu les macro dynamiques dans le sens où elles prennent une année en paramètre pour afficher les données de l'année en question.

On a déjà vu le fonctionnement des macros précédemment, je ne reviendrais donc pas dessus.

La macro du compte de résultat à titre d'exemple :

<%
    {
        'name' 'cerenit/accountancy/compte-resultat'
        'desc' 'Function to calculate the cumulative benefit (or loss) of the company'
        'sig' [ [ [ [  'year:LONG' ] ]  [ 'result:GTS' ] ] ]
        'params' {
            'year' 'Year, YYYY'
            'result' 'GTS'
        }
        'examples' [
    <'
2020 @cerenit/accountancy/compte-resultat
    '>
    ]
    } INFO

    // Actual code
    SAVE 'context' STORE

    TOLONG // When called from dashboard, it's a string - so convert paramter to LONG first
    'year' STORE // Save parameter as year
    
    // Compute 1st Jan of given year
    [ $year 01 01 ] TSELEMENTS-> ISO8601 
    'start' STORE
    
    // Compute 31 Dec of given year
    [ $year 12 31 23 59 59 ] TSELEMENTS-> ISO8601 
    'end' STORE
    
    "<readToken>" "readToken"  STORE

    [ $readToken '~comptabilite.resultat.6.*' { "operation" "credit" } $start $end ] FETCH
    MERGE
    SORT
    'charges_credit' RENAME
    'charges_credit' STORE

    [ $readToken '~comptabilite.resultat.6.*' { "operation" "debit" } $start $end ] FETCH
    MERGE
    SORT
    'charges_debit' RENAME
    'charges_debit' STORE

    [
    $charges_debit -1 *
    $charges_credit
    ] MERGE
    SORT
    { NULL NULL } RELABEL
    'charges_flux' RENAME
    'charges_flux' STORE

    [ $readToken '~comptabilite.resultat.7.*' { "operation" "credit" } $start $end ] FETCH
    MERGE
    SORT
    'produits_credit' RENAME
    'produits_credit' STORE

    [ $readToken '~comptabilite.resultat.7.*' { "operation" "debit" } $start $end ] FETCH
    MERGE
    SORT
    'produits_debit' RENAME
    'produits_debit' STORE

    [
    $produits_debit -1 *
    $produits_credit 
    ] MERGE
    SORT
    { NULL NULL } RELABEL
    'produits_flux' RENAME
    'produits_flux' STORE

    [
    $produits_flux
    $charges_flux
    ] MERGE
    SORT
    'compte_resultat' RENAME
    [ SWAP mapper.sum MAXLONG 0 0 ] MAP
    'compte_resultat' STORE
    $compte_resultat

    $context RESTORE
%>
'macro' STORE
$macro

Comme le décrit l'exemple, si on veut le compte de résultat de l'année 2020, on utilisera le code suivant :

2020 @cerenit/accountancy/compte-resultat

J'ai profité de ce billet pour utiliser Warpfleet Synchronizer & Warpfleet Resolver pour simplifier le déploiement des macros ; cela explique que les signatures pour appeler les macros changent par la suite dans le dashboard.

Dashboards

Ci-après le code du dashboard :

<%
{
    'title' 'Comptabilité CérénIT'
    'description' 'Trésorerie et compte de résultat'
    'vars' {
        'myYear' 2020
    }    
    'tiles' [
        {
            'title' 'Informations'
            'type' 'display'
            'w' 11 'h' 1 'x' 0 'y' 0
            'data' {
                'data' 'R&eacute;sultat de la s&eacute;rie <a href="https://www.cerenit.fr/blog/premiers-pas-avec-warp10-comptabilite-et-previsions/">Ma comptabilit&eacute;, une s&eacute;rie temporelle comme les autres</a> et de l&apos;ingestion des Fichiers d&apos;&eacute;critures comptables.'
                'globalParams' { 'timeMode' 'custom' }
            }
        }
        {
            'title' 'Année'
            'type' 'input:list'
            'w' 1 'h' 1 'x' 11 'y' 0
            'data' {
                'data' [ '2017' '2018' '2019' '2020' ]
                'events' [ { 'type' 'variable' 'tags' 'year' 'selector' 'myYear' } ]
                'globalParams' { 'input' { 'value' '2020' } }   
            }
        }
        {
            'title' 'Trésorerie (annuel)'
            'type' 'line'
            'w' 6 'h' 2 'x' 0 'y' 1
            'macro' <% $myYear @cerenit/macros/treso %>
            'options' { 'eventHandler' 'type=(variable),tag=year' }
        }
        {
            'title' 'Compte de résultat (annuel)'
            'type' 'line'
            'w' 6 'h' 2 'x' 6 'y' 1
            'macro' <% $myYear @cerenit/macros/compteresultat %>
            'options' { 'eventHandler' 'type=(variable),tag=year' }
        }
        {
            'title' 'Trésorerie (pluri-annuelle)'
            'type' 'line'
            'w' 12 'h' 2 'x' 0 'y' 3
            'macro' <% [ 2017 $myYear ] @cerenit/macros/treso_multi %>
            'options' { 'eventHandler' 'type=(variable),tag=year' }
        }  
    ]
}
{ 'url' 'https://w.ts.cerenit.fr/api/v0/exec'  } 
@senx/discovery2/render
%>

et son rendu :

warp10 - dashboard

Dans le bloc global du dashboard, on définir une variable myYear, initialisée à 2020. Cette variable est mise à jour dynamiquement lorsque l'on choisit une valeur dans la liste déroulante du bloc "Année".

<%
{
    'title' 'Comptabilité CérénIT'
    'description' 'Trésorerie et compte de résultat'
    'vars' {
        'myYear' 2020
    }    
    ...

Le bloc Année justement :

        {
            'title' 'Année'
            'type' 'input:list'
            'w' 1 'h' 1 'x' 11 'y' 0
            'data' {
                'data' [ '2017' '2018' '2019' '2020' ]
                'events' [ { 'type' 'variable' 'tags' 'year' 'selector' 'myYear' } ]
                'globalParams' { 'input' { 'value' '2020' } }   
            }
        }

C'est une liste déroulante (type: input:list) avec pour valeurs les années 2017 à 2020. Par défaut, elle est initialisée à 2020. Via le mécanisme des "events", lorsqu'une valeur est choisie, celle-ci est émise sous la forme d'une variable, nommée myYear et ayant pour tag la valeur year.

Ainsi, si je sélectionne 2017 dans la liste, la variable myYear prendra cette valeur. Maintenant que la valeur est définie suite à mon choix et émise vers le reste du dashboard, il faut que les autres tiles récupèrent l'information.

Regardons le tile Trésorerie :

        {
            'title' 'Trésorerie (annuel)'
            'type' 'line'
            'w' 6 'h' 2 'x' 0 'y' 1
            'macro' <% $myYear @cerenit/macros/treso %>
            'options' { 'eventHandler' 'type=(variable),tag=year' }
        }

La récupération de la variable se fait via la proriété options et la récupération de l'eventHandler associé et défini précédemment.

Une fois récupérée, la variable myYear peut être utilisée dans le bloc macro et le tile est mis à jour dynamiquement.

En conséquence :

  • Les deux premiers tiles afficheront le solde de trésorerie et le compte de résultat de l'année sélectionnée
  • Le dernier tile affichera la trésorerie depuis début 2017 jusqu'à la fin d'année sélectionnée. Donc au minimum 2017 et au maximum 2017 > 2020.

Ainsi s'achève cette série sur les données comptable et les séries temporelles. Des analyses complémentaires pourraient être menées (analyse de stocks, réparition d'activité, etc) mais mes données comptables sont insuffisantes pour en valoir l'intérêt. J'espère néanmoins que cela aura sucité votre intérêt et ouvert des horizons.

Cette série fut aussi l'occasion de faire un tour de la solution Warp 10 et de voir :

  • l'ingestion de données,
  • la manipulation et l'analyse des données,
  • la mise en place de dashboards,
  • la projection de données avec les algorythmes de machine learning.

Si vous souhaitez poursuivre l'aventure et le sujet, n'hésitez pas à me contacter.

Web, Ops, Data et Time Series - Octobre 2021

postgresqltimeseriesbidatataskdbtmetabasesingertimescaleinfluxdbquasardbvectornomadclever-cloudyieldpivotwarp10flowsvscodekapacitorchronograftelegrafclickhouse

BI

Code

  • vscode.dev : l'ère de l'IDE dans le navigateur continue après gitpod ou githuab codspaces, c'est au tour de vscode.dev qui permet d'avoir une IDE dans son navigateur. Affaire à suivre...

Observabilité et monitoring

Orchestration & conteneurs

  • damon, un dashboard pour nomad en ligne de commande.
  • Announcing HashiCorp Nomad 1.2 Beta : ajout des "System Batch" qui sont des (petits) jobs globaux au cluster, des améliorations de l'interface et l'ajout des Nomad Pack, une sorte de catalogue d'applications prêtes à être déployées dans votre cluster.

SQL

Sécurité

Time Series

Annonces & Produits :

Articles & Vidéos :

Pour le retour sur les InfluxDays North America qui ont lieu cette semaine, ce sera pour un prochain billet ou édition du Time Series France Meetup

n8n & Warp 10 - Automatisez vos manipulations de séries temporelles

n8nautomationwarp10timeseriesworkflow

Il y a quelques temps et sachant que j'utilisais n8n pour automatiser la génération des brèves du BigData Hebdo, Mathias m'a demandé s'il était possible de faire la même chose entre n8n et Warp 10 qu'avec node-red et Warp 10.

La réponse est oui mais voyons comment faire cela.

n8n/Warp 10 - workflow

Pour ceux qui ne connaissent pas n8n, c'est un clone open source (sous licence fair-code) à des services comme Zapier ou IFTTT. Il permet d'automatiser des processus via la création de workflows. Ces workflows sont composés d'étapes et d'actions. n8n dispose d'un grand nombre de connecteurs vers les différents services existants, des opérateurs génériques (faire un appel http, appliquer une fonction), des opérateurs logiques (si, etc), des opérateurs de transformation de données, etc. Chacun de ces éléments est implémenté via une node. A chaque étape du workflow, une node est instanciée puis paramétrée. Les nodes peuvent être reliées entre-elles et la sortie d'une node peut alimenter la suivante.

Le workflow se veut basique et va être le suivant :

  • Récupération d'une entrée de monitoring CPU dans Warp 10
  • Si la valeur est supérieure ou égale à 90%, alors création d'une entrée dans une série dédiée à cet effet.

Ce n'est pas le workflow le plus passionnant du monde, mais cela permet de faire deux appels à l'API HTTP de Warp 10 :

  • Le premier permet de tester l'exécution de code WarpScript via l'API /api/v0/exec ; vu le code, j'aurais pu passer par /api/V0/fetch mais cela me permet de tester l'exécution de code WarpScript.
  • Le second utilisera l'API /api/v0/update pour insérer une donnée dans une série. Cela permet de tester le passage du token d'authentification via un header.

Pour commencer le workflow, la donnée de départ est la valeur en pourcentage du métrique "CPU Idle" d'un de mes serveurs.

En WarpScript, cela donne:

'<readToken>' 'readToken'  STORE

[ $readToken 'crnt-ovh.cpu.usage_idle' { "host" "crnt-d10-gitlab" "cpu" "cpu-total" }  NOW -1 ] FETCH

Et la réponse :

[
	[{
		"c": "crnt-ovh.cpu.usage_idle",
		"l": {
			"host": "crnt-d10-gitlab",
			"cpu": "cpu-total",
			"source": "telegraf",
			".app": "io.warp10.bootstrap"
		},
		"a": {},
		"la": 0,
		"v": [
			[1634505650000000, 91.675025]
		]
	}]
]

n8n dispose d'une node HTTP Request, qui comme son nom l'indique permet de faire des requêtes HTTP vers un serveur distant. Toutefois, il n'est pas possible de passer notre code WarpScript directement dans l'appel HTTP. Il faut créer un objet avec le code WarpScript et passer ensuite l'objet créé et le nom de la propriété contenant le code WarpScript à la node HTTP Request.

Pour stocker le code WarpScript dans un objet, il faut utiliser la node Set. Une fois la node Set ajoutée dans le workflow, aller dans Parameters > Add Value > Type: String

Saisir:

  • Name: warpscript
  • Value: le code WarpScript ci-dessus

En cliquant sur "Execute Node", on peut valider la variable (la partie grisée étant mon token) :

n8n/Warp 10 - set node

On peut maintenant ajouter une node HTTP Request dans le workflow et la relier à la node Set nouvellement créée. Ainsi, la node HTTP Request aura directement accès au résultat de la node Set.

Pour les ajustements à faire :

  • Parameters :
    • Request Method : POST
    • URL : http://url.de.votre.instance.warp.10/api/v0/exec
    • Activer la case JSON/RAW Parameters
  • Options :
    • Add Option > Mime Type : text/plain
    • Add Option > Body Content Type : RAW/Custom
    • Body Parameters > Add Expression > Current Node > Input Data > JSON > warpscript (les colonnes de droites doivent se remplir avec la clé en haut et la valeur en dessous ; cliquer sur la croix pour revenir à l'écran précédent)

En cliquant sur "Execute Node", le résultat de la requête est visible (la partie grisée étant un bout de mon token) :

n8n/Warp 10 - http request node

On retrouve notre objet JSON mais il est imbriqué dans des Array Javascript, on va applanir tout ça et extraire le timestamp et la valeur du cpu via l'ajout de deux nodes Function que l'on relie à la node HTTP Request. La node Function permet d'exécuter du code javascript sur les données et de réaliser des transformations que l'on ne peut pas forcément faire avec les autres nodes. Cela n'étant pas le coeur du sujet, cela ne sera pas détaillé.

A l'issue des deux exécutions, les données sont réduites à ce qui suit :

[{
	"ts": 1634503660000000,
	"cpu": 93.219488
}]

La node IF ne sera pas détaillée non plus ; elle sert juste à introduire un semblant de logique dans le workflow. En l'occurence, si la valeur de "cpu" >= 90, alors le test est considéré comme vrai et faux sinon. Dans le cas où c'est faux, une node noOp a été ajoutée pour matérialiser la fin du workflow.

Dans le cas où le test est vrai (valeur de "cpu" >= 90), on veut alors insérer le timestamp et la valeur dans une autre série sur une instance Warp 10. Comme précédemment, cela va se faire en deux fois:

  • Préparation de la donnée au format GTS Input Format et mise à disposition sous la forme d'une propriété
  • Exécution de l'appel HTTP.

On ajoute une node Set, ensuite dans Parameters > Add Value > Type: String

Saisir:

  • Name: gtsinput
  • Value > Add Expression et dans la partie expression, on met:
{{$json["ts"]}}// n8n{} {{$json["cpu"]}}

Ce qui nous donne l'écran suivant :

n8n/Warp 10 - set node 2 - expression

On revient à l'écran précédent en cliquant sur la croix à droite et en exécutant la node, on obtient :

n8n/Warp 10 - set node 2 - result

Ensuite, il faut ajouter une nouvelle node HTTP Request avec le paramétrage suivant :

  • Parameters :
    • Authentication > Header Auth
    • Request Method : POST
    • URL : http://url.de.votre.instance.warp.10/api/v0/update
    • Activer la case JSON/RAW Parameters
  • Options :
    • Add Option > Mime Type : text/plain
    • Add Option > Body Content Type : RAW/Custom
    • Add Option > Full Response et activer là pour voir la réponse complète de votre instance Warp 10
    • Body Parameters > Add Expression > Current Node > Input Data > JSON > gtsinput (les colonnes de droites doivent se remplir avec la clé en haut et la valeur en dessous ; cliquer sur la croix pour revenir à l'écran précédent)

En haut du menu de gauche, une section "Credentials" est apparue ; dans la liste déroulante, cliquer sur "Create new" et remplissez le formulaire de la façon suivante:

  • Name: X-Warp10-Token
  • Value : votre token Warp 10 avec des droits d'écriture

n8n/Warp 10 - auth token

Revener ensuite dans votre node HTTP Request dont on peut lancer l'exécution et on obtient :

n8n/Warp 10 - auth token

Si je vais ensuite voir le contenu de ma série n8n :

'<readToken>' 'readToken'  STORE

[ $readToken 'n8n' {}  NOW -100 ] FETCH

J'obtiens comme réponse :

[
	[{
		"c": "n8n",
		"l": {
			".app": "io.warp10.bootstrap"
		},
		"a": {},
		"la": 0,
		"v": [
			[1634503660000000, 93.219488],
			[1634502790000000, 94.808468],
			[1634501690000000, 93.7751],
			[1634501550000000, 91.741742],
			[1634478300000000, 92.774711]
		]
	}]
]

Avec une entrée pour chaque exécution du workflow sous réserve d'avoir un "CPU idle" >= 90%.

En conclusion, nous pouvons retenir que :

  • Il est facile d'intégrer Warp 10 dans un workflow n8n grâce à l'API HTTP de Warp 10 et la node HTTP Request de n8n
  • Pour interagir avec Warp 10, il faut d'abord créer un objet portant le code WarpScript ou les donées au format GTS Input pour l'envoyer ensuite à Warp 10 via la node HTTP Request
  • Même si cela n'a pas été détaillé, il est possible de manipuler les données issues de Warp 10 ou de préparer des données à destination de Warp 10.

Le workflow était très basique pour permettre de montrer rapidement cette intégration. Des workflows plus complexes et riches sont laissés à votre imagination :

  • sur la base d'un événement avec la node Webhook : insertion de données ou lancement d'une analyse suite à un événement, etc.
  • sur la base d'une tache planifiée avec la node Cron : analyse de données, etc
  • ou depuis Warp 10, on peut appeler n8n en utilisant HTTP, URLFETCH ou WEBCALL pour lancer l'exécution d'un workflow ou récupérer le résultat d'un workflow.

InfluxDB et les alertes : Tasks, Checks et Notifications

influxdbtimeseriesinfluxdatataskfluxchecknotificationskapacitoralertes

CérénIT vient de finaliser la migration pour un de ses clients d'un socle InfluxDB/Chronograf/Kapacitor vers InfluxDB2. Ce billet est l'occasion de revenir sur la partie alerting et de la migration de Kapacitor vers des alertes dans InfluxDB2.

Dans le cadre du socle InfluxDB/Chronograf/Kapacitor, le fonctionnement était le suivant :

  • Les utilisateurs créent une alerte via l'application métier en définissant un à plusieurs critères d'alertes ; ex: est-ce que l'unité est opérationnelle et est-ce que l'humidité est supérieure à tel taux ou la température supérieure à telle valeur.
  • L'application métier traduisait l'alerte en TickScript et enregistrait l'alerte auprès de Kapacitor via son API HTTP
  • Kapacitor, en mode streaming, évalue si l'alerte doit être levée ou pas au fur et à mesure de l'arrivée des données
  • En cas de seuil franchi, Kapacitor envoie un message à l'application métier via l'API HTTP de cette dernière.
  • L'application métier envoie ensuite un mail et/ou un SMS à l'auteur de l'alerte.

Avant d'envisager la migration InfluxDB2, un point de vocabulaire :

  • une alerte est globalement composée d'un "check", d'un endpoint de notiifcation et d'une règle de notification.
  • un check est une task simplifiée. Elle permet de définir une requête mono critère, les niveaux de seuils associés (ok, crit, warn, etc) et sa fréquence d'exécution.
  • une task est codée flux
  • un endpoint de notification : service vers lequel sera envoyé l'alerte: slack, http, etc.
  • une règle de notification : les conditions de notifications (ex je passe à un état critique), le check associé, la fréquence d'exécution, le message de notification et le endpoint de notification à utiliser.

Avec la migration InfluxDB2, nous avons voulu maintenir le même mécanisme. Toutefois :

  • Les tasks en Flux ne fonctionnent pas en mode streaming, mais uniquement en mode batch et avec une certaine fréquence
  • Les checks sont mono-critères et pas multi-critères

Heureusement, la documentation mentionne la possibilité de faire des "custom checks" et un billet très détaillé intitulé "InfluxDB’s Checks and Notifications System" permet de mieux comprendre ce qu'il est possible de faire et donne quelques exemples de code.

Dès lors, il s'agit de :

  • développer une tâche "tout en un", contenant l'ensemble de la logique de l'alerte,
  • de conserver un historique des alertes pour permettre d'assurer un suivi des alertes pour l'équipe en charge du projet depuis InfluxDB
  • d'être en mesure de notifier l'application métier via son API HTTP

Pour se faire, nous allons nous appuyer sur les mécanismes mis à disposition par Influxdata, à savoir les fonctions monitor.check(), monitor.from() et monitor.notify() et les mécanismes induits.

C'est ce que nous allons voir maintenant :

InfluxDB - task / check / notification

Le cycle de vie d'une alerte est le suivant :

  • La task contient une requête en flux plus ou moins complexe en fonction de votre besoin ; ex: quelle est la valeur de la temperature du boitier X depuis la dernière exécution ?
  • On appelle monitor.check() en définissant les informations d'identification du check, le type de check que l'on utilise (threshold, deadman, custom), les différents seuils dont on a besoin, le message à envoyer au endpoint, les données issues de la requête flux.
  • monitor.check() va alors stocker l'ensemble de ces données dans un measurement statuses dans le bucket _monitoring et il s'arrête là.
  • monitor.from() prend le relais, regarde s'il y a de nouveaux status depuis sa dernière exécution et en fonction des règles de notifications qui ont été définies, il va passer le relais monitor.notify().
  • monitor.notify() enverra une notification si la règle est validée et il insérera une entrée dans le measurement notifications du bucket _monitoring

Une première version des alertes ont été implémentées sur cette logique. Des dashboards ont été réalisés pour suivre les status et les notifications. Cela fonctionne, pas de soucis ou presque.

Il se peut qu'il y ait un délai entre le moment où l'insertion issue du monitor.check() se fait et le moment où le monitor.from() s'exécute. Si monitor.from() fait sa requête avant l'insertion de données, alors l'alerte ne sera pas immédiatement levée. Elle sera levée à la prochaine exécution de la task, ce qui peut être problématique dans certains cas. Pour une tâche qui s'exécute toutes les minutes, cela ne se voit pas ou presque. Pour une tâche toutes les 5 minutes, ça commence à se voir.

Une version intermédiare de la task est alors née : une fois le monitor.check() exécuté, nous faisons appel à monitor.notify() pour envoyer le message vers le endpoint.

InfluxDB - task / check / notification v2

Avantage :

  • la notification se déclenche sans délais

Inconvénients :

  • cela ne remplit pas le measurement notifications de la même façon que précédemment (d'où les pointillés) vu que les données insérées dans le measurement statuses n'existent pas encore. On perd la visibilité sur les notifications envoyées (mais on a toujours le suivi des statuts ; nous supposons que si on a le statut, alors on sait si la notification a été envoyée)
  • cela aboutit à un peu de duplication de code sur la gestion des seuils et des messages.

Une variante non essayée à ce stade : elle consiste à faire cette notification au plus tôt mais de conserver le mécanisme de monitor.from() + monitor.notify() pour avoir le measurement notifications correctement mise à jour. A voir si les alertes ne sont pas perturbées par ce double appel à monitor.notify(). Dans le cas présent, c'est l'application métier qui envoie les alertes après que la task InfluxDB ait appelé son API HTTP. Si chaque monitor.notify() en vient à lever une alerte, cela est sans impact pour l'utilisateur. En effet, une fois qu'une alerte est levée, elle est considérée comme levée tant qu'elle n'est pas acquittée. Donc même si la task provoque 2 appels, seul le premier lévera l'alerte et la seconde ne fera rien de plus.

InfluxDB - task / check / notification v3

Enfin dernière variante (testée) : s'affranchir complètement de monitor.notify() pour faire directement appel à http.endpoint() et http.post() et faire complètement l'impasse sur le suivi dans notifications.

InfluxDB - task / check / notification v4

Tout est une histoire de compromis.

En conclusion, nous pouvons retenir que :

  • Une alerte est composée d'un check, d'un endpoint de notification et d'une règle de notification
  • En 2.0, le principe est que les alertes sont des séries temporelles via le bucket _monitoring et les measurements statuses et notifications.
  • Toute personne s'intéressant au sujet doit lire au préalable InfluxDB’s Checks and Notifications System pour bien comprendre les concepts et les rouages.
  • Via la UI, les alertes (checks) sont assez basiques (requête monocritère)
  • Il est possible de faire des "custom checks" via des tasks en flux
  • Les fonctions du package monitor permettent de gérer des alertes
  • Les exécutions dans la même task (ou dans des tasks concomittentes) de monitor.check() et monitor.from() peuvent conduire à des décalages de levées d'alertes

InfluxDB, shard, shard duration et retention policies

influxdbtimeseriesinfluxdatashardshard durationretention policyshard group

CérénIT a été contacté pour mener l'audit d'une instance InfluxDB 1.8 OSS utilisée dans un projet IoT lié à l'énergie. L'audit avait plusieurs objectifs :

  • Comprendre la consommation mémoire de l'instance (48Go / 64Go de la VM)
  • Faire un état de santé de la plateforme et estimer sa capacité à stocker et procésser des données supplémentaires dans le cadre de l'ouverture d'une application métier
  • Expliquer la raison des problèmes observés par le passé et évaluer les solutions apportées
  • Etablir des recommendations et éventuellement les implémenter.

De l'audit, on notera que :

  • L'instance contient ~35.000 shards / ~36.000 tsm files pour environ 200 bases permanentes et des dizaines de bases éphémères permettant de calculer des indicateurs ou de recalculer des historiques de données suite à des changements de paramètres de l'application métier (plusieurs dizaines de milliers de bases temporaires par semaine, avec des profondeurs de données variables)
  • Les recommendations pour InfluxDB Enterprise sont d'avoir 30/40 bases par data nodes et 1.000 shards par data node

Avant d'aller plus loin, précisons un peu cette notion de shard et les notions liées pour bien appréhender le sujet :

  • Une instance InfluxDB peut contenir 1 à n bases de données (database),
  • A chaque base de données InfluxDB, on peut définir une "retention policy" qui est la période maximale de conservation des données. Avec une retention policy de 7 jours par ex, seules les données des 7 derniers jours sont conservées. Les données les plus vieilles seront alors supprimées au fur et à mesure que les nouvelles données arriveront via un mécanisme de compaction.
  • Les données d'une base de donneés InfluxDB sont réparties au sein de shards au niveau stockage ; chaque shard comprend les données sur une période de temps donnée. Si une base de données a une retention policy d'une semaine, alors chaque shard contiendra 1 jour de données. Nous aurons donc 7 shards pour cette base de données. Ce délai est appelé shard duration.
  • Au sein de chaque shard, nous allons retrouver les données sous la forme d'un ou plusieurs fichiers TSM, le fichier d'index pour le shard, ainsi que le fichier de WAL et de cache.

Nous pouvons représenter la logique instance > database > shard(s) > tsm files de la façon suivante :

InfluxDB - database / shard / tsm files

Par défaut, InfluxDB applique les shard duration suivantes en fonction des retention policy :

Retention policyDefault shard duration
<= 2 days1 hour
<= 6 months1 day
> 6 months7 days

Source : InfluxData - Shard group duration

Dès lors, une base de données avec une retention policy infinie aura une shard duration de 7 jours. Ainsi, si cette base contient 10 ans d'hisorique (soit 10 * 52 semaines = 520 semaines), elle contiendra 520 shards.

Du coup, InfluxData recommande les valeurs suivantes (au moins en 1.x ; on peut supposer que cela reste valable en 2.x):

Retention policyDefault shard duration
<= 1 day6 hour
<= 7 days1 day
<= 3 months7 days
> 3 months30 days
infinite>= 52 weeks

Source : Shard group duration recommendations

Selon cette perspective, la base de données avec 10 ans d'historique ne contiendra plus 520 shards mais 10 shards en prenant une shard duration de 52 semaines. L'écart entre la valeur par défaut et la valeur recommandée est plus que significatif.

Pour bien dimensionner vos shard duration, InfluxData recommande :

  • La durée doit être égale à 2 fois la période d'analyse la plus longue et la plus fréquente ; si vos analyses les plus fréquentes portent sur 6 mois de données maximum, alors votre shard duration est d'un an
  • Chaque shard doit contenir au moins 100.000 points
  • Chaque shard doit contenir au moins 1.000 points par série (combinaison de measurement (~table) + combinaison des tags + clés des fields)

Pourquoi nous en arrivons là ? C'est assez simple :

  • InfluxDB au lancement va découvrir l'ensemble de ses shards et les périodes qu'ils recouvrent
  • InfluxDB cherche à mettre un maximum de données en mémoire par souci d'efficience et de performance
  • Une requête sur des périodes longues va nécessiter de monter en mémoire tous les shards correspondants à la période

Dès lors, un nombre important de shards va augmenter d'autant plus la consommation mémoire et le nombre de fichiers ouverts pour manipuler les données associées.

Si on recoupe ces données avec les recommendations pour InfluxDB Entreprise, à savoir 30/40 bases par data nodes et 1.000 shards par node, le bon réglage des retention policy et des shard durations n'est pas à négliger pour la bonne santé de votre instance.

En outre, s'il est possible de mettre à jour la retention policy et la shard duration en 1.x, cela ne s'appliquera que pour les nouveaux shards. Les anciens shards ne seront pas "restructurés" en fonction des nouvelles valeurs.

Pour mettre à jour les shards existants, il faut :

  • Arrêter les mécanismes d'ingestion de données
  • Exporter les données sous la forme de points au format InfluxDB Line Protocol via influxd inspect export
  • Supprimer les measurements (voir la base de données si vous voulez aller plus vite)
  • Modifier la retention policy et la shard duration de la base de données (ou créer une nouvelle base de données avec les bonnes valeurs pour la retention policy et la shard duration)
  • Importer les données par batch de 5000 à 10.000 points.
  • Valider le bon fonctionnement de l'instance et l'intégrité des données
  • Relancer les mécanismes d'ingestion et gérer le rattrapage.

Ultime question, la version 2.x OSS change-t-elle la donne sur le sujet :

En conclusion, ce qu'il faut retenir :

  • La retention policy et la shard duration de vos bases InfluxDB ne sont pas à négliger et les valeurs par défaut ne sont surement pas adaptées à votre cas d'usage
  • Il est possible de mettre à jour ses valeurs mais elles ne s'appliqueront qu'aux nouveaux shards - pour les anciens shards, il faut exporter les données sous la forme de points et les réimporter
  • Il faut trouver la bonne taille de shard duration adaptée à votre cas d'usage ; trop de shards ou des shards trop gros ont chacun leur limites et auront des effets différents sur la consommation CPU/RAM/IOPS
  • Pour InfluxDB (Entreprise), il est recommandé d'avoir maximum 1.000 shards et 30/40 bases par node.
  • La version 2.x OSS n'apporte pas grand chose de plus sur le sujet - la version 2.1 permet d'être à peu près au niveau de la 1.x.

Mise à jour : le client reporte les gains suivants post restructuration des bases pour le serveur de recette et production :

  • Les dashboards simples sont entre 15% et 30% plus rapides pour leur affichage
  • Les dashboards complexes sont entre 3 et 10 fois plus rapides pour leur affichage
  • La consommation mémoire reste stable autour de 6/8 Go
  • Des changements sur la shard duration de certaines bases ont permis de descendre jusqu'à 600 shards environ.

Ma comptabilité, une série temporelle comme les autres - partie 5 - Les FEC et le compte 512

warp10timeseriescomptabilitétrésoreriebanquefec

Suite de notre épopée :

Dans ce cinquième billet, nous allons parler de Fichier d'Ecritures Comptables (FEC) et d'un compte simple à analyser : le compte 512 qui correspond à votre compte en banque.

Le Fichier des Ecritures Comptables (FEC)

Le Fichier des Ecritures Comptables (FEC) est un format de fichier normalisé. Sa spécification est disponible et grosso modo, ce qu'il faut en savoir à ce stade :

  • C'est un fichier TSV (un CSV avec les champs séparés par des tabulations)
  • Il contient 18 champs permettant de décrire les différentes écritures comptables :
    • JournalCode
    • JournalLib
    • EcritureNum
    • EcritureDate
    • CompteNum
    • CompteLib
    • CompAuxNum
    • CompAuxLib
    • PieceRef
    • PieceDate
    • EcritureLib
    • Debit
    • Credit
    • EcritureLet
    • DateLet
    • ValidDate
    • Montantdevise
    • Idevise

En partant de ces informations et après quelques précisions fournies par mon expert-comptable Fabrice Heuvrard sur le fichier, nous avons convenu de commencer par l'analyse du compte 512 correspondant aux opérations bancaires. Facile à calculer (somme des crédits - somme des débits) et facile à vérifier, il me suffit de regarder mon compte en banque et/ou mon bilan en fin d'année.

Continuant à utiliser Warp 10 pour y stocker mes séries temporelles, j'ai réalisé un script en Go qui prend le fichier FEC en entrée et envoie les données dans Warp 10 avec le formalisme suivant : <société>.<bilan ou resultat>.<classe de compte>.<type d'opération: credit ou debit> :

  • <société> est juste le début de l'arborescence
  • <bilan ou résultat> : le Plan Comptable Général Francais défini que si les comptes de classe 1 à 5 sont des classes de bilan et les classes 6 et 7 sont des classes de compte de résultat. Je suis donc le même principe de séparation des comptes et défiinr la valeur bilan et resultat. Le compte 512 que nous allons étudier commençant par 5, c'est un compte de bilan. Il sera donc dans la série cerenit.bilan.*
  • <classe de compte> : le plan comptable général est normalisé sur ces trois premiers chiffres. Les trois suivants sont à la discrétion du comptable. Du coup, pour ne pas avoir une série par code comptable, je retrouve par classe du plan de compte. Ainsi, toutes les opérations ayant le code 512xxx se retrouvera dans la série cerenit.bilan.512.*
  • <type d'opération: crédit ou débit> : suivant si l'opération est un débit ou crédit, cela prend la valeur adéquat. Ainsi, toutes les opérations ayant le code 512xxx se retrouvera dans la série cerenit.bilan.512.credit ou ``cerenit.bilan.512.debit`
  • Le montant de l'écriture comptable sera la valeur associée à mon point dans la série.
  • Les autres informations seront mises sous la forme de labels (ie meta données de mon point) pour d'éventuelles analyses ultérieures. Il s'agit de couple clé/valeurs.

Ainsi, un crédit de 100€ avec une référence de pièce à 1234 sera représenté sous la forme :

<Timestamp de l'écriture comptable>// cerenit.bilan.512.credit{PieceRef=1234} 100

La modélisation est peut être un peu naive à ce stade, il sera toujours temps de la faire évoluer dans un second temps mais a priori :

  • J'ai ma séparation bilan / compte de résultat
  • J'ai ma séparation par classe de compte
  • J"ai ma séparation débit / crédit
  • au pire via les labels, j'ai des axes complémentaires de recherche / sélection.

Contrôle des données

Avant de commencer la moindre analyse, j'ai voulu vérifier l'intégrité de mes données.

"<readToken>" "readToken"  STORE

// Récupération des données de 2020 pour le compte 512
[ $readToken 'cerenit.bilan.512.credit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
// Fusion de l'ensemble des séries temporelles en une seule série
MERGE
// Calcul de la somme de l'ensemle des valeurs de la séries -
// MAXLONG permet de tout récupérer sans calculer la taille exacte de la liste (pour peu que votre liste soit plus petite que la valeur de MAXLONG)
// 1 permet de ne sortir qu'une valeur en sortie
[ SWAP mapper.sum MAXLONG MAXLONG 1 ] MAP
// C'est une liste avec une liste à 1 élément, on "applatit" tout ça
MERGE
VALUES
0 GET
// On stocke la valeur finale dans totalCredit
'totalCredit' STORE

// Même opération sur les débits
[ $readToken 'cerenit.bilan.512.debit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
[ SWAP mapper.sum MAXLONG MAXLONG 1 ] MAP
MERGE
VALUES
0 GET
'totalDebit' STORE

// Calcul du solde
$totalCredit $totalDebit -

Cela me donne : 27746.830000000075

Exploration de la trésorerie

"<readToken>" "readToken"  STORE

// Récupération des données de 2020 pour le compte 512
[ $readToken 'cerenit.bilan.512.credit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
// Fusion de l'ensemble des séries temporelles en une seule série
MERGE
// Tri des points par date
SORT
// Renommage de la série
'credit' RENAME
// Suppression des labels
{ NULL NULL } RELABEL
// Stockage dans une variable
'credit' STORE

// Même opération sur les débits
[ $readToken 'cerenit.bilan.512.debit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
SORT
'debit' RENAME
{ NULL NULL } RELABEL
'debit' STORE

// Affichage des deux séries
$credit
$debit

// Création de la série de mouvements
$credit $debit -
'mouvements' RENAME

Cela nous donne ces courbes:

warp10 - exploration treso

Mais on voit bien à fin décembre qu'il y a des opérations de débit qui ne sont pas prises en compte dans le solde (la ligne orange s'arrête avant la verte).

En cherchant un peu, je me dis qu'il faudrait que je calcule une nouvelle série avec tous les éléments de crédit et débit et faire l'addition de tout cela. Je vois également que FLATTEN (doc)permet de fusionner plusieurs listes en une seule. Mais finalement, seul MERGE sera nécessaire.

Cela me donne la piste suivante :

"<readToken>" "readToken"  STORE

// Récupération des données de 2020 pour le compte 512
[ $readToken 'cerenit.bilan.512.credit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
SORT
'credit' RENAME
{ NULL NULL } RELABEL
'credit' STORE

// Même opération sur les débits
[ $readToken 'cerenit.bilan.512.debit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
SORT
'debit' RENAME
{ NULL NULL } RELABEL
// Je multiplie les debits par -1 pour pouvoir faire l'opération de solde ensuite
[ SWAP -1 mapper.mul 0 0 0 ] MAP
'debit' STORE

// Je fusionne les deux séries avec MERGE
[
  $credit
  $debit
] MERGE
// Je trie les éléments par date
SORT
'mouvements' RENAME

Cette fois-ci, mon solde prend bien en compte toutes les opérations de l'année.

warp10 - mouvements treso

Pour la version consolidée avec le solde du compte :

// Récupération des données de 2020 pour le compte 512
[ $readToken 'cerenit.bilan.512.credit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
SORT
'credit' RENAME
{ NULL NULL } RELABEL
'credit' STORE

// Récupération des données de 2020 pour le compte 512
[ $readToken 'cerenit.bilan.512.debit' {} '2020-01-01T00:00:00Z' '2021-01-01T00:00:00Z' ] FETCH
MERGE
SORT
'debit' RENAME
{ NULL NULL } RELABEL
-1 *
'debit' STORE

// Fusion des débits/crédits comme vu précédemment
[
  $credit
  $debit
] MERGE
SORT
'mouvements' RENAME

// On applique mapper.sum sur l'ensemble des points précédents le point qui est considéré
// Le premier point ne va donc prendre que lui même
// Le 2nd point va prendre sa valeur et ajouter celle du précédédent
// Le 3ème point va prendre sa valeur et la somme des points précédents
// Et ainsi de quiste
[ SWAP mapper.sum MAXLONG 0 0 ] MAP

Et le résultat en images :

warp10 - mouvements treso

Et voilà !

Il ne me reste plus qu'à :

  • ingérer les FEC des autres années pour envisager des comparaisons entre les différents exercices comptables, voire du prédictif pour l'exercice en cours et à venir,
  • étendre ces analyses à d'autres comptes maintenant,
  • et à créer les dashboards adéquats avec Discovery.

Ma solution pour le Warp 10 Code Contest - partie 2

timeserieswarp10geospatialchallenge

Suite et fin de ma réponse au code contest après la première partie. Dans ce billet, nous allons voir comment calculer les émissions de CO2 pour la partie de trajet sur la route 66.

// Define points from the car journey on the US66 road
[
  // Here is the gts of the car datalogger
  @senx/dataset/route66_vehicle_gts

  // Here is the route 66 geoshape (+/- 20meters)
  @senx/dataset/route66_geoshape
  mapper.geo.within 0 0 0
] MAP
"onTheRoad" STORE

$onTheRoad
{
 'timesplit' 60 s
}
MOTIONSPLIT
0 GET
'sectionOnTheRoad' STORE

// Compute speed - result in m/s
[ $sectionOnTheRoad mapper.hspeed 1 0 0 ] MAP
// Convert in km/h so x3600 /1000 = 3.6 - mapper.mul expects a constant
[ SWAP 3.6 mapper.mul 0 0 0 ] MAP
'speedFrames' STORE

// Get distance between each points in km (first in meters, then in km)
[ $sectionOnTheRoad mapper.hdist 0 1 0 ] MAP
[ SWAP 0.001 mapper.mul 0 0 0 ] MAP
'distFrames' STORE

// fuel consumption approximation is  (8 liters/100km) × (speed (km/h) / 80) +1
// So it's Speed * 8 / 80 / 100 + 1 = V/10 + 1
// F = False => does not return the index
$speedFrames
<%
  0.1 *
  1.0 +
%> F LMAP
'hundredKmFuelConsumption' STORE

[ ] 'instantConsumption' STORE

<%
  'i' STORE // store index

  // Get each list and compute one by another
  // So we compute consumption for 100 km at given speed (computed previously)
  // with related distance
  // then we divide by 100 as first value is for 100 km
  $distFrames $i GET
  $hundredKmFuelConsumption $i GET
  *
  100 /
  'r' STORE
  $instantConsumption $r +!
%>
'C' STORE
0 7 $C FOR
CLEAR

// For each GTS, compute fuel consumption as 1 point
[
  $instantConsumption
  mapper.sum
  MAXLONG
  MAXLONG
  1
] MAP
// Sum all points to get total consumption
0 SWAP <% VALUES 0 GET + %> FOREACH
// 1L = 2392g CO2
2392 *
// Enjoy !

Le premier et le second bloc sont les mêmes que dans la premièr partie. Je vous y renvoie donc si besoin.

A ce stade, nous avons une liste de 8 séries correspondant à chaque section passée sur la route 66. Chaque série comporte un liste de timestamps et de points géospatiaux (lattitude, longitude, élévation).

Concernant le troisième bloc :

  • il s'agit de calculer la vitesse en m/s entre chaque point de la série (ce qui explique le 1 0 0 pour prendre le point précédent, aucun point suivant et appliquer cette opération sur l'ensemble de la liste - voir la tips 3 de 12 tips to apply sliding window algorithms like an expert). Pour cela, on utilise mapper.hspeed (doc) qui consomme une série et calcule la vitesse en m/s en tenant compte de la longitude/lattitude/élévation.
  • Ce résultat, on le convertit en km/h dans la foulée en utilisant mapper.mul (doc) en notant au passage qu'il lui faut une constante (on ne peut pas mettre 3600 * 1000 / mais 3.6)
  • On a donc une liste de 8 séries temporelles avec chacune un timestamp, les données géospatiales et une vitesse entre chaque point. C'est stocké dans la variable speedFrames.

Concernant le 4ème bloc :

  • Sur le même modèle que pour la vitesse, on calcul la distance entre chaque point des 8 séries via mapper.hdist que l'on a vu dans le premier billet. Cette fois-ci, plutôt que de calculer la distance totale, on la distance entre le point et le point suivant et on le fait pout tout les points de la liste, d'où le 0 1 0
  • La distance étant en mètres, on la divisie par 1000 pour avoir des kilomètres. Mais comme il n'y a pas de mapper de division, alors on utilise mapper.mulet la valeur 0.001
  • On a donc une liste de 8 séries temporelles avec chacune un timestamp, les données géospatiales et une distance entre chaque point. C'est stocké dans la variable distanceFrames.

Concernant le 5ème bloc :

  • j'ai voulu calculer la consommation d'essence sur la base de la formule:

(8 liters/100km) × (speed (km/h) / 80) +1

  • Cela se simplifie en Speed/10 + 1.
  • Si on multiplie ce coefficient par les vitesses entre deux points obtenues précédemment (dans speedFrames), on obtient une consommation pour 100km avec chaque vitesse. Il faudra dans un second temps le pondérer par la distance parcourue entre deux points (distanceFrames) pour avoir un instantané de consommation pour la vitesse et la distance parcourue.
  • Pour faire cette consommation au 100km non pondérée, on utilise LMAP (doc)pour appliquer une MACRO à chaque élément de la liste. Cette macro contient le coefficient de consommation d'essence. LMAP retourne normalement l'index et la valeur associée. Or l'index ne nous sert à rien, on met donc l'argument concernant l'index à False (abrégé F) pour qu'il ne soit pas retourné.
  • On stocke le résultat dans hundredKmFuelConsumption et on a donc une liste de 8 series avec la consommation pour 100km à la vitesse donnée. Il nous faut maintenant pondérée cette liste par la distance pour avoir un instantané de consommation.

Concernant le 6ème bloc :

  • On commence par créer une liste vide appelée instantConsumption.
  • On sait que l'on a une liste de 8 éléments, donc on peut faire un boucle FOR (doc) dessus avec un indice allant de 0 à 7. FOR prend comme dernier argument une MACRO que j'ai nommé C
  • Dans la MACRO définie au dessus, je commence par stocker l'index de la boucle. Mes deux listes de 8 séries temporelles sont identiques en terme de points, avec l'une contenant les consommations pour 100km hundredKmFuelConsumptionet la seconde les distances entre chaque point distFrames. L'idée est donc de multiplier chaque série de hundredKmFuelConsumption par la série équivalente dans distFrames et de diviser par 100 pour finir notre proportionnalité.
  • On stocke cet consommation instantanée dans la variable r.
  • On ajoute ce résultat r dans la liste instantConsumption, ce qui permet de reconstituer notre liste de 8 séries mais ayant pour valeur cette fois ci les instantanés de consommation entre chaque point de chaque série.

Un petit interlude visuel avant le dernier bloc :

warp10 - instantané de consommation

Concernant le 7ème bloc :

  • Le but est de faire la somme de chaque instantanné de consommation pour avoir la consommation totale.
  • Comme dans la première partie, on utilise cette fois-ci mapper.sum (doc) en prenant l'ensemble des données des listes capturées via MAXLONG et on récupère 1 seule valeur qui s'avère être le total. On a donc la consommation totale de chaque série
  • Comme vu aussi en fin de première partie, on fait alors la somme de chaque liste pour avoir la consommation totale (9.823366576601234)
  • On sait que 1L = 2392g CO2, il nous reste donc à faire cette multiplication.
  • On obtient alors : 23497.492851230152 ou 23,497 kg de CO2.

J'espère avoir été clair dans ces explications - si ce n'est pas le cas - dites le moi (via Twitter, Mail, LinkedIn, etc) et je préciserai les choses.

Bilan personnel de ce code contest :

  • Opportunité de découvrir une partie des fonctionnalités géospatiales de Warp 10 que je n'avais pas encore utilisé
  • Améliorer mon usage autour de MAP, les mapper, les MACRO et LMAP et plein de petites choses ici ou là.
  • MAP s'applique sur des GTS mais aussi des listes de GTS sans rien avoir à faire. Pas besoin de se rajouter des boucles supplémentaires !
  • MAXLONG utilisé dans les MAP permet de ne pas avoir à se soucier de la taille de l'élément sur laquelle on applique MAP. Cela ne fait pas non plus une erreur du style index out of range.
  • en bonus, obtenir quelques lots sympathiques 😎

warp10 - goodies recto

warp10 - goodies verso

J'espère néanmoins apprendre des choses du corrigé officiel : Working with GEOSHAPEs: code contest results.

← Précédent 1 / 4