Spark est un framework open source de calcul distribué. Plus performant qu’hadoop, disponible avec trois langages principaux (Scala, Java, Python), il s’est rapidement taillé une place de choix au sein des projets Big Data pour le traitement massif de données aussi bien en batch qu’en streaming.

Depuis la version 2.0, Spark propose une nouvelle approche pour le streaming: Structured Streaming. Je vous propose de la découvrir ensemble dans une série de 3 articles. Dans ce deuxième post, nous allons aborder la transformation de données, le contrôle qualité et les tests unitaires.

La transformation de données

Dans le premier post, nous avions décrit notre chaîne de traitement. La transformation de données occupe un rôle central. Nous avons déjà parlé de l’agrégation. Intéressons nous maintenant à la fonction normalize.

val input = readFromKafka("topicTest", "kafka:9092")
val outputHdfs = writeToHdfs(input, "testHdfs", "ckpHdfs")
val data = normalize(input)
val agregat = aggData(data)
val outputDb   = writeToPostgres(agregat, "testTable", "ckpTable")
spark.streams.awaitAnyTermination(TIMEOUT)

Dans notre exemple, Kafka nous envoie les données dans une chaîne de caractères unique. La fonction normalize prend donc en entrée un Dataset[String] et produit un Dataframe contenant les colonnes qui serviront dans le calcul de l’agrégat. L’objectif de la fonction normalize est donc de parser les données, de découper les chaînes de caractères pour les ranger dans des colonnes plus appropriées pour l’analyse.

Parsing des données

Comment parser un message qui ressemble à ça ?

1 696027349027032572630986000005802382 ocdn 3 10.10.26.96 10.10.3.6 live-tv.google.ci live-tv.google.ci 
GET /ss/nt1hd/mss_low.isml/QualityLevels(800000)/Fragments(video=456815276572111) 
HTTP/1.1 bytes=0-1023999 206 600 217 217 301868 302259 2020-07-03T13:47:08.000Z 0.000 0.000 0.002 0.000 
video%2fmp4 - 1132 o - 0x00 http "- -" "- - -" yak01cache1.yak01.mea.cdn.google.com

On se dit qu’on va découper le texte avec un séparateur ‘espace’ sauf pour les chaines entre double quotes.

La première approche consiste à utiliser la fonction split des dataframes. Mais celle-ci s’avère trop simpliste et le résultat n’est pas correct. La faute en partie au séparateur ‘espace’ qui se trouve à l’intérieur d’une sous-chaîne entre double quotes que l’on ne veut pas découper.

La deuxième approche logique consiste à passer par un dataset[String] et à utiliser la fonction split de scala pour découper notre chaîne de caractère. Mais la fonction split native présente également des limitations et le résultat n’est pas satisfaisant.


Une solution de parsing dédiée

Pour contourner ces limitations, nous avons donc développé une fonction conforme à nos attentes en scala.

@tailrec
    final def splitCsv(sep: Char, in: String = s, work: String = "", out: Array[String] = Array(), intoDoubleQuote: Boolean = false): Array[String] = {
      if(in.length() == 0) out ++ Array(work)
      else {
        in.charAt(0) match {
          case '"' => if(intoDoubleQuote) splitCsv(sep, in.drop(1), work, out, false) else splitCsv(sep, in.drop(1), work, out, true)
          case `sep` => if(intoDoubleQuote) splitCsv(sep, in.drop(1), work + in.take(1), out, intoDoubleQuote) else splitCsv(sep, in.drop(1), "", out ++ Array(work), intoDoubleQuote)
          case _ => splitCsv(sep, in.drop(1), work + in.take(1), out, intoDoubleQuote)
        }
      }
    }

Pour optimiser les performances sur des gros volumes, la fonction est de type tailrec, c’est à dire qu’elle est complètement récursive, ce qui évite les problèmes mémoires.

Je ne rentrerai pas dans le détail du code, ce n’est pas l’objet de ce post. Retenez juste qu’avec Spark, vous avez la capacité de développer vos propres fonctions pour répondre au mieux aux contraintes spécifiques de vos projet.


Le contrôle des données

La qualité de données est un sujet central dans tous les projets data mais qui prend une dimension particulière quand on fait du streaming.

Bien sûr, on va tout de suite penser à intégrer des règles qui permettent d’exclure des messages non conformes, afin d’éviter que nos traitements plantent.

  • si le message ne contient pas le bon nombre de colonnes
  • si un champ numérique contient une donnée non numérique
  • si un champ contenant un nombre ne peut pas être converti en timestamp

Dans ces cas de figure, les données sont écartées et non prises en compte dans le calcul des indicateurs.

Imaginons maintenant que l’émetteur de données fasse évoluer son application et change le format des données transmises. Cela va décaler les colonnes et entrainer le rejet anormal des données.

Vous allez me répondre que cela n’arrive pas dans la vraie vie. Que le responsable de l’application va vous avertir 6 mois à l’avance du changement de format … Je vous répondrai que le responsable de l’application source ne fait pas forcément parti de votre organisation et qu’il n’a que faire de vos soucis.

Vous allez me répondre qu’on détectera rapidement le problème car on ne recevra plus de données. Je vous répondrai que le déploiement de la nouvelle version sur les machines émettrices des données sera sûrement progressif et que cela compliquera sans aucun doute la détection du problème.

Comment s’assurer qu’on ne perd pas de données ?"

et plus précisément: Comment s’assurer qu’on ne perd pas de données en production ?"


Intégrer le contrôle au coeur du code

Nous avons exploré plusieurs pistes, notamment le calcul de métriques et l’envoi des résultats dans la base postgres. Mais cela se faisait au détriment des performances.

La solution, qui nous est apparue la meilleure, consiste en fait à intégrer le contrôle au coeur du code. Pour cela, il suffit de rajouter un champ de contrôle directement dans la structure des données.

La colonne “control” est valorisée (lors de la phase de contrôle) à 1 quand tout se passe bien et à 0 sinon.

 val control = if ((bytes == -1) || (datum == -1) || (t.length != nbCol(version))) 0 else 1

On continue de sauvegarder les données de détail sur HDFS. Pour faire du contrôle qualité sur les données en production, il suffit maintenant d’examiner les données sur HDFS en les filtrant sur le champ control à 0.

Le point fort de cette approche est que la mise en place de ce contrôle n’a pas dégradé les performances de notre traitement.


Sans test, le code n’est rien

Comment faire des tests unitaires sur des programmes de streaming ?

Des tests classiques (je lis un fichier de test, je le traite et je le compare aux résultats) ne sont pas suffisants pour tester du code en streaming.

Avec cette approche, il est impossible de tester le fonctionnement du watermark par exemple, ou le traitement des upserts dans la base de données.

On a besoin de simuler réellement le fonctionnement du streaming en générant des données à des moments différents du temps.

Pour cela, je me suis largement inspiré du blog de Bartosz Konieczny.

Génération des données à des moments différents

new Thread(new Runnable() {
          override def run(): Unit = {
        
            inputStream.addData(
              ("1","node1","hostname1","s1", new Timestamp(30000L),100, 60, 10, 15, 1),
              ("2","node1","hostname1","s1", new Timestamp(40000L),80, 40, 20, 10, 1),
              ("3","node2","hostname1","s2", new Timestamp(70000L),101, 60, 10, 15, 1),
              ("4","node1","hostname1","s2", new Timestamp(100000L),100, 65, 10, 15, 1))

            while (!query.isActive) {}
            // to be efficient, sleep must be > trigger processing (60s when window("datum","60 seconds","60 seconds")
            Thread.sleep(20000)

            inputStream.addData(
              ("5","node1","hostname1","s1", new Timestamp(58500L),100, 60, 10, 15, 1),    // ko: time < limit time
              ("6","node2","hostname1","s1", new Timestamp(59500L),100, 60, 10, 15, 1),    // ko: time < limit time
              ("7","node1","hostname1","s1", new Timestamp(95500L),100, 120, 40, 15, 1),    // ok: time > limit time
              ("4","node1","hostname1","s2", new Timestamp(100000L),100, 65, 10, 15, 1),    // ko: duplicate
              ("8","node2","hostname1","s1", new Timestamp(190000L),100, 90, 10, 15, 1))   // ok: time > limit time
            Thread.sleep(20000)
          }
        }).start()

Stockage des résultats dans un objet en mémoire

Les données générées sont ajoutées à un objet en mémoire de type MemoryStream.

      val testKey = "aggAccess should have good results withwatermark"
      val inputStream = new MemoryStream[(String, String, String, String, 
            Timestamp, Long, Long, Float, Float, Long)](1, spark.sqlContext)
      val inputDf = inputStream.toDS().toDF("request_id", "cp","node", "status",
            "datum", "bytes", "cache_hit_bytes", "request_time", "response_start_time", "total")
      val aggregatedStream = aggAccess(inputDf, "5 seconds")
      val query = aggregatedStream.writeStream.trigger(Trigger.ProcessingTime(10000)).outputMode("update")
            .foreach(
                  new InMemoryStoreWriter[Row](testKey, row =>
                        s"${row.getAs[Long]("datum")} / " +
                        s"${row.getAs[String]("cp")} / " +
                        s"${row.getAs[String]("node")} / " +
                        s"${row.getAs[String]("status")} -> " +
                        s"${row.getAs[Long]("bytes")} -> " +
                        s"${row.getAs[Long]("cachehitbytes")} -> " +
                        s"${row.getAs[Float]("requesttime")} -> " +
                        s"${row.getAs[Float]("responsestarttime")} -> " +
                        s"${row.getAs[Long]("total")}"))
          .start()

Le stockage en mémoire évite les problèmes d’écriture disque et les soucis de latence éventuels. En paramètre, on passe une fonction qui permet de formater comme on le souhaite les données en mémoire.

Comparaison des résultats

Il suffit enfin de comparer ce que nous avons stocké en mémoire avec une liste de résultat attendu.

val readValues = InMemoryKeyedStore.getValues(testKey)
        readValues.foreach(println)

        assert(readValues.size == 6)
        val list = List(
          "1970-01-01 00:01:00.0 / node2 / hostname1 / s2 -> 101 -> 60 -> 10.0 -> 15.0 -> 1",
          "1970-01-01 00:00:00.0 / node1 / hostname1 / s1 -> 180 -> 100 -> 30.0 -> 25.0 -> 2",
          "1970-01-01 00:01:00.0 / node1 / hostname1 / s2 -> 100 -> 65 -> 10.0 -> 15.0 -> 1",
          "1970-01-01 00:01:00.0 / node1 / hostname1 / s1 -> 100 -> 120 -> 40.0 -> 15.0 -> 1",
          "1970-01-01 00:03:00.0 / node2 / hostname1 / s1 -> 100 -> 90 -> 10.0 -> 15.0 -> 1",
          "1970-01-01 00:03:00.0 / node2 / hostname1 / s1 -> 277 -> 233 -> 33.0 -> 52.0 -> 3"
        )
        assert(list.filter(readValues.contains(_)) == list)

Cela termine ce deuxième article. Dans le prochain et dernier post, nous aborderons les tests de performances.