最近查阅 Spark SQL 源码时看到了很久之前用过的获取单调递增 id 方法的实现,本文简要记录。之前在离线场景下有给记录生成唯一 id 的需求,当时使用了 Spark SQL 中的 monotonically_increasing_id
方法,其源码位于 functions.scala at v2.4.4:
1 | /** |
追踪源码至 MonotonicallyIncreasingID.scala at v2.4.4:
1 | /** |
该算法实现其实非常简单,生成的 id 占用 8 个字节,即 64 位,分区 id 占用高 31 位,记录 id 占用低 33 位。使用分区 id 左移 33 位然后再加上当前分区该记录的 id 即可得到该 64 位 id 值。该实现与 Twitter 的 Snowflake 算法很相似,都处于分布式的环境中,均未采用一个单独的发号器来实现。
使用该方法时遇到的一个问题就是该算法的起始值为 0,在之前的业务场景中,我们将离线数据生成了 id
并写入至 Hive 数据表后,同时使用 INSERT INTO
往 MySQL 写入了一份。为了保证数据一致,在 INSERT INTO
至 MySQL 时,使用的该算法生成的 id
,最后发现竟然有记录不一致,经过一阵排查,原来是 id
为 0 的数据导致。在 MySQL 中,如果往自增主键写入值 0,此时会被当作写入 NULL 处理,MySQL 会为该字段获取自增值,导致这条 id
为 0 的记录看起来消失了,而多出了一条记录。之前为了处理这个问题,我在代码里将 monotonically_increasing_id
多加了 1 以规避该情况,业务代码如下:
1 | val bizDF = spark.sql(s"SELECT ...") |
当然也可以通过更改 MySQL 的配置实现,配置选项可以参考:MySQL :: MySQL 5.7 Reference Manual :: 5.1.10 Server SQL Modes,但是在文档中已经提到了不建议在自增列中存储值 0,原文如下:
Storing 0 is not a recommended practice, by the way.