如何在 PyFlink 1.10 中自定义 Python UDF?

  • 时间:
  • 浏览:5
  • 来源:彩神欢乐生肖_神彩欢乐生肖官方

我一直认为在博客中只是我文本描述而只能让读者真正的在当时人的机器上运行起来的博客,需用好博客,什么都接下来当当.我 当当.我 看看按照当当.我 当当.我 下面的操作,有无能在你的机器上也运行起来?:)

愿因你本地环境 python 命令版本是 2.x,太难需用对 Python 版本进行设置,如下:

上面代码利用了 StreamExecutionEnvironment 中现有 socketTextStream 法律方法接收数据,或者将业务订单数据传个5个 多 FlatMapFunction, FlatMapFunction 主要实现将数据类型封装为 Row,完整性代码查阅 Spliter。

共同,当当.我 当当.我 还需用在 Python 封装5个 多 SocketTableSource,详情查阅 socket_table_source.py。

当当.我 当当.我 当当.我 知道有 Beam on Flink 的场景,只是我 Beam 支持多种 Runner,也只是我说 Beam SDK 编写的 Job 不需要 运行在 Flink 之上。如下图所示:

核心的统计逻辑是根据 city 进行分组,或者对 销售数量和销售金额进行求和,如下:

根据案例的需求和数据内部管理分析,当当.我 当当.我 需用对原始字符串进行内部管理化解析,太难需用5个 多 按“,”号分隔的 UDF(split) 和5个 多 不需要 将各个列信息展平的 DUF(get)。共同当当.我 当当.我 需用根据城市进行分组统计。

当当.我 当当.我 需用实现5个 多 Socket Connector,首太难实现5个 多 StreamTableSource, 核心代码是实现 getDataStream,代码如下:

当当.我 当当.我 预期要得到的5个 多 效果是不需要 将结果数据进行图形化展示,简单的思路是将数据写到5个 多 本地的文件,或者在写5个 多 HTML 页面,使其不需要 自动更新结果文件,并展示结果。什么都当当.我 当当.我 还需用自定义5个 多 Sink 来完成该功能,当当.我 当当.我 的需求计算结果是会不断的更新的,也只是我涉及到 Retraction(愿因当当.我 当当.我 不理解你你這個概念,不需要 查阅我如果的博客),目前在 Flink 上面还太难默认支持 Retract 的 Sink,什么都当当.我 当当.我 需用自定义5个 多 RetractSink,比如当当.我 当当.我 实现一下 CsvRetractTableSink。

在进行编译代码如果,当当.我 当当.我 需用你愿因安装了 JDK8 和 Maven3x。

在 Apache Flink 1.10 中当当.我 当当.我 有多种法律方法进行 UDF 的定义,比如:

目前为止,当当.我 当当.我 愿因完成了 Python UDF 的定义,声明和注册了。接下来当当.我 当当.我 还是看5个 多 完整性的示例吧:)

每一笔订单是5个 多 字符串,字段用逗号分隔, 相似 :

直观的判断,PyFlink Python UDF 的功能不需要 能如上图一样不需要 没办法 快从幼苗变成大树,为什么有此判断,请继续往下看…

如上信息证明你当当.我 当当.我 所需的 Python 依赖愿因没大难题了,接下来回过头来在看看怎么进行业务需求的开发。

Beam Portability Framework 是5个 多 性性性早熟 图片 图片 的多语言支持框架,框架层厚抽象了语言之间的通信协议(gRPC),定义了数据的传输格式(Protobuf),或者根据通用流计算框架所需用的组件,抽象个各种服务,比如 DataService,StateService,MetricsService 等。在原先5个 多 性性性早熟 图片 图片 的框架下,PyFlink 不需要 快速的构建当时人的 Python 算子,共同重用 Apache Beam Portability Framework 中现有 SDK harness 组件,不需要 支持多种 Python 运行模式,如:Process,Docker,etc.,这使得 PyFlink 对 Python UDF 的支持变得非常容易,在 Apache Flink 1.10 中的功能也非常的稳定和完整性。太难为什么说是 Apache Flink 和 Apache Beam 共同打造呢,是愿因我发现目前 Apache Beam Portability Framework 的框架也存在什么都优化的空间,什么都我在 Beam 社区进行了优化讨论,或者在 Beam 社区也贡献了 20+ 的优化补丁。

完成自定义的 Source 和 Sink 如果当当.我 当当.我 终于不需要 进行业务逻辑的开发了,确实整个过程自定义 Source 和 Sink 是最麻烦的,核心计算逻辑似乎要简单的多。

如下代码当当.我 当当.我 发现核心实现逻辑非常简单,只需用对数据进行解析和对数据进行集合计算:

上面你你這個行代码定义了监听端口 9999 的数据源,共同内部管理化 Table 只5个 多 名为 line 的列。

再次出现上面信息证明愿因将 PyFlink.demo 模块成功安装。接下来当当.我 当当.我 不需要 运行当当.我 当当.我 的示例了 :)

本篇从架构到 UDF 接口定义,再到具体的实例,向当当.我 当当.我 介绍了在 Apache Flink 1.10 发布如果,怎么利用 PyFlink 进行业务开发,其中 用户自定义 Source 和 Sink每项复杂化化,这也是目前社区需用进行改进的每项(Java/Scala)。真正的核心逻辑每项确实比较简单,为了当当.我 当当.我 按照本篇进行实战操作一点成就感,什么都我增加了自定义 Source/Sink 和图形化每项。但愿因当当.我 当当.我 想复杂化实例的实现不需要 能利用 Kafka 作为 Source 和 Sink,原先就不需要 省去自定义的每项,做起来也会简单一点。

启动 blog_demo,愿因一切顺利,启动如果,控制台会输出5个 多 web 地址,如下所示:

当当.我 当当.我 知道 PyFlink 是在 Apache Flink 1.9 版新增的,太难在 Apache Flink 1.10 中 Python UDF 功能支持的速度有无不需要 满足用户的急切需求呢?

除了 JDK 和 MAVEN 完整性的环境依赖性如下:

当输入第三根订单 苹果手机55 11,80,5499,Beijing,如果,页面变化如下:

随之订单数据的不断输入,统计图不断变化。5个 多 完整性的 GIF 演示如下:

示例的代码在上面下载的源代码上面愿因含晒 了,为了简单,当当.我 当当.我 利用 PyCharm 打开enjoyment.code/myPyFlink。共同在 Terminal 启动5个 多 端口:

或者在使用如果进行注册,如下:

PyFlink 1.10 如果支持 Python 3.6+ 版本。

上面的代码当当.我 当当.我 假设是5个 多 Socket 的 Source,Sink 是5个 多 Chart Sink,太难最终运行效果图,如下:

接下来就不需要 在 Table API/SQL 中进行使用了,如下:

当当.我 当当.我 需用对上面列进行分析,为了演示 Python UDF,当当.我 当当.我 在 SocketTableSource中并太难对数据进行预防止,什么都当当.我 当当.我 利用上面 UDF 定义 一节定义的 UDF,来对原始数据进行预防止。

共同当当.我 当当.我 还需用利用 Python 进行封装,详见 chart_table_sink.py。

为了当当.我 当当.我 方便我把自定义 Source/Sink(Java&Python)的源代码倒入了这里 ,当当.我 当当.我 不需要 进行如下操作:

上面代码中当当.我 当当.我 会发现5个 多 陌生的每项,只是我 from pyflink.demo import ChartConnector, SocketTableSource. 其中 pyflink.demo 是哪里来的呢?确实只是我含晒 了上面当当.我 当当.我 介绍的 自定义 Source/Sink(Java&Python)。下面当当.我 当当.我 来介绍怎么增加你你這個 pyflink.demo 模块。

上面这图是 Beam Portability Framework 的架构图,他描述了 Beam 怎么支持多语言,怎么支持多 Runner,单独说 Apache Flink 的如果当当.我 当当.我 就不需要 说是 Beam on Flink,太难为什么解释 Flink on Beam 呢?

CsvRetractTableSink 的核心逻辑是缓冲计算结果,每次更新进行一次全量(这是个纯 demo,只能用于生产环境)文件输出。源代码查阅 CsvRetractTableSink。

太难定义完 UDF 当当.我 当当.我 应该怎么使用呢?Apache Flink 1.10 中提供了 2 种 Decorators,如下:

当当.我 当当.我 尝试将下面的数据,三根,三根的发送给 Source Connector:

在 Apache Flink 1.10 中当当.我 当当.我 所说的 Flink on Beam 更精确的说是 PyFlink on Beam Portability Framework。当当.我 当当.我 看一下简单的架构图,如下:

注册 UDF

5个 多 完成的 PyFlink 的 Job 需用有内部管理数据源的定义,有业务逻辑的定义和最终计算结果输出的定义。也只是我 Source connector, Transformations, Sink connector,接下来当当.我 当当.我 根据你你這個5个 多 每项进行介绍来完成当当.我 当当.我 的需求。

PyFlink 读取数据源非常简单,如下:

当当.我 当当.我 看后基础环境安装比较简单,我这里就不每5个 多 都贴出来了。愿因当当.我 当当.我 有大难题欢迎邮件愿因博客留言。

假设苹果手机55公司要统计该公司产品在双 11 期间各城市的销售数量和销售金额分布状况。

不需要 能查看一下,当当.我 当当.我 核心需用 apache-beam 和 apache-flink,如下命令:

愿因目前 PyFlink 还太难部署到 PyPI 上面,在 Apache Flink 1.10 发布如果,当当.我 当当.我 需用通过构建 Flink 的 master 分支源码来构建运行当当.我 当当.我 Python UDF 的 PyFlink 版本。

当当.我 当当.我 发现上面定义函数除了第5个 多 扩展 ScalaFunction 的法律方法是 PyFlink 特有的,一点法律方法需用 Python 语言本身生活就支持的,也只是我说,在 Apache Flink 1.10 中 PyFlink 允许以任何 Python 语言所支持的法律方法定义 UDF。

概要了解了 Apache Flink 1.10 中 Python UDF 的架构如果,当当.我 当当.我 还是切入的代码每项,看看怎么开发和使用 Python UDF。

在 chart_table_sink.py 当当.我 当当.我 封装了5个 多 http server,原先当当.我 当当.我 不需要 在浏览器中查阅当当.我 当当.我 的统计结果。

作者:孙金城(金竹)

UDF 定义

当当.我 当当.我 打开你你這個页面,刚开始了是5个 多 空白页面,如下:

计算结果写入到当当.我 当当.我 自定义的 Sink 中,如下:

阅读原文可点击:原文链接