MongoSpark 实现 upsert 操作

Spark 版本: 2.1

通常我们经过各种算得到 RDD,MongoSpark 没有有直接提供 upsert 或 update 的方法,因为保存 RDD使用的是 insertMany的方法。但是 WriteConfig 提供了一个 replaceDocument 的参数,仅对 Dataset 有效,当存在 _id 字段的时候,可以实现 replace 或 upsert 的功能。

但是当我把 RDD 转成 Dataset 后保存 Trenbolone Hexahydrobenzylcarbonate,发现了几个问题

  1. Bean Class 必须有 get_id 和 set_id 的方法,@Id 注解无效。
  2. java.sql.Timestamp 类型保存后变为普通的 object。

https://docs.mongodb.com/spark-connector/master/scala/datasets-and-sql/,文档上说 Mongo 的 Date 对应 java 的 java.sql.Timestamp。原来是时间这样的:

ISODate("2017-10-18T14:36:58.849Z")

现在变成这样了:

 {
 "date" : 18,
 "hours" : 22,
 "minutes" : 36,
 "month" : 9,
 "seconds" : 58,
 "time" : NumberLong(1508337418849),
 "year" : 117
 }

这显然不是我们需要的结果。

import java.sql.Timestamp;
import java.util.Date;
class CustomClass {
 private Timestamp time;
 public void setTime(Timestamp time) {
   this.time = time;
 }
 public Date getTime() {
   return time;
 }
}

通过调试(需要安装 scala sdk)发现,问题在于 Encoders.bean(CustomClass.class) 这个方法。 Timestamp 获得的 schema 为

StructType(
 StructField(date,IntegerType,false), StructField(day,IntegerType,false), 
 StructField(hours,IntegerType,false), StructField(minutes,IntegerType,false), 
 StructField(month,IntegerType,false), StructField(seconds,IntegerType,false), 
 StructField(time,LongType,false), StructField(timezoneOffset,IntegerType,false), 
 StructField(year,IntegerType,false)
)

期望的 schema 应该是 DateType 或 TimestampType,而直接使用 Encoders.bean(Timestamp.class),就没有问题。原因在于 getTime的返回类型为 java.util.Date, 原来用的是 Date, 后来改用 Timestamp 后,忘记修改 getTime 的返回类型。