sparksql插入postgresql 字段格式不匹配报错处理
2021/7/12 19:06:40
本文主要是介绍sparksql插入postgresql 字段格式不匹配报错处理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1、错误关键信息
Caused by: org.postgresql.util.PSQLException: ERROR: column "c1" is of type point but expression is of type character at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:323) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859)
详细异常样例:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, info3, executor 1): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO sink_pg_newtable_10 ("column1","column2","column3","column4","column5","column6","column7","column8","column9","column10","column11","column13","column14","column15","column16","column17","column18","column19","column20","column21","column22","column23","column24","column25","column27","column28","column29","column30","column31","column32","column33","column34","column35","column36","column37","column38","column39","column40","column41","column42","column44","column45","column46","column47","column48","column49","column50","column51","column52","column53","column54","column55","column56","column57","column58","column59","column60") VALUES (NULL,'2','true','false',NULL,'(1,2),(1,2)',NULL,'1','192','192.168.0.0/32','<(1,1),5>',NULL,'[2020-09-10,2020-10-09)',NULL,NULL,NULL,'192.168.1.32','11','1','11','[2,4)','11','[124,456)','78','11 days 11:11:11','{"a":1,"b":2}','{"a": 1, "b": 2}','$."action"','{1,-1,0}','[(1,1),(2,2)]','08:00:2b:01:02:03','08:00:2b:ff:fe:01:02:03','11.0',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'3','3','3',NULL,'3',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) was aborted: ERROR: column "column1" is of type bigint but expression is of type character at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:169) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:862) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:901) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:676) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.postgresql.util.PSQLException: ERROR: column "column1" is of type bigint but expression is of type character at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:323) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:980) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:978) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:978) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:838) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) ...... at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO sink_pg_newtable_10 ("column1","column2","column3","column4","column5","column6","column7","column8","column9","column10","column11","column13","column14","column15","column16","column17","column18","column19","column20","column21","column22","column23","column24","column25","column27","column28","column29","column30","column31","column32","column33","column34","column35","column36","column37","column38","column39","column40","column41","column42","column44","column45","column46","column47","column48","column49","column50","column51","column52","column53","column54","column55","column56","column57","column58","column59","column60") VALUES (NULL,'2','true','false',NULL,'(1,2),(1,2)',NULL,'1','192','192.168.0.0/32','<(1,1),5>',NULL,'[2020-09-10,2020-10-09)',NULL,NULL,NULL,'192.168.1.32','11','1','11','[2,4)','11','[124,456)','78','11 days 11:11:11','{"a":1,"b":2}','{"a": 1, "b": 2}','$."action"','{1,-1,0}','[(1,1),(2,2)]','08:00:2b:01:02:03','08:00:2b:ff:fe:01:02:03','11.0',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'3','3','3',NULL,'3',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) was aborted: ERROR: column "column1" is of type bigint but expression is of type character at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:169) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:862) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:901) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:676) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.postgresql.util.PSQLException: ERROR: column "column1" is of type bigint but expression is of type character at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:323) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859)
2、解决方案
这里要分为两种情况去考虑:
1)第一种情况,插入字段值不为 NULL
这种情况主要是数据类型不匹配,pg在数据插入时不能正确的类型转换,举例说明:如 字段 c1 类型为point,值为 (1,2),则插入语句像 "insert into table (c1) values('(1,2)'); ",这里的c1字段值默认被按照varchar类型处理了,pg不能正确的将它转换为 point类型。那么解决方案就很简单,配置数据库连接url参数即可:"stringtype=unspecified",申明pg自动处理字符串转换逻辑。
url=jdbc:postgresql://192.168.1.84:5432/postgres?stringtype=unspecified
2)第二种情况,插入字段值为NULL
这种情况问题主要出在sparksql代码里,我们来分析一下。sparksql在进行jdbc操作时需要对数据类型转换,spark里面支持的类型和数据库支持的类型需要映射,这部分实现主要在不同的数据库方言里面,如pg数据库的方言实现类:org.apache.spark.sql.jdbc.PostgresDialect,sparksql在使用的时候通过JdbcDialects.registerDialect(PostgresDialect)注册,然后使用。
那么我们来举一个例子分析一下问题解决方案,还是c1字段,值为NULL,类型为point。那么在执行sql "insert into table (c1) values(NULL); ",按理说这句sql在pg里面直接执行是不报错的如果字段没有非空约束的话。为什么报错了呢?
debug 跟代码到sparksql中类org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.savePartition()方法中第661行,这里有通过PrepareStatement设置参数的方式设置字段值,当值为NULL时,这里调用的是 stmt.setNull(i + 1, nullTypes(i)) 方法专门来处理NULL值。
while (i < numFields) { if (row.isNullAt(i)) { stmt.setNull(i + 1, nullTypes(i)) //注意此处专门处理NULL值 } else { setters(i).apply(stmt, row, i) } i = i + 1 }
那么问题来了,setNull()方法有两个参数,第一个是字段下标,第二个是空值时的映射类型,就是当插入字段值为空时,告诉数据库这个数值按照那个类型去处理。那么这样就明了,我们指定正确的类型就行了。但是问题又来了,这字段在数据库到底是什么类型呢?这个我们提前是一定不知道,因为spark并不支持所有的数据库字段类型,比如pg中的point类型,映射到spark中是StringType类型,但是反过来从StringType就不能推测出数据库中类型了,因为可能有很多类型都转成StringType类型了。比如:path,jsonb等pg支持的而通用sql又不支持的类型。
再来看nullTypes(i)这个代码,这里nullTypes这个是一个数组,存储的是每个字段空值的时候对应的映射的类型。他在哪里赋值的呢,看代码
val nullTypes = rddSchema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType) //这里从getJdbcType方法得到的datatype获取空值映射类型
从上面代码我们看到他是在方言实现类的 getJdbcType 方法获得的,我们再看pg的方言中该方法实现
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("TEXT", Types.CHAR)) //注意这里,c1对应的是这个,看出空值时c1的NULL值会按照CHAR类型插入,自然会报错 case BinaryType => Some(JdbcType("BYTEA", Types.BINARY)) case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN)) case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT)) case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE)) case ShortType => Some(JdbcType("SMALLINT", Types.SMALLINT)) case t: DecimalType => Some( JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC)) case ArrayType(et, _) if et.isInstanceOf[AtomicType] => getJDBCType(et).map(_.databaseTypeDefinition) .orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition)) .map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY)) case ByteType => throw new IllegalArgumentException(s"Unsupported type in postgresql: $dt"); case _ => None }
从代码 case StringType => Some(JdbcType("TEXT", Types.CHAR)) 中我们看到c1对应的StringType类型,空值时c1的NULL值会按照CHAR类型插入,不是数据库中真实类型point类型,自然会报错。
那么怎么处理?其实我们可以指定一个空类型让pg自己决定最终使用哪种类型插入,避免定死了类型后强转不了儿报错。那么就重写PostgresDialect方言,重写 getJDBCType 方法,修改如下(java版本):
import java.sql.Connection; import java.sql.SQLException; import java.sql.Types; import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions; import org.apache.spark.sql.jdbc.JdbcDialect; import org.apache.spark.sql.jdbc.JdbcType; import org.apache.spark.sql.types.ArrayType$; import org.apache.spark.sql.types.BinaryType$; import org.apache.spark.sql.types.BooleanType$; import org.apache.spark.sql.types.ByteType$; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DateType$; import org.apache.spark.sql.types.DecimalType; import org.apache.spark.sql.types.DecimalType$; import org.apache.spark.sql.types.DoubleType$; import org.apache.spark.sql.types.FloatType$; import org.apache.spark.sql.types.IntegerType$; import org.apache.spark.sql.types.LongType$; import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.ShortType$; import org.apache.spark.sql.types.StringType$; import org.apache.spark.sql.types.TimestampType$; import scala.None$; import scala.Option; import scala.Option$; import scala.Some; import scala.collection.JavaConverters; import scala.collection.immutable.Map; public class PostgresDialect extends JdbcDialect { private static final long serialVersionUID = -5826284056572945657L; @Override public boolean canHandle(String url) { return url.startsWith("jdbc:postgresql:"); } @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public Option getCatalystType(int sqlType, String typeName, int size, MetadataBuilder md) { if (sqlType == Types.REAL) { return Option$.MODULE$.apply(FloatType$.MODULE$); } else if (sqlType == Types.SMALLINT) { return Option$.MODULE$.apply(ShortType$.MODULE$); } else if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) { return Option$.MODULE$.apply(BinaryType$.MODULE$); } else if (sqlType == Types.OTHER) { return Option$.MODULE$.apply(StringType$.MODULE$); } else if (sqlType == Types.ARRAY) { int scale = (int) md.build().getLong("scale"); return Option$.MODULE$.apply(ArrayType$.MODULE$.apply(toCatalystType(typeName, size, scale).get())); } else { return None$.MODULE$; } } private Option<DataType> toCatalystType(String typeName, int precision, int scale) { switch (typeName) { case "bool": return Option$.MODULE$.apply(BinaryType$.MODULE$); case "int2": return Option$.MODULE$.apply(ShortType$.MODULE$); case "int4": return Option$.MODULE$.apply(IntegerType$.MODULE$); case "int8": case "oid": return Option$.MODULE$.apply(LongType$.MODULE$); case "float4": return Option$.MODULE$.apply(FloatType$.MODULE$); case "money": case "float8": return Option$.MODULE$.apply(DoubleType$.MODULE$); case "text": case "varchar": case "char": case "cidr": case "inet": case "json": case "jsonb": case "uuid": return Option$.MODULE$.apply(StringType$.MODULE$); case "bytea": return Option$.MODULE$.apply(BinaryType$.MODULE$); case "timestamp": case "timestamptz": case "time": case "timetz": return Option$.MODULE$.apply(TimestampType$.MODULE$); case "date": return Option$.MODULE$.apply(DateType$.MODULE$); case "numeric": case "decimal": if (precision > 0) { return Option$.MODULE$.apply(new DecimalType(Math.min(precision, DecimalType$.MODULE$.MAX_PRECISION()), Math.min(scale, DecimalType$.MODULE$.MAX_SCALE()))); } else { return Option$.MODULE$.apply(new DecimalType(DecimalType$.MODULE$.MAX_PRECISION(), 18)); } default: return null; } } @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public Option<JdbcType> getJDBCType(DataType dt) { Object obj; DataType datatype = dt; if (StringType$.MODULE$.equals(datatype)) { obj = new Some(new JdbcType("TEXT", Types.NULL)); // 改成Types.NULL } else if (BinaryType$.MODULE$.equals(datatype)) { obj = new Some(new JdbcType("BYTEA", Types.BINARY)); } else if (BooleanType$.MODULE$.equals(datatype)) { obj = new Some(new JdbcType("BOOLEAN", Types.BOOLEAN)); } else if (FloatType$.MODULE$.equals(datatype)) { obj = new Some(new JdbcType("FLOAT4", Types.FLOAT)); } else if (DoubleType$.MODULE$.equals(datatype)) { obj = new Some(new JdbcType("FLOAT8", Types.DOUBLE)); } else if (ShortType$.MODULE$.equals(datatype)) { obj = new Some(new JdbcType("SMALLINT", Types.SMALLINT)); } else if (DecimalType$.MODULE$.SYSTEM_DEFAULT().equals(datatype)) { obj = new Some(new JdbcType("NUMBER(38,18)", Types.NUMERIC)); } else if (ByteType$.MODULE$.equals(datatype)) { throw new IllegalArgumentException("Unsupported type in postgresql:" + dt); } else { obj = None$.MODULE$; } return ((Option) (obj)); } @Override public String getTableExistsQuery(String table) { return "SELECT 1 FROM " + table + " LIMIT 1"; } @Override public String getTruncateQuery(String table, Option<Object> cascade) { Object object = cascade.get(); if (object != null && Boolean.valueOf(object.toString())) { return "TRUNCATE TABLE ONLY " + table + " CASCADE"; } return "TRUNCATE TABLE ONLY" + table; } @Override public void beforeFetch(Connection connection, Map<String, String> properties) { super.beforeFetch(connection, properties); java.util.Map<String, String> javaMap = JavaConverters.mapAsJavaMapConverter(properties).asJava(); String stringOption = javaMap.get(JDBCOptions.JDBC_BATCH_FETCH_SIZE()); if (!stringOption.isEmpty() && Integer.valueOf(stringOption) > 0) { try { connection.setAutoCommit(false); } catch (SQLException e) { throw new RuntimeException(e); } } } }
主要修改这里
if (StringType$.MODULE$.equals(datatype)) { obj = new Some(new JdbcType("TEXT", Types.NULL)); // 改成Types.NULL }
然后卸载原来的方言,只用自定义的方言即可。
JdbcDialects.unregisterDialect(org.apache.spark.sql.jdbc.PostgresDialect$.MODULE$); //卸载原来的方言实现 JdbcDialects.registerDialect(new PostgresDialect()); //注册自定义的方言实现 //数据操作逻辑 ........
声明:文中有定义命名不一致的地方请不要计较这些细节,只是举例说明无需严格对应关系。
这篇关于sparksql插入postgresql 字段格式不匹配报错处理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-01-05快速清空 PostgreSQL 数据库中的所有表格,让你的数据库重新焕然一新!
- 2024-01-04在PostgreSQL中创建角色:判断角色是否存在并创建
- 2023-05-16PostgreSQL一站式插件推荐 -- pg_enterprise_views
- 2022-11-22PostgreSQL 实时位置跟踪
- 2022-11-22如何将PostgreSQL插件移植到openGauss
- 2022-11-11PostgreSQL:修改数据库用户的密码
- 2022-11-06Windows 环境搭建 PostgreSQL 物理复制高可用架构数据库服务
- 2022-10-27Windows 环境搭建 PostgreSQL 逻辑复制高可用架构数据库服务
- 2022-10-11PostgreSql安装(Windows10版本)
- 2022-09-13PostgreSQL-Network Address类型操作和函数