用大型语言模型在Amazon Bedrock上分类Jira工单

2024/11/14 21:03:21

本文主要是介绍用大型语言模型在Amazon Bedrock上分类Jira工单,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

用提示工程和大型语言模型(LLM)等来代替传统的NLP方法对Jira工单文本进行分类。代码示例解析

这张照片由 Annie Spratt 在 Unsplash 拍摄.

还记得那时候进行文本分类意味着开始了一段机器学习之旅吗?如果你在这个领域待了足够长的时间,你可能见过至少有一个团队掉进了构建“完美”文本分类系统的无底洞。这样的故事通常是这样的:

  • 第一个月: “我们只要快速训练一个NLP模型就行了!”
  • 第二个月: “我们需要更多的训练数据才行。”
  • 第三个月: “这已经差不多了”

多年来,文本分类一直是经典机器学习的一部分。在我职业生涯的早期阶段,我记得曾经用支持向量机(SVM)这种模型进行邮件分类的训练。这需要大量的预处理、迭代、数据搜集和标上标签。这些工作量很大。

但有一个转折:现在已经到了2024年,生成式AI模型可以直接对文本进行一般的分类!你可以构建一个稳健的工单分类系统,而无需收集数千个标记的训练样本,管理机器学习训练流程,或维护自己的模型。

在这篇帖子中,我们将首先展示如何利用亚马逊Bedrock及其他AWS服务上的大语言模型来搭建Jira票证分类系统。

免责声明:我是在 AWS 工作的一名生成AI架构师,以下观点仅代表我个人的看法。

为什么给 Jira 问题分类?

公司常希望能了解团队如何分配时间。Jira 提供了一些标签功能,但有时由于人为错误或粒度不够细,这些标签功能可能显得不足。通过这项练习,组织可以获得更深入的团队活动洞察,从而能够做出更基于数据的资源分配、项目投资和项目淘汰决策。

为什么不试试其他的自然语言处理方法呢?

传统的机器学习模型和较小的Transformer(如 BERT)需要数百(或数千)个标记样本,而LLMs可以直接分类文本。在我们的Jira票证分类测试中,通过提示工程的方法表现与传统机器学习模型相当或更优,在每年处理超过10,000张票证的情况下,使用Claude Haiku的成本仅为每年约10美元(不计算其他AWS服务费用)。此外,提示更容易更新,无需重新训练模型。

代码样例

这个GitHub仓库(https://github.com/aws-samples/jira-ticket-classification)包含一个样本应用,该应用连接到Jira Cloud,分类票单,并输出为可以被您喜欢的仪表板工具(如Tableau、Quicksight或其他支持CSV的工具)使用的CSV格式。

重要提示:此项目使用Terraform在您的AWS环境中部署资源。您将为此AWS资源支付费用。请留意您所在AWS区域中特别是Lambda、Bedrock、Glue和S3等服务的价格。请注意这些费用。

前置条件

您需要在计划部署代码的环境里安装Terraform和AWS CLI才行。

  • 使用 tfenv 安装 Terraform
  • 安装并设置 AWS CLI
建筑

架构非常简单明了。详情请见下文。

作者供图

首先,一个 AWS Lambda 函数会在定时任务触发时,根据时间窗口获取 Jira 工单。然后,将这些工单格式化并推送到带有 /unprocessed 前缀的 S3 存储桶中。

步骤2:/unprocessed 对象上有新文件时,会触发一个 Glue 作业。此作业运行一个 PySpark 去重任务,以确保没有重复的工单到达仪表板。去重后的工单将被放置在以 /staged 为前缀的文件夹中。这在手动上传工单和依赖自动获取工单的情况下都非常有用。如果你能确保没有重复项,就可以省去这一步。

步骤3: 通过调用Amazon Bedrock,开始对新工单记录进行分类任务,根据提示向大型语言模型(LLM)分类工单记录。分类完成后,将处理结果推送到/processed前缀路径。在这里,你可以使用任何能够读取CSV的仪表板工具来查看处理过的CSV。

开始吧

要开始,请先克隆上面的 GitHub 代码库,然后进入 /terraform 目录

请运行以下命令克隆代码库

$ git clone https://github.com/aws-samples/jira-ticket-classification.git  

$ cd jira-ticket-classification/terraform

请确保您已切换到terraform文件夹以继续操作。

依次执行 terraform initterraform planterraform apply。请确保已在您的计算机上安装了 Terraform 并配置好了 AWS CLI。

初始化 Terraform 以准备您的工作目录。
$ terraform init  
规划 Terraform 资源以查看将要执行的操作。
$ terraform plan  
应用 Terraform 配置以将更改应用到实际资源。
$ terraform apply

一旦基础设施部署到您的帐户,您可以导航到 AWS Secrets Manager 并更新带有您的 Jira Cloud 凭证的密钥凭证。您需要 API 密钥、基础 URL 和电子邮件来启用自动拉取。

作者的图片

就这样啦!

你可以(1)等待Cron自动执行任务,(2)将工单导出为CSV文件,并上传到/unprocessed S3桶的前缀下,还可以(3)通过测试手动触发Lambda函数。

这怎么用?
Jira 拉取

Jira抓取使用一个Lambda函数,并通过Cloudwatch的cron事件触发。Lambda函数获取AWS密钥并进行请求,在while循环中使用get请求分页获取结果,直到JQL查询完成并返回所有结果。

    def fetch_jira_issues(base_url, project_id, email, api_key):  
        url = f"{base_url}/rest/api/3/search"  

        # 计算并获取8天前的日期  
        eight_days_ago = (datetime.now() - timedelta(days=8)).strftime("%Y-%m-%d")  

        # 创建JQL  
        jql = f"project = {project_id} AND created >= '{eight_days_ago}' ORDER BY created DESC"  

        # 提交到请求参数中  
        params = {  
            "jql": jql,  
            "startAt": 0  
        }  
        all_issues = []  

        auth = HTTPBasicAuth(email, api_key)  
        headers = {"Accept": "application/json"}  

        while True:  
            response = requests.get(url, headers=headers, params=params, auth=auth)  
            if response.status_code != 200:  
                raise Exception(f"获取项目 {project_id} 的问题失败:{response.text}")  

            data = json.loads(response.text)  
            issues = data['issues']  
            all_issues.extend(issues)  

            if len(all_issues) >= data['total']:  
                break  

            params['startAt'] = len(all_issues)  

        return all_issues

它然后生成CSV的字符串表示,并将它上传到S3:

    def 上传到S3存储(csv_string, bucket, key):  
        try:  
            s3_client.put_object(  
                Bucket=bucket,  
                Key=key,  
                Body=csv_string,  
                ContentType='text/csv'  
            )  
        except Exception as e:  
            raise Exception(f"未能将CSV文件上传到S3: {str(e)}")
修补工作

当S3事件在/unprocessed前缀上触发时,启动了第二个lambda函数,该函数启动一个AWS Glue作业。这在当Jira工单可以通过多个入口点进入系统时非常有用,例如,当你需要进行数据回填时。

    import boto3   

    # 初始化 Boto3 Glue 客户端  
    glue_client = boto3.client('glue')  

    # 处理程序
    def handler(event, context):  
        # 打印 event 用于调试  
        print(f"Received event: {json.dumps(event)}")  

        # 从 S3 事件中提取 bucket 名称和 object key(文件名)
        try:  
            s3_event = event['Records'][0]['s3']  
            s3_bucket = s3_event['bucket']['name']  
            s3_key = s3_event['object']['key']  
        except KeyError as e:  
            print(f"出了问题: {str(e)}")  
            raise  

        # 启动 Glue 作业运行
        response = glue_client.start_job_run(  
            JobName=glue_job_name,  # glue作业名称
            Arguments={  # 参数字典
                '--S3_BUCKET': s3_bucket,  
                '--NEW_CSV_FILE': s3_key  
            }  
        )

Glue 作业本身是用 PySpark 编写的代码,可以在代码仓库的此链接找到 here。重要的一点是,它进行了 leftanti 类型的连接,使用 issue Id 来对比新 CSV 中的项目与所有 /staged CSV 中的 Id。

结果会被推送到/staged路径下。

给 Jira 分类工单:

这里就变得有趣了。实际上,利用提示工程可以达到与使用几种技术的文本分类器相当,甚至有时甚至更好的效果。

  • 可以在一个提示中定义分类及其描述内容,
  • 让模型逐步思考(Chain of Thought)。
  • 然后输出分类,而无需单独训练模型。如下是一个示例提示:

注意: 使用人工整理的分类和标注过的工单子集来验证您的提示语非常重要。您应该用验证数据集来运行该提示,以确保其分类方式符合您的预期标准。

    SYSTEM_PROMPT = '''  
    你是一个支持工单助手。你会根据给出的Jira工单字段来分类这个工单。

    以下是可能的分类以及这些分类的描述。
    <classifications>  
    ACCESS_PERMISSIONS_REQUEST: 当有人没有写权限、无法登录到某个东西,或者无法获得正确的IAM凭证来使服务工作时使用。  
    BUG_FIXING: 当某件事情失败或者发现了一个bug时使用。通常描述中会包含日志或技术信息。  
    CREATING_UPDATING_OR_DEPRECATING_DOCUMENTATION: 当文档过时的时候使用。通常描述中会提到文档。  
    MINOR_REQUEST: 这通常是一个微小的bug修复。即使是稍微复杂的情况,也应使用BUG_FIXING。  
    SUPPORT_TROUBLESHOOTING: 当需要对于某个工程事件寻求支持时使用。也可能看起来像一个自动化工单。  
    NEW_FEATURE_WORK: 通常描述一个新功能需求或者还没有运行的功能。  
    </classifications>  

    可用的字段及其描述如下。
    <fields>  
    Summary: 这是工单的摘要或标题  
    Description: 该问题的自然语言描述。大多数分类所需的信息都来自这个字段  
    </fields>  

    <rules>  

* 可能某些字段是空的,在这种情况下,请忽略这些字段来进行分类。  

* 在进行分类之前,请仔细考虑你的理由,并在<thinking></thinking>标签中写下你的思考过程。这是你思考和理解决定工单分类的区域。  

* 一旦完成思考以后,仅使用上述列出的分类对工单进行分类,并将分类放在<answer></answer>标签中。  
    </rules>'''  

    USER_PROMPT = '''  
    使用以下的票证字段:

    <summary_field>  
    {summary}  
    </summary_field>  

    <description_field>  
    {description}  
    </description_field>  

    仅使用系统提示中列出的分类之一对工单进行分类。记得在分类之前逐步思考,并将你的想法放在<thinking></thinking>标签中。
    当你完成思考以后,请将你的分类放在<answer></answer>标签中。仅放置分类,不包含其他内容。
    '''

新增了一个帮助类来并行执行对Bedrock的调用,以提高速度。

import boto3
from concurrent.futures import ThreadPoolExecutor, as_completed
import re
from typing import List, Dict
from prompts import USER_PROMPT, SYSTEM_PROMPT

class TicketClassifier:
    SONNET_ID = "anthropic.claude-3-sonnet-20240229-v1:0"
    HAIKU_ID = "anthropic.claude-3-haiku-20240307-v1:0"
    HYPER_PARAMS = {"temperature": 0.35, "topP": .3}  # 控制采样多样性的参数
    REASONING_PATTERN = r'<thinking>(.*?)</thinking>'
    CORRECTNESS_PATTERN = r'<answer>(.*?)</answer>'

    def __init__(self):
        self.bedrock = boto3.client('bedrock-runtime')

    def classify_tickets(self, tickets: List[Dict[str, str]]) -> List[Dict[str, str]]:
        prompts = [self._create_chat_payload(t) for t in tickets]
        responses = self._call_threaded(prompts, self._call_bedrock)
        formatted_responses = [self._format_results(r) for r in responses]
        # 将tickets和formatted_responses合并
        return [{**d1, **d2} for d1, d2 in zip(tickets, formatted_responses)]

    def _call_bedrock(self, message_list: list[dict]) -> str:
        response = self.bedrock.converse(
            modelId=self.HAIKU_ID,
            messages=message_list,
            inferenceConfig=self.HYPER_PARAMS,
            system=[{'text': SYSTEM_PROMPT}]  # 系统提示的文本
        )
        return response['output']['message']['content'][0]['text']

    def _call_threaded(self, requests, function):
        future_to_position = {}
        with ThreadPoolExecutor(max_workers=5) as executor:
            for i, request in enumerate(requests):
                future = executor.submit(function, request)
                future_to_position[future] = i
            responses = [None] * len(requests)
            for future in as_completed(future_to_position):
                position = future_to_position[future]
                try:
                    response = future.result()
                    responses[position] = response
                except Exception as exc:
                    print(f"打印异常信息: 位置 {position} 的请求生成了异常: {exc}")
                    responses[position] = None
        return responses

    def _create_chat_payload(self, ticket: dict) -> dict:
        user_prompt = USER_PROMPT.format(summary=ticket['Summary'], description=ticket['Description'])
        user_msg = {"role": "user", "content": [{"text": user_prompt}]}
        return [user_msg]

    def _format_results(self, model_response: str) -> dict:
        reasoning = self._extract_with_regex(model_response, self.REASONING_PATTERN)
        correctness = self._extract_with_regex(model_response, self.CORRECTNESS_PATTERN)
        return {'Model Answer': correctness, 'Reasoning': reasoning}

    @staticmethod
    def _extract_with_regex(response, regex):
        matches = re.search(regex, response, re.DOTALL)
        return matches.group(1).strip() if matches else None

最终,将分类的票转换为CSV并将其上传到S3

import boto3  
import io  
import csv  

s3 = boto3.client('s3')  

def upload_csv(data: List[Dict[str, str]]) -> None:  
      csv_buffer = io.StringIO()  
      writer = csv.DictWriter(csv_buffer, fieldnames=data[0].keys())  
      writer.writeheader()  
      writer.writerows(data)  

      current_time = datetime.now().strftime("%Y%m%d_%H%M%S")  
      filename = f"processed/processed_{current_time}.csv"  

      s3.put_object(  
          Bucket=self.bucket_name,  
          Key=filename,  
          Body=csv_buffer.getvalue()  
      )
基于仪表盘

该项目不依赖于特定的仪表板工具。任何常用的工具或服务都可以使用,只要它能够处理 CSV 文件即可。Amazon QuickSight、Tableau 或任何其他类似工具都可以。

结论部分

在这篇博客中,我们讨论了如何使用Bedrock自动分类Jira工单。这些分类后的Jira工单可以用于创建仪表板或使用其他工具。最重要的是,自从采用大语言模型(LLMs)以来,文本分类变得简单得多,以前可能需要几周才能完成的任务现在可以在几天内搞定。

如果你喜欢这篇文章,欢迎在LinkedIn上与我建立联系。



这篇关于用大型语言模型在Amazon Bedrock上分类Jira工单的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程