centos7下Kerberos认证并集成Kafka,java代码实现flink订阅Kerberos认证的Kafka消息
2021/4/17 7:27:58
本文主要是介绍centos7下Kerberos认证并集成Kafka,java代码实现flink订阅Kerberos认证的Kafka消息,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
centos7下Kerberos认证并集成Kafka
-
版本环境:
- (1)centos7.6
- (2)kafka_2.12-0.10.2.2
- (3)kerberos
- (4)flink-1.11.3-bin-scala_2.11
-
(5)jdk1.8
注意:其中“b.kuxiao”是我的hostname
一、kafka安装
1.1.去官网下载kafka
kafka官网:http://kafka.apache.org/downloads.
选择自己需要的版本:(本文我使用的是0.10.2.2版本)
1.2.解压.tgz
tar -zxvf kafka_2.12-0.10.2.2.tgz
1.3.修改配置文件
进入config/server.properties文件修改
broker.id:因为kafka一般是集群部署,所以每个单机有个broker.id,因为这里我只部一个单机测试,所以指定为0就可以了
log.dirs:自行指定kafka的日志目录
listeners:取消注释并修改为“PLAINTEXT://b.kuxiao:9092”,其中“b.kuxiao”是我的hostname(可使用hostnamectl命令查看自己的hostname)
zookeeper.connect:修改为自己的hostname
进入config/producer.properties文件修改
bootstrap.servers:修改为自己的hostname
进入config/consumer.properties文件修改
zookeeper.connect:修改为自己的hostname
命令进入vim /etc/hosts,添加自己的hostname,“192.168.72.130”是我得内网ip
1.4.启动zookeeper(kafka自带的zk)
./bin/zookeeper-server-start.sh ./config/zookeeper.properties 或 nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
1.5.启动kafka
./bin/kafka-server-start.sh ./config/server.properties 或 nohup ./bin/kafka-server-start.sh ./config/server.properties &
1.6.使用 ps -ef|grep kafka查看,执行成功
1.7.创建一个测试topic(test)
./bin/kafka-topics.sh --create --zookeeper b.kuxiao:2181 --replication-factor 1 --partitions 2 --topic test
查看已存在topic
./bin/kafka-topics.sh --list --zookeeper b.kuxiao:2181
1.8.启动消费者
./bin/kafka-console-consumer.sh --bootstrap-server b.kuxiao:9092 --topic test --from-beginning
1.9.启动生产者
./bin/kafka-console-producer.sh --broker-list b.kuxiao:9092 --topic test
1.10.测试
二、Kerberos安装
2.1.Kerberos服务端安装
2.1.1.安装
yum install krb5-server
2.1.2.修改配置文件
修改/var/kerberos/krb5kdc/kdc.conf
vim /var/kerberos/krb5kdc/kdc.conf
[kdcdefaults] kdc_ports = 88 kdc_tcp_ports = 88 [realms] KUXIAO.COM = { #master_key_type = aes256-cts acl_file = /var/kerberos/krb5kdc/kadm5.acl dict_file = /usr/share/dict/words admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal }
修改/var/kerberos/krb5kdc/kadm5.acl
vim /var/kerberos/krb5kdc/kadm5.acl
*/admin@KUXIAO.COM *
修改/etc/krb5.conf
vim /etc/krb5.conf
# Configuration snippets may be placed in this directory as well includedir /etc/krb5.conf.d/ [logging] default = FILE:/var/log/krb5libs.log kdc = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log [libdefaults] default_realm = KUXIAO.COM dns_lookup_kdc = false dns_lookup_realm = false ticket_lifetime = 86400 # renew_lifetime = 604800 forwardable = true default_tgs_enctypes = aes128-cts aes256-cts-hmac-sha1-96 des3-hmac-sha1 arcfour-hmac default_tkt_enctypes = aes128-cts aes256-cts-hmac-sha1-96 des3-hmac-sha1 arcfour-hmac permitted_enctypes = aes128-cts aes256-cts-hmac-sha1-96 des3-hmac-sha1 arcfour-hmac udp_preference_limit = 1 kdc_timeout = 60000 [realms] KUXIAO.COM = { kdc = b.kuxiao #hostname admin_server = b.kuxiao } # EXAMPLE.COM = { # kdc = kerberos.example.com # admin_server = kerberos.example.com # } [domain_realm] # .example.com = EXAMPLE.COM # example.com = EXAMPLE.COM
2.1.3.初始化KDC数据库
需要输入一个管理KDC服务器的密码,千万别忘记
kdb5_util create -r KUXIAO.COM -s
Loading random data Initializing database '/var/kerberos/krb5kdc/principal' for realm 'KUXIAO.COM', master key name 'K/M@KUXIAO.COM' You will be prompted for the database Master Password. It is important that you NOT FORGET this password. Enter KDC database master key: Re-enter KDC database master key to verify:
2.1.4.启动KDC服务
systemctl start krb5kdc#启动服务 systemctl status krb5kdc.service#查看运行状态 systemctl enable krb5kdc#设置开机自启
● krb5kdc.service - Kerberos 5 KDC Loaded: loaded (/usr/lib/systemd/system/krb5kdc.service; disabled; vendor preset: disabled) Active: active (running) since 五 2021-04-16 14:41:38 CST; 7s ago Process: 77245 ExecStart=/usr/sbin/krb5kdc -P /var/run/krb5kdc.pid $KRB5KDC_ARGS (code=exited, status=0/SUCCESS) Main PID: 77249 (krb5kdc) Tasks: 1 CGroup: /system.slice/krb5kdc.service └─77249 /usr/sbin/krb5kdc -P /var/run/krb5kdc.pid 4月 16 14:41:38 b.kuxiao systemd[1]: Starting Kerberos 5 KDC... 4月 16 14:41:38 b.kuxiao systemd[1]: Started Kerberos 5 KDC.
2.1.5.启动kerberos服务
systemctl start kadmin#启动服务 systemctl status kadmin#查看运行状态 systemctl enable kadmin#设置开机自启
● kadmin.service - Kerberos 5 Password-changing and Administration Loaded: loaded (/usr/lib/systemd/system/kadmin.service; disabled; vendor preset: disabled) Active: active (running) since 五 2021-04-16 14:44:12 CST; 22s ago Process: 77433 ExecStart=/usr/sbin/_kadmind -P /var/run/kadmind.pid $KADMIND_ARGS (code=exited, status=0/SUCCESS) Main PID: 77438 (kadmind) Tasks: 1 CGroup: /system.slice/kadmin.service └─77438 /usr/sbin/kadmind -P /var/run/kadmind.pid 4月 16 14:44:12 b.kuxiao systemd[1]: Starting Kerberos 5 Password-changing and Administration... 4月 16 14:44:12 b.kuxiao systemd[1]: Started Kerberos 5 Password-changing and Administration.
2.1.6.设置管理员
/usr/sbin/kadmin.local -q "addprinc admin/admin"
Authenticating as principal root/admin@KUXIAO.COM with password. WARNING: no policy specified for admin/admin@KUXIAO.COM; defaulting to no policy Enter password for principal "admin/admin@KUXIAO.COM": Re-enter password for principal "admin/admin@KUXIAO.COM": Principal "admin/admin@KUXIAO.COM" created.
2.1.7.kerberos日常操作
登录到管理员账户: 如果在本机上,可以通过kadmin.local直接登录。其它机器的,先使用kinit进行验证
kadmin.local
或
[root@localhost app]# kinit admin/admin Password for admin/admin@KUXIAO.COM: [root@localhost app]# kadmin Authenticating as principal admin/admin@KUXIAO.COM with password. Password for admin/admin@KUXIAO.COM: kadmin:
增删查账户
kadmin.local: addprinc -pw 123456 kafka/b.kuxiao@KUXIAO.COM#创建新账户 WARNING: no policy specified for kafka/b.kuxiao@KUXIAO.COM; defaulting to no policy Principal "kafka/b.kuxiao@KUXIAO.COM" created. kadmin.local: listprincs #查看所有账户 K/M@KUXIAO.COM admin/admin@KUXIAO.COM kadmin/admin@KUXIAO.COM kadmin/b.kuxiao@KUXIAO.COM kadmin/changepw@KUXIAO.COM kafka/b.kuxiao@KUXIAO.COM kiprop/b.kuxiao@KUXIAO.COM krbtgt/KUXIAO.COM@KUXIAO.COM test@KUXIAO.COM kadmin.local: delprinc test#删除test账户 Are you sure you want to delete the principal "test@KUXIAO.COM"? (yes/no): yes Principal "test@KUXIAO.COM" deleted. Make sure that you have removed this principal from all ACLs before reusing.
生成keytab:使用xst命令或者ktadd命令
kadmin.local: ktadd -k /app/kafka.keytab -norandkey kafka/b.kuxiao Entry for principal kafka/b.kuxiao with kvno 1, encryption type aes256-cts-hmac-sha1-96 added to keytab WRFILE:/app/kafka.keytab. Entry for principal kafka/b.kuxiao with kvno 1, encryption type aes128-cts-hmac-sha1-96 added to keytab WRFILE:/app/kafka.keytab. Entry for principal kafka/b.kuxiao with kvno 1, encryption type des3-cbc-sha1 added to keytab WRFILE:/app/kafka.keytab. Entry for principal kafka/b.kuxiao with kvno 1, encryption type arcfour-hmac added to keytab WRFILE:/app/kafka.keytab. Entry for principal kafka/b.kuxiao with kvno 1, encryption type camellia256-cts-cmac added to keytab WRFILE:/app/kafka.keytab. Entry for principal kafka/b.kuxiao with kvno 1, encryption type camellia128-cts-cmac added to keytab WRFILE:/app/kafka.keytab. Entry for principal kafka/b.kuxiao with kvno 1, encryption type des-hmac-sha1 added to keytab WRFILE:/app/kafka.keytab. Entry for principal kafka/b.kuxiao with kvno 1, encryption type des-cbc-md5 added to keytab WRFILE:/app/kafka.keytab.
2.2.Kerberos客户端安装
2.2.1.安装
yum install krb5-workstation krb5-libs krb5-auth-dialog
2.2.2.将之前服务端修改的/etc/krb5.conf 拷贝到客户端(我只有一台服务器,这里就不操作了)
2.2.3.用户操作
认证用户
kinit -kt /app/kafka.keytab kafka/b.kuxiao
[root@localhost app]# kinit -kt /app/kafka.keytab kafka/b.kuxiao [root@localhost app]# klist Ticket cache: FILE:/tmp/krb5cc_0 Default principal: kafka/b.kuxiao@KUXIAO.COM Valid starting Expires Service principal 2021-04-16T15:25:28 2021-04-17T15:25:28 krbtgt/KUXIAO.COM@KUXIAO.COM
查看当前的认证用户
klist
[root@localhost app]# klist Ticket cache: FILE:/tmp/krb5cc_0 Default principal: kafka/b.kuxiao@KUXIAO.COM Valid starting Expires Service principal 2021-04-16T15:25:28 2021-04-17T15:25:28 krbtgt/KUXIAO.COM@KUXIAO.COM
删除当前的认证的缓存
kdestroy
[root@localhost app]# kdestroy [root@localhost app]# klist klist: No credentials cache found (filename: /tmp/krb5cc_0)
三、kafka配置kerberos安全认证
3.1.生成用户keytab
我直接使用2.1.7中生成的
kadmin.local: ktadd -k /app/kafka.keytab -norandkey kafka/b.kuxiao
3.2.创建配置文件
在kafka文件夹下创建jaas/client.properties文件,内容如下:
security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka sasl.mechanism=GSSAPI
在kafka文件夹下创建jaas/kafka_server_jaas.conf文件,内容如下:
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/app/kafka.keytab" principal="kafka/b.kuxiao@KUXIAO.COM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/app/kafka.keytab" principal="kafka/b.kuxiao@KUXIAO.COM"; };
3.3.修改配置文件
在config/server.properties修改添加如下配置
listeners=SASL_PLAINTEXT://b.kuxiao:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI sasl.kerberos.service.name=kafka super.users=User:kafka authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
在bin/kafka-run-class.sh脚本添加kafka jvm参数
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf" export KAFKA_OPTS="-Djava.security.auth.login.config=/app/kafka_2.12-0.10.2.2/jaas/kafka_server_jaas.conf"
3.4.重启kafka服务测试
zookeeper启动
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
kafka启动
nohup ./bin/kafka-server-start.sh ./config/server.properties &
Kerberos认证后消费者
./bin/kafka-console-consumer.sh --bootstrap-server b.kuxiao:9092 --topic test --from-beginning --consumer.config jaas/client.properties
消费者启动成功如图
Kerberos认证后生产者
./bin/kafka-console-producer.sh --broker-list b.kuxiao:9092 --topic test --producer.config jaas/client.properties
生产者输入数据
消费者拿到数据
至此kafka添加Kerberos认证完成
四、java代码实现flink订阅Kerberos认证的Kafka消息
4.1.flink安装
4.1.1.下载
官网下载:https://flink.apache.org/downloads.html
4.1.2.解压
tar -zxvf flink-1.11.3-bin-scala_2.11.tgz
4.1.3.修改配置文件
修改conf/flink-conf.yaml文件,添加内容:(我这里只有一台服务器,所以使用一个keytab啦)
security.kerberos.login.use-ticket-cache: false security.kerberos.login.keytab: /app/kafka.keytab security.kerberos.login.principal: kafka/b.kuxiao@KUXIAO.COM security.kerberos.login.contexts: Client,KafkaClient
参考图:
4.2.启动flink
./bin/start-cluster.sh
打开:http://192.168.72.130:8081/
4.3.Java代码实现
依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cn</groupId> <artifactId>point</artifactId> <version>1.0</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.4</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <java.version>1.8</java.version> <!--<flink.version>1.12.0</flink.version>--> <flink.version>1.11.3</flink.version> <!-- <flink.version>1.7.2</flink.version>--> <hadoop.version>2.7.7</hadoop.version> <scala.version>2.11.8</scala.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent --> <!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.4</version> <type>pom</type> </dependency> --> <!-- alibaba fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.22</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <!-- 默认的版本为3.8.1,修改为4.x,因为3.x使用的为编程的方式,4.x为注解的形式。 --> </dependency> <!-- 需要引入与所安装的kafka对应版本的依赖 --> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.1.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <!-- flink核心API --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>commons-compress</artifactId> <groupId>org.apache.commons</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>scala-library</artifactId> <groupId>org.scala-lang</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>scala-library</artifactId> <groupId>org.scala-lang</groupId> </exclusion> <exclusion> <artifactId>scala-parser-combinators_2.11</artifactId> <groupId>org.scala-lang.modules</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>scala-library</artifactId> <groupId>org.scala-lang</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <!--flink整合kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>snappy-java</artifactId> <groupId>org.xerial.snappy</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <artifactId>jsr305</artifactId> <groupId>com.google.code.findbugs</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>commons-cli</artifactId> <groupId>commons-cli</groupId> </exclusion> <exclusion> <artifactId>commons-codec</artifactId> <groupId>commons-codec</groupId> </exclusion> <exclusion> <artifactId>commons-collections</artifactId> <groupId>commons-collections</groupId> </exclusion> <exclusion> <artifactId>commons-lang</artifactId> <groupId>commons-lang</groupId> </exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>commons-math3</artifactId> <groupId>org.apache.commons</groupId> </exclusion> <exclusion> <artifactId>jackson-core-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>jackson-mapper-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>snappy-java</artifactId> <groupId>org.xerial.snappy</groupId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
flink启动代码
package org.track.manager.data.verify.monitor.verify; import org.apache.flink.api.common.functions.*; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.util.OutputTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class DataVerifyApp { final static Logger logger = LoggerFactory.getLogger(DataVerifyApp.class); private static final Long INITIALDELAY = 5L; private static final Long PERIOD = 5L; //分流 private static final OutputTag<String> countsTag = new OutputTag<String>("counts") { }; public static void main(String[] args) throws Exception { //获取外界参数 ParameterTool pt = ParameterTool.fromArgs(args); String KAFKABROKER = "b.kuxiao:9092";// Kafka服务 String TRANSACTIONGROUP = "grouptest";// 消费组 String TOPICNAME = "test";// topic String sasl_kerberos_service_name = "kafka";// 认证的账户 String security_protocol = "SASL_PLAINTEXT"; String sasl_mechanism = "GSSAPI"; StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.enableCheckpointing(5000); //设置 flink 自动管理检查点 streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置消费语义为EXACTLY_ONCE(只消费一次) streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 确认 checkpoints 之间的时间会进行 30000 ms streamEnv.getCheckpointConfig().setCheckpointTimeout(60000); // Checkpoint 必须在60000L内完成,否则就会被抛弃 streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同一时间只允许一个 checkpoint 进行 //当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint 默认情况下不保留 streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", KAFKABROKER); //kafka的节点的IP或者hostName properties.setProperty("group.id", TRANSACTIONGROUP); //flink consumer flink的消费者的group.id properties.setProperty("security.protocol", security_protocol); properties.setProperty("sasl.mechanism", sasl_mechanism); properties.setProperty("sasl.kerberos.service.name",sasl_kerberos_service_name); //创建一个消费者 FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(TOPICNAME, new SimpleStringSchema(), properties); // 定义kafka流 logger.info("定义kafka流"); SingleOutputStreamOperator<String> kafkaDataStream = streamEnv.addSource(kafkaConsumer) .filter(new RichFilterFunction<String>() { @Override public void open(Configuration parameters) throws Exception { ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); exec.scheduleAtFixedRate(() -> reload(), INITIALDELAY, PERIOD, TimeUnit.MINUTES); } //重复读取数据 public void reload() { } @Override public boolean filter(String s) throws Exception { logger.error("kafka数据读取 : data = {}",s); return true; } }); streamEnv.execute("Data-Verify-APPS"); } }
4.4.运行flink跑jar
./bin/flink run -c org.track.manager.data.verify.monitor.verify.DataVerifyApp /app/source/track-manager-data-verify-1.0.jar
运行成功
4.5.测试
生产者输入消息
成功读取到数据
五、java实现flink订阅Kerberos认证的Kafka消息demo
其实“4.3.Java代码实现”中已经是源代码了,要是搭建不起来的话,就去下面地址下载得了
https://download.csdn.net/download/weixin_43857712/16713877
参考:
https://blog.csdn.net/qq_41787086/article/details/103434489
https://blog.csdn.net/Mrerlou/article/details/114986255
这篇关于centos7下Kerberos认证并集成Kafka,java代码实现flink订阅Kerberos认证的Kafka消息的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南