deploy.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. __author__ = 'lmstack'
  2. #coding=utf-8
  3. import os
  4. import datetime
  5. import pandas as pd
  6. from LIB.BACKEND import DBManager, Log
  7. from sqlalchemy import create_engine
  8. from sqlalchemy.orm import sessionmaker
  9. import time, datetime
  10. import dateutil.relativedelta
  11. import traceback
  12. from LIB.MIDDLE.CellStateEstimation.Common import log
  13. from pandas.core.frame import DataFrame
  14. from LIB.MIDDLE.SaftyCenter.Common import QX_BatteryParam
  15. from LIB.MIDDLE.SaftyCenter.CellValueDiag.V1_0_0.SC_SamplingSafty import SamplingSafty
  16. from LIB.MIDDLE.CellStateEstimation.Common import DBDownload
  17. from urllib import parse
  18. import pymysql
  19. import pdb
  20. from apscheduler.schedulers.blocking import BlockingScheduler
  21. import datacompy
  22. def fun():
  23. global df_sn
  24. global db_res_engine
  25. global logger
  26. global df_Diag_Ram
  27. # 读取结果数据库
  28. host2='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  29. port2=3306
  30. db2='safety_platform'
  31. user2='qx_read'
  32. password2='Qx@123456'
  33. start=time.time()
  34. end_time=datetime.datetime.now()
  35. start_time=end_time-datetime.timedelta(seconds=120)
  36. start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
  37. end_time=end_time.strftime('%Y-%m-%d %H:%M:%S')
  38. logger.info("cycle start !!!!!!!!!!!!!!!!!!!!")
  39. for i in range(0, len(df_sn)):
  40. try:
  41. if df_sn.loc[i, 'imei'][5:9] == 'N640':
  42. celltype=1 #6040三元电芯
  43. elif df_sn.loc[i, 'imei'][5:9] == 'N440':
  44. celltype=2 #4840三元电芯
  45. elif df_sn.loc[i, 'imei'][5:9] == 'L660':
  46. celltype=99 # 6060锂电芯
  47. elif df_sn.loc[i, 'imei'][3:5] == 'LX' and df_sn.loc[i, 'imei'][5:9] == 'N750':
  48. celltype=3 #力信 50ah三元电芯
  49. elif df_sn.loc[i, 'imei'][3:5] == 'CL' and df_sn.loc[i, 'imei'][5:9] == 'N750':
  50. celltype=4 #CATL 50ah三元电芯
  51. else:
  52. logger.info("pid-{} celltype-{} SN: {} SKIP!".format(os.getpid(), "未知", sn))
  53. continue
  54. sn = df_sn.loc[i, 'sn']
  55. logger.info("pid-{} celltype-{} SN: {} START!".format(os.getpid(), celltype, sn))
  56. param=QX_BatteryParam.BatteryInfo(celltype)
  57. # sn='PK50201A000002039'
  58. # celltype=2
  59. # start_time='2021-05-02 09:12:26'
  60. # end_time='2021-06-03 19:12:26'
  61. # # df_bms= pd.read_csv(r'D:\Platform\platform_python\data_analyze_platform\USER\01qixiang\98Download\\'+'BMS_'+sn+'.csv',encoding='GB18030')
  62. #读取原始数据库数据........................................................................................................................................................
  63. dbManager = DBManager.DBManager()
  64. df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms'])
  65. df_bms = df_data['bms']
  66. # df_bms.to_csv(r'D:\Platform\platform_python\data_analyze_platform\USER\01qixiang\99Result\\''BMS_'+sn+'.csv',encoding='GB18030')
  67. #读取结果数据库数据........................................................................................................................................................
  68. host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  69. port=3306
  70. db='qx_cas'
  71. user='qx_read'
  72. password='Qx@123456'
  73. mode=1
  74. # 读取结果数据库
  75. host2='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  76. port2=3306
  77. db2='safety_platform'
  78. user2='qx_read'
  79. password2='Qx@123456'
  80. tablename1='cellstateestimation_soh'
  81. tablename2='cellstateestimation_uniform_socvoltdiff'
  82. tablename3='cellstateestimation_soc'
  83. DBRead=DBDownload.DBDownload(host, port, db, user, password,mode)
  84. with DBRead as DBRead:
  85. df_soh=DBRead.getdata('time_st','sn','soh', 'cellsoh', tablename=tablename1, sn=sn, timename='time_sp', st=start_time, sp=end_time)
  86. df_uniform=DBRead.getdata('time','sn','cellsoc_diff','cellmin_num','cellmax_num', tablename=tablename2, sn=sn, timename='time', st=start_time, sp=end_time)
  87. # df_soc=DBRead.getdata('time','sn','packsoc', tablename=tablename3, sn=sn)
  88. #电池诊断................................................................................................................................................................
  89. #BatDiag=CBMSBatDiag.BatDiag(sn,celltype,df_bms, df_soh, df_uniform)
  90. #df_res=BatDiag.diag()
  91. #df_Diag_Ram_old=df_Diag_Ram
  92. df_Diag_Ram_Update=DataFrame(columns=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice'])
  93. CellFltInfo=DataFrame(columns=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice'])
  94. if not df_bms.empty:
  95. CellFltInfo=df_Diag_Ram[df_Diag_Ram['product_id']==sn]
  96. df_Diag_Ram_Update=SamplingSafty.main(sn,param,df_bms,CellFltInfo)
  97. if not df_Diag_Ram_Update.empty:
  98. sn_index=df_Diag_Ram[df_Diag_Ram['product_id']==sn].index
  99. df_Diag_Ram=df_Diag_Ram.drop(index=sn_index)
  100. df_Diag_Ram=df_Diag_Ram.append(df_Diag_Ram_Update)
  101. df_Diag_Ram.reset_index(inplace=True,drop=True)
  102. Diag_Ram_Dif=datacompy.Compare(df_Diag_Ram_Update,CellFltInfo,join_columns=['product_id','end_time', 'code'])
  103. Diag_Ram_Dif=Diag_Ram_Dif.df1_unq_rows
  104. if len(Diag_Ram_Dif)>0:
  105. Diag_Ram_Dif_New=Diag_Ram_Dif[Diag_Ram_Dif['end_time']=='0000-00-00 00:00:00']
  106. Diag_Ram_Dif_Finish=df_Diag_Ram[df_Diag_Ram['end_time']!='0000-00-00 00:00:00']
  107. if len(Diag_Ram_Dif_New)>0: # 新增
  108. Diag_Ram_Dif_New.columns = ['start_time', 'end_time', 'product_id', 'code', 'level', 'info', 'advice']
  109. Diag_Ram_Dif_New['factory'] = '骑享'
  110. Diag_Ram_Dif_New.to_sql("all_fault_info",con=db_res_engine, if_exists="append",index=False)
  111. if len(Diag_Ram_Dif_Finish)>0: # 修改
  112. Diag_Ram_Dif_Finish=Diag_Ram_Dif_Finish.reset_index(drop=True)
  113. try:
  114. conn = pymysql.connect(host=host2, port=port2, user=user2, password=password2, database=db2)
  115. cursor = conn.cursor()
  116. for i in range(0,len(Diag_Ram_Dif_Finish)):
  117. cursor.execute('''
  118. update all_fault_info set end_time={} where product_id='{}' and code={}
  119. '''.format(Diag_Ram_Dif_Finish.loc[i,'end_time'], Diag_Ram_Dif_Finish.loc[i,'product_id'], Diag_Ram_Dif_Finish.loc[i,'code']))
  120. conn.commit()
  121. conn.close();
  122. except:
  123. logger.error(traceback.format_exc)
  124. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  125. conn.close();
  126. end=time.time()
  127. logger.info("pid-{} SN: {} DONE!".format(os.getpid(), sn))
  128. except:
  129. logger.error(traceback.format_exc)
  130. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  131. if __name__ == "__main__":
  132. # 时间设置
  133. # now_time = datetime.datetime.now()
  134. # pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1)# 前一日
  135. # end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 00:00:00")
  136. # start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 00:00:00")
  137. history_run_flag = False # 历史数据运行标志
  138. # 更新sn列表
  139. host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
  140. port=3306
  141. db='qixiang_oss'
  142. user='qixiang_oss'
  143. password='Qixiang2021'
  144. conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db)
  145. cursor = conn.cursor()
  146. cursor.execute("select sn, imei, add_time from app_device")
  147. res = cursor.fetchall()
  148. df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time'])
  149. df_sn = df_sn.reset_index(drop=True)
  150. conn.close();
  151. # 数据库配置
  152. host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  153. port = 3306
  154. user = 'qx_cas'
  155. password = parse.quote_plus('Qx@123456')
  156. database = 'qx_cas'
  157. db_engine = create_engine(
  158. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  159. user, password, host, port, database
  160. ))
  161. DbSession = sessionmaker(bind=db_engine)
  162. # 运行历史数据配置
  163. df_first_data_time = pd.read_sql("select * from bat_first_data_time", db_engine)
  164. # 日志配置
  165. now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_")
  166. log_path = 'log/' + now_str
  167. if not os.path.exists(log_path):
  168. os.makedirs(log_path)
  169. log = Log.Mylog(log_name='saftyCenter_cellvaluediag', log_level = 'info')
  170. log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100)
  171. log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100)
  172. logger = log.get_logger()
  173. logger.info("pid is {}".format(os.getpid()))
  174. # 算法参数
  175. host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  176. port=3306
  177. db='safety_platform'
  178. user='qx_read'
  179. password=parse.quote_plus('Qx@123456')
  180. tablename='all_fault_info'
  181. db_res_engine = create_engine(
  182. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  183. user, password, host, port, db
  184. ))
  185. #............................模块运行前,先读取数据库中所有结束时间为0的数据,需要从数据库中读取................
  186. # print("select start_time, end_time, product_id, code, level, info, advice, factory from {}".format(tablename))
  187. result=pd.read_sql("select start_time, end_time, product_id, code, level, info, advice from all_fault_info where factory = '{}'".format('骑享'), db_res_engine)
  188. result = result[['start_time', 'end_time', 'product_id', 'code', 'level', 'info', 'advice']]
  189. df_Diag_Ram=result[result['end_time']=='0000-00-00 00:00:00']
  190. #定时任务.......................................................................................................................................................................
  191. scheduler = BlockingScheduler()
  192. scheduler.add_job(fun, 'interval', seconds=120, id='diag_job')
  193. try:
  194. scheduler.start()
  195. except Exception as e:
  196. scheduler.shutdown()
  197. logger.error(str(e))