代码回现 | 如何实现交易反欺诈?
2021/4/14 18:55:19
本文主要是介绍代码回现 | 如何实现交易反欺诈?,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、背景概述
交易反欺诈是VoltDB适用场景之一,是典型的事件驱动的业务,核心是摄取高频的交易数据,并逐条对交易进行一系列复杂的反欺诈规则校验,最终生成评判交易可疑度的分值,发送给下游业务系统,触发交易拦截动作。
反欺诈规则中涉及大量的通过分析历史交易生成的指标项,在VoltDB中进行流式计算,可基于本地保存的丰富的上下文数据对事件进行分析决策,使实时计算靠近上下文数据,获得性能优势。
二、实例回现
下面我们通过一个刷卡的应用,展示VoltDB是如何实现一个简单的反欺诈用例的。为了让示例代码更加简洁,又能突出VoltDB的功能,这里使用一个地铁刷卡的场景替代金融交易(如信用卡刷卡),以避免引入过多专业的金融业务知识。同时一个繁忙地铁系统产生的交易吞吐量不可小觑,定义的反欺诈规则也更容易理解。
可以通过这个链接来访问详细的代码https://github.com/ssomagani/event-driven-transactions
在这个应用中,模拟如下几个场景:
- 多辆列车在地铁站点之间运行,生成列车进站事件。通过这个场景可以了解,如何将数据发布到VoltDB Topic中,以及如何消费Topic中的数据。
- 公交卡充值操作。通过这个场景,可以了解,如何使用一个包含自定义业务规则的procedure来处理Topic中的数据,同时使用Stream对象将数据导出到Topic中,并通过视图对Stream中的数据流进行统计,生成实时的统计报表。视图会逐条统计Stream中的流数据,将处理结果保存到视图中,是VoltDB实现流式计算的方式之一。
- 乘客刷卡乘车,生成高频交易数据。通过这个场景,可以了解,如何使用VoltDB数据库客户端api直接操作数据表(区别与将数据发送到Topic中),保存交易数据。如何通过VoltDB的java procedure定制反欺诈校验规则,并调用java procedure进行交易校验和反欺诈行为。
让我们来具体了解一下,在VoltDB中运行这个用例的过程。
2.1准备工作
1. 启用VoltDB Topic功能
VoltDB提供一个统一的配置文件,主要的特性都可以在其中进行定义,如:持久化、高可用、安全性等等,这里主要介绍与案例相关的VoltDB Topic功能。如下配置开启了Topic服务,并在服务器上开启端口9999,用于接受客户端发来的消息。
<Topics enabled="true"> <properties> <property name="port">9999</property> <property name="group.initial.rebalance.delay.ms">0</property> <property name="retention.policy.threads">1</property> </properties> <profiles> <profile name="retain_compact"> <retention policy="compact" limit="2048" /> </profile> </profiles> </Topics>
2.根据特定配置文件启动VoltDB
3.创建Topic,Topic的用途后面的代码分析中提到
CREATE Topic TRAINTOPIC execute procedure train_events.insert; CREATE TOPIC RECHARGE execute procedure RechargeCard; CREATE TOPIC using stream CARD_ALERT_EXPORT properties(topic.format=avro); create topic using stream FRAUD properties(topic.format=avro,consumer.keys=TRANS_ID);
4.创建数据表
在处理实时事件流时,可以充分利用底层的数据库引擎,充分利用本地关系型数据进行数据分析,得到反欺诈业务指标。在本例中将创建如下数据表和视图(省略具体DDL)
5.初始化数据
通过VoltDB的数据导入功能,从csv文件中初始化站点和列车
csvloader --file $PROJ_HOME/data/redline.csv --reportdir log stations csvloader --file $PROJ_HOME/data/trains.csv --reportdir log trains
2.2 代码分析-列车运行
在这个场景中,客户端模拟8辆列车在17个站点之间运行,产生进站事件并发送到Topic。由于设定的列车进出站时间比较短(微秒为单位),所以会产生高频事件流。
在服务端,VoltDB完成:
1.消息接收
2.消费消息
3.将列车进站事件记录到数据库中
在客户端,通过java类TrainProducer生成多辆列车进站事件,并将事件发送到VoltDB Topic中。TrainProducer的执行命令如下:
java metro.pub.TrainProducer localhost:9999 TRAINTOPIC 8
TrainProducer类接收四个参数:
- .指定接收列车进站和离站事件的VoltDB服务器端口。这里假设在同一台机器上运行client代码和VoltDB,而前面在VoltDB配置文件中我们已经指定Topic的监听端口是9999。
- 指定VoltDB broker
- 指定数据发送的Topic名称。
- 指定要模拟的列车数量。
分析一下TrainProducer的主要方法,main方法生成10个线程,每50毫秒执行一次publish()方法,将列车进出站时间发送到Topic“TRAINTOPIC”中。
public static void main(String[] args) { ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(10); TrainProducer producer = new TrainProducer(args[0], args[1], Integer.parseInt(args[2])); System.out.println("Scheduling trains"); EXECUTOR.scheduleAtFixedRate ( () -> { producer.publish(producer.getNewEvents()); }, 1, 50, MILLISECONDS); }
跟踪代码找到producer的定义,它其实就是原生的KafkaProducer,所以可以看到VoltDB Topic完全兼容kafka api。而brokers即是main方法中的传参localhost:9999,因此上面producer.getNewEvents()方法生成的数据将被发送到VoltDB Topic中。
private Producer<String, TrainEvent> createProducer() { Properties props = new Properties(); props.put("bootstrap.servers", brokers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "metro.serde.TrainEventSer"); Producer<String, TrainEvent> producer = new KafkaProducer <String, TrainEvent>(props); return producer; }
Publish方法所发送的消息由producer.getNewEvents()方法生成。有必要提前看一下Stations类,其中定义了17个火车站点,包括每个站点的到下一个站点的运行时间(Station.nextStnDuration)和本站点停车时间(Station.stnWaitDuration),时间以微秒为单位。所有列车将依次在这些站点中运行。
static HashMap<Integer, Station> idToStationMap = new HashMap<>(); static { idToStationMap.put(1, new Station(1, 1200000, 450000)); idToStationMap.put(2, new Station(2, 1050000, 250000)); idToStationMap.put(3, new Station(3, 850000, 300000)); idToStationMap.put(4, new Station(4, 900000, 350000)); idToStationMap.put(5, new Station(5, 500000, 260000)); idToStationMap.put(6, new Station(6, 950000, 190000)); idToStationMap.put(7, new Station(7, 450000, 130000)); idToStationMap.put(8, new Station(8, 200000, 280000)); idToStationMap.put(9, new Station(9, 200000, 110000)); idToStationMap.put(10, new Station(10, 450000, 300000)); idToStationMap.put(11, new Station(11, 550000, 200000)); idToStationMap.put(12, new Station(12, 550000, 200000)); idToStationMap.put(13, new Station(13, 800000, 150000)); idToStationMap.put(14, new Station(14, 950000, 100000)); idToStationMap.put(15, new Station(15, 1000000, 130000)); idToStationMap.put(16, new Station(16, 1200000, 220000)); idToStationMap.put(17, new Station(17, 1500000, 500000)); } public static class Station { public final int stationId; public final int nextStnDuration; public final int stnWaitDuration; public Station(int stationId, int nextStnDuration, int stnWaitDuration) { this.stationId = stationId; this.nextStnDuration = nextStnDuration; this.stnWaitDuration = stnWaitDuration; } }
所以getNewEvents主要的逻辑是首先随机设定列车从任意站点出发,然后调用next()根据系统当前时间和站点的Station.nextStnDuration、Station.stnWaitDuration来判断每辆列车目前运行到哪个站点,如果next返回的LastKnownLocation对象有变化,则判断列车已进入下一站,将列车进站事件trainEvent放到records中,用于发送给Topic。(注:列车调度不是本样例的重点,因此next方法不会考虑列车的冲突问题,它假设站点之间由足够多的轨道,可以供多个列车并行)。
public List<TrainEvent> getNewEvents() { ArrayList<TrainEvent> records = new ArrayList<>(); for(TrainEvent trainEvent : idToTrainMap.values()) { LastKnownLocation prevLoc = trainEvent.location; LastKnownLocation curLoc = next(prevLoc, LocalDateTime.now()); if(!prevLoc.equals(curLoc)) { trainEvent = new TrainEvent(trainEvent.trainId, curLoc); idToTrainMap.put(trainEvent.trainId, trainEvent); records.add(trainEvent); } } return records; }
Topic TRAINTOPIC定义如下,train_events.insert是VoltDB为表创建的默认存储过程,命名规则为[tablename].insert。Topic与存储过程连用,表示存储过程train_events.insert消费该Topic TRAINTOPIC中的trainEvent数据,并写入train_events表中。
CREATE Topic TRAINTOPIC execute procedure train_events.insert;
2.23 代码分析-公交卡充值
在这个场景中,客户端将完成充值消息发送。
在服务端,VoltDB完成:
- 消息接收
- 消费消息
- 使用自定义逻辑处理消息 将充值数据更新到数据库中
- 生成充值消息,并将数据写入stream对象中
- 基于stream对象创建视图,来生成实时的充值统计报表
- 将stream中的充值消息发布到Topic中,供后续(VoltDB之外的)数据处理逻辑进行消费。例如被spark消费,由于进行后续的批处理逻辑。
在客户端通过执行java类CardsProducer,首先初始化公交卡记录,并将记录写入数据库表中。然后随机生成卡片充值事件,发送事件到Topic RECHARGE中。CardsProducer的执行命令如下:
java metro.pub.CardsProducer --mode=recharge --servers=localhost:9999 --Topic=RECHARGE
CardsProducer类接收三个参数:
- 执行模式,用于指定是初始化公交卡记录还是生成充值事件。
- 指定VoltDB broker
- 指定数据发送的Topic名称
分析一下CardsProducer的主要方法,main方法生成10个线程,每5毫秒执行一次publish()方法,将列车进出站时间发送到Topic“RECHARGE”中。
public static void main(String[] args) throws IOException { CONFIG.parse("CardsProducer", args); if(CONFIG.mode.equals("new")) { genCards(CONFIG); return; } ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(10); CardsProducer producer = new CardsProducer(CONFIG.servers, CONFIG.Topic); System.out.println("Recharging Cards"); EXECUTOR.scheduleAtFixedRate ( () -> { producer.publish(producer.getRechargeActivityRecords(1)); }, 1, 5, MILLISECONDS); }
和前面TrainProducer一样,CardsProducer中的 producer也是KafkaProducer,不多介绍。getRechargeActivityRecords方法用来生成一条随机的充值事件,包括卡号、充值金额和充值站点。每5毫秒执行一次。
public List<CardEvent> getRechargeActivityRecords(int count) { final ArrayList<CardEvent> records = new ArrayList<>(); int amt = (ThreadLocalRandom.current().nextInt(18)+2)*1000; int stationId = ThreadLocalRandom.current().nextInt(1, 18); ThreadLocalRandom.current().ints(count, 0, CONFIG.cardcount).forEach((cardId) -> { records.add(new CardEvent(cardId, amt, stationId)); } ); return records; }
这个场景中,Client端的代码非常简单,到此为止。更多的逻辑在服务端定义,请看以下。
Topic用于接收充值事件,它的定义如下:
CREATE TOPIC RECHARGE execute procedure RechargeCard;
其中RechargeCard用于消费Topic中的数据,而RechargeCard是一个java procedure,它通过java+sql的方式,自定义了业务逻辑。java procedure是VoltDB在处理流数据时经常用到的对象,它是一个运行在VoltDB服务端的java类,而非client端代码。它需要提前编译成jar包(如下procs.jar),并加载到VoltDB java 运行时环境中。之后使用如下DDL定义。定义了RechargeCard后,在上面的CREATE TOPIC中才能被引用。
sqlcmd --query="load classes $PROJ_HOME/dist/procs.jar" CREATE PROCEDURE PARTITION ON TABLE cards COLUMN card_id PARAMETER 0 FROM CLASS metro.cards.RechargeCard;
让我们看一下RechargeCard中的逻辑,重点关注如何将java业务逻辑与SQL进行结合。其中定义run()方法和四个sql语句。RechargeCard从Topic RECHARGE中消费数据,进行反序列化之后,逐条将数据(即充值事件)作为传参交给run()方法,run()是procedure的入口方法。
voltQueueSQL是VoltDB的server 端api,用来执行sql并返回结果。Sql getCard和getStationName首先根据从Topic中获取的数据进行充值事件合法性校验,如果数据库中没有对应的充值站点或公交卡记录,则执行sql exportNotif写入一条错误信息。否则,update VoltDB数据库中对应公交卡,增加余额,并执行sql exportNotif写入一条成功信息。
public class RechargeCard extends VoltProcedure { public final SQLStmt updateBalance = new SQLStmt("UPDATE cards SET balance = balance + ? WHERE card_id = ? AND card_type = 0"); public final SQLStmt getCard = new SQLStmt("SELECT * from cards WHERE card_id = ?"); public final SQLStmt exportNotif = new SQLStmt("INSERT INTO CARD_ALERT_EXPORT values (?, NOW, ?, ?, ?, ?, ?, ?)"); public final SQLStmt getStationName = new SQLStmt("SELECT name FROM stations WHERE station_id = ?"); public long run(int cardId, int amt, int stationId) { voltQueueSQL(getStationName, stationId); voltQueueSQL(getCard, cardId); String station = "UNKNOWN"; final VoltTable[] results = voltExecuteSQL(); if(results.length == 0) exportError(cardId, station); VoltTable stationResult = results[0]; if(stationResult.advanceRow()) station = stationResult.getString(0); VoltTable card = results[1]; if(card.advanceRow()) { voltQueueSQL(updateBalance, amt, cardId); String name = card.getString(5); String phone = card.getString(6); String email = card.getString(7); int notify = (int) card.getLong(8); voltQueueSQL(updateBalance, amt, cardId); voltQueueSQL(exportNotif, cardId, station, name, phone, email, notify, "Card recharged successfully"); voltExecuteSQL(true); } else { exportError(cardId, station); } return 0; } private void exportError(int cardId, String station) { exportError(cardId, station, "", "", "", 0, "Could not locate details of card for recharge"); } private void exportError(int cardId, String station, String name, String phone, String email, int notify, String msg) { voltQueueSQL(exportNotif, cardId, station, name, phone, email, notify, msg); voltExecuteSQL(true); } }
exportNotif的定义如下,其中CARD_ALERT_EXPORT是VoltDB的stream数据库对象,一种数据管道,insert进去的数据逐一流过。
public final SQLStmt exportNotif = new SQLStmt("INSERT INTO CARD_ALERT_EXPORT values (?, NOW, ?, ?, ?, ?, ?, ?)");
可以在CARD_ALERT_EXPORT上添加数据处理逻辑,实现流计算效果。这个场景中,简单的在Stream上创建了一个视图,用于生成实时统计报表。视图的定义如下:
CREATE VIEW card_export_stats(card_id, station_name, rechargeCount) AS SELECT card_id, station_name, count(*) from CARD_ALERT_EXPORT GROUP BY card_id, station_name;
最后,我们定义Stream中的数据最终流向另外的Topic,该Topic可以让VoltDB之外的大数据产品进行消费,完成下游数据处理逻辑。
CREATE TOPIC using stream CARD_ALERT_EXPORT properties(Topic.format=avro);
2.4 代码分析-乘客刷卡乘车
这个场景中,客户端随机生成大量乘客刷卡进站记录,并发送给数据库处理。
服务端完成如下操作:
1.首先进行一系列校验,如验证卡信息,卡余额,是否盗刷等反欺诈操作。
2.将所有刷卡行为都记录到数据表中。并将余额不足和复合欺诈逻辑的刷卡事件分别发布到不同的Topic中,供其他下游系统订阅。
在客户端通过执行java类RidersProducer,与前面两个场景不同,RidersProducer类直接连接VoltDB数据库将数据写入数据表中,而不是将数据发送到VoltDB Topic中。用来展示VoltDB的多种使用方式。
connectToOneServerWithRetry使用VoltDB client api连接指定ip的VoltDB数据库。
void connectToOneServerWithRetry(String server, Client client) { int sleep = 1000; while (true) { try { client.createConnection(server); break; } catch (Exception e) { System.err.printf("Connection failed - retrying in %d second(s).\n", sleep / 1000); try { Thread.sleep(sleep); } catch (Exception interruted) {} if (sleep < 8000) sleep += sleep; } } System.out.printf("Connected to VoltDB node at: %s.\n", server); }
RidersProducer类创建100个线程,runBenchmark方法中每200毫秒这些线程执行一次getEntryActivityRecords。getEntryActivityRecords随机生成一条乘客进站乘车记录,记录内容包括卡号、当前时间、进站站点id等
private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(100); public void runBenchmark() throws Exception { int microsPerTrans = 1000000/RidersProducer.config.rate; EXECUTOR.scheduleAtFixedRate ( () -> { List<Object[]> entryRecords = getEntryActivityRecords(config.cardcount);//生成随机的进站记录 call(config.cardEntry, entryRecords);//将数据发送到VoltDB数据库 }, 10000, microsPerTrans, MICROSECONDS); } public static List<Object[]> getEntryActivityRecords(int count) { final ArrayList<Object[]> records = new ArrayList<>(); long curTime = System.currentTimeMillis(); ThreadLocalRandom.current().ints(1, 0, count).forEach((cardId) -> { records.add(new Object[] {cardId, curTime, Stations.getRandomStation().stationId, ENTER.value, 0}); } ); return records; }
接着调用call方法,将数据records发送到数据库进行处理。Call方法定义如下,callProcedure是VoltDB的client端api,用于将数据发送给指定名称的procedure进行处理,可以通过同步和异步IO两种方式进行调用,异步调用时需要指定回调函数对数据库调用的返回结果进行处理,即本例中的自定义了BenchmarkCallback。
protected static void call(String proc, Object[] args) { try { client.callProcedure(new BenchmarkCallback(proc, args), procName, args); } catch (IOException e) { e.printStackTrace(); } }
Call方法将数据发送给procedure,procedure名称由如下代码指定。一起看看procedure中的具体逻辑。
@Option(desc = "Proc for card entry swipes") String cardEntry = "ValidateEntry";
Procedure ValidateEntry的部分定义,首先定义了6个SQL。
//查询公交卡是否存在 public final SQLStmt checkCard = new SQLStmt( "SELECT enabled, card_type, balance, expires, name, phone, email, notify FROM cards WHERE card_id = ?;"); //卡充值 public final SQLStmt chargeCard = new SQLStmt( "UPDATE cards SET balance = ? WHERE card_id = ?;"); //查询指定站点的入站费用 public final SQLStmt checkStationFare = new SQLStmt( "SELECT fare, name FROM stations WHERE station_id = ?;"); //记录进站事件 public final SQLStmt insertActivity = new SQLStmt( "INSERT INTO card_events (card_id, date_time, station_id, activity_code, amount, accept) VALUES (?,?,?,?,?,?);"); //再次用到card_alert_export 这个stream对象,用于发送公交卡欠费消息 public final SQLStmt exportActivity = new SQLStmt( "INSERT INTO card_alert_export (card_id, export_time, station_name, name, phone, email, notify, alert_message) VALUES (?,?,?,?,?,?,?,?);"); //将刷卡欺诈行为写入stream对象fraud中 public final SQLStmt publishFraud = new SQLStmt( "INSERT INTO fraud (trans_id, card_id, date_time, station, activity_type, amt) values (?, ?, ?, ?, ?, ?)" );
值得说明的,上面最后一个sql中用到的fraud是另外一个stream对象,用于插入刷卡欺诈事件,通过DDL定义其中的刷卡欺诈行为最终会发布到VoltDB Topic中,用于下游处理产品消费。
CREATE STREAM FRAUD partition on column CARD_ID ( TRANS_ID varchar not null, CARD_ID integer not null, DATE_TIME timestamp not null, STATION integer not null, ACTIVITY_TYPE TINYINT not null, AMT integer not null ); create Topic using stream FRAUD properties(Topic.format=avro,consumer.keys=TRANS_ID);
前面已经提到run方法是procedure的入口方法,VoltDB运行procedure时,自动调用该方法。前面客户端传进的records记录,被逐一传递到run方法到参数中进行处理。run方法定义如下
public VoltTable run(int cardId, long tsl, int stationId, byte activity_code, int amt) throws VoltAbortException { //查询公交卡是否存在 voltQueueSQL(checkCard, EXPECT_ZERO_OR_ONE_ROW, cardId); //查询指定站点的交通费用 voltQueueSQL(checkStationFare, EXPECT_ONE_ROW, stationId); VoltTable[] checks = voltExecuteSQL(); VoltTable cardInfo = checks[0]; VoltTable stationInfo = checks[1]; byte accepted = 0; //如果公交卡记录等于0,说明卡不存在 if (cardInfo.getRowCount() == 0) { //记录刷卡行为到数据库表中,将accept字段置为拒绝“REJECTED” voltQueueSQL(insertActivity, cardId, tsl, stationId, ACTIVITY_ENTER, amt, ACTIVITY_REJECTED); voltExecuteSQL(true); //返回“被拒绝”消息给客户端。 return buildResult(accepted,"Card Invalid"); } // 如果卡存在,则取出卡信息。 cardInfo.advanceRow(); //卡状态,0不可用,1可用 int enabled = (int)cardInfo.getLong(0); int cardType = (int)cardInfo.getLong(1); //卡余额 int balance = (int)cardInfo.getLong(2); TimestampType expires = cardInfo.getTimestampAsTimestamp(3); String owner = cardInfo.getString(4); String phone = cardInfo.getString(5); String email = cardInfo.getString(6); int notify = (int)cardInfo.getLong(7); // 查询指定站点的进站费用 stationInfo.advanceRow(); //指定站点的进站费用 int fare = (int)stationInfo.getLong(0); String stationName = stationInfo.getString(1); // 刷卡时间 TimestampType ts = new TimestampType(tsl); // 如果卡状态为不可用 if (enabled == 0) { //向客户端返回“此卡不可用” return buildResult(accepted,"Card Disabled"); } // 如果卡类型为“非月卡” if (cardType == 0) { // 如果卡内余额充足 if (balance > fare) { //isFrand为反欺诈策略,后面介绍 if (isFraud(cardId, ts, stationId)) { // 如果认定为欺诈,记录刷卡记录,记录类型为“欺诈刷卡” voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, fare, ACTIVITY_FRAUD); //并且把欺诈事件写入stream,并最终被发布到VoltDB Topic中。见前面STREAM FRAUD到ddl定义 voltQueueSQL(publishFraud, generateId(cardId, tsl), cardId, ts, stationId, ACTIVITY_ENTER, amt); voltExecuteSQL(true); //向客户端返回“欺诈交易”消息 return buildResult(0, "Fraudulent transaction"); } else { // 如果不是欺诈行为,则减少卡内余额,完成正常消费 voltQueueSQL(chargeCard, balance - fare, cardId); //记录正常的刷卡事件 voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, fare, ACTIVITY_ACCEPTED); voltExecuteSQL(true); //向客户端返回卡内余额 return buildResult(1, "Remaining Balance: " + intToCurrency(balance - fare)); } } else { // 如果卡内余额不足,记录刷卡失败事件。 voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, 0, ACTIVITY_REJECTED); if (notify != 0) { //再次用到card_alert_export 这个stream对象,用于发送公交卡欠费消息 voltQueueSQL(exportActivity, cardId, getTransactionTime().getTime(), stationName, owner, phone, email, notify, "Insufficient Balance"); } voltExecuteSQL(true); //向客户端返回“余额不足“消息 return buildResult(0,"Card has insufficient balance: "+intToCurrency(balance)); } } }
以上代码中有一个isFraud方法,用于判定是否为欺诈性刷卡。这里定义了一些简单反欺诈规则
- 如果一秒钟内相同的卡片有1次以上的刷卡记录,认定为欺诈。因为不可能存在时间间隔如此短的刷卡行为,可能是由于有多张伪造卡片在同时刷卡。
- 同一张卡在过去一小时内,在5个或5个以上站点刷卡进站。假设这同样被认为是由于有多张伪造卡片在同时刷卡。
-
同一张卡在过去一小时内,有过10次以上刷卡进站记录。进出站次数太多,暂停使用一段时间。
isFraud方法根据当前刷卡记录中的数据,结合数据库中的历史记录实现以上反欺诈规则。历史刷卡记录被保存在card_events表中,另外基于这张表创建了视图,统计每张卡在一秒钟内是否有过刷卡记录。
CREATE VIEW CARD_HISTORY_SECOND as select card_id, TRUNCATE(SECOND, date_time) scnd from card_events group by card_id, scnd; isFraud方法的定义 public final SQLStmt cardHistoryAtStations = new SQLStmt( "SELECT activity_code, COUNT(DISTINCT station_id) AS stations " + "FROM card_events " + "WHERE card_id = ? AND date_time >= DATEADD(HOUR, -1, ?) " + "GROUP BY activity_code;" ); public final SQLStmt cardEntries = new SQLStmt( "SELECT activity_code " + "FROM card_events " + "WHERE card_id = ? AND station_id = ? AND date_time >= DATEADD(HOUR, -1, ?) " + "ORDER BY date_time;" ); public final SQLStmt instantaneousCardActivity = new SQLStmt( "SELECT count(*) as activity_count " + "FROM CARD_HISTORY_SECOND " + "WHERE card_id = ? " + "AND scnd = TRUNCATE(SECOND, ?) " + "GROUP BY scnd;" ); public boolean isFraud(int cardId, TimestampType ts, int stationId) { voltQueueSQL(instantaneousCardActivity, cardId, ts); voltQueueSQL(cardHistoryAtStations, cardId, ts); voltQueueSQL(cardEntries, cardId, stationId, ts); final VoltTable[] results = voltExecuteSQL(); final VoltTable cardInstantaneousActivity = results[0]; final VoltTable cardHistoryAtStationisTable = results[1]; final VoltTable cardEntriesTable = results[2]; //一秒钟之内已经有一次刷卡记录的话,返回true while (cardInstantaneousActivity.advanceRow()) { if(cardInstantaneousActivity.getLong("activity_count") > 0) { return true; } } while (cardHistoryAtStationisTable.advanceRow()) { final byte activity_code = (byte) cardHistoryAtStationisTable.getLong("activity_code"); final long stations = cardHistoryAtStationisTable.getLong("stations"); if (activity_code == ACTIVITY_ENTER) { // 过去1小时之内在五个站点刷卡进站,返回true if (stations >= 5) { return true; } } } byte prevActivity = ACTIVITY_INVALID; int entranceCount = 0; while (cardEntriesTable.advanceRow()) { final byte activity_code = (byte) cardHistoryAtStationisTable.getLong("activity_code"); if (prevActivity == ACTIVITY_INVALID || prevActivity == activity_code) { if (activity_code == ACTIVITY_ENTER) { prevActivity = activity_code; entranceCount++; } else { prevActivity = ACTIVITY_INVALID; } } } // 如果在过去1小时内有10次连续的刷卡记录,返回true。 if (entranceCount >= 10) { return true; } return false; }
您看好VoltDB吗? 马上行动吧!
欢迎私信,与更多小伙伴一起探讨。
关于VoltDB
VoltDB支持强ACID和实时智能决策的应用程序,以实现互联世界。没有其它数据库产品可以像VoltDB这样,可以同时需要低延时、大规模、高并发数和准确性相结合的应用程序加油。
VoltDB由2014年图灵奖获得者Mike Stonebraker博士创建,他对关系数据库进行了重新设计,以应对当今不断增长的实时操作和机器学习挑战。Stonebraker博士对数据库技术研究已有40多年,在快速数据,流数据和内存数据库方面带来了众多创新理念。
在VoltDB的研发过程中,他意识到了利用内存事务数据库技术挖掘流数据的全部潜力,不但可以满足处理数据的延迟和并发需求,还能提供实时分析和决策。VoltDB是业界可信赖的名称,在诺基亚、金融时报、三菱电机、HPE、巴克莱、华为等领先组织合作有实际场景落地案例。
这篇关于代码回现 | 如何实现交易反欺诈?的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-26Java语音识别项目资料:新手入门教程
- 2024-11-26JAVA语音识别项目资料:新手入门教程
- 2024-11-26Java语音识别项目资料:入门与实践指南
- 2024-11-26Java云原生资料入门教程
- 2024-11-26Java云原生资料入门教程
- 2024-11-26Java云原生资料:新手入门教程
- 2024-11-25Java创意资料:新手入门的创意学习指南
- 2024-11-25JAVA对接阿里云智能语音服务资料详解:新手入门指南
- 2024-11-25Java对接阿里云智能语音服务资料详解
- 2024-11-25Java对接阿里云智能语音服务资料详解