Flink java作为消费者连接虚拟机中的kafka/或本地的kafka,并解决java.net.UnknownHostException报错
2021/9/26 17:10:40
本文主要是介绍Flink java作为消费者连接虚拟机中的kafka/或本地的kafka,并解决java.net.UnknownHostException报错,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
kafka的安装与配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120488287
- 首先在kafka中创建一个topic,名称叫
mytesttopic
,进入到kafka
的目录下,运行:
./bin/kafka-topics.sh --create --topic mytesttopic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9092
然后启动生产者:
./bin/kafka-console-producer.sh --topic mytesttopic --broker-list localhost:9092
- 首先配置
pom.xml
:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId><!--这个是kafka的版本--> <version>1.13.2</version><!--这个是flink的版本--> </dependency>
- java代码如下:
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class KafkaStreamWordCount { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.99.5:9092");// 这里是kafka的主机地址,可以是 域名:端口,也可是 ip:端口 properties.setProperty("group.id", "test");//第1个参数是固定值 group.id,第2个参数是自定义的组ID,这个可以自己指定 DeserializationSchema<String> deserializationSchema = new SimpleStringSchema(); String topic = "mytesttopic";// 哇!这里不要写错啊,这个是作为消费者接收的kafka对应的topic名称 DataStream<String> text = env.addSource(new FlinkKafkaConsumer<String>(topic, deserializationSchema, properties)); text.print(); env.execute("Flink-Kafka demo"); } }
- 修改host文件(如果是本机的flink与kafka是不需要配置的)
比如我的虚拟机主机名是:ubuntu
,ip是:192.168.99.5
,就在host里添加:
192.168.99.5 ubuntu
注意即便properties.setProperty("bootstrap.servers", "192.168.99.5:9092");
这样使用ip:端口
配置也需要添加host!
- 运行java程序,然后在kafka的生产者中输入任何想要输入的内容,就可以在flink里查看了
xq@ubuntu:~/Desktop/software/kafka_2.12-2.7.1$ ./bin/kafka-console-producer.sh --topic myte--broker-list localhost:9092 >hello kafka >hello flink >
在flink中显示:
......... 16:52:44,055 INFO org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-test-26, groupId=test] Cluster ID: HDij23gxR_edwXhIDqE9ng 16:52:44,056 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-test-26, groupId=test] Discovered group coordinator ubuntu:9092 (id: 2147483647 rack: null) 16:52:44,075 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-test-26, groupId=test] Setting offset for partition mytesttopic-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=ubuntu:9092 (id: 0 rack: null), epoch=0}} 16> hello kafka 16> hello flink
这篇关于Flink java作为消费者连接虚拟机中的kafka/或本地的kafka,并解决java.net.UnknownHostException报错的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2022-03-01沐雪多租宝商城源码从.NetCore3.1升级到.Net6的步骤
- 2024-12-06使用Microsoft.Extensions.AI在.NET中生成嵌入向量
- 2024-11-18微软研究:RAG系统的四个层次提升理解与回答能力
- 2024-11-15C#中怎么从PEM格式的证书中提取公钥?-icode9专业技术文章分享
- 2024-11-14云架构设计——如何用diagrams.net绘制专业的AWS架构图?
- 2024-05-08首个适配Visual Studio平台的国产智能编程助手CodeGeeX正式上线!C#程序员必备效率神器!
- 2024-03-30C#设计模式之十六迭代器模式(Iterator Pattern)【行为型】
- 2024-03-29c# datetime tryparse
- 2024-02-21list find index c#
- 2024-01-24convert toint32 c#