pywebio更新数据库系统

2022/4/6 2:49:05

本文主要是介绍pywebio更新数据库系统,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

import pywebio
from pywebio.input import *
from pywebio.output import *
from pywebio import start_server
import pywebio.pin as pin
from pywebio.session import hold
import pandas as pd
from functools import partial
import pymysql as mdb
import sys

def read_file(filename):

    if filename.endswith('xlsx'):
        df = pd.read_excel(filename)

    elif filename.endswith('csv'):
        df = pd.read_csv(filename)
    return df


def chongfu(df, res_table):
    df1 = df[df.values[:,1].duplicated() == True]

    put_scrollable(res_table,horizon_scroll=True)
    res_table.reset(put_html(df1.to_html(border=0)))

def updatesql(df, res_table,data):
    urls=df.values[:,1]
    con = mdb.connect('127.0.0.1', 'root', 'root', 'test');
    cur = con.cursor()
    for url in urls:
        try:
            cur.execute("update {} set used='yes' and locked=null where url like '%{}%'").format(data['resource'],url)
            cur.execute("select * from {} where url like '%{}%'").format(data['resource'],url)
            con.commit()
        except:
            con.rollback()
    rows = cur.fetchall()

    #df1 = df.drop_duplicates()

    put_scrollable(res_table,horizon_scroll=True,height=450)
    res_table.reset(put_html(rows))

def query_used(res_table,data):
    #urls=df.values[:,1]
    con = mdb.connect('127.0.0.1', 'root', 'root', 'test');
    cur = con.cursor()
    for url in urls:
        try:
            #cur.execute("update {} set used='yes' and locked=null where url like '%{}%'").format(data['resource'],url)
            cur.execute("select * from {} where used = 'yes'").format(data['resource'])
            con.commit()
        except:
            con.rollback()
    rows = cur.fetchall()

    #df1 = df.drop_duplicates()

    put_scrollable(res_table,horizon_scroll=True,height=450)
    res_table.reset(put_html(rows))

def query_not_used(res_table,data):
    #urls=df.values[:,1]
    con = mdb.connect('127.0.0.1', 'root', 'root', 'test');
    cur = con.cursor()
    for url in urls:
        try:
            #cur.execute("update {} set used='yes' and locked=null where url like '%{}%'").format(data['resource'],url)
            cur.execute("select * from {} where used is null").format(data['resource'])
            con.commit()
        except:
            con.rollback()
    rows = cur.fetchall()

    #df1 = df.drop_duplicates()

    put_scrollable(res_table,horizon_scroll=True,height=450)
    res_table.reset(put_html(rows))

def query_locked(res_table,data):
    #urls=df.values[:,1]
    con = mdb.connect('127.0.0.1', 'root', 'root', 'test');
    cur = con.cursor()
    for url in urls:
        try:
            #cur.execute("update {} set used='yes' and locked=null where url like '%{}%'").format(data['resource'],url)
            cur.execute("select * from {} where locked is not null").format(data['resource'])
            con.commit()
        except:
            con.rollback()
    rows = cur.fetchall()

    #df1 = df.drop_duplicates()

    put_scrollable(res_table,horizon_scroll=True,height=450)
    res_table.reset(put_html(rows))

def query_not_locked(res_table,data):
    #urls=df.values[:,1]
    con = mdb.connect('127.0.0.1', 'root', 'root', 'test');
    cur = con.cursor()
    for url in urls:
        try:
            #cur.execute("update {} set used='yes' and locked=null where url like '%{}%'").format(data['resource'],url)
            cur.execute("select * from {} where locked is null").format(data['resource'])
            con.commit()
        except:
            con.rollback()
    rows = cur.fetchall()

    #df1 = df.drop_duplicates()

    put_scrollable(res_table,horizon_scroll=True,height=450)
    res_table.reset(put_html(rows))

def get_resource(res_table,data):
    res = input("选择数量", type=NUMBER, name="number")
    con = mdb.connect('127.0.0.1', 'root', 'root', 'test');
    cur = con.cursor()
    for url in urls:
        try:
            #cur.execute("update {} set used='yes' and locked=null where url like '%{}%'").format(data['resource'],url)
            cur.execute("select * from {} where locked is null and used is null").format(data['resource'])
            con.commit()
        except:
            con.rollback()
    rows = cur.fetchall()

    #df1 = df.drop_duplicates()

    put_scrollable(res_table,horizon_scroll=True,height=450)
    res_table.reset(put_html(rows[:res[number]]))
    put_buttons(['返回主页面'], onclick=[
        lambda: main()
        ])
    
'''
def chaxun(res_table,df, key):


    res_table.reset()
    put_scrollable(res_table,horizon_scroll=True,height=450)

    df1 = df[df['国家奥委会'] == key]
    res_table.reset(put_html(df1.to_html(border=0)))

def other():
    popup('功能未开发', [

        put_html(f'啊哦,这个按钮的功能还没有开发,你可以仿照上面代码补充该功能'),
        put_text('\n'),
        put_buttons(['关闭'], onclick=lambda _: close_popup())
    ])
'''

def return_resource():
    '''
    数据查询系统 - 早起Python    '''

    #put_markdown('# 资源数据处理系统')
    #file = file_upload('请选择需要加载的数据')
    #df = read_file(file['filename'])

    data = input_group("Basic info", [ # 1      
        select('选择你的身份', [''], name="user"),
        select('选择资源库', [''], name="resource"),
        file_upload('请选择需要加载的数据',name='file')
    ])
    df = read_file(data['file']['filename'])
    put_markdown('## 资源数据处理-归还资源')
    put_markdown('下面是一些常见的数据处理操作,点击对应按钮实现不同操作')

    res_table = output()
    
    put_markdown('### 当前选择:'+'\t'+data['user']+'\t'+data['resource']+'\t'+data['file']['filename'])
    put_buttons(['检查重复值','更新数据库','查看已使用资源','查看未使用资源','查看已占用资源情况','查看未占用资源情况'], onclick=[
        lambda: chongfu(df,res_table),
        lambda: updatesql(df,res_table,data),
        lambda: query_used(res_table,data),
        lambda: query_not_used(res_table,data),
        lambda: query_locked(res_table,data),
        lambda: query_not_locked(res_table,data)])

    put_markdown('## 数据查询')
    pin.put_input('res', label='请在下方输入框要查询的关键字', type=TEXT)
    put_buttons(['提交查询'], lambda _: chaxun(res_table,df,pin.pin['res']))
    put_scrollable(res_table, horizon_scroll=True,height=450)
    res_table.reset(put_html(df.to_html(border=0)))
    
def accept_resource():
    '''
    数据查询系统 - 早起Python    '''

    #put_markdown('# 资源数据处理系统')
    #file = file_upload('请选择需要加载的数据')
    #df = read_file(file['filename'])

    data = input_group("Basic info", [ # 1      
        select('选择你的身份', [''], name="user"),
        select('选择资源库', [''], name="resource"),
        #file_upload('请选择需要加载的数据',name='file')
    ])
    #df = read_file(data['file']['filename'])
    put_markdown('## 资源数据处理-获取资源')
    put_markdown('下面是一些常见的数据处理操作,点击对应按钮实现不同操作')

    res_table = output()
    
    put_markdown('### 当前选择:'+'\t'+data['user']+'\t'+data['resource'])
    put_buttons(['查看已使用资源','查看未使用资源','查看已占用资源情况','查看未占用资源情况','获取资源'], onclick=[
        lambda: query_used(res_table,data),
        lambda: query_not_used(res_table,data),
        lambda: query_locked(res_table,data),
        lambda: query_not_locked(res_table,data),
        lambda: get_resource(res_table,data)])

    put_markdown('## 数据查询')
    pin.put_input('res', label='请在下方输入框要查询的关键字', type=TEXT)
    put_buttons(['提交查询'], lambda _: chaxun(res_table,df,pin.pin['res']))
    put_scrollable(res_table, horizon_scroll=True,height=450)
    res_table.reset(put_html(df.to_html(border=0)))

def main():
    put_markdown('# 资源数据处理系统')
    target=select('选择操作', ['归还资源','获取资源'])
    #print(type(target))
    if target=='归还资源':
        return_resource()
    if target=='获取资源':
        accept_resource()

if __name__ == '__main__':

    start_server(main, port=8080 ,debug=True, cdn=False, auto_open_webbrowser=True)


这篇关于pywebio更新数据库系统的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程