sparksql插入postgresql 字段格式不匹配报错处理

2021/7/12 19:06:40

本文主要是介绍sparksql插入postgresql 字段格式不匹配报错处理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1、错误关键信息

1
2
3
4
5
Caused by: org.postgresql.util.PSQLException: ERROR: column "c1" is of type point but expression is of type character
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:323)
    at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859)

详细异常样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, info3, executor 1): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO sink_pg_newtable_10 ("column1","column2","column3","column4","column5","column6","column7","column8","column9","column10","column11","column13","column14","column15","column16","column17","column18","column19","column20","column21","column22","column23","column24","column25","column27","column28","column29","column30","column31","column32","column33","column34","column35","column36","column37","column38","column39","column40","column41","column42","column44","column45","column46","column47","column48","column49","column50","column51","column52","column53","column54","column55","column56","column57","column58","column59","column60") VALUES (NULL,'2','true','false',NULL,'(1,2),(1,2)',NULL,'1','192','192.168.0.0/32','<(1,1),5>',NULL,'[2020-09-10,2020-10-09)',NULL,NULL,NULL,'192.168.1.32','11','1','11','[2,4)','11','[124,456)','78','11 days 11:11:11','{"a":1,"b":2}','{"a": 1, "b": 2}','$."action"','{1,-1,0}','[(1,1),(2,2)]','08:00:2b:01:02:03','08:00:2b:ff:fe:01:02:03','11.0',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'3','3','3',NULL,'3',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) was aborted: ERROR: column "column1" is of type bigint but expression is of type character
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:169)
    at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:862)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:901)
    at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:676)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "column1" is of type bigint but expression is of type character
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:323)
    at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:980)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:978)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:978)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:838)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    ......
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO sink_pg_newtable_10 ("column1","column2","column3","column4","column5","column6","column7","column8","column9","column10","column11","column13","column14","column15","column16","column17","column18","column19","column20","column21","column22","column23","column24","column25","column27","column28","column29","column30","column31","column32","column33","column34","column35","column36","column37","column38","column39","column40","column41","column42","column44","column45","column46","column47","column48","column49","column50","column51","column52","column53","column54","column55","column56","column57","column58","column59","column60") VALUES (NULL,'2','true','false',NULL,'(1,2),(1,2)',NULL,'1','192','192.168.0.0/32','<(1,1),5>',NULL,'[2020-09-10,2020-10-09)',NULL,NULL,NULL,'192.168.1.32','11','1','11','[2,4)','11','[124,456)','78','11 days 11:11:11','{"a":1,"b":2}','{"a": 1, "b": 2}','$."action"','{1,-1,0}','[(1,1),(2,2)]','08:00:2b:01:02:03','08:00:2b:ff:fe:01:02:03','11.0',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'3','3','3',NULL,'3',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) was aborted: ERROR: column "column1" is of type bigint but expression is of type character
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:169)
    at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:862)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:901)
    at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:676)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "column1" is of type bigint but expression is of type character
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:323)
    at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859)

2、解决方案

     这里要分为两种情况去考虑:

  1)第一种情况,插入字段值不为 NULL

        这种情况主要是数据类型不匹配,pg在数据插入时不能正确的类型转换,举例说明:如 字段 c1 类型为point,值为 (1,2),则插入语句像 "insert into table (c1) values('(1,2)'); ",这里的c1字段值默认被按照varchar类型处理了,pg不能正确的将它转换为 point类型。那么解决方案就很简单,配置数据库连接url参数即可:"stringtype=unspecified",申明pg自动处理字符串转换逻辑。

   

 2)第二种情况,插入字段值为NULL

        这种情况问题主要出在sparksql代码里,我们来分析一下。sparksql在进行jdbc操作时需要对数据类型转换,spark里面支持的类型和数据库支持的类型需要映射,这部分实现主要在不同的数据库方言里面,如pg数据库的方言实现类:org.apache.spark.sql.jdbc.PostgresDialect,sparksql在使用的时候通过JdbcDialects.registerDialect(PostgresDialect)注册,然后使用。

        那么我们来举一个例子分析一下问题解决方案,还是c1字段,值为NULL,类型为point。那么在执行sql "insert into table (c1) values(NULL); ",按理说这句sql在pg里面直接执行是不报错的如果字段没有非空约束的话。为什么报错了呢?

      debug 跟代码到sparksql中类org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.savePartition()方法中第661行,这里有通过PrepareStatement设置参数的方式设置字段值,当值为NULL时,这里调用的是 stmt.setNull(i + 1, nullTypes(i)) 方法专门来处理NULL值。

1
2
3
4
5
6
7
8
while (i < numFields) {
            if (row.isNullAt(i)) {
              stmt.setNull(i + 1, nullTypes(i)) //注意此处专门处理NULL值
            } else {
              setters(i).apply(stmt, row, i)
            }
            i = i + 1
          }

那么问题来了,setNull()方法有两个参数,第一个是字段下标,第二个是空值时的映射类型,就是当插入字段值为空时,告诉数据库这个数值按照那个类型去处理。那么这样就明了,我们指定正确的类型就行了。但是问题又来了,这字段在数据库到底是什么类型呢?这个我们提前是一定不知道,因为spark并不支持所有的数据库字段类型,比如pg中的point类型,映射到spark中是StringType类型,但是反过来从StringType就不能推测出数据库中类型了,因为可能有很多类型都转成StringType类型了。比如:path,jsonb等pg支持的而通用sql又不支持的类型。

        再来看nullTypes(i)这个代码,这里nullTypes这个是一个数组,存储的是每个字段空值的时候对应的映射的类型。他在哪里赋值的呢,看代码

1
val nullTypes = rddSchema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType) //这里从getJdbcType方法得到的datatype获取空值映射类型

从上面代码我们看到他是在方言实现类的 getJdbcType 方法获得的,我们再看pg的方言中该方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
  case StringType => Some(JdbcType("TEXT", Types.CHAR)) //注意这里,c1对应的是这个,看出空值时c1的NULL值会按照CHAR类型插入,自然会报错
  case BinaryType => Some(JdbcType("BYTEA", Types.BINARY))
  case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN))
  case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT))
  case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE))
  case ShortType => Some(JdbcType("SMALLINT", Types.SMALLINT))
  case t: DecimalType => Some(
    JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC))
  case ArrayType(et, _) if et.isInstanceOf[AtomicType] =>
    getJDBCType(et).map(_.databaseTypeDefinition)
      .orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition))
      .map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY))
  case ByteType => throw new IllegalArgumentException(s"Unsupported type in postgresql: $dt");
  case _ => None
}

从代码 case StringType => Some(JdbcType("TEXT", Types.CHAR)) 中我们看到c1对应的StringType类型,空值时c1的NULL值会按照CHAR类型插入,不是数据库中真实类型point类型,自然会报错。

        那么怎么处理?其实我们可以指定一个空类型让pg自己决定最终使用哪种类型插入,避免定死了类型后强转不了儿报错。那么就重写PostgresDialect方言,重写 getJDBCType 方法,修改如下(java版本):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Types;
 
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcType;
import org.apache.spark.sql.types.ArrayType$;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType$;
import org.apache.spark.sql.types.ByteType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType$;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DecimalType$;
import org.apache.spark.sql.types.DoubleType$;
import org.apache.spark.sql.types.FloatType$;
import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.ShortType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.TimestampType$;
 
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
 
public class PostgresDialect extends JdbcDialect {
    private static final long serialVersionUID = -5826284056572945657L;
 
    @Override
    public boolean canHandle(String url) {
        return url.startsWith("jdbc:postgresql:");
    }
     
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @Override
    public Option getCatalystType(int sqlType, String typeName, int size, MetadataBuilder md) {
        if (sqlType == Types.REAL) {
            return Option$.MODULE$.apply(FloatType$.MODULE$);
        } else if (sqlType == Types.SMALLINT) {
            return Option$.MODULE$.apply(ShortType$.MODULE$);
        } else if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
            return Option$.MODULE$.apply(BinaryType$.MODULE$);
        } else if (sqlType == Types.OTHER) {
            return Option$.MODULE$.apply(StringType$.MODULE$);
        } else if (sqlType == Types.ARRAY) {
            int scale = (int) md.build().getLong("scale");
            return Option$.MODULE$.apply(ArrayType$.MODULE$.apply(toCatalystType(typeName, size, scale).get()));
        } else {
            return None$.MODULE$;
        }
    }
 
    private Option<DataType> toCatalystType(String typeName, int precision, int scale) {
        switch (typeName) {
        case "bool":
            return Option$.MODULE$.apply(BinaryType$.MODULE$);
        case "int2":
            return Option$.MODULE$.apply(ShortType$.MODULE$);
        case "int4":
            return Option$.MODULE$.apply(IntegerType$.MODULE$);
        case "int8":
        case "oid":
            return Option$.MODULE$.apply(LongType$.MODULE$);
        case "float4":
            return Option$.MODULE$.apply(FloatType$.MODULE$);
        case "money":
        case "float8":
            return Option$.MODULE$.apply(DoubleType$.MODULE$);
        case "text":
        case "varchar":
        case "char":
        case "cidr":
        case "inet":
        case "json":
        case "jsonb":
        case "uuid":
            return Option$.MODULE$.apply(StringType$.MODULE$);
        case "bytea":
            return Option$.MODULE$.apply(BinaryType$.MODULE$);
        case "timestamp":
        case "timestamptz":
        case "time":
        case "timetz":
            return Option$.MODULE$.apply(TimestampType$.MODULE$);
        case "date":
            return Option$.MODULE$.apply(DateType$.MODULE$);
        case "numeric":
        case "decimal":
            if (precision > 0) {
                return Option$.MODULE$.apply(new DecimalType(Math.min(precision, DecimalType$.MODULE$.MAX_PRECISION()),
                        Math.min(scale, DecimalType$.MODULE$.MAX_SCALE())));
            } else {
                return Option$.MODULE$.apply(new DecimalType(DecimalType$.MODULE$.MAX_PRECISION(), 18));
            }
        default:
            return null;
        }
 
    }
 
    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    public Option<JdbcType> getJDBCType(DataType dt) {
        Object obj;
        DataType datatype = dt;
        if (StringType$.MODULE$.equals(datatype)) {
            obj = new Some(new JdbcType("TEXT", Types.NULL)); // 改成Types.NULL
        } else if (BinaryType$.MODULE$.equals(datatype)) {
            obj = new Some(new JdbcType("BYTEA", Types.BINARY));
        } else if (BooleanType$.MODULE$.equals(datatype)) {
            obj = new Some(new JdbcType("BOOLEAN", Types.BOOLEAN));
        } else if (FloatType$.MODULE$.equals(datatype)) {
            obj = new Some(new JdbcType("FLOAT4", Types.FLOAT));
        } else if (DoubleType$.MODULE$.equals(datatype)) {
            obj = new Some(new JdbcType("FLOAT8", Types.DOUBLE));
        } else if (ShortType$.MODULE$.equals(datatype)) {
            obj = new Some(new JdbcType("SMALLINT", Types.SMALLINT));
        } else if (DecimalType$.MODULE$.SYSTEM_DEFAULT().equals(datatype)) {
            obj = new Some(new JdbcType("NUMBER(38,18)", Types.NUMERIC));
        } else if (ByteType$.MODULE$.equals(datatype)) {
            throw new IllegalArgumentException("Unsupported type in postgresql:" + dt);
        } else {
            obj = None$.MODULE$;
        }
        return ((Option) (obj));
    }
 
    @Override
    public String getTableExistsQuery(String table) {
        return "SELECT 1 FROM " + table + " LIMIT 1";
    }
 
    @Override
    public String getTruncateQuery(String table, Option<Object> cascade) {
        Object object = cascade.get();
        if (object != null && Boolean.valueOf(object.toString())) {
            return "TRUNCATE TABLE ONLY " + table + " CASCADE";
        }
        return "TRUNCATE TABLE ONLY" + table;
 
    }
 
    @Override
    public void beforeFetch(Connection connection, Map<String, String> properties) {
        super.beforeFetch(connection, properties);
        java.util.Map<String, String> javaMap = JavaConverters.mapAsJavaMapConverter(properties).asJava();
        String stringOption = javaMap.get(JDBCOptions.JDBC_BATCH_FETCH_SIZE());
        if (!stringOption.isEmpty() && Integer.valueOf(stringOption) > 0) {
            try {
                connection.setAutoCommit(false);
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }
 
}

主要修改这里

1
2
3
if (StringType$.MODULE$.equals(datatype)) {
            obj = new Some(new JdbcType("TEXT", Types.NULL)); // 改成Types.NULL
        }

然后卸载原来的方言,只用自定义的方言即可。

1
2
3
4
5
JdbcDialects.unregisterDialect(org.apache.spark.sql.jdbc.PostgresDialect$.MODULE$); //卸载原来的方言实现
JdbcDialects.registerDialect(new PostgresDialect()); //注册自定义的方言实现
 
//数据操作逻辑
 ........

     声明:文中有定义命名不一致的地方请不要计较这些细节,只是举例说明无需严格对应关系。



这篇关于sparksql插入postgresql 字段格式不匹配报错处理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


原文链接: https://blog.csdn.net/u011511210/article/details/118676210
扫一扫关注最新编程教程