博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
《从0到1学习Flink》—— Flink 写入数据到 Kafka
阅读量:6870 次
发布时间:2019-06-26

本文共 2560 字,大约阅读时间需要 8 分钟。

前言

之前文章 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用到了 Flink 自带的 Kafka source connector(FlinkKafkaConsumer)。存入到 ES 只是其中一种情况,那么如果我们有多个地方需要这份通过 Flink 转换后的数据,是不是又要我们继续写个 sink 的插件呢?确实,所以 Flink 里面就默认支持了不少 sink,比如也支持 Kafka sink connector(FlinkKafkaProducer),那么这篇文章我们就讲讲如何将数据写入到 Kafka。

准备

添加依赖

Flink 里面支持 Kafka 0.8、0.9、0.10、0.11 ,以后有时间可以分析下源码的实现。

这里我们需要安装下 Kafka,请对应添加对应的 Flink Kafka connector 依赖的版本,这里我们使用的是 0.11 版本:

org.apache.flink
flink-connector-kafka-0.11_2.11
${flink.version}

Kafka 安装

这里就不写这块内容了,可以参考我以前的文章 。

这里我们演示把其他 Kafka 集群中 topic 数据原样写入到自己本地起的 Kafka 中去。

配置文件

kafka.brokers=xxx:9092,xxx:9092,xxx:9092kafka.group.id=metrics-group-testkafka.zookeeper.connect=xxx:2181metrics.topic=xxxstream.parallelism=5kafka.sink.brokers=localhost:9092kafka.sink.topic=metric-teststream.checkpoint.interval=1000stream.checkpoint.enable=falsestream.sink.parallelism=5

目前我们先看下本地 Kafka 是否有这个 metric-test topic 呢?需要执行下这个命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181

可以看到本地的 Kafka 是没有任何 topic 的,如果等下我们的程序运行起来后,再次执行这个命令出现 metric-test topic,那么证明我的程序确实起作用了,已经将其他集群的 Kafka 数据写入到本地 Kafka 了。

程序代码

Main.java

public class Main {    public static void main(String[] args) throws Exception{        final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);        StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);        DataStreamSource
data = KafkaConfigUtil.buildSource(env); data.addSink(new FlinkKafkaProducer011
( parameterTool.get("kafka.sink.brokers"), parameterTool.get("kafka.sink.topic"), new MetricSchema() )).name("flink-connectors-kafka") .setParallelism(parameterTool.getInt("stream.sink.parallelism")); env.execute("flink learning connectors kafka"); }}

运行结果

启动程序,查看运行结果,不段执行上面命令,查看是否有新的 topic 出来:

执行命令可以查看该 topic 的信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic metric-test

分析

上面代码我们使用 Flink Kafka Producer 只传了三个参数:brokerList、topicId、serializationSchema(序列化)

其实也可以传入多个参数进去,现在有的参数用的是默认参数,因为这个内容比较多,后面可以抽出一篇文章单独来讲。

总结

本篇文章写了 Flink 读取其他 Kafka 集群的数据,然后写入到本地的 Kafka 上。我在 Flink 这层没做什么数据转换,只是原样的将数据转发了下,如果你们有什么其他的需求,是可以在 Flink 这层将数据进行各种转换操作,比如这篇文章中的一些转换:,然后将转换后的数据发到 Kafka 上去。

本文原创地址是: , 未经允许禁止转载。

关注我

微信公众号:zhisheng

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

Github 代码仓库

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客

相关文章

1、

2、

3、

4、

5、

6、

7、

8、

9、

10、

11、

12、

13、

你可能感兴趣的文章
localStorage使用总结
查看>>
计算一年中的第几天
查看>>
iOS 一句话获取日期和星期几
查看>>
【javascript】Lazy Load, 延迟加载图片的 jQuery 插件
查看>>
Percona XtraDB Cluster高可用与状态快照传输(PXC 5.7 )
查看>>
OBJECT_ID 技巧整理
查看>>
Date日期类,Canlendar日历类,Math类,Random随机数学类
查看>>
java中forName()的作用
查看>>
解决oracle_4031错误的方法
查看>>
C# Out,Ref 学习总结
查看>>
CentOS 7.4如何安装Python3
查看>>
activity的四种模式
查看>>
RESTful API
查看>>
linux共享windows资料
查看>>
前端UI框架总结
查看>>
( component 标签元素,及其 :is 属性 )的使用样例(组件切换的一个简单样例,不过,最好使用动画来实现组件的切换)...
查看>>
这7个人生捷径,一定不要走!
查看>>
Koa2+Mysql搭建简易博客
查看>>
Atom 初识
查看>>
Servlet、Filter和Listener
查看>>