Hive任务解析流程
2021/7/29 6:06:20
本文主要是介绍Hive任务解析流程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1.获取入口类
从hive以及ext/cli.sh脚本里面可以看到执行的主类为org.apache.hadoop.hive.cli.CliDriver
2.执行main方法
3.执行run方法
3.1 解析系统参数,比如hiveconf、hive.root.logger等
process_stage1方法如下:
public boolean process_stage1(String[] argv) { try { commandLine = new GnuParser().parse(options, argv); Properties confProps = commandLine.getOptionProperties("hiveconf"); for (String propKey : confProps.stringPropertyNames()) { // with HIVE-11304, hive.root.logger cannot have both logger name and log level. // if we still see it, split logger and level separately for hive.root.logger // and hive.log.level respectively if (propKey.equalsIgnoreCase("hive.root.logger")) { CommonCliOptions.splitAndSetLogger(propKey, confProps); } else { System.setProperty(propKey, confProps.getProperty(propKey)); } } Properties hiveVars = commandLine.getOptionProperties("define"); for (String propKey : hiveVars.stringPropertyNames()) { hiveVariables.put(propKey, hiveVars.getProperty(propKey)); } Properties hiveVars2 = commandLine.getOptionProperties("hivevar"); for (String propKey : hiveVars2.stringPropertyNames()) { hiveVariables.put(propKey, hiveVars2.getProperty(propKey)); } } catch (ParseException e) { System.err.println(e.getMessage()); printUsage(); return false; } return true; }
3.2 定义流
定义一些标准输入输出流用户HQL的输入以及打印信息
3.3 解析 -e -f 等用户输入的参数
process_stage2方法如下:
public boolean process_stage2(CliSessionState ss) { ss.getConf(); if (commandLine.hasOption('H')) { printUsage(); return false; } ss.setIsSilent(commandLine.hasOption('S')); ss.database = commandLine.getOptionValue("database"); ss.execString = commandLine.getOptionValue('e'); ss.fileName = commandLine.getOptionValue('f'); ss.setIsVerbose(commandLine.hasOption('v')); String[] initFiles = commandLine.getOptionValues('i'); if (null != initFiles) { ss.initFiles = Arrays.asList(initFiles); } if (ss.execString != null && ss.fileName != null) { System.err.println("The '-e' and '-f' options cannot be specified simultaneously"); printUsage(); return false; } if (commandLine.hasOption("hiveconf")) { Properties confProps = commandLine.getOptionProperties("hiveconf"); for (String propKey : confProps.stringPropertyNames()) { ss.cmdProperties.setProperty(propKey, confProps.getProperty(propKey)); } } return true; }
3.4 执行cli driver work
4.executeDriver方法
代码如下:
private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc) throws Exception { CliDriver cli = new CliDriver(); cli.setHiveVariables(oproc.getHiveVariables()); // use the specified database if specified // 使用声明的database cli.processSelectDatabase(ss); // Execute -i init files (always in silent mode) cli.processInitFiles(ss); if (ss.execString != null) { int cmdProcessStatus = cli.processLine(ss.execString); return cmdProcessStatus; } try { if (ss.fileName != null) { return cli.processFile(ss.fileName); } } catch (FileNotFoundException e) { System.err.println("Could not open input file for reading. (" + e.getMessage() + ")"); return 3; } if ("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE))) { console.printInfo(HiveConf.generateMrDeprecationWarning()); } setupConsoleReader(); String line; int ret = 0; String prefix = ""; String curDB = getFormattedDb(conf, ss); String curPrompt = prompt + curDB; String dbSpaces = spacesForString(curDB); // 1.读取输入HQL while ((line = reader.readLine(curPrompt + "> ")) != null) { if (!prefix.equals("")) { prefix += '\n'; } if (line.trim().startsWith("--")) { continue; } // 用;来切割 if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) { line = prefix + line; // 处理每行HQL ret = cli.processLine(line, true); prefix = ""; curDB = getFormattedDb(conf, ss); curPrompt = prompt + curDB; dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB); } else { prefix = prefix + line; curPrompt = prompt2 + dbSpaces; continue; } } return ret; }
调用processLine方法来处理每行HQL
5.processLine方法
在其中调用了processCmd方法
6.processCmd方法
代码如下:
主要判断:
- 是否quit或者exit命令
- 如果为source命令,执行文件
- 如果以!开头,执行shell命令
- 如果前三者都不是,执行正常解析操作
public int processCmd(String cmd) { CliSessionState ss = (CliSessionState) SessionState.get(); ss.setLastCommand(cmd); ss.updateThreadName(); // Flush the print stream, so it doesn't include output from the last command ss.err.flush(); String cmd_trimmed = HiveStringUtils.removeComments(cmd).trim(); String[] tokens = tokenizeCmd(cmd_trimmed); int ret = 0; // 1.如果命令为quit或者exit,则退出 if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) { // if we have come this far - either the previous commands // are all successful or this is command line. in either case // this counts as a successful run ss.close(); System.exit(0); // 2.如果命令为source,执行HQL文件 } else if (tokens[0].equalsIgnoreCase("source")) { String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length()); cmd_1 = new VariableSubstitution(new HiveVariableSource() { @Override public Map<String, String> getHiveVariable() { return SessionState.get().getHiveVariables(); } }).substitute(ss.getConf(), cmd_1); File sourceFile = new File(cmd_1); if (! sourceFile.isFile()){ console.printError("File: "+ cmd_1 + " is not a file."); ret = 1; } else { try { ret = processFile(cmd_1); } catch (IOException e) { console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(), stringifyException(e)); ret = 1; } } // 3.命令以!开头,执行shell命令 } else if (cmd_trimmed.startsWith("!")) { // for shell commands, use unstripped command String shell_cmd = cmd.trim().substring(1); shell_cmd = new VariableSubstitution(new HiveVariableSource() { @Override public Map<String, String> getHiveVariable() { return SessionState.get().getHiveVariables(); } }).substitute(ss.getConf(), shell_cmd); // shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'"; try { ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmd, ss.out, ss.err); ret = executor.execute(); if (ret != 0) { console.printError("Command failed with exit code = " + ret); } } catch (Exception e) { console.printError("Exception raised from Shell command " + e.getLocalizedMessage(), stringifyException(e)); ret = 1; } // 4.如果前面三个都不满足,进行解析 } else { // local mode try { try (CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf)) { if (proc instanceof IDriver) { // Let Driver strip comments using sql parser ret = processLocalCmd(cmd, proc, ss); } else { ret = processLocalCmd(cmd_trimmed, proc, ss); } } } catch (SQLException e) { console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(), org.apache.hadoop.util.StringUtils.stringifyException(e)); ret = 1; } catch (Exception e) { throw new RuntimeException(e); } } ss.resetThreadName(); return ret; }
7.processLocalCmd方法
调用IDriver的run方法
8.qp.run方法
该方法是IDriver接口的抽象方法,实现类是org.apache.hadoop.hive.ql.Driver
9.runInternal方法
其中主要分为两步:
- 编译HQL语句
- 执行
9.1 compileInternal方法
调用compile方法
9.1.1 compile方法
9.1.1.1 调用ParseUtils.parse方法生成ASTNode
9.1.1.2 ParseUtils.parse方法
在ParseDriver中最终分为四步:
- 构建词法解析器
- 替换HQL中的关键词
- 语法解析
- 获取最终的ASTNode
public ASTNode parse(String command, Context ctx, String viewFullyQualifiedName) throws ParseException { if (LOG.isDebugEnabled()) { LOG.debug("Parsing command: " + command); } // 1.构建词法解析器 HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command)); // 2.替换HQL中的关键词 TokenRewriteStream tokens = new TokenRewriteStream(lexer); if (ctx != null) { if (viewFullyQualifiedName == null) { // Top level query ctx.setTokenRewriteStream(tokens); } else { // It is a view ctx.addViewTokenRewriteStream(viewFullyQualifiedName, tokens); } lexer.setHiveConf(ctx.getConf()); } HiveParser parser = new HiveParser(tokens); if (ctx != null) { parser.setHiveConf(ctx.getConf()); } parser.setTreeAdaptor(adaptor); HiveParser.statement_return r = null; try { // 3.语法解析 r = parser.statement(); } catch (RecognitionException e) { e.printStackTrace(); throw new ParseException(parser.errors); } if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) { LOG.debug("Parse Completed"); } else if (lexer.getErrors().size() != 0) { throw new ParseException(lexer.getErrors()); } else { throw new ParseException(parser.errors); } // 4.获取最终的ASTNode ASTNode tree = (ASTNode) r.getTree(); tree.setUnknownTokenBoundaries(); return tree; }
9.1.2 sem.analyze方法
在compile方法里面调用analyze方法解析AST
实现类:org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
9.1.2.1 analyzeInternal方法
void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticException { LOG.info("Starting Semantic Analysis"); // 1. Generate Resolved Parse tree from syntax tree boolean needsTransform = needsTransform(); //change the location of position alias process here processPositionAlias(ast); PlannerContext plannerCtx = pcf.create(); // 将AST转换为QueryBlock if (!genResolvedParseTree(ast, plannerCtx)) { return; } if (HiveConf.getBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY)) { for (String alias : qb.getSubqAliases()) { removeOBInSubQuery(qb.getSubqForAlias(alias)); } } // Check query results cache. // If no masking/filtering required, then we can check the cache now, before // generating the operator tree and going through CBO. // Otherwise we have to wait until after the masking/filtering step. boolean isCacheEnabled = isResultsCacheEnabled(); QueryResultsCache.LookupInfo lookupInfo = null; if (isCacheEnabled && !needsTransform && queryTypeCanUseCache()) { lookupInfo = createLookupInfoForQuery(ast); if (checkResultsCache(lookupInfo)) { return; } } ASTNode astForMasking; if (isCBOExecuted() && needsTransform && (qb.isCTAS() || qb.isView() || qb.isMaterializedView() || qb.isMultiDestQuery())) { // If we use CBO and we may apply masking/filtering policies, we create a copy of the ast. // The reason is that the generation of the operator tree may modify the initial ast, // but if we need to parse for a second time, we would like to parse the unmodified ast. astForMasking = (ASTNode) ParseDriver.adaptor.dupTree(ast); } else { astForMasking = ast; } // 2. Gen OP Tree from resolved Parse Tree Operator sinkOp = genOPTree(ast, plannerCtx); boolean usesMasking = false; if (!unparseTranslator.isEnabled() && (tableMask.isEnabled() && analyzeRewrite == null)) { // Here we rewrite the * and also the masking table ASTNode rewrittenAST = rewriteASTWithMaskAndFilter(tableMask, astForMasking, ctx.getTokenRewriteStream(), ctx, db, tabNameToTabObject, ignoredTokens); if (astForMasking != rewrittenAST) { usesMasking = true; plannerCtx = pcf.create(); ctx.setSkipTableMasking(true); init(true); //change the location of position alias process here processPositionAlias(rewrittenAST); genResolvedParseTree(rewrittenAST, plannerCtx); if (this instanceof CalcitePlanner) { ((CalcitePlanner) this).resetCalciteConfiguration(); } sinkOp = genOPTree(rewrittenAST, plannerCtx); } } // Check query results cache // In the case that row or column masking/filtering was required, we do not support caching. // TODO: Enable caching for queries with masking/filtering if (isCacheEnabled && needsTransform && !usesMasking && queryTypeCanUseCache()) { lookupInfo = createLookupInfoForQuery(ast); if (checkResultsCache(lookupInfo)) { return; } } // 3. Deduce Resultset Schema // 定义生成的Schema if (createVwDesc != null && !this.ctx.isCboSucceeded()) { resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver()); } else { // resultSchema will be null if // (1) cbo is disabled; // (2) or cbo is enabled with AST return path (whether succeeded or not, // resultSchema will be re-initialized) // It will only be not null if cbo is enabled with new return path and it // succeeds. if (resultSchema == null) { resultSchema = convertRowSchemaToResultSetSchema(opParseCtx.get(sinkOp).getRowResolver(), HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES)); } } // 4. Generate Parse Context for Optimizer & Physical compiler copyInfoToQueryProperties(queryProperties); ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps, new HashSet<JoinOperator>(joinContext.keySet()), new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()), loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); // Set the semijoin hints in parse context pCtx.setSemiJoinHints(parseSemiJoinHint(getQB().getParseInfo().getHintList())); // Set the mapjoin hint if it needs to be disabled. pCtx.setDisableMapJoin(disableMapJoinWithHint(getQB().getParseInfo().getHintList())); // 5. Take care of view creation if (createVwDesc != null) { if (ctx.getExplainAnalyze() == AnalyzeState.RUNNING) { return; } if (!ctx.isCboSucceeded()) { saveViewDefinition(); } // validate the create view statement at this point, the createVwDesc gets // all the information for semanticcheck validateCreateView(); if (createVwDesc.isMaterialized()) { createVwDesc.setTablesUsed(getTablesUsed(pCtx)); } else { // Since we're only creating a view (not executing it), we don't need to // optimize or translate the plan (and in fact, those procedures can // interfere with the view creation). So skip the rest of this method. ctx.setResDir(null); ctx.setResFile(null); try { PlanUtils.addInputsForView(pCtx); } catch (HiveException e) { throw new SemanticException(e); } // Generate lineage info for create view statements // if LineageLogger hook is configured. // Add the transformation that computes the lineage information. Set<String> postExecHooks = Sets.newHashSet(Splitter.on(",").trimResults() .omitEmptyStrings() .split(Strings.nullToEmpty(HiveConf.getVar(conf, HiveConf.ConfVars.POSTEXECHOOKS)))); if (postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter") || postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger") || postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) { ArrayList<Transform> transformations = new ArrayList<Transform>(); transformations.add(new HiveOpConverterPostProc()); transformations.add(new Generator(postExecHooks)); for (Transform t : transformations) { pCtx = t.transform(pCtx); } // we just use view name as location. queryState.getLineageState() .mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp); } return; } } // 6. Generate table access stats if required if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) { TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx); setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess()); } // 7. Perform Logical optimization // 执行逻辑优化 if (LOG.isDebugEnabled()) { LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values())); } Optimizer optm = new Optimizer(); optm.setPctx(pCtx); optm.initialize(conf); // 执行优化 pCtx = optm.optimize(); if (pCtx.getColumnAccessInfo() != null) { // set ColumnAccessInfo for view column authorization setColumnAccessInfo(pCtx.getColumnAccessInfo()); } if (LOG.isDebugEnabled()) { LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values())); } // 8. Generate column access stats if required - wait until column pruning // takes place during optimization boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED); if (isColumnInfoNeedForAuth || HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) { ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx); // view column access info is carried by this.getColumnAccessInfo(). setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess(this.getColumnAccessInfo())); } // 9. Optimize Physical op tree & Translate to target execution engine (MR, // TEZ..) // 执行物理优化 if (!ctx.getExplainLogical()) { TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx); compiler.init(queryState, console, db); compiler.compile(pCtx, rootTasks, inputs, outputs); fetchTask = pCtx.getFetchTask(); } //find all Acid FileSinkOperatorS QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks, ctx.getExecutionId()); // 10. Attach CTAS/Insert-Commit-hooks for Storage Handlers final Optional<TezTask> optionalTezTask = rootTasks.stream().filter(task -> task instanceof TezTask).map(task -> (TezTask) task) .findFirst(); if (optionalTezTask.isPresent()) { final TezTask tezTask = optionalTezTask.get(); rootTasks.stream() .filter(task -> task.getWork() instanceof DDLWork) .map(task -> (DDLWork) task.getWork()) .filter(ddlWork -> ddlWork.getPreInsertTableDesc() != null) .map(ddlWork -> ddlWork.getPreInsertTableDesc()) .map(ddlPreInsertTask -> new InsertCommitHookDesc(ddlPreInsertTask.getTable(), ddlPreInsertTask.isOverwrite())) .forEach(insertCommitHookDesc -> tezTask.addDependentTask( TaskFactory.get(new DDLWork(getInputs(), getOutputs(), insertCommitHookDesc), conf))); } LOG.info("Completed plan generation"); // 11. put accessed columns to readEntity if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) { putAccessedColumnsToReadEntity(inputs, columnAccessInfo); } if (isCacheEnabled && lookupInfo != null) { if (queryCanBeCached()) { QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo); // Specify that the results of this query can be cached. setCacheUsage(new CacheUsage( CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo)); } } }
9.2 execute方法
- 构建MRJob
- 启动任务
9.2.1 launchTask方法
9.2.2 runSequential方法
9.2.3 executeTask方法
9.2.4 execute方法
具体实现类为MapRedTask类
1.设置MR任务的相关执行类
2.构建执行MR任务的命令
3.执行ExecDriver
这篇关于Hive任务解析流程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-20RabbitMQ教程:新手入门指南
- 2024-11-20Redis教程:新手入门指南
- 2024-11-20SaToken教程:新手入门指南
- 2024-11-20SpringBoot教程:从入门到实践
- 2024-11-20Java全栈教程:从入门到实战
- 2024-11-20Java微服务系统教程:入门与实践指南
- 2024-11-20Less教程:初学者快速上手指南
- 2024-11-20MyBatis教程:新手快速入门指南
- 2024-11-20QLExpress教程:初学者快速入门指南
- 2024-11-20订单系统教程:从入门到实践的全面指南