centos7下Kerberos认证并集成Kafka,java代码实现flink订阅Kerberos认证的Kafka消息

2021/4/17 7:27:58

本文主要是介绍centos7下Kerberos认证并集成Kafka,java代码实现flink订阅Kerberos认证的Kafka消息,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

centos7下Kerberos认证并集成Kafka

版本环境:
(1)centos7.6
(2)kafka_2.12-0.10.2.2
(3)kerberos
(4)flink-1.11.3-bin-scala_2.11
(5)jdk1.8
注意:其中“b.kuxiao”是我的hostname

一、kafka安装

1.1.去官网下载kafka

kafka官网:http://kafka.apache.org/downloads.
选择自己需要的版本:(本文我使用的是0.10.2.2版本)
在这里插入图片描述

1.2.解压.tgz

tar -zxvf kafka_2.12-0.10.2.2.tgz

在这里插入图片描述

1.3.修改配置文件

进入config/server.properties文件修改
broker.id:因为kafka一般是集群部署,所以每个单机有个broker.id,因为这里我只部一个单机测试,所以指定为0就可以了
在这里插入图片描述
log.dirs:自行指定kafka的日志目录
在这里插入图片描述
listeners:取消注释并修改为“PLAINTEXT://b.kuxiao:9092”,其中“b.kuxiao”是我的hostname(可使用hostnamectl命令查看自己的hostname)
在这里插入图片描述
zookeeper.connect:修改为自己的hostname
在这里插入图片描述
进入config/producer.properties文件修改
bootstrap.servers:修改为自己的hostname
在这里插入图片描述
进入config/consumer.properties文件修改
zookeeper.connect:修改为自己的hostname
在这里插入图片描述
命令进入vim /etc/hosts,添加自己的hostname,“192.168.72.130”是我得内网ip
在这里插入图片描述

1.4.启动zookeeper(kafka自带的zk)

./bin/zookeeper-server-start.sh ./config/zookeeper.properties
或
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

1.5.启动kafka

./bin/kafka-server-start.sh ./config/server.properties
或
nohup ./bin/kafka-server-start.sh ./config/server.properties &

1.6.使用 ps -ef|grep kafka查看,执行成功

在这里插入图片描述

1.7.创建一个测试topic(test)

./bin/kafka-topics.sh --create --zookeeper b.kuxiao:2181 --replication-factor 1 --partitions 2 --topic test


查看已存在topic

./bin/kafka-topics.sh --list --zookeeper b.kuxiao:2181

在这里插入图片描述

1.8.启动消费者

./bin/kafka-console-consumer.sh --bootstrap-server b.kuxiao:9092  --topic test --from-beginning

1.9.启动生产者

./bin/kafka-console-producer.sh --broker-list b.kuxiao:9092 --topic test

1.10.测试

在这里插入图片描述

二、Kerberos安装

2.1.Kerberos服务端安装

2.1.1.安装

yum install krb5-server

2.1.2.修改配置文件

修改/var/kerberos/krb5kdc/kdc.conf

vim /var/kerberos/krb5kdc/kdc.conf
[kdcdefaults]
 kdc_ports = 88
 kdc_tcp_ports = 88

[realms]
 KUXIAO.COM = {
  #master_key_type = aes256-cts
  acl_file = /var/kerberos/krb5kdc/kadm5.acl
  dict_file = /usr/share/dict/words
  admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
  supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
 }

修改/var/kerberos/krb5kdc/kadm5.acl

vim /var/kerberos/krb5kdc/kadm5.acl
*/admin@KUXIAO.COM      *

修改/etc/krb5.conf

vim /etc/krb5.conf
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/

[logging]
 default = FILE:/var/log/krb5libs.log
 kdc = FILE:/var/log/krb5kdc.log
 admin_server = FILE:/var/log/kadmind.log

[libdefaults]
 default_realm = KUXIAO.COM
 dns_lookup_kdc = false
 dns_lookup_realm = false
 ticket_lifetime = 86400
# renew_lifetime = 604800
 forwardable = true
 default_tgs_enctypes = aes128-cts aes256-cts-hmac-sha1-96 des3-hmac-sha1 arcfour-hmac
 default_tkt_enctypes = aes128-cts aes256-cts-hmac-sha1-96 des3-hmac-sha1 arcfour-hmac
 permitted_enctypes = aes128-cts aes256-cts-hmac-sha1-96 des3-hmac-sha1 arcfour-hmac
 udp_preference_limit = 1
 kdc_timeout = 60000

[realms]
KUXIAO.COM = {
  kdc = b.kuxiao #hostname
  admin_server = b.kuxiao
}

# EXAMPLE.COM = {
#  kdc = kerberos.example.com
#  admin_server = kerberos.example.com
# }

[domain_realm]
# .example.com = EXAMPLE.COM
# example.com = EXAMPLE.COM

2.1.3.初始化KDC数据库

需要输入一个管理KDC服务器的密码,千万别忘记

kdb5_util create -r KUXIAO.COM -s
Loading random data
Initializing database '/var/kerberos/krb5kdc/principal' for realm 'KUXIAO.COM',
master key name 'K/M@KUXIAO.COM'
You will be prompted for the database Master Password.
It is important that you NOT FORGET this password.
Enter KDC database master key: 
Re-enter KDC database master key to verify: 

2.1.4.启动KDC服务

systemctl start krb5kdc#启动服务
systemctl status krb5kdc.service#查看运行状态
systemctl enable krb5kdc#设置开机自启
● krb5kdc.service - Kerberos 5 KDC
   Loaded: loaded (/usr/lib/systemd/system/krb5kdc.service; disabled; vendor preset: disabled)
   Active: active (running) since 五 2021-04-16 14:41:38 CST; 7s ago
  Process: 77245 ExecStart=/usr/sbin/krb5kdc -P /var/run/krb5kdc.pid $KRB5KDC_ARGS (code=exited, status=0/SUCCESS)
 Main PID: 77249 (krb5kdc)
    Tasks: 1
   CGroup: /system.slice/krb5kdc.service
           └─77249 /usr/sbin/krb5kdc -P /var/run/krb5kdc.pid

4月 16 14:41:38 b.kuxiao systemd[1]: Starting Kerberos 5 KDC...
4月 16 14:41:38 b.kuxiao systemd[1]: Started Kerberos 5 KDC.

2.1.5.启动kerberos服务

systemctl start kadmin#启动服务
systemctl status kadmin#查看运行状态
systemctl enable kadmin#设置开机自启
● kadmin.service - Kerberos 5 Password-changing and Administration
   Loaded: loaded (/usr/lib/systemd/system/kadmin.service; disabled; vendor preset: disabled)
   Active: active (running) since 五 2021-04-16 14:44:12 CST; 22s ago
  Process: 77433 ExecStart=/usr/sbin/_kadmind -P /var/run/kadmind.pid $KADMIND_ARGS (code=exited, status=0/SUCCESS)
 Main PID: 77438 (kadmind)
    Tasks: 1
   CGroup: /system.slice/kadmin.service
           └─77438 /usr/sbin/kadmind -P /var/run/kadmind.pid

4月 16 14:44:12 b.kuxiao systemd[1]: Starting Kerberos 5 Password-changing and Administration...
4月 16 14:44:12 b.kuxiao systemd[1]: Started Kerberos 5 Password-changing and Administration.

2.1.6.设置管理员

/usr/sbin/kadmin.local -q "addprinc admin/admin"
Authenticating as principal root/admin@KUXIAO.COM with password.
WARNING: no policy specified for admin/admin@KUXIAO.COM; defaulting to no policy
Enter password for principal "admin/admin@KUXIAO.COM": 
Re-enter password for principal "admin/admin@KUXIAO.COM": 
Principal "admin/admin@KUXIAO.COM" created.

2.1.7.kerberos日常操作

登录到管理员账户: 如果在本机上,可以通过kadmin.local直接登录。其它机器的,先使用kinit进行验证

kadmin.local

[root@localhost app]# kinit admin/admin
Password for admin/admin@KUXIAO.COM: 
[root@localhost app]# kadmin
Authenticating as principal admin/admin@KUXIAO.COM with password.
Password for admin/admin@KUXIAO.COM: 
kadmin:  

增删查账户

kadmin.local:  addprinc -pw 123456 kafka/b.kuxiao@KUXIAO.COM#创建新账户
WARNING: no policy specified for kafka/b.kuxiao@KUXIAO.COM; defaulting to no policy
Principal "kafka/b.kuxiao@KUXIAO.COM" created.
kadmin.local:  listprincs #查看所有账户
K/M@KUXIAO.COM
admin/admin@KUXIAO.COM
kadmin/admin@KUXIAO.COM
kadmin/b.kuxiao@KUXIAO.COM
kadmin/changepw@KUXIAO.COM
kafka/b.kuxiao@KUXIAO.COM
kiprop/b.kuxiao@KUXIAO.COM
krbtgt/KUXIAO.COM@KUXIAO.COM
test@KUXIAO.COM
kadmin.local:  delprinc test#删除test账户
Are you sure you want to delete the principal "test@KUXIAO.COM"? (yes/no): yes
Principal "test@KUXIAO.COM" deleted.
Make sure that you have removed this principal from all ACLs before reusing.

生成keytab:使用xst命令或者ktadd命令

kadmin.local:  ktadd -k /app/kafka.keytab -norandkey kafka/b.kuxiao
Entry for principal kafka/b.kuxiao with kvno 1, encryption type aes256-cts-hmac-sha1-96 added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type aes128-cts-hmac-sha1-96 added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type des3-cbc-sha1 added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type arcfour-hmac added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type camellia256-cts-cmac added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type camellia128-cts-cmac added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type des-hmac-sha1 added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type des-cbc-md5 added to keytab WRFILE:/app/kafka.keytab.

2.2.Kerberos客户端安装

2.2.1.安装

yum install krb5-workstation krb5-libs krb5-auth-dialog

2.2.2.将之前服务端修改的/etc/krb5.conf 拷贝到客户端(我只有一台服务器,这里就不操作了)

2.2.3.用户操作

认证用户

kinit -kt /app/kafka.keytab kafka/b.kuxiao
[root@localhost app]# kinit -kt /app/kafka.keytab kafka/b.kuxiao
[root@localhost app]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka/b.kuxiao@KUXIAO.COM

Valid starting       Expires              Service principal
2021-04-16T15:25:28  2021-04-17T15:25:28  krbtgt/KUXIAO.COM@KUXIAO.COM

查看当前的认证用户

klist
[root@localhost app]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka/b.kuxiao@KUXIAO.COM

Valid starting       Expires              Service principal
2021-04-16T15:25:28  2021-04-17T15:25:28  krbtgt/KUXIAO.COM@KUXIAO.COM

删除当前的认证的缓存

kdestroy
[root@localhost app]# kdestroy
[root@localhost app]# klist
klist: No credentials cache found (filename: /tmp/krb5cc_0)

三、kafka配置kerberos安全认证

3.1.生成用户keytab

我直接使用2.1.7中生成的

kadmin.local:  ktadd -k /app/kafka.keytab -norandkey kafka/b.kuxiao

3.2.创建配置文件

在kafka文件夹下创建jaas/client.properties文件,内容如下:

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI

在kafka文件夹下创建jaas/kafka_server_jaas.conf文件,内容如下:

KafkaServer {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/app/kafka.keytab"
            principal="kafka/b.kuxiao@KUXIAO.COM";
        };
KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/app/kafka.keytab"
        principal="kafka/b.kuxiao@KUXIAO.COM";
};

3.3.修改配置文件

在config/server.properties修改添加如下配置

listeners=SASL_PLAINTEXT://b.kuxiao:9092

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
super.users=User:kafka
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

在bin/kafka-run-class.sh脚本添加kafka jvm参数

export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf" 
export KAFKA_OPTS="-Djava.security.auth.login.config=/app/kafka_2.12-0.10.2.2/jaas/kafka_server_jaas.conf"

在这里插入图片描述

3.4.重启kafka服务测试

zookeeper启动

nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

kafka启动

nohup ./bin/kafka-server-start.sh ./config/server.properties &

Kerberos认证后消费者

./bin/kafka-console-consumer.sh --bootstrap-server b.kuxiao:9092  --topic test --from-beginning --consumer.config jaas/client.properties

消费者启动成功如图
在这里插入图片描述

Kerberos认证后生产者

./bin/kafka-console-producer.sh --broker-list b.kuxiao:9092 --topic test --producer.config jaas/client.properties

生产者输入数据
在这里插入图片描述
消费者拿到数据
在这里插入图片描述
至此kafka添加Kerberos认证完成

四、java代码实现flink订阅Kerberos认证的Kafka消息

4.1.flink安装

4.1.1.下载

官网下载:https://flink.apache.org/downloads.html

4.1.2.解压

tar -zxvf flink-1.11.3-bin-scala_2.11.tgz

4.1.3.修改配置文件

修改conf/flink-conf.yaml文件,添加内容:(我这里只有一台服务器,所以使用一个keytab啦)

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /app/kafka.keytab
security.kerberos.login.principal: kafka/b.kuxiao@KUXIAO.COM
security.kerberos.login.contexts: Client,KafkaClient

参考图:
在这里插入图片描述

4.2.启动flink

./bin/start-cluster.sh

打开:http://192.168.72.130:8081/

4.3.Java代码实现

依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.cn</groupId>
	<artifactId>point</artifactId>
	<version>1.0</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.4</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<java.version>1.8</java.version>
		<!--<flink.version>1.12.0</flink.version>-->
		<flink.version>1.11.3</flink.version>
<!--      <flink.version>1.7.2</flink.version>-->
	    <hadoop.version>2.7.7</hadoop.version>
	    <scala.version>2.11.8</scala.version>
	</properties>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
		<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> 
			<version>2.4.4</version> <type>pom</type> </dependency> -->

		<!-- alibaba fastjson -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.22</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<!-- 默认的版本为3.8.1,修改为4.x,因为3.x使用的为编程的方式,4.x为注解的形式。 -->
		</dependency>
		<!-- 需要引入与所安装的kafka对应版本的依赖 -->
		<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.11</artifactId>
			<version>1.1.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<!-- flink核心API -->
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-cep_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<artifactId>commons-compress</artifactId>
					<groupId>org.apache.commons</groupId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-scala_2.11</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<artifactId>scala-library</artifactId>
					<groupId>org.scala-lang</groupId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<artifactId>scala-library</artifactId>
					<groupId>org.scala-lang</groupId>
				</exclusion>
				<exclusion>
					<artifactId>scala-parser-combinators_2.11</artifactId>
					<groupId>org.scala-lang.modules</groupId>
				</exclusion>
				<exclusion>
					<artifactId>slf4j-api</artifactId>
					<groupId>org.slf4j</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_2.11</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<artifactId>scala-library</artifactId>
					<groupId>org.scala-lang</groupId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!--flink整合kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<artifactId>slf4j-api</artifactId>
					<groupId>org.slf4j</groupId>
				</exclusion>
				<exclusion>
					<artifactId>snappy-java</artifactId>
					<groupId>org.xerial.snappy</groupId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		
		<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
      <exclusions>
        <exclusion>
          <artifactId>jsr305</artifactId>
          <groupId>com.google.code.findbugs</groupId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-cli</artifactId>
          <groupId>commons-cli</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-codec</artifactId>
          <groupId>commons-codec</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-collections</artifactId>
          <groupId>commons-collections</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-lang</artifactId>
          <groupId>commons-lang</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-logging</artifactId>
          <groupId>commons-logging</groupId>
        </exclusion>
        <exclusion>
          <artifactId>netty</artifactId>
          <groupId>io.netty</groupId>
        </exclusion>
        <exclusion>
          <artifactId>log4j</artifactId>
          <groupId>log4j</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-math3</artifactId>
          <groupId>org.apache.commons</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jackson-core-asl</artifactId>
          <groupId>org.codehaus.jackson</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jackson-mapper-asl</artifactId>
          <groupId>org.codehaus.jackson</groupId>
        </exclusion>
        <exclusion>
          <artifactId>slf4j-api</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
          <artifactId>slf4j-log4j12</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
          <artifactId>snappy-java</artifactId>
          <groupId>org.xerial.snappy</groupId>
        </exclusion>
      </exclusions>
    </dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>8</source>
					<target>8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

flink启动代码

package org.track.manager.data.verify.monitor.verify;

import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DataVerifyApp {
    final static Logger logger = LoggerFactory.getLogger(DataVerifyApp.class);
    private static final Long INITIALDELAY = 5L;
    private static final Long PERIOD = 5L;

    //分流
    private static final OutputTag<String> countsTag = new OutputTag<String>("counts") {
    };


    public static void main(String[] args) throws Exception {

        //获取外界参数
        ParameterTool pt = ParameterTool.fromArgs(args);
        String KAFKABROKER = "b.kuxiao:9092";// Kafka服务
        String TRANSACTIONGROUP = "grouptest";// 消费组
        String TOPICNAME = "test";// topic
        String sasl_kerberos_service_name = "kafka";// 认证的账户
        String security_protocol = "SASL_PLAINTEXT";
        String sasl_mechanism = "GSSAPI";

        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.enableCheckpointing(5000); //设置 flink 自动管理检查点
        streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置消费语义为EXACTLY_ONCE(只消费一次)

        streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 确认 checkpoints 之间的时间会进行 30000 ms
        streamEnv.getCheckpointConfig().setCheckpointTimeout(60000);  // Checkpoint 必须在60000L内完成,否则就会被抛弃
        streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);  // 同一时间只允许一个 checkpoint 进行
        //当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint  默认情况下不保留
        streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", KAFKABROKER);  //kafka的节点的IP或者hostName
        properties.setProperty("group.id", TRANSACTIONGROUP);   //flink consumer flink的消费者的group.id
        properties.setProperty("security.protocol", security_protocol);
        properties.setProperty("sasl.mechanism", sasl_mechanism);
        properties.setProperty("sasl.kerberos.service.name",sasl_kerberos_service_name);

        //创建一个消费者
        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(TOPICNAME, new SimpleStringSchema(), properties);
        // 定义kafka流
        logger.info("定义kafka流");
        SingleOutputStreamOperator<String> kafkaDataStream = streamEnv.addSource(kafkaConsumer)
                .filter(new RichFilterFunction<String>() {
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
                        exec.scheduleAtFixedRate(() -> reload(), INITIALDELAY, PERIOD, TimeUnit.MINUTES);
                    }

                    //重复读取数据
                    public void reload() {
                    }

                    @Override
                    public boolean filter(String s) throws Exception {
                        logger.error("kafka数据读取 : data = {}",s);
                        return true;
                    }
                });

        streamEnv.execute("Data-Verify-APPS");
    }

}

4.4.运行flink跑jar

./bin/flink run -c org.track.manager.data.verify.monitor.verify.DataVerifyApp /app/source/track-manager-data-verify-1.0.jar

在这里插入图片描述

运行成功
在这里插入图片描述

4.5.测试

生产者输入消息
在这里插入图片描述
成功读取到数据
在这里插入图片描述

五、java实现flink订阅Kerberos认证的Kafka消息demo

其实“4.3.Java代码实现”中已经是源代码了,要是搭建不起来的话,就去下面地址下载得了
https://download.csdn.net/download/weixin_43857712/16713877
参考:
https://blog.csdn.net/qq_41787086/article/details/103434489
https://blog.csdn.net/Mrerlou/article/details/114986255



这篇关于centos7下Kerberos认证并集成Kafka,java代码实现flink订阅Kerberos认证的Kafka消息的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程