2021/10/30 23:17:33
OSSIM Agent的主要职责是收集网络上存在的各种设备发送的所有数据,然后按照一种标准方式(standardized way)有序的发送给OSSIM Server,Agent收集到数据后在发送给Server之前要对这些数据进行标准化处理,这样Server就可以依一种统一的方式来处理这些信息,并且也简化了Server的处理过程
__regexIsTranslationSection = re.compile("translation-\S+", re.UNICODE) __regex_check_for_translate2_function = re.compile("\{translate2\((?P<variable>\$[^\)]+),(?P<translate_section>\$[^\)]+)\)\}", re.UNICODE)
def rules(self): rules = {} for section in sorted(self.sections()): regexp = self.get(section,"regexp") if self.get("config","source") == "log" and (regexp is None or regexp == ""): continue if Plugin.__regexIsTranslationSection.match(section.lower()) is not None: if section.lower() not in Plugin.SECTIONS_NOT_RULES: logger.info("Loading new translation section... %s" % section.lower()) Plugin.SECTIONS_NOT_RULES.append(section.lower()) if section.lower() not in Plugin.SECTIONS_NOT_RULES: rules[section] = self.hitems(section,True) return rules
def _replace_array_variables(self, value, groups): for i in range(2): search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE) rvalue = value if search != []: for string in search: var = string[2:-1] var_position = 0 try: var_position= int(var) if var_position < len(groups): rvalue = rvalue.replace(string, str(groups[var_position])) except ValueError,e: rvalue = value return rvalue
def get_replace_array_value(self, value, groups): rvalue = "" rvalue = self._replace_array_variables(value, groups) if rvalue==value: rvalue = self._replace_user_array_functions(value, groups) if rvalue == value: rvalue = self._replace_custom_user_function_array(value,groups) return rvalue
def _replace_custom_user_function_array(self,value,groups): search = re.findall("(\{:(\w+)\((\$[^\)]+)?\)\})", value, re.UNICODE) if search != []: for string in search: (string_matched, func, variables) = string vars = split_variables(variables) for var in vars: var_pos =0 try: var_pos = int(var) except TypeError: logger.warning("Can not replace '%s'" % var) return value if var_pos >= len(groups): logger.warning("Can not replace '%s' " % var) return value func_name = "%s_%s" % (func,self.get("DEFAULT", "plugin_id")) if func_name != Plugin.TRANSLATION_FUNCTION and \ hasattr(Plugin, func_name): args = ",".join(["groups[%s]"%s for s in vars]) try: cmd = "value = value.replace(string_matched, str(self.%s(%s)))" %(func_name,args) exec cmd except TypeError, e: logger.error(e) else: logger.warning( "Function '%s' is not implemented" % (func)) value = value.replace(string_matched, str(groups[var])) return value
def _replace_user_array_functions(self, value, groups): if value is None: return None search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE) if search != []: for string in search: (string_matched, func, variables) = string vars = split_variables(variables) for var in vars: if len(groups) - 1 < int(var): return value if func != Plugin.TRANSLATION_FUNCTION and \ hasattr(ParserUtil, func): f = getattr(ParserUtil, func) args = "" for v in vars: args += "str(groups[%s])," % (str(v)) try: exec "value = value.replace(string_matched," + \ "str(f(" + args + ")))" except TypeError, e: logger.error(e) else: logger.debug("Tranlation fucntion...") for v in vars: . if int(v) > (len(groups) - 1): logger.debug("Error var:%d, is greatter than groups size. (%d)" % (int(v), len(groups))) else: var_value = groups[int(v)] if self.has_section(Plugin.TRANSLATION_SECTION): if self.has_option(Plugin.TRANSLATION_SECTION, var_value): value = self.get(Plugin.TRANSLATION_SECTION, var_value) return value
def replace_config(self, conf): for section in sorted(self.sections()): for option in self.options(section): regexp = self.get(section, option) search = re.findall("(\\\\_CFG\(([\w-]+),([\w-]+)\))", regexp, re.UNICODE) if search != []: for string in search: (all, arg1, arg2) = string if conf.has_option(arg1, arg2): value = conf.get(arg1, arg2) regexp = regexp.replace(all, value) self.set(section, option, regexp)
def replace_aliases(self, aliases) for rule in self.rules().iterkeys(): regexp = self.get(rule, 'regexp') search = re.findall("\\\\\w\w+", regexp, re.UNICODE) if search != []: for string in search: repl = string[1:] if aliases.has_option("regexp", repl): value = aliases.get("regexp", repl) regexp = regexp.replace(string, value) self.set(rule, "regexp", regexp)
def _replace_variables(self, value, groups, rounds=2): for i in range(rounds): search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE) if search != []: for string in search: var = string[2:-1] if groups.has_key(var): value = value.replace(string, str(groups[var])) return value
def _replace_variables_assess(self, value): ret = 0 for i in range(2): search = re.findall("\{\$[^\}\{]+\}", value, re.UNICODE) if search != []: ret = i return ret
def _replace_translations(self, value, groups): regexp = "(\{(" + Plugin.TRANSLATION_FUNCTION + ")\(\$([^\)]+)\)\})" search = re.findall(regexp, value, re.UNICODE) if search != []: for string in search: (string_matched, func, var) = string if groups.has_key(var): if self.has_section(Plugin.TRANSLATION_SECTION): if self.has_option(Plugin.TRANSLATION_SECTION, groups[var]): value = self.get(Plugin.TRANSLATION_SECTION, groups[var]) else: logger.warning("Can not translate '%s' value" % \ (groups[var])) if self.has_option(Plugin.TRANSLATION_SECTION, Plugin.TRANSLATION_DEFAULT): value = self.get(Plugin.TRANSLATION_SECTION, Plugin.TRANSLATION_DEFAULT) else: value = groups[var] else: logger.warning("There is no translation section") value = groups[var] return value
def _replace_translations_assess(self, value): regexp = "(\{(" + Plugin.TRANSLATION_FUNCTION + ")\(\$([^\)]+)\)\})" search = re.findall(regexp, value, re.UNICODE) if search != []: return self._MAP_REPLACE_TRANSLATIONS return 0
def _replace_user_functions(self, value, groups): search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE) if search != []: for string in search: (string_matched, func, variables) = string vars = split_variables(variables) for var in vars: if not groups.has_key(var): logger.warning("Can not replace '%s'" % (value)) return value if func != Plugin.TRANSLATION_FUNCTION and \ hasattr(ParserUtil, func): f = getattr(ParserUtil, func) args = "" for i in (range(len(vars))): args += "groups[vars[%s]]," % (str(i)) try: cmd = "value = value.replace(string_matched," + \ "str(f(" + args + ")))" exec cmd except TypeError, e: logger.error(e) else: logger.warning( "Function '%s' is not implemented" % (func)) value = value.replace(string_matched, \ str(groups[var])) return value
def _replace_user_functions_assess(self, value): search = re.findall("(\{(\w+)\((\$[^\)]+)\)\})", value, re.UNICODE) if search != []: return self._MAP_REPLACE_USER_FUNCTIONS return 0
def _replace_custom_user_functions(self,value,groups): search = re.findall("(\{:(\w+)\((\$[^\)]+)?\)\})", value, re.UNICODE) if search != []: for string in search: (string_matched, func, variables) = string vars = split_variables(variables) for var in vars: if not groups.has_key(var): logger.warning("Can not replace '%s'" % (var)) return value func_name = "%s_%s" % (func,self.get("DEFAULT", "plugin_id")) if func_name != Plugin.TRANSLATION_FUNCTION and \ hasattr(Plugin, func_name): args = "" for i in (range(len(vars))): args += "groups[vars[%s]]," % (str(i)) try: cmd = "value = value.replace(string_matched, str(self.%s(%s)))" %(func_name,args) exec cmd except TypeError, e: logger.error(e) else: logger.warning( "Function '%s' is not implemented" % (func)) value = value.replace(string_matched, \ str(groups[var])) return value
def __replaceConcatFunction(self,value,groups): concat ="" m = re.match("\$CONCAT\((?P<params>.*)\)",value) if m: mdict = m.groupdict() if mdict.has_key('params'): paramlist = mdict['params'].split(',') for param in paramlist: if param.startswith('$'):#assume variable if groups.has_key(param[1:]): concat+=groups[param[1:]] else: concat+=param else: concat+=param return concat
def __checkReplaceConcatFunction(self,value): ret = 0 if re.match("\$CONCAT\((.*)\)",value): ret = self._MAP_REPLACE_CONCAT return ret
def __replace_translate2_function(self,value,groups): replaced_value ="" m = self.__regex_check_for_translate2_function.match(value) if m: mdict = m.groupdict() if 'variable' in mdict and 'translate_section' in mdict: variable = mdict['variable'].replace('$','') translate_section = mdict['translate_section'].replace('$','') if variable in groups: if self.has_section(translate_section): if self.has_option(translate_section,groups[variable]): replaced_value = self.get(translate_section, groups[variable]) return replaced_value else: logger.warning("Translate2 variable(%s) not found on the translate section %s" % (groups[variable],translate_section)) else: logger.warning("Translate2 translation_section(%s) not found" % translate_section) replaced_value = value
def __check_for_translate2_function(self,value): """Check whether we need to make a translate2 replacement or not. @param value: the string in the rule """ ret = 0 if Plugin.__regex_check_for_translate2_function.match(value) is not None: ret = self._MAP_REPLACE_TRANSLATE2 return ret
def get_replace_value(self, value, groups, replace=15): if replace > 0: if replace & self._MAP_REPLACE_TRANSLATE2: value = self.__replace_translate2_function(value, groups) if replace & 3: value = self._replace_variables(value, groups, (replace & 3)) if replace & 4: value = self._replace_translations(value, groups) if replace & 8: value = self._replace_user_functions(value, groups) if replace & self._MAP_REPLACE_CUSTOM_USER_FUNCTIONS: value = self._replace_custom_user_functions(value,groups) if replace & self._MAP_REPLACE_CONCAT: value = self.__replaceConcatFunction(value,groups) return value
class CommandLineOptions: def __init__(self): self.__options = None parser = OptionParser( usage="%prog [-v] [-q] [-d] [-f] [-g] [-c config_file]", version="OSSIM (Open Source Security Information Management) " + \ "- Agent ") parser.add_option("-v", "--verbose", dest="verbose", action="count", help="verbose mode, makes lot of noise") parser.add_option("-d", "--daemon", dest="daemon", action="store_true", help="Run agent in daemon mode") parser.add_option("-f", "--force", dest="force", action="store_true", help="Force startup overriding pidfile") parser.add_option("-s", "--stats", dest="stats", type='choice', choices=['all', 'clients', 'plugins'], default=None, help="Get stats about the agent") parser.add_option("-c", "--config", dest="config_file", action="store", help="read config from FILE", metavar="FILE") (self.__options, args) = parser.parse_args() if len(args) > 1: parser.error("incorrect number of arguments") if self.__options.verbose and self.__options.daemon: parser.error("incompatible options -v -d") def get_options(self): return self.__options
def split_variables(string): return re.findall("(?:\$([^,\s]+))+", string) def split_sids(string, separator=','): list = list_tmp = [] list = string.split(separator) for sid in list: a = sid.split('-') if len(a) == 2: list.remove(sid) for i in range(int(a[0]), int(a[1]) + 1): list_tmp.append(str(i)) list.extend(list_tmp) return list
- 2025-01-102025 蛇年,J 人直播带货内容审核团队必备的办公软件有哪 6 款?
- 2025-01-10高效运营背后的支柱:文档管理优化指南
- 2025-01-10年末压力山大?试试优化你的文档管理
- 2025-01-10跨部门协作中的进度追踪重要性解析
- 2025-01-10总结 JavaScript 中的变体函数调用方式
- 2025-01-10HR团队如何通过数据驱动提升管理效率?6个策略
- 2025-01-10WBS实战指南:如何一步步构建高效项目管理框架?
- 2025-01-10实现精准执行:团队协作新方法
- 2025-01-10如何使用工具提升活动策划团队的工作效率?几个必备工具推荐
- 2025-01-10WiX 标签使用介绍:打造专业安装程序的利器