FLINK Notebook 混合编程:PYTHON (一)

2021/5/14 22:55:20

本文主要是介绍FLINK Notebook 混合编程:PYTHON (一),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

本文介绍了 Py4j的使用以及 Flink官方如何使用 Py4j进行混合语言编程,最后会介绍下我们会应用这种技术在我们的 Flink Notebook 服务,来创建一个混合语言编程环境。

Flink Notebook 服务是我司自研的基于Notebook方式的Flink 开发平台,他支持用户通过SQL方式和JAR包方式进行混合编程,并通过一些配置,既可完全的在页面上完成FLINK任务的开发工作,如图:

在这里插入图片描述

​ 通过不同的Notebook Type我们可以加载不同类型的组件,通过table结果集流转方式,承接上下游,以完成相应的功能。目前的插件类型主要是主要分:1.SQL组建:可以自由撰写SQL,2.格式化组建:sink或者source,有具体的格式,标准的前端组建对应,3.JAR包自定义组件,通过用户上传自己开发的jar包完成对应的逻辑。

​ 对于Jar包自定义组件来说,他是为了解决1%的特异性需求的,但问题是其代码不可见,逻辑也相对自由,有违Notebook的初衷,因此,我们想设计一种Notebook的Type,支持可视化的Python编写,可以直接将代码在页面上进行开发。

​ Flink 本身来说,就有PyFlink 和 Python UDF support,因此python和 flink的耦合度应该很高,所以我们要了解Flink是怎么做的,从而研究我们应该如何去做,所以本文会分成以下3个部分来介绍整个混编逻辑:

1. Java与Python 通信:Py4J
2. Py4j in Flink
3. Notebook with Python

Py4j 介绍

Py4j可以使运行于python解释器的python程序动态的访问java虚拟机中的java对象。Java方法可以像java对象就在python解释器里一样被调用, java collection也可以通过标准python collection方法调用。Py4j也可以使java程序回调python对象。

详细说明可以参考官网 https://www.py4j.org/

安装以及基本使用也可以参考官网

Py4j可以在系统中创建一个 java和python 之间通信的socket管道。
在这里插入图片描述
我们可以通过一个例子来看整个Py4j是如何工作的。

我们先创建一个想让python负责具体实现的Java 接口:

public interface TestEnterPoint {
    String gift(HashMap<String,String> a, String b);
}

在java 服务端,我们通过以下代码可以启动一个简单的Py4j监听:

    public static void main(String[] args) {
        ListenerApplication application = new ListenerApplication();
        GatewayServer server = new GatewayServer(application);
        server.start(true);
    }

ListenerApplication 表示一个允许共享给python的类,她可以是任意java类,包括Map,List等复杂结构化数据:

public class ListenerApplication {
    TestEnterPoint enterPoint = new TestEnterPoint();
    public void setListener(TestEnterPoint enterPoint) {
        this.enterPoint = enterPoint;
    }
    public void notifyAllListeners() {
        HashMap<String,String> map = new HashMap<>();
        map.put("a","aaaa");
        Object returnValue = listener.gift(map,"a");
        System.out.println(returnValue);
    }
}

而在Python端,我们可以通过以下代码运行一个python程序:

from py4j.java_gateway import JavaGateway, CallbackServerParameters

class TestEnterPoint(object):
    def gift(self, map, key):
        return map.get(key)
    class Java:
        implements = ["com.xxxx.xxx.test.py.TestEnterPoint"]

if __name__ == "__main__":
    gateway = JavaGateway(
        callback_server_parameters=CallbackServerParameters())
    listener = TestEnterPoint()
    gateway.entry_point.setListener(listener)
    gateway.entry_point.notifyAllListeners()
    gateway.shutdown()

这样我们就通过Python来实现了一个 map.get(key) 的方法

整个过程中,我们看出几点对于python来说比较基本的使用方式,那就是,第一,通过Python 中implements = ["com.xxxx.xxx.test.py.TestEnterPoint"]的使用方式,我们可以实现一个Java的Interface,第二,通过gateway.entry_point的方式,我们可以拿到java中设置的可共享变量,第三个我们在例子中并没有呈现,但也是非常基础的使用,就是通过在python中使用 gateway.jvm.com.xxxx.xxx.test.py.TestServer的方式,允许python使用任何java的class,允许初始化,允许调用方法,但是他们如果想和java端进行数据通信,则必须通过entry_point来实现。

Py4j in Flink

讲完Py4j并且如果把上面的代码自己拿来试下,应该已经对整个python和java互通有一定理解了,那么我们Flink中如何使用Py4J来进行混编,也就顺理成章的很好理解了,在Flink中,有很多地方使用到了这种技术,包括PyFlink,以及Python UDF support,PyFlink 属于Pyton为主,也比较复杂,这边就先就以简单的Python UDF为例,梳理下Flink的执行逻辑。

在Flink Java中如何使用Python UDF

在Flink 中使用Python的UDF相对来说非常简单,创建一个Python代码,比如:

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def func1(s: str):
   return s.replace('hello', 'ni hao')

在Flink Java中,需要配置Python环境变量,首先将Python文件加到环境中去,如果是集群提交,需要加到依赖中去(使用-pyfs 提交Python文件),或者远程的Hdfs文件。其次需要配置Python的程序依赖环境路径:

configuration.setString("python.files", "/Users/yourName/test.py");
configuration.setString("python.client.executable", "python3");
configuration.setString("python.executable", "/usr/bin/python3");

最后,我们在使用过程中,比如通过SQL使用时候,只需要如下SQL语句即可:

create temporary system function func1 as 'test1.func1' language python

其中test1是python的文件名"test1.py"而 func1就是上文中的那个python 的function name,如此既可以在java中使用python实现的UDF

Flink是如何实现这些的

​ 在追踪Flink Sql是如何执行create function过程中,我们发现整个Flink的执行流程大致如图:

在这里插入图片描述

​ Flink会通过语法解析后的通过create function的后缀“ language python”判断是否是Python fuction,如果是,会调用PythonFunctionUtils来获取function,而PythonFunctionUtil最终通过动态加载的PythonFunctionFactory来最终调用Py4j。这里可以看见他的逻辑其实也比较简单,首先就是启动Py4j的Java端server,然后主要就是通过环境变量,以及configture 里的各种参数,最终拼接出python的cmd 执行命令,运行命令并通过entryPoint获取其中的贡献类。最终生成我们在java端可以用的function。

​ 这块如果有兴趣,在Flink源码中搜索 PythonFunctionFactory 可以直接看见相关代码。

Notebook 混编Python

我们平台是类似Zeeplin的可视化Notebook编程页面,对于我们来说,要在页面上支持Python编程,有几种方案:
  • 只支持Python UDF
  • 以PyFlink为基础,配置混合编程方案
  • 以Java版为基础,配置混合编程方案

方案一对于我们来说并不难,可以看到Flink官方既是支持Python UDF的,我们只需要将这个Notebook Part里的内容,生成Python文件并添加到环境中一起提交即可,但这种方案没法解决我们上面提出的一大痛点,用户的1%需要Jar包开发的非标任务,不是单单可以通过UDF来实现的。

方案二对于我们来说,最大的问题是所有的优化,整个程序体系都是建立在Java 基础上的,改动会非常巨大。

如此,只能采取方案三,而方案三的问题是,Flink的原版PyFlink只创建了 PythonFunctionFactory 和一个 心跳2个 entryPoint,这对我们来说比较局限。所以我们会采取模仿 PythonFunctionFactory 的方式,自己创建Py4j进程,来完成Notebook的混编实现

这里的详细设计以及Demo 我们会在下篇文章(二)中放出。谢谢各位。



这篇关于FLINK Notebook 混合编程:PYTHON (一)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程