spark代码 读、写、删除 postgresql数据库
2021/11/26 19:10:28
本文主要是介绍spark代码 读、写、删除 postgresql数据库,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package TEST import java.sql.{Connection, DriverManager, PreparedStatement} import java.util import java.util.Properties import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * description: spark 读、写、删除 postgressql数据库 代码测试 * author: 徐国胜 * since: 2021/11/26 15:37 * version: 1.0 */ object PostgresqlTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("PostgresqlTest") .config("spark.ui.showConsoleProgress", "true") .config("spark.shuffle.reduceLocality.enabled", "false") .enableHiveSupport() .getOrCreate() //造数据 val dataList = new util.ArrayList[Row]() dataList.add(Row("1", "徐国胜1", "22")) dataList.add(Row("2", "邱良东2", "22")) val schema = StructType(List(StructField("id", StringType, true), StructField("name", StringType, true), StructField("age", StringType, true))) val DF: DataFrame = spark.createDataFrame(dataList, schema) //DataFrame写入PG库 WriteDfToPG(DF, "testdb", "bigdata.student") /** * 查询PG库中某个表的数据 * 如果需要查询某个的字段,直接在后面接select就行 */ val dataDF: DataFrame = QueryPG(spark, "testdb", "bigdata.student") //按条件筛选,删除数据库中的数据;如果需要删除全表,可以把删选条件写成 1=1 DeleteFromPG("testdb", "bigdata.student", "name='邱良东'") } def QueryPG(spark: SparkSession, databaseName: String, tableName: String): DataFrame = { val url: String = s"jdbc:postgresql://172.16.221.208:15432/$databaseName" val prop: Properties = new Properties() prop.put("user", "postgres") prop.put("password", "bm@123") prop.put("driver", "org.postgresql.Driver") spark.read.jdbc(url, tableName, prop) } def WriteDfToPG(df: DataFrame, databaseName: String, tableName: String): Unit = { val url: String = s"jdbc:postgresql://172.16.221.208:15432/$databaseName" val prop: Properties = new Properties() prop.put("user", "postgres") prop.put("password", "bm@123") prop.put("driver", "org.postgresql.Driver") df.write.mode("Append").jdbc(url, tableName, prop) } def DeleteFromPG(databaseName: String, tableName: String, condition: String): Unit = { Class.forName("org.postgresql.Driver") val url: String = s"jdbc:postgresql://172.16.221.208:15432/$databaseName" val prop: Properties = new Properties() prop.put("user", "postgres") prop.put("password", "bm@123") prop.put("driver", "org.postgresql.Driver") val connection: Connection = DriverManager.getConnection(url, prop) val delSql = s"delete from $tableName where $condition" val delPS: PreparedStatement = connection.prepareStatement(delSql) delPS.execute() delPS.close() println("delete query :" + delSql) } }
这篇关于spark代码 读、写、删除 postgresql数据库的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-01-05快速清空 PostgreSQL 数据库中的所有表格,让你的数据库重新焕然一新!
- 2024-01-04在PostgreSQL中创建角色:判断角色是否存在并创建
- 2023-05-16PostgreSQL一站式插件推荐 -- pg_enterprise_views
- 2022-11-22PostgreSQL 实时位置跟踪
- 2022-11-22如何将PostgreSQL插件移植到openGauss
- 2022-11-11PostgreSQL:修改数据库用户的密码
- 2022-11-06Windows 环境搭建 PostgreSQL 物理复制高可用架构数据库服务
- 2022-10-27Windows 环境搭建 PostgreSQL 逻辑复制高可用架构数据库服务
- 2022-10-11PostgreSql安装(Windows10版本)
- 2022-09-13PostgreSQL-Network Address类型操作和函数