Flink处理--异步IO
2021/12/3 6:06:44
本文主要是介绍Flink处理--异步IO,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
async
import com.alibaba.fastjson.JSONObject; import org.apache.commons.io.IOUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.hadoop.hbase.util.Bytes; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pers.aishuang.flink.streaming.entity.ItcastDataPartObj; import pers.aishuang.flink.streaming.entity.VehicleLocationModel; import pers.aishuang.flink.streaming.utils.GaoDeMapUtils; import pers.aishuang.flink.streaming.utils.GeoHashUtil; import pers.aishuang.flink.streaming.utils.RedisUtil; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Supplier; /** * 通过异步请求获取指定经纬度的位置信息,从高德API获取位置数据 * 将指定vin某个时间的位置数据保存到redis中 */ public class AsyncHttpQueryFunction extends RichAsyncFunction<ItcastDataPartObj, ItcastDataPartObj> { //创建日志打印器 private static final Logger logger = LoggerFactory.getLogger(AsyncHttpQueryFunction.class); //实现读取异步请求的客户端 (可关闭的http异步请求客户端) private static CloseableHttpAsyncClient httpAsyncClient = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //设置HttpAsyncClient配置 RequestConfig config = RequestConfig.custom() //-- 设置连接超时时间 .setConnectTimeout(5000) //-- 设置socket超时时间 .setSocketTimeout(3000) .build(); //初始化异步Http的client httpAsyncClient = HttpAsyncClients .custom() //设置最大连接数量 .setMaxConnTotal(5) .setDefaultRequestConfig(config) .build(); //开启异步http的客户端 httpAsyncClient.start(); } //实现读取高德API获取位置数据并将位置数据保存到redis中并返回ItcastDataPartObj @Override public void asyncInvoke(ItcastDataPartObj input, ResultFuture<ItcastDataPartObj> resultFuture) throws Exception { //1. 获取当前车辆的经纬度 Double lng = input.getLng(); Double lat = input.getLat(); //2. 通过GaoDeMapUtils工具类根据参数获取请求的url String urlByLonLat = GaoDeMapUtils.getUrlByLonLat(lng,lat); //3. 创建http get请求对象 HttpGet httpGet = new HttpGet(urlByLonLat); //4. 使用刚创建的http异步客户端执行 http请求对象 Future<HttpResponse> future = httpAsyncClient.execute(httpGet, null); //5. 从执行完成的future中获取数据,返回ItcastDataPartObj对象 CompletableFuture<ItcastDataPartObj> completableFuture = CompletableFuture.supplyAsync(new Supplier<ItcastDataPartObj>(){ //重写get方法 //成功时,Redis写入了数据,ItcastDataPartObj的相关字段数据也补齐了。 //失败时,什么也不做,原样返回 @Override public ItcastDataPartObj get() { try { String country = null; String province = null; String city = null; String district = null; String address = null; //再开个线程自己去拿 HttpResponse httpResponse = future.get(); //使用future获取到返回的值 if(httpResponse.getStatusLine().getStatusCode() == 200 ){ HttpEntity entity = httpResponse.getEntity(); InputStream contentStream = entity.getContent(); //①通过IO流工具类直接生成字符串 String content1 = IOUtils.toString(contentStream); //②通过将InputStream转换成输入Reader (转换流:字节流->字符流) InputStreamReader inputStreamReader = new InputStreamReader(contentStream); //--再读取数据流到buffer缓冲区(字符流->高效字符流) BufferedReader bufferedReader = new BufferedReader(inputStreamReader); final int bufferSize = 1024; final char[] buffer = new char[bufferSize]; final StringBuilder out = new StringBuilder(); int len; while ((len = bufferedReader.read(buffer)) != -1){ out.append(new String(buffer,0,len)); } inputStreamReader.close(); bufferedReader.close(); String content2 = out.toString(); //③Entity工具类 String content3 = EntityUtils.toString(entity); //---------------------------- //将json字符串转换成对象然后读取出来国家,省、市、区、address JSONObject jsonObject = JSONObject.parseObject(content3); JSONObject regeocode = jsonObject.getJSONObject("regeocode"); if(regeocode !=null && regeocode.size() > 0 ){ address = regeocode.getString("formatted_address"); JSONObject addressComponent = regeocode.getJSONObject("addressComponent"); if(addressComponent != null && addressComponent.size() > 0) { country = addressComponent.getString("country"); province = addressComponent.getString("province"); city = addressComponent.getString("city"); district = addressComponent.getString("district"); //将其封装为VehicleLocationModel 并写入到redis VehicleLocationModel vehicleLocationModel = new VehicleLocationModel( country, province, city, district, address, lat, lng ); //获取geohash值作为存储到redis的key String geoHash = GeoHashUtil.encode(lat,lng); RedisUtil.set( Bytes.toBytes(geoHash), //字节数组 (二进制数据) vehicleLocationModel.toJsonStringArr()//字节数组(二进制数据) ); //将当前车辆的位置信息赋值 input.setCountry(country); input.setProvince(province); input.setCity(city); input.setDistrict(district); input.setAddress(address); }else{ logger.error("当前解析出来的地理信息为空,请检查"); } }else { logger.error("当前解析出来的对象为空,请检查!"); } }else { logger.error("当前url请求返回reponse错误!"); } } catch (Exception e) { e.printStackTrace(); } return input; } }); //6. 从future的thenAccept completableFuture.thenAccept(new Consumer<ItcastDataPartObj>() { //重写accept方法,使用集合中只放一个对象 @Override public void accept(ItcastDataPartObj itcastDataPartObj) { resultFuture.complete(Collections.singleton(itcastDataPartObj)); } }); } //超时了怎么处理(如果当前请求超时,打印输出超时日志或告警信息) @Override public void timeout(ItcastDataPartObj input, ResultFuture<ItcastDataPartObj> resultFuture) throws Exception { //超时时间,打印输出异步请求的超时警告 System.out.println("当前异步请求超时!"); } //关闭当前的http异步请求客户端 @Override public void close() throws Exception { if(httpAsyncClient.isRunning()) httpAsyncClient.close(); } }
实例
import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pers.aishuang.flink.streaming.async.AsyncHttpQueryFunction; import pers.aishuang.flink.streaming.entity.ItcastDataPartObj; import pers.aishuang.flink.streaming.entity.OnlineDataObj; import pers.aishuang.flink.streaming.entity.VehicleInfoModel; import pers.aishuang.flink.streaming.function.flatmap.VehicleInfoMapMysqlFunction; import pers.aishuang.flink.streaming.function.map.LocactionInfoReidsFunction; import pers.aishuang.flink.streaming.function.window.OnlineStatisticsWindowFunction; import pers.aishuang.flink.streaming.sink.mysql.OnlineStatisticsMysqlSink; import pers.aishuang.flink.streaming.source.mysql.VehicleInfoMysqlSource; import pers.aishuang.flink.streaming.utils.JsonParsePartUtil; import java.util.HashMap; import java.util.concurrent.TimeUnit; /** * 实现车辆的实时上报故障诊断业务分析 * 1、读取车辆的数据,将jsob字符串转换成对象 * 2、读取出来正确的数据 * 3、将车辆的数据通过地理位置(经纬度)去redis中拉取(geoHash算法) * -- 如果拉取数据成功,直接封装成对象 * -- 如果拉取省市区地理位置失败,异步数据流读取高德API请求地理位置并将数据保存到redis中 * 4、将从redis和高德API拉宽的数据进行合并处理 * 5、使用窗口操作,比如30s统计一些窗口内的故障告警对象返回 * 6、读取mysql数据库中的车辆静态数据,车辆车型车系,销售时间等 * 7、窗口数据和静态数据进行connect并flatMap,拉宽数据 * 8、将数据写入到mysql中 * 9、执行任务流环境 * */ public class OnlineStatisticsTask extends BaseTask{ private static final Logger logger = LoggerFactory.getLogger(OnlineStatisticsTask.class); public static void main(String[] args) throws Exception{ //1. 初始化Flink流处理的执行环境(事件时间、checkpoint、hadoop name) StreamExecutionEnvironment env = getEnv(OnlineStatisticsTask.class.getSimpleName()); //2. 接入kafka数据源,消费kafka数据 DataStreamSource<String> kafkaStream = getKafkaStream( env, "__consumer_online_alarm_analysis_", SimpleStringSchema.class); //3. 将消费到的json字符串转换成ItcastDataPartObj对象 DataStream<ItcastDataPartObj> source = kafkaStream .map(JsonParsePartUtil::parseJsonToObject) //4. 过滤掉异常数据,根据errorDara属性判断(没有VIN号和终端时间 和json解析失败的数据都视为异常数据) .filter(obj -> StringUtils.isEmpty(obj.getErrorData())); //5. 读取redis中的位置数据<geohash,VehicleLocationModel> ,生成新的数据流 SingleOutputStreamOperator<ItcastDataPartObj> itcastDataMapStream = source.map(new LocactionInfoReidsFunction()); //6. 过滤出 redis拉宽成功的地理位置数据 SingleOutputStreamOperator<ItcastDataPartObj> okWithLocationStream = itcastDataMapStream .filter(obj -> StringUtils.isNotEmpty(obj.getProvince())); //7. 过滤出 redis拉框失败的地理位置数据 SingleOutputStreamOperator<ItcastDataPartObj> ngWithLocationStream = itcastDataMapStream .filter(obj -> StringUtils.isEmpty(obj.getProvince())); //8. 对redis拉框失败的地理位置数据使用异步IO访问高德地图地理位置查询地理位置信息,并将返回结果写入到reids中 //-- 异步数据流 :处理之后的数据(成功补齐数据和失败的ItcastDataPartObj) //-- 存在问题,http请求失败的数据还在里面,仍然缺少坐标详细信息 SingleOutputStreamOperator<ItcastDataPartObj> withLocationAsyncStream = AsyncDataStream //无序返回(可设置返回是否有序,先访问先返回,后访问后返回,设置有序会造成效率低,所以设置为无序) .unorderedWait( ngWithLocationStream, new AsyncHttpQueryFunction(), 3000, //设置超时时间,超过设定时间,认为任务请求失败,3000ms=》 3s TimeUnit.MICROSECONDS //超时单位 ); //9. 将redis拉宽的地理位置数据与高德API拉宽的地理位置数据进行上下合并(合流) //flatmap(FlatMap) / map(Map) 用于单流 // broadcast + connect + flatmap(CoFlatMap)/map(CoMap) 数据拉宽,主要用于两流的数据左右合并(不要求两流的数据类型一致) // union 数据数据上下合并,要求数据类型一致。 //FlatMap 和 Map是用于单流的,CoFlatMap和CoMap是用于两条流连接(co:connect) WindowedStream<ItcastDataPartObj, String, TimeWindow> windowStream = okWithLocationStream .union(withLocationAsyncStream) //10. 创建原始数据的30s的滚动窗口,根据vin进行分流操作 .assignTimestampsAndWatermarks( //水印乱序时间设为3s new BoundedOutOfOrdernessTimestampExtractor<ItcastDataPartObj>(Time.seconds(3)) { @Override public long extractTimestamp(ItcastDataPartObj element) { //指定JavaBean中某个字段数据作为事件时间,必须是long类型 return element.getTerminalTimeStamp(); } } ) //设置分组,指定JavaBean的vin字段作为分组字段 .keyBy(obj -> obj.getVin()) //设置窗口类型:为滚动事件时间窗口,并设置窗口大小 .window(TumblingEventTimeWindows.of(Time.seconds(30))); //11. 对原始数据的窗口流数据进行实时故障分析(区分出来告警数据和非告警数据19个告警字段) SingleOutputStreamOperator<OnlineDataObj> onlineStatisticsStream = windowStream .apply(new OnlineStatisticsWindowFunction()); //12. 加载业务中间表(7张表:车辆表、车辆类型表、车辆销售记录表、车辆用途表4张),并进行广播 DataStream<HashMap<String, VehicleInfoModel>> vehicleInfoBroadcastStream = env .addSource(new VehicleInfoMysqlSource()).broadcast(); //13. 将第11步和第12步的广播流结果进行关联,并应用拉宽操作。 //上报车辆不在库记载的直接丢了 SingleOutputStreamOperator<OnlineDataObj> result = onlineStatisticsStream .connect(vehicleInfoBroadcastStream) .flatMap(new VehicleInfoMapMysqlFunction()); //14. 将拉框后的结果数据写入到mysql数据库中 result.addSink(new OnlineStatisticsMysqlSink()); //15. 启动作业(触发执行) env.execute(OnlineStatisticsTask.class.getSimpleName()); } }
这篇关于Flink处理--异步IO的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南