MongoSpark 实现 upsert 操作

  • lovebing 
  • 未分类

Spark 版本: 2.1

通常我们经过各种算得到 RDD,MongoSpark 没有有直接提供 upsert 或 update 的方法,因为保存 RDD使用的是 insertMany的方法。但是 WriteConfig 提供了一个 replaceDocument 的参数,仅对 Dataset 有效,当存在 _id 字段的时候,可以实现 replace 或 upsert 的功能。

但是当我把 RDD 转成 Dataset 后保存 Trenbolone Hexahydrobenzylcarbonate,发现了几个问题

  1. Bean Class 必须有 get_id 和 set_id 的方法,@Id 注解无效。
  2. java.sql.Timestamp 类型保存后变为普通的 object。

https://docs.mongodb.com/spark-connector/master/scala/datasets-and-sql/
文档上说 Mongo 的 Date 对应 java 的 java.sql.Timestamp。
原来是时间这样的:
ISODate("2017-10-18T14:36:58.849Z")

现在变成这样了:
{
"date" : 18,
"hours" : 22,
"minutes" : 36,
"month" : 9,
"seconds" : 58,
"time" : NumberLong(1508337418849),
"year" : 117
}

这显示不是我们需要的结果。

import java.sql.Timestamp;
import java.util.Date;
class CustomClass {
private Timestamp time;
public void setTime(Timestamp time) {
this.time = time;
}
public Date getTime() {
return time;
}
}

通过调试(需要安装 scala sdk)发现,问题在于 Encoders.bean(CustomClass.class) 这个方法。 Timestamp 获得的 schema 为

StructType(
StructField(date,IntegerType,false), StructField(day,IntegerType,false), StructField(hours,IntegerType,false), StructField(minutes,IntegerType,false), StructField(month,IntegerType,false), StructField(seconds,IntegerType,false), StructField(time,LongType,false), StructField(timezoneOffset,IntegerType,false), StructField(year,IntegerType,false)
)
而期望的 schema 应该是 DateType 或 TimestampType。
而直接使用 Encoders.bean(Timestamp.class),就没有问题。
原因在于 getTime的返回类型为 java.util.Date, 原来用的是 Date, 后来改用 Timestamp 后,忘记修改 getTime 的返回类型。

发表评论

电子邮件地址不会被公开。 必填项已用*标注