Spark: Une brève histoire du temps

Même si j’apprécie à titre personnel des films sur le paradoxe temporel, j’avoue qu’à titre professionnel, le traitement du temps et des timezones m’a toujours laissé un peu de glace. Et il faut dire que dans la majorité des cas, ce n’est pas vraiment un problème.

Oui mais voilà, quand on travaille sur un projet international avec des données en streaming, une bonne gestion du temps devient primordiale.

Au commencement, des problèmes inexpliqués avec les timestamp

Depuis plusieurs semaines, je travaille sur un projet avec spark structured streaming. Mon code est développé et testé en local puis il est de nouveau testé sur Gitlab, une fois le code poussé.

Or ce matin, j’ai reçu un mail de Gitlab pour me signifier une erreur dans mes tests ! Très bizarre, d’autant plus que je n’ai pas fait de changement depuis plusieurs jours. Je teste en local pour me rassurer mais là aussi tous mes tests sont en erreurs !!!

En détaillant certaines erreurs, je tombe sur ceci:

[info] - should have good results without watermark *** FAILED ***
[info]   ListBuffer("1970-01-01 01:00:06.0 / node2 / hostname1 -> 150", "1970-01-01 01:00:06.0 / node1 / hostname1 -> 190", "1970-01-01 01:00:06.0 / node2 / hostname2 -> 75", "1970-01-01 01:00:06.0 / node1 / hostname1 -> 960", "1970-01-01 01:00:06.0 / node1 / hostname2 -> 90") did not contain "1970-01-01 00:00:06.0 / node1 / hostname1 -> 960"

Je constate que mon résultat est décalé d’une heure. Tiens, se pourrait-il que cela soit dû au changement d’heure de la semaine dernière ? Ce projet est pourtant assez particulier: les timestamps reçus ne contiennent pas de timezone et sont traités en heure UTC. Non, ça ne doit pas être ça. Puis je me suis rappelé que j’ai travaillé cette semaine sur une nouvelle version de mon image SBT et que j’ai profité de cette évolution pour mettre à jour la commande de lancement docker du container avec la timezone Europe/Paris. Je tenais une bonne piste.

Quelques recherches sur Internet plus tard, j’étais sûr d’avoir identifié la bonne piste.

Gestion des timestamps sour Spark sans timezone

Pour comprendre la gestion des timestamps, j’ai donc fait quelques tests sous Spark. J’utilise pour cela une image Docker de ma composition (swalu/spark-master). Je décide de démarrer mon cluster comme d’habitude sans la timezone.

❯ docker run -d --rm --net sparkCluster -p 4040:4040 -p 8080:8080 -p 8081:8081 -v $PWD:/root --name spark-master -h spark-master swal4u/spark-master:v2.3.0.3
0b373e5570cb97c77509b7756625ffd4216d8ca58fdc3ee96690ba8c7b68965e

~
❯ spark-shell
20/11/08 17:40:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://spark-master:4040
Spark context available as 'sc' (master = spark://spark-master:7077, app id = app-20201108174058-0000).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

En parallèle, je me connecte au container pour visualiser l’heure.

❯ docker exec -it spark-master bash
root@spark-master:~# date
Sun Nov  8 17:43:43 UTC 2020

On est bien en heure UTC avec un décalage d’une heure avec mon heure locale (18h43).

scala> import java.sql.Timestamp
import java.sql.Timestamp

scala> val now = 1576800000000L
now: Long = 1576800000000

scala> val ds = Seq(new Timestamp(now)).toDS().withColumn("localTime",from_utc_timestamp(col("value"), "UTC"))
ds: org.apache.spark.sql.DataFrame = [value: timestamp, localTime: timestamp]

scala> ds.show
+-------------------+-------------------+
|              value|          localTime|
+-------------------+-------------------+
|2019-12-20 00:00:00|2019-12-20 00:00:00|
+-------------------+-------------------+

Dans cet exemple, je crée un timestamp à partir d’un long puis je le convertis en heure locale, ici UTC. Comme prévu, la valeur initiale et la valeur convertie sont identiques. Que se passe t’il maintenant si notre système est paramétré avec une timezone différente ?

Gestion des timestamps sour Spark avec timezone

On relance donc le cluster Spark en rajoutant l’instruction pour la time zone -e TZ=Europe/Paris. On vérifie que le paramètre a bien été pris en compte.

❯ docker exec -it spark-master bash
root@spark-master:~# date
Sun Nov  8 18:58:48 CET 2020

Puis on relance le test précédent.

scala> val now = 1576800000000L
now: Long = 1576800000000

scala> val ds = Seq(new Timestamp(now)).toDS().withColumn("localTime",from_utc_timestamp(col("value"), "UTC"))
ds: org.apache.spark.sql.DataFrame = [value: timestamp, localTime: timestamp]

scala> ds.show
+-------------------+-------------------+
|              value|          localTime|
+-------------------+-------------------+
|2019-12-20 01:00:00|2019-12-20 01:00:00|
+-------------------+-------------------+

Ce résultat fut une vraie surprise pour moi. Je m’attendais à trouver les mêmes valeurs que tout à l’heure. Mais ici, les valeurs sont décalées d’une heure. Il y a donc une conversion implicite des timestamps en heure locale démontrée par la première colonne. La deuxième colonne est logique puisque si on considère que la première colonne est en UTC, le résultat doit être identique.

Regardons maintenant ce qui se passe si on demande l’heure de Paris.

scala> val ds = Seq(new Timestamp(now)).toDS().withColumn("localTime",from_utc_timestamp(col("value"), "Europe/Paris"))
ds: org.apache.spark.sql.DataFrame = [value: timestamp, localTime: timestamp]

scala> ds.show
+-------------------+-------------------+
|              value|          localTime|
+-------------------+-------------------+
|2019-12-20 01:00:00|2019-12-20 02:00:00|
+-------------------+-------------------+

On retrouve une valeur décalée d’une heure car nous sommes à l’heure d’hiver. Que se passe t’il si on est en été ?

scala> val now = 1576800000000L + 15552000000L
now: Long = 1592352000000

scala> val ds = Seq(new Timestamp(now)).toDS().withColumn("localTime",from_utc_timestamp(col("value"), "Europe/Paris"))
ds: org.apache.spark.sql.DataFrame = [value: timestamp, localTime: timestamp]

scala> ds.show
+-------------------+-------------------+
|              value|          localTime|
+-------------------+-------------------+
|2020-06-17 02:00:00|2020-06-17 04:00:00|
+-------------------+-------------------+

On a bien les 2h de décalage, compte tenu du passage à l’heure d’été.

Recommandations

Que faut-il faire ? Etant donné que les environnements de production sont configurés avec des timezones correctes, nous sommes obligés d’en tenir compte dans nos développements sous peine d’avoir des résultats inattendus en production.

La meilleure manière d’opérer est de lancer vos containers avec la bonne timezone paramétrée. Concernant les tests, vous pouvez procéder de la même manière, en exprimant les timestamps dans la timezone locale. Mais cela risque de poser un problème lors de vos sous Gitlab car vous ne connaissez pas la timezone des serveurs sui seront utilisés. Je vous conseille donc pour vos tests de fixer la timezone sur l’UTC.

Pour se faire, il suffit de rajouter la ligne suivante avant le début de vos tests.

TimeZone.setDefault(TimeZone.getTimeZone("UTC"))

Références

Deux liens qui m’ont permis d’y voir plus clair sur ce sujet: