__author__ = 'lmstack' #coding=utf-8 import os import datetime import pandas as pd from LIB.BACKEND import DBManager, Log from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import time, datetime import dateutil.relativedelta import traceback from LIB.MIDDLE.CellStateEstimation.Common import log from pandas.core.frame import DataFrame from LIB.MIDDLE.SaftyCenter.Common import QX_BatteryParam from LIB.MIDDLE.SaftyCenter.diagfault.SC_SamplingSafty import SamplingSafty from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import DBDownload from urllib import parse import pymysql import pdb from apscheduler.schedulers.blocking import BlockingScheduler import datacompy from LIB.MIDDLE.SaftyCenter.Common import FeiShuData from LIB.MIDDLE.SaftyCenter.Common import QX_BatteryParam from LIB.MIDDLE.SaftyCenter.diagfault import CBMSBatDiag from LIB.MIDDLE.SaftyCenter.diagfault.SC_SamplingSafty import SamplingSafty def fun(): global df_sn global db_res_engine global logger global df_Diag_Ram # 读取结果数据库 host2='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com' port2=3306 db2='safety_platform' user2='qx_read' password2='Qx@123456' start=time.time() end_time=datetime.datetime.now() start_time=end_time-datetime.timedelta(seconds=120) start_time=start_time.strftime('%Y-%m-%d %H:%M:%S') end_time=end_time.strftime('%Y-%m-%d %H:%M:%S') logger.info("cycle start !!!!!!!!!!!!!!!!!!!!") start=time.time() end_time=datetime.datetime.now() start_time=end_time-datetime.timedelta(seconds=130) start_time=start_time.strftime('%Y-%m-%d %H:%M:%S') end_time=end_time.strftime('%Y-%m-%d %H:%M:%S') df_read_Yunw = FeiShuData.getFeiShuDATA()#运维表格数据 df_read_Yunw.rename(columns={'电池编码':'product_id'},inplace=True) df_read_Yunw.rename(columns={'内容描述':'info'},inplace=True) df_read_Yunw.rename(columns={'发生时间':'start_time'},inplace=True) df_read_Yunw.rename(columns={'维修信息':'advice'},inplace=True) for i in range(0, len(df_sn)): try: if df_sn.loc[i, 'imei'][5:9] == 'N640': celltype=1 #6040三元电芯 elif df_sn.loc[i, 'imei'][5:9] == 'N440': celltype=2 #4840三元电芯 elif df_sn.loc[i, 'imei'][5:9] == 'L660': celltype=99 # 6060锂电芯 elif df_sn.loc[i, 'imei'][3:5] == 'LX' and df_sn.loc[i, 'imei'][5:9] == 'N750': celltype=3 #力信 50ah三元电芯 elif df_sn.loc[i, 'imei'][3:5] == 'CL' and df_sn.loc[i, 'imei'][5:9] == 'N750': celltype=4 #CATL 50ah三元电芯 elif df_sn.loc[i, 'imei'][5:9] == 'L420': celltype=101 #20ah磷酸铁锂电芯 elif df_sn.loc[i, 'imei'][5:9] == 'L264': celltype=102 #120ah磷酸铁锂电芯 else: logger.info("pid-{} celltype-{} SN: {} SKIP!".format(os.getpid(), "未知", sn)) continue sn = df_sn.loc[i, 'sn'] logger.info("pid-{} celltype-{} SN: {} START!".format(os.getpid(), celltype, sn)) param=QX_BatteryParam.BatteryInfo(celltype) #读取原始数据库数据........................................................................................................................................................ dbManager = DBManager.DBManager() df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms']) df_bms = df_data['bms'] #读取结果数据库数据........................................................................................................................................................ host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com' port=3306 db='qx_cas' user='qx_read' password='Qx@123456' mode=1 tablename1='cellstateestimation_soh' tablename2='cellstateestimation_uniform_socvoltdiff' #电池诊断................................................................................................................................................................ DBRead=DBDownload.DBDownload(host, port, db, user, password,mode) with DBRead as DBRead: df_soh=DBRead.getdata('time_st,sn,soh,cellsoh', tablename=tablename1, sn=sn, timename='time_sp', st=start_time, sp=end_time) df_uniform=DBRead.getdata('time,sn,cellsoc_diff,cellmin_num,cellmax_num', tablename=tablename2, sn=sn, timename='time', st=start_time, sp=end_time) #电池诊断................................................................................................................................................................ CellFltInfo=DataFrame(columns=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice']) df_Diag_Ram = df_Diag_Ram.drop_duplicates(subset=['product_id','code','start_time','Batpos','info'],keep='first')#sn之外的故障 df_Diag_Ram_sn = df_Diag_Ram.loc[df_Diag_Ram['product_id']==sn]#历史故障 df_Diag_Ram_sn_else = pd.concat([df_Diag_Ram,df_Diag_Ram_sn,df_Diag_Ram_sn]).drop_duplicates(subset=['product_id','code','start_time','Batpos','info'],keep=False)#sn之外的故障 CellFltInfo = df_Diag_Ram_sn.drop('Batpos',axis=1) df_Diag_Ram_add = pd.DataFrame() df_Diag_Ram_Update_change = pd.DataFrame() if not df_bms.empty: df_Diag_Batdiag_update_xq=SamplingSafty.main(sn,param,df_bms,CellFltInfo)#学琦计算故障 BatDiag=CBMSBatDiag.BatDiag(sn,celltype,df_bms, df_soh, df_uniform, CellFltInfo)#鹏飞计算 df_Diag_Batdiag_update=BatDiag.diag() df_Diag_Cal_Update_add = pd.concat([df_Diag_Batdiag_update_xq,df_Diag_Batdiag_update])#重新计算的该SN下的故障 df_Diag_Cal_Update_temp = df_Diag_Cal_Update_add.drop_duplicates(subset=['product_id','start_time','end_time','code','info'], keep='first', inplace=False, ignore_index=False)#去除相同故障 df_Diag_cal_early_unfix = pd.DataFrame() df_Diag_Cal_finish = pd.DataFrame() df_Diag_cal_early_fix = pd.DataFrame() if not df_Diag_Cal_Update_temp.empty: #------------------------------合并两者故障,并将同一sn号下的车辆故障放一起---------------------------------------------- df_Diag_Cal_Update = df_Diag_Cal_Update_temp#替换上一行 df_Diag_Cal_finish = df_Diag_Cal_Update.loc[df_Diag_Cal_Update['end_time'] != '0000-00-00 00:00:00'] df_Diag_Cal_new = df_Diag_Cal_Update.loc[df_Diag_Cal_Update['end_time'] == '0000-00-00 00:00:00'] df_Diag_Cal_finish['Batpos'] = 1 df_Diag_Cal_new['Batpos'] = 0 df_feishu_sta = df_read_Yunw.loc[(df_read_Yunw['product_id'] == sn)]#飞书中该sn车辆状态 if df_feishu_sta.empty:#飞书中没有该sn记录故障的新增 df_Diag_cal_early_unfix = df_Diag_Cal_new#如果为新出故障,则直接记录在df_diag_frame中 else: df_Diag_cal_later = df_Diag_Cal_new.loc[pd.to_datetime(df_Diag_Cal_new['start_time']) > max(pd.to_datetime(df_feishu_sta['start_time']))]#故障表中故障时间晚于飞书记录时间的新增 df_Diag_cal_early = pd.concat([df_Diag_Cal_new,df_Diag_cal_later,df_Diag_cal_later]).drop_duplicates(subset=['product_id','code','start_time'],keep=False)#故障表中故障时间早于飞书记录时间 df_feishu_sta_latest = df_feishu_sta.loc[pd.to_datetime(df_feishu_sta['start_time']) == max(pd.to_datetime(df_feishu_sta['start_time']))]#飞书中该SN下的最新故障 df_feishu_diag_unfix = (df_feishu_sta_latest['advice'] == '需正常返仓') | (df_feishu_sta_latest['advice'] == '需紧急返仓') if any(df_feishu_diag_unfix): df_Diag_cal_early_unfix = df_Diag_Cal_new else: df_Diag_cal_early_fix = df_Diag_cal_early df_Diag_cal_early_unfix = df_Diag_cal_later if not df_Diag_cal_early_fix.empty: df_Diag_cal_early_fix['Batpos'] = 1 df_Diag_Ram_Update = pd.concat([df_Diag_cal_early_unfix,df_Diag_cal_early_fix,df_Diag_Cal_finish]) df_Diag_Ram_Update['start_time'] = pd.to_datetime(df_Diag_Ram_Update['start_time']) df_Diag_Ram_Update.sort_values(by = ['start_time'], axis = 0, ascending=True,inplace=True)#该sn下当次诊断的故障状态 df_Diag_Ram_add = pd.concat([df_Diag_Ram_Update,df_Diag_Ram_sn,df_Diag_Ram_sn]).drop_duplicates(subset=['start_time','code'],keep=False)#此次判断中新增故障 df_Diag_Ram_Update_old = pd.concat([df_Diag_Ram_Update,df_Diag_Ram_add,df_Diag_Ram_add]).drop_duplicates(subset=['start_time','code'],keep=False)#此次判断中新增故障 df_Diag_Ram_Update_change = pd.concat([df_Diag_Ram_Update_old,df_Diag_Ram_sn,df_Diag_Ram_sn]).drop_duplicates(subset=['start_time','code','Batpos'],keep=False)#此次判断中新增故障 df_Diag_Ram_sav = df_Diag_Ram_Update.loc[df_Diag_Ram_Update['Batpos'] == 0] df_Diag_Ram = pd.concat([df_Diag_Ram_sn_else,df_Diag_Ram_sav]) # if (len(df_Diag_Ram_add) > 0) | (len(df_Diag_Ram_Update_change) > 0):#历史及现有故障 # df_Diag_Ram.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\01Screen_Problem\result.csv',index=False,encoding='GB18030') if len(df_Diag_Ram_add) > 0:#新增故障 df_Diag_Ram_add.columns = ['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice', 'Batpos'] df_Diag_Ram_add['factory'] = '骑享' df_Diag_Ram_add.to_sql("all_fault_info",con=db_res_engine, if_exists="append",index=False) # df_Diag_Ram_add.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\01Screen_Problem\result_add.csv',index=False,encoding='GB18030') if len(df_Diag_Ram_Update_change) > 0:#更改故障 df_Diag_Ram_Update_change = df_Diag_Ram_Update_change.reset_index(drop=True) try: conn = pymysql.connect(host=host2, port=port2, user=user2, password=password2, database=db2) cursor = conn.cursor() logger.info(df_Diag_Ram_Update_change) for i in range(0,len(df_Diag_Ram_Update_change)): sql = ''' update all_fault_info set end_time='{}', Batpos={} where product_id='{}' and code={} and start_time='{}' '''.format(df_Diag_Ram_Update_change.loc[i,'end_time'], df_Diag_Ram_Update_change.loc[i, 'Batpos'], df_Diag_Ram_Update_change.loc[i,'product_id'], df_Diag_Ram_Update_change.loc[i,'code'], df_Diag_Ram_Update_change.loc[i,'start_time']) logger.info(sql) cursor.execute(sql) conn.commit() conn.close(); except: logger.error(traceback.format_exc) logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True) conn.close(); # df_Diag_Ram_Update_change.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\01Screen_Problemm\result_change.csv',index=False,encoding='GB18030') end=time.time() logger.info("pid-{} celltype-{} SN: {} DONE!".format(os.getpid(), celltype, sn)) except: logger.error(traceback.format_exc) logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True) logger.info("cycle DONE !!!!!!!!!!!!!!!!!!!!") if __name__ == "__main__": # 时间设置 # now_time = datetime.datetime.now() # pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1)# 前一日 # end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 00:00:00") # start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 00:00:00") history_run_flag = False # 历史数据运行标志 # 更新sn列表 host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com' port=3306 db='qixiang_oss' user='qixiang_oss' password='Qixiang2021' conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db) cursor = conn.cursor() cursor.execute("select sn, imei, add_time from app_device") res = cursor.fetchall() df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time']) df_sn = df_sn.reset_index(drop=True) conn.close(); # 数据库配置 host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com' port = 3306 user = 'qx_cas' password = parse.quote_plus('Qx@123456') database = 'qx_cas' db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, password, host, port, database )) DbSession = sessionmaker(bind=db_engine) # 运行历史数据配置 df_first_data_time = pd.read_sql("select * from bat_first_data_time", db_engine) # 日志配置 now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_") log_path = 'log/' + now_str if not os.path.exists(log_path): os.makedirs(log_path) log = Log.Mylog(log_name='saftyCenter_diagfault', log_level = 'info') log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100) log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100) logger = log.get_logger() logger.info("pid is {}".format(os.getpid())) # 算法参数 host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com' port=3306 db='safety_platform' user='qx_read' password=parse.quote_plus('Qx@123456') tablename='all_fault_info' db_res_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, password, host, port, db )) #............................模块运行前,先读取数据库中所有结束时间为0的数据,需要从数据库中读取................`` # print("select start_time, end_time, product_id, code, level, info, advice, factory from {}".format(tablename)) df_Diag_Ram=pd.read_sql("select start_time, end_time, product_id, code, level, info, advice, Batpos from all_fault_info where factory = '{}'".format('骑享'), db_res_engine) # result = result[['start_time', 'end_time', 'product_id', 'code', 'level', 'info', 'advice']] # df_Diag_Ram=result[result['end_time']=='0000-00-00 00:00:00'] #定时任务....................................................................................................................................................................... fun() scheduler = BlockingScheduler() scheduler.add_job(fun, 'interval', seconds=600, id='diag_job') try: scheduler.start() except Exception as e: scheduler.shutdown() logger.error(str(e))