123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- from enum import Enum
- from pywebio.input import *
- from pywebio.output import *
- from pywebio.session import *
- import pywebio.pin as pin
- from pywebio.session import local
- from pywebio import start_server
- import pandas as pd
- from io import StringIO
- from utils import judge_field, image, widget
- from utils.common import *
- from utils.algo import *
- # def get_cur_state():
- # return f"当前状态--- 数据文件:{}, 字段映射:{}, 数据清洗:{}"
- def init_actions():
- if not local.state_enter.get(State.init, None):
- local.state_enter.update({State.init: True})
- buttons = {State.upload_file.value: ['我想导入数据文件', 'success']}
- res = actions('请告诉我您的需求', buttons=construct_buttons(buttons))
- put_table([['用户:', buttons[res][0]]])
- return State(res)
-
- def upload_file_actions():
- put_table([['系统:', "请选择您要导入的数据文件"]])
- file = file_upload(accept=['.csv', '.xlsx','.xls'], placeholder="请上传文件,支持 .csv, .xlsx, .xls 文件")
- filename = file['filename']
-
- if filename.endswith(".csv"):
- df = pd.read_csv(StringIO(file['content'].decode('utf-8')))
- elif filename.endswith(".xlsx"):
- df = pd.read_excel(file['content'])
- elif filename.endswith(".xls"):
- df = pd.read_excel(file['content'])
-
- columns = list(df.columns)
- put_table([
- ['系统:',put_markdown(f"{filename}文件读取成功,展示前2条数据如下")]
- ])
- put_table([
- ['系统:',
- put_table(df.to_dict(orient='records')[0:2],header=columns)]
- ])
- local.df = df
- local.columns = columns
-
- # 下一步支持的action
- if not local.state_enter.get(State.upload_file, None):
- local.state_enter.update({State.upload_file: True})
- buttons = {State.column_map.value: ['我想配置数据字段映射', 'success'], State.upload_file.value: ['我想重新导入数据文件', 'info']}
- res = actions('请告诉我您的需求', buttons=construct_buttons(buttons))
- put_table([['用户:', buttons[res][0]]])
- return State(res)
- else:
- return widget.next_actions()
-
- def column_map_actions():
-
- columns = local.columns
-
-
- # 下一步支持的action
- if not local.state_enter.get(State.column_map, None):
- local.state_enter.update({State.column_map: True})
- put_table([
- ['系统:',put_markdown(f"字段自动化识别结果如下,可手动点击进行修改")]
- ])
- widget.column_map_widget(columns)
- buttons = {State.data_clean_conf.value: ['我想进行数据清洗', 'success'],
- State.run_algo.value: ['我想进入算法执行系统', 'success'],
- State.upload_file.value: ['我想重新导入数据文件', 'info']}
- res = actions('请告诉我您的需求', buttons=construct_buttons(buttons))
- put_table([['用户:', buttons[res][0]]])
- return State(res)
- else:
- put_table([
- ['系统:',put_markdown(f"当前的字段映射结果如下,请手动点击进行修改")]
- ])
- widget.column_map_widget(columns)
- return widget.next_actions()
-
- def data_clean_conf_actions():
- if not local.state_enter.get(State.data_clean_conf, None):
- local.state_enter.update({State.data_clean_conf: True})
- buttons = {-1: ['我想自定义清洗规则', 'success'],
- State.data_clean_exec.value: ['我想使用默认规则进行数据清洗', 'success']}
- res = actions('是否需要自定义清洗规则?', buttons=construct_buttons(buttons))
- put_table([['系统:', "是否需要自定义清洗规则?"]])
- put_table([['用户:', buttons[res][0]]])
- if res == -1:
- res = widget.data_clean_conf_widget()
- else:
- res = widget.data_clean_conf_widget()
- return State(res)
-
- def data_clean_exec_actions():
- if not local.state_enter.get(State.data_clean_exec, None):
- local.state_enter.update({State.data_clean_exec: True})
- pass # 数据清洗功能
- put_table([['系统:', "数据清洗完成"]])
- return download_file_actions()
- def download_file_actions():
- local.df.to_csv("after_clean.csv")
- with open("./after_clean.csv", 'rb') as f:
- content = f.read()
- put_table([['系统:', put_file('data.csv', content, '点击下载清洗后的数据文件')]])
-
- return widget.next_actions()
- def run_algo_actions():
- if not local.state_enter.get(State.run_algo, None):
- local.state_enter.update({State.run_algo: True})
- put_table([['系统:', "您已进入算法执行系统"]])
- return widget.algo_system_actions()
- def run_algo_all_actions():
- if not local.state_enter.get(State.run_algo_all, None):
- local.state_enter.update({State.run_algo_all: True})
- put_table([['系统:', "所有算法执行完成"]])
- return widget.next_actions()
- def run_algo_1_actions():
- if not local.state_enter.get(State.run_algo_1, None):
- local.state_enter.update({State.run_algo_1: True})
- put_table([['系统:', "基本信息统计执行完成, 结果如下"]])
- put_table([['系统:', basic_info_sta()]])
-
- return widget.algo_system_actions()
- def run_algo_2_actions():
- if not local.state_enter.get(State.run_algo_2, None):
- local.state_enter.update({State.run_algo_2: True})
- put_table([['系统:', "电池信息分析执行完成, 结果如下"]])
- put_table([['系统:', bat_info_sta()]])
- return widget.algo_system_actions()
- def run_algo_3_actions():
- if not local.state_enter.get(State.run_algo_3, None):
- local.state_enter.update({State.run_algo_3: True})
- put_table([['系统:', "电池安全诊断执行完成, 结果如下"]])
- put_table([['系统:', safety_diag()]])
- return widget.algo_system_actions()
- def run_algo_4_actions():
- if not local.state_enter.get(State.run_algo_4, None):
- local.state_enter.update({State.run_algo_4: True})
- put_table([['系统:', "电池性能诊断执行完成, 结果如下"]])
- put_table([['系统:', performance_diag()]])
- return widget.algo_system_actions()
- def run_algo_5_actions():
- if not local.state_enter.get(State.run_algo_5, None):
- local.state_enter.update({State.run_algo_5: True})
- put_table([['系统:', "电池画像分析执行完成, 结果如下"]])
- put_table([['系统:', bat_portrait()]])
- return widget.algo_system_actions()
- def run_algo_exit_actions():
- if not local.state_enter.get(State.run_algo_exit, None):
- local.state_enter.update({State.run_algo_exit: True})
- put_table([['系统:', "您已退出算法执行系统"]])
- return widget.next_actions()
-
-
- def main():
- local.state_enter = {}
- cur_state = State.init
- set_env(title='神目电池安全卫士', output_animation=False)
- put_markdown(f"## 欢迎使用神目电池安全卫士系统")
- while True:
- try:
- if cur_state == State.init:
- cur_state = init_actions()
- elif cur_state == State.upload_file:
- cur_state = upload_file_actions()
- elif cur_state == State.column_map:
- cur_state = column_map_actions()
- elif cur_state == State.data_clean_conf:
- cur_state = data_clean_conf_actions()
- elif cur_state == State.data_clean_exec:
- cur_state = data_clean_exec_actions()
- elif cur_state == State.download_file:
- cur_state = download_file_actions()
- elif cur_state == State.run_algo:
- cur_state = run_algo_actions()
- elif cur_state == State.run_algo_all:
- cur_state = run_algo_all_actions()
- elif cur_state == State.run_algo_1:
- cur_state = run_algo_1_actions()
- elif cur_state == State.run_algo_2:
- cur_state = run_algo_2_actions()
- elif cur_state == State.run_algo_3:
- cur_state = run_algo_3_actions()
- elif cur_state == State.run_algo_4:
- cur_state = run_algo_4_actions()
- elif cur_state == State.run_algo_5:
- cur_state = run_algo_5_actions()
- elif cur_state == State.run_algo_exit:
- cur_state = run_algo_exit_actions()
- else:
- raise Exception("进入未知状态")
- except Exception as e:
- toast(str(e), 0, color='error')
- # cur_state = State.init
-
- if __name__ == '__main__':
- start_server(main, port=8080, debug=True, remote_access=True)
|