flink之SQL入门

2021/7/19 2:04:59

本文主要是介绍flink之SQL入门,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

SQL部分学习

Table API的特点Table API和SQL都是Apache Flink中高等级的分析API,SQL所具备的特点Table API也都具有,如下:

声明式 - 用户只关心做什么,不用关心怎么做;
高性能 - 支持查询优化,可以获取最好的执行性能;
流批统一 - 相同的统计逻辑,既可以流模式运行,也可以批模式运行;
标准稳定 - 语义遵循SQL标准,语法语义明确,不易变动。

当然除了SQL的特性,因为Table API是在Flink中专门设计的,所以Table API还具有自身的特点:

表达方式的扩展性 - 在Flink中可以为Table API开发很多便捷性功能,如:Row.flatten(), map/flatMap 等
功能的扩展性 - 在Flink中可以为Table API扩展更多的功能,如:Iteration,flatAggregate 等新功能
编译检查 - Table API支持java和scala语言开发,支持IDE中进行编译检查。

Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL:

```xml
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.6.1</version>
</dependency>
另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加:

```java
```xml
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.6.1</version>
</dependency>

3.2. Table API和SQL程序的结构

Table API一般与DataSet或者DataStream紧密关联,可以通过一个DataSet或DataStream创建出一个Table,再用类似于filter, join, 或者 select关系型转化操作来转化为一个新的Table对象。最后将一个Table对象转回一个DataSet或DataStream。从内部实现上来说,所有应用于Table的转化操作都变成一棵逻辑表操作树,在Table对象被转化回DataSet或者DataStream之后,转化器会将逻辑表操作树转化为对等的DataSet或者DataStream操作符。

Flink的批处理和流处理的Table API和SQL程序遵循相同的模式;所以我们只需要使用一种来演示即可要想执行flink的SQL语句,首先需要获取SQL的执行环境:两种方式(batch和streaming):

批处理:

```java
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

流处理:

val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责:

在内部目录中注册一个表
注册外部目录
执行SQL查询
注册用户定义的(标量,表格或聚合)函数
转换DataStream或DataSet成Table
持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment

在内部目录中注册一个表

TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格和输出表格。输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统输入表可以从各种来源注册:

现有Table对象,通常是表API或SQL查询的结果。
TableSource,它访问外部数据,例如文件,数据库或消息传递系统。
DataStream或DataSet来自DataStream或DataSet程序。

输出表可以使用注册TableSink。

注册一个表

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register the Table projTable as table “projectedX”
tableEnv.registerTable(“projectedTable”, projTable)
// Table is the result of a simple projection query
val projTable: Table = tableEnv.scan("projectedTable ").select(…)

注册一个TableSource

TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],…)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,…)

// get a TableEnvironment 
val tableEnv = TableEnvironment.getTableEnvironment(env) 
// create a TableSource
val csvSource: TableSource = CsvTableSource.builder().path("./data/score.csv")...
// register the TableSource as table "CsvTable" 
tableEnv.registerTableSource("CsvTable", csvSource)
注册一个TableSink

注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],…)

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
Table和DataStream和DataSet的集成
package com.ccj.pxj.sql
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.sinks.CsvTableSink
object DataSet_DataStreamToTable {
  case class Order1(id:Long,proudct:String,amount:Int)
  def main(args: Array[String]): Unit = {
      //1.  获取流处理环境
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      //2.  获取TableEnvironment
      val tableEnv= TableEnvironment.getTableEnvironment(env)
    //3.  加载本地集合
    val dataStream: DataStream[Order1] = env.fromCollection(List(
      Order1(1, "beer", 3),
      Order1(2, "diaper", 4)
      , Order1(3, "ruber", 2)
    ))
      //4.  根据数据注册表
    tableEnv.registerDataStream("s",dataStream)
      //5.  执行SQL
      val table = tableEnv.sqlQuery("select * from s")
      //6.  写入CSV文件中
    table.printSchema()
    table.writeToSink(new CsvTableSink("./data/score_sql.csv",",",1,FileSystem.WriteMode.OVERWRITE))
      //7.  执行任务
    env.execute()
  }
}
将Table转换为DataStream或DataSet

Table可以转换为DataStream或者DataSet,这样的话,自定义的DataStream或者DataSet程序就可以基于Table API或者SQL查询的结果来执行了。

当将一个Table转换为DataStream或者DataSet时,你需要指定生成的DataStream或者DataSet的数据类型,即需要转换表的行的数据类型,通常最方便的转换类型是Row,下面列表概述了不同选项的功能:

Row:字段通过位置映射、可以是任意数量字段,支持空值,非类型安全访问   
POJO:字段通过名称(POJO字段作为Table字段时,必须命名)映射,可以是任意数量字段,支持空值,类型安全访问   
Case Class:字段通过位置映射,不支持空值,类型安全访问   
Tuple:字段通过位置映射,不得多于22(Scala)或者25(Java)个字段,不支持空值,类型安全访问   
Atomic Type:Table必须有一个字段,不支持空值,类型安全访问。

将Table转换为DataStream

流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。

有两种模式可以将 Table转换为DataStream:

1:Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时,即仅附加,之前发送的结果不会被更新。

2:Retract Mode:始终都可以使用此模式,它使用一个boolean标识来编码INSERT和DELETE更改。

package com.ccj.pxj.sql
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
object TableTODataStream {
  def main(args: Array[String]): Unit = {
   //1. 获取流处理环境
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   //  2. 设置并行度
    env.setParallelism(1)
    //  3. 获取Table运行环境
    val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
    //  4. 加载本地集合
    val dataStream: DataStream[(Long, Int, String)] = env.fromCollection(List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (6L, 6, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World")
    )
    )
    //  5. 转换DataStream为Table
    val table: Table = tableEnv.fromDataStream(dataStream)
    //  6. 将table转换为DataStream----将一个表附加到流上Append Mode
    val appendDataStream: DataStream[(Long, Int, String)] = tableEnv.toAppendStream[(Long, Int, String)](table)
    //7. 将table转换为DataStream----Retract Mode true代表添加消息,false代表撤销消息
    val retractDataStream: DataStream[(Boolean, (Long, Int, String))] = tableEnv.toRetractStream[(Long, Int, String)](table)
   //8. 打印输出
    appendDataStream.print()
    println("----------------------")
    retractDataStream.print()
   //  9. 执行任务
    env.execute("pxj")
  }
}
将Table转换为DataSet
package com.ccj.pxj.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.BatchTableEnvironment
object TableTODataSet {
  def main(args: Array[String]): Unit = {
      //1. 获取批处理环境
      val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
      //2. 设置并行度
    env.setParallelism(1)
    //3. 获取Table运行环境
    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
    //4. 加载本地集合
    val dataSet: DataSet[(Long, Int, String)] = env.fromCollection(List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (3L, 3, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World")
    ))
    //5. DataSet转换为Table
    val table: Table = tableEnv.fromDataSet(dataSet)
    //6. table转换为dataSet
    val result: DataSet[(Long, Int, String)] = tableEnv.toDataSet[(Long, Int, String)](table)
      //7. 打印输出
    result.print()
    println("--------------------------------")
    println(table)
    println("---------------")
    table.printSchema()
    //env.execute()
  }
}
批处理案例1:
package com.ccj.pxj.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
object BatchFlinkSqlDemo {
  //创建一个样例类Order用来映射数据(订单名、用户名、订单日期、订单金额)
  case class  Order(id:Int, userName:String, createTime:String, money:Double)
  def main(args: Array[String]): Unit = {
   //1. 获取一个批处理运行环境
   val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //  2. 获取一个Table运行环境
    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
    //  3. 创建一个样例类`Order`用来映射数据(订单名、用户名、订单日期、订单金额)
    //4. 基于本地`Order`集合创建一个DataSet source
    val dataSet: DataSet[Order] = env.fromCollection(List(
      Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
      Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
      Order(3, "lisi", "2018-10-20 16:30", 127.5),
      Order(4, "lisi", "2018-10-20 16:30", 328.5),
      Order(5, "lisi", "2018-10-20 16:30", 432.5),
      Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
      Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
      Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
      Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
    ))
   //5. 使用Table运行环境将DataSet注册为一张表
    tableEnv.registerDataSet("t",dataSet)
    //  6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
    val table: Table = tableEnv.sqlQuery("select userName,sum(money)" +
      ",max(money)" +
      ",min(money),count(1) from t group by userName"
    )
    //7. 使用TableEnv.toDataSet将Table转换为DataSet
    val resultDataSet: DataSet[Row] = tableEnv.toDataSet[Row](table)
   //8. 打印测试
    table.printSchema()
    println("-----")
    resultDataSet.print()
   // env.execute("pxj")
  }
}
批处理案例2
package com.ccj.pxj.sql
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.api.{Table, TableEnvironment, Types}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
object BatchTableDemo {
  def main(args: Array[String]): Unit = {
      //1. 获取批处理运行环境
      val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //2. 获取Table运行环境
    val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
      //3. 加载外部CSV文件
      val csvTableSource: CsvTableSource = CsvTableSource.builder()
        .path("data/score.csv") //加载文件路径
        .field("id", Types.INT) // 列名,类型定义
        .field("name", Types.STRING)
        .field("subjectId", Types.INT)
        .field("score", Types.DOUBLE)
        .fieldDelimiter(",") // 属性间分隔符
        .lineDelimiter("\n") // 换行符
        //      .ignoreFirstLine()              // 忽略第一行内容
        .ignoreParseErrors() // 忽略解析错误
        .build()
      //4. 将外部数据构建成表
    tableEnv.registerTableSource("t",csvTableSource)
      //5. 使用table方式查询数据
      val table: Table = tableEnv.scan("t")
        .select("id,name,subjectId,score")
        .filter("name='张三'")
      //6. 打印表结构
    table.printSchema()
      //7. 将数据落地到新的CSV文件中
    table.writeToSink(new CsvTableSink("./data/score_table.csv",",",1,
      FileSystem.WriteMode.OVERWRITE))
    // 8. 执行任务
    env.execute()
  }
}

流数据处理案例

import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.types.Row

import scala.util.Random

object StreamFlinkSqlDemo {

  //  4. 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳)
  case class Order(id: String, userId: Int, money: Int, createTime: Long)

  def main(args: Array[String]): Unit = {
    //  1. 获取流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    2. 获取Table运行环境
    val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
    //    3. 设置处理时间为`EventTime`
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //  5. 创建一个自定义数据源
    val orderDataStream: DataStream[Order] = env.addSource(new RichSourceFunction[Order] {
      override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
        //    - 使用for循环生成1000个订单
        for (i <- 0 until 1000) {
          //  - 随机生成订单ID(UUID)
          val id = UUID.randomUUID().toString
          //  - 随机生成用户ID(0-2)
          val userId = Random.nextInt(3)
          //  - 随机生成订单金额(0-100)
          val money = Random.nextInt(101)
          //  - 时间戳为当前系统时间
          val timestamp = System.currentTimeMillis()
          // 收集数据
          ctx.collect(Order(id, userId, money, timestamp))
          //  - 每隔1秒生成一个订单
          TimeUnit.SECONDS.sleep(1)
        }

      }

      override def cancel(): Unit = {

      }
    })

    //  6. 添加水印,允许延迟2秒
    val waterDataStream: DataStream[Order] = orderDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Order]() {

      var currentTimeStamp = 0L

      // 获取水印
      override def getCurrentWatermark: Watermark = {
        new Watermark(currentTimeStamp - 2000)
      }

      // 获取当前时间
      override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = {
        currentTimeStamp = Math.max(element.createTime, previousElementTimestamp)
        currentTimeStamp
      }
    })

    //  7. 导入`import org.apache.flink.table.api.scala._`隐式参数
    import org.apache.flink.table.api.scala._

    //    8. 使用`registerDataStream`注册表,并分别指定字段,还要指定rowtime字段
    tableEnv.registerDataStream("t_order",waterDataStream,'id, 'userId, 'money, 'createTime.rowtime)
    //  9. 编写SQL语句统计用户订单总数、最大金额、最小金额
    //  - 分组时要使用`tumble(时间列, interval '窗口时间' second)`来创建窗口
    val sql =
    """
      |select
      | userId,
      | count(1) as totalCount,
      | max(money) as maxMoney,
      | min(money) as minMoney
      | from
      | t_order
      | group by
      | userId,
      | tumble(createTime, interval '5' second)
      """.stripMargin
    //  10. 使用`tableEnv.sqlQuery`执行sql语句
    val table: Table = tableEnv.sqlQuery(sql)
    //    11. 将SQL的执行结果转换成DataStream再打印出来
    tableEnv.toAppendStream[Row](table).print()
    //    12. 启动流处理程序
    env.execute("pxj")
  }

}

作者:pxj
日期:2021-07-18



这篇关于flink之SQL入门的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程