pyflink作业提交的踩坑过程,看完少走两个星期弯路

2021/4/30 18:29:10

本文主要是介绍pyflink作业提交的踩坑过程,看完少走两个星期弯路,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

flink在努力地将Python 生态和大数据生态融合,但目前的版本还不够成熟,尤其是在官方对python现有资料有限的情况下,用户想要使用python完成一个flink job并提交到flink平台上,还是有很多雷需要踩的。

以下对pyflink环节问题,python job编写到提交做了总结,可减少不必要的弯路。

一、部署环境

JDK 1.8+  &  Python 3.5+ (3.7.6)  &  apache-flink 1.12  &   pyflink 1.0

二、官方API

flink为用户提供了多层API,对于python用户,主要使用Table API 和 SQL API,个人认为Table API有一点点类似python的Dataframe,故主要使用Table API完成作业开发。用户可以参考对应版本的官方文档和示例代码学习使用。

注:这里建议一定要看官方文档,因为目前pyflink版本之间差别较大,随便搜的资料由于版本差异会造成很多不必要的麻烦。

三、环境理解

在Table API层,flink提供了3类环境和两类 planner,用户需要理解环境之间的区别和属性,以便使用正确的环境和刚好地理解一些代码参数。

简单说:TableEnviroment实现了流批一体,但不支持UDF;StreamTableEnviroment、BatchTableEnviroment分别对应流式和批处理;不过当StreamTableEnviroment设定时间窗口时,其聚合操作可看作一种特殊的批处理;

另外:仅Blink Planner支持Pandas UDAF。

  

四、用户自定义函数:集成 Python 生态( Python 类库)到 Flink 中的手段

        UDF:  自定义标量函数。一行输入一行输出。

        UDAF: 自定义聚合函数。多行输入一行输出。

        UDTF: 自定义表函数。一行输入多行输出或一列输入多列输出。

五、向量化的UDF

        Python 在写 Python API 的时候本质是在调用 Java API, 这个是通过Py4J作为 Java VM 和 Python VM 之间通讯的桥梁解决了两者的通讯问题,在 PythonVM 启动一个 Gateway,并且 Java VM 启动一个 Gateway Server 用于接受 Python 的请求,同时在 Python API 里面提供和 Java API 一样的对象,比如 TableENV, Table,等等。因此在两者做通讯时就会有序列化/反序列化的开销问题。

        向量化Python用户自定义函数,是在执行时,通过在JVM和Python VM之间以Arrow列存格式批量传输数据,来执行的函数。 向量化Python用户自定义函数的性能通常比非向量化Python用户自定义函数要高得多,因为向量化Python用户自定义函数可以大大减少序列化/反序列化的开销和调用开销。 此外,用户可以利用流行的Python库(例如Pandas,Numpy等)来实现向量化Python用户自定义函数的逻辑。这些Python库通常经过高度优化,并提供了高性能的数据结构和功能。 

 【注】向量化UDF是在flink层级的,不要跟UDF里面写的方法混淆,UDF本身则是python层的,故UDF内部自然是可以完成所以python自有的功能。

六、pyflink作业模板

  【实例化flink环境】->【建表source、sink】->【简单功能通过Table API对数据源表做处理】->【复杂功能通过注册、调用UDF实现】->【写出】

  • 建表的所有字段必须指定数据类型、字段排序必须一致
  • UDF选型,主要考虑输入输出情况

七、job 提交

    

      

 

 

 

对 UDF ( User-defined-Funciton)的支持上,需要添加 UDF 注册的 API , register_function,但仅仅是注册还不够,用户在自定义 Python UDF 的时候往往会依赖一些三方库,所以我们还需要增加添加依赖的方法,那就是一系列的 add 方法,比如 add_Python_file()。在编写 Python 作业的同时, Java API 也会同时被调用在提交作业之前,Java 端会构建JobGraph。然后通过 CLI 等多种方式将作业提交到集群进行运行。

udaf (矢量化标量函数)(同样使用所有标量场景)

通过以Arrow列格式在JVM和Python VM之间传输一批元素来执行的函数。

性能好、 可以利用主要的Python库( Pandas,Numpy)

Pandas UDAF不支持部分聚合。

仅Blink Planner支持Pandas UDAF。

Pandas UDAF 组或窗口的所有数据将在执行过程中同时加载到内存中,因此您必须确保组或窗口的数据适合内存。

 

 

 

 

 

watermark

  • 是一个时间戳,标识小于这个时间戳的时间已经都到达

  • watermark水印在源位置发射,并通过拓扑中方的运算符传播

  • 是StreamElement,和普通数据一起在算子之间传递

  • 触发窗口的计算,那么longmax_value值会公诉算子后续没有任何数据了

生成watermark策略

  • Periodic 

  • punctuated

 

#DDL连接属性

(1) format:可以有多种选择,如:JSON,CSV,AVRO,Canal CDC,Debezium CDC等,详细见:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/

(2) can.startup.mode: 也可以有多种选择,如:earliest-offset, latest-offset, group-offsets, timestamp and specific-offsets详见:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position

TIMESTAMP(3),它表示自大纪元以来的时间戳。



这篇关于pyflink作业提交的踩坑过程,看完少走两个星期弯路的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程