import datetime #from datetime.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. import datetime #from datetime
  2. from multiprocessing import Pool
  3. import json
  4. import os
  5. import time
  6. import traceback
  7. import warnings
  8. from keras.models import load_model
  9. import pickle
  10. from sqlalchemy import text, delete, and_, or_, update
  11. import pandas as pd
  12. from ZlwlAlgosCommon.utils.ProUtils import *
  13. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  14. #from DataSplit.COMMON.service.iotp.IotpAlgoService import IotpAlgoService #test_code
  15. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  16. from ZlwlAlgosCommon.orm.models import *
  17. from DataSplit.V_1_0_0 import data_status as ds ##充电状态标准化程序
  18. from DataSplit.V_1_0_0 import data_split as dt ##分段函数程序
  19. from DataSplit.V_1_0_0 import data_drive_stat as ddt ##行驶数据按行驶段汇总统计
  20. from DataSplit.V_1_0_0 import data_charge_stat as dct ##充电数据按充电段汇总
  21. from DataSplit.V_1_0_0 import data_stand_stat as dst ##静置数据按静置段汇总
  22. from DataSplit.V_1_0_0 import data_drive_stat_period as ddtp ##行驶数据按充电周期汇总统计
  23. from DataSplit.V_1_0_0 import trans_day as trd ##解决跨天的问题
  24. from DataSplit.V_1_0_0 import stand_status as ss
  25. def update_param(db_engine):
  26. # 从redis中获取参数,如果redis中获取不到,则去数据库中获取
  27. df_algo_adjustable_param = pd.read_sql("select * from algo_adjustable_param", db_engine)
  28. df_algo_list = pd.read_sql("select * from algo_list", db_engine)
  29. df_algo_pack_param = pd.read_sql("select* from algo_pack_param", db_engine)
  30. return df_algo_adjustable_param, df_algo_list, df_algo_pack_param
  31. def main():
  32. process_num=1
  33. # 程序不能停止
  34. while(True):
  35. try:
  36. warnings.filterwarnings("ignore")
  37. try:
  38. cleanUtils = CleanUtils()
  39. # 调用算法前的准备工作
  40. mysql_algo_conn = None
  41. mysql_algo_engine = None
  42. mysql_iotp_conn = None
  43. mysql_iotp_engine= None
  44. kafka_consumer = None
  45. rc= None
  46. algo_list = ['socdiag','low_soc_diag','Sor_Diag','Li_Plted','Data_Split'] # 本调度所包含的算法名列表。
  47. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  48. logger_main.info(f"process-{process_num}: 配置中间件")
  49. # mysql
  50. mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
  51. mysqlUtils = MysqlUtils()
  52. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  53. mysql_algo_conn = mysql_algo_engine.connect()
  54. mysql_iotp_data = sysUtils.get_cf_param('mysql-iotp')
  55. mysqlUtils = MysqlUtils()
  56. mysql_iotp_engine, mysql_iopt_Session= mysqlUtils.get_mysql_engine(mysql_iotp_data)
  57. mysql_iotp_conn = mysql_iotp_engine.connect()
  58. #Hbase
  59. hbase_params = sysUtils.get_cf_param('hbase-datafactory')# test_code
  60. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  61. #redis
  62. redis_params = sysUtils.get_cf_param('redis')
  63. reidsUtils = RedisUtils()
  64. rc = reidsUtils.get_redis_conncect(redis_params)
  65. except Exception as e:
  66. logger_main.error(f'process-{process_num}: {e}')
  67. logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
  68. cleanUtils.clean(mysql_algo_conn, mysql_algo_engine, mysql_iotp_conn, mysql_iotp_engine, kafka_consumer, rc)
  69. # 开始准备调度
  70. # for message in kafka_consumer:
  71. #KafkaConsumer.commit(self)
  72. try:
  73. logger_main.info(f'收到调度')
  74. if not mysql_algo_conn.closed:
  75. mysql_algo_conn.close()
  76. mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接
  77. df_algo_adjustable_param, df_algo_list, df_algo_pack_param=update_param(mysql_algo_conn)
  78. df_algo_adjustable_param[df_algo_adjustable_param['pack_code']=='JX19220']
  79. df_algo_pack_param=df_algo_pack_param[df_algo_pack_param['pack_code']=='JX19220']
  80. df_algo_pack_param=json.loads(df_algo_pack_param.iloc[0]['param'])
  81. pack_code='JX19220'
  82. # schedule_params = json.loads(message.value)
  83. # if (schedule_params is None) or (schedule_params ==''):
  84. # logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value)))
  85. # continue
  86. # kafka 调度参数解析
  87. # df_snlist = pd.DataFrame(schedule_params['snlist'])
  88. # df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai'])
  89. # df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param'])
  90. # df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()}
  91. # df_algo_param = pd.DataFrame(schedule_params['algo_list'])
  92. # start_time = schedule_params['start_time']
  93. # end_time = schedule_params['end_time']
  94. # pack_code = schedule_params['pack_code']
  95. # cell_type = schedule_params['cell_type']
  96. mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
  97. mysqlUtils = MysqlUtils()
  98. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  99. mysql_algo_conn = mysql_algo_engine.connect()
  100. df_snpk_list = pd.read_sql("select sn, imei,pack_model,device_cell_type,scrap_status from t_device", mysql_algo_conn)
  101. df_snpk_list=df_snpk_list[df_snpk_list['scrap_status']<4]
  102. df_snpk_list=df_snpk_list.rename(columns={'pack_model':'pack_code'})
  103. df_snpk_list=df_snpk_list[df_snpk_list['sn'].str.contains('N234P')]
  104. df_sn_list=list(df_snpk_list['sn'])
  105. sn_list_total=df_sn_list
  106. sn_list=sn_list_total
  107. start_time = "2023-03-25 00:00:00"
  108. end_time = "2023-06-25 00:00:00"
  109. columns = [ DataField.time, DataField.sn, DataField.pack_crnt, DataField.pack_volt, DataField.pack_soc,
  110. DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp,
  111. DataField.other_temp_value, DataField.bms_sta, DataField.charge_sta,DataField.latitude,DataField.longitude,
  112. DataField.mileage, DataField.accum_chg_wh, DataField.accum_dschg_wh, DataField.accum_chg_ah,DataField.accum_dschg_ah,DataField.vin]
  113. for i in range(0,len(sn_list_total)):
  114. sn_list=[sn_list_total[i]]
  115. # 取数
  116. time_st = time.time()
  117. logger_main.info(f"process-{process_num}: 开始取数,{start_time} ~ {end_time}\n{str(sn_list)}")
  118. df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time)
  119. #df_data.to_excel('df_data_1.xlsx')
  120. logger_main.info(f'process-{process_num},获取到{len(df_data)}条数据,取数d耗时:{time.time()-time_st}')
  121. if len(df_data) == 0:
  122. logger_main.info(f"process-{process_num}: 数据清洗耗时{time.time()-time_st}, 无有效数据,跳过本次运算")
  123. continue
  124. # 将字符串转换成datetime对象
  125. str_date = start_time
  126. date_time =datetime.datetime.strptime(str_date, '%Y-%m-%d %H:%M:%S')
  127. # 将datetime对象减去6小时
  128. new_date_time = date_time - datetime.timedelta(hours=8)
  129. # 将datetime对象转换成字符串
  130. start_time_8h = new_date_time.strftime('%Y-%m-%d %H:%M:%S')
  131. df_data_8h = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time_8h, end_time=start_time)
  132. logger_main.info(f'process-{process_num},获取到{len(df_data_8h)}条数据,取数耗时:{time.time()-time_st}')
  133. # 数据清洗
  134. try:
  135. time_st = time.time()
  136. logger_main.info(f'process-{process_num}数据清洗')
  137. if len(df_data_8h)==0:
  138. df_data_8h=pd.DataFrame(columns=['datatype','time','sn','pack_crnt','pack_volt','pack_soc',
  139. 'cell_voltage_count','cell_temp_count','cell_voltage','cell_temp','other_temp_value',
  140. 'bms_sta','charge_sta','latitude','longitude','mileage','accum_dschg_wh','accum_chg_wh','accum_chg_ah','accum_dschg_ah','vin'])
  141. #df_data_8h = df_data_8h.drop(['latitude','longitude','mileage','accum_chg_wh''accum_dschg_wh','accum_dschg_ah','accum_chg_ah','vin'], axis=1, errors='ignore')
  142. if len(df_data)==0:
  143. df_data=pd.DataFrame(columns=['datatype','time','sn','pack_crnt','pack_volt','pack_soc',
  144. 'cell_voltage_count','cell_temp_count','cell_voltage','cell_temp','other_temp_value',
  145. 'bms_sta','charge_sta','latitude','longitude','mileage','accum_dschg_wh','accum_chg_wh','accum_chg_ah','accum_dschg_ah','vin'])
  146. #df_data = df_data.drop(['latitude','longitude','mileage','accum_chg_wh','accum_dschg_wh','accum_dschg_ah','accum_chg_ah','vin'], axis=1, errors='ignore')
  147. df_data_gps=iotp_service.gps_datacleaning(df_data)
  148. df_data_gps_8h=iotp_service.gps_datacleaning(df_data_8h)
  149. df_data_accum=iotp_service.accum_datacleaning(df_data)
  150. df_data_accum_8h=iotp_service.accum_datacleaning(df_data_8h)
  151. df_data_vin=iotp_service.vin_datacleaning(df_data)
  152. df_data_vin_8h=iotp_service.vin_datacleaning(df_data_8h)
  153. df_data, df_table, cellvolt_name, celltemp_name = iotp_service.datacleaning(df_algo_pack_param,df_data)#进行数据清洗
  154. df_data_8h, df_table_t, cellvolt_name_t, celltemp_name_t = iotp_service.datacleaning(df_algo_pack_param,df_data_8h)#进行数据清洗
  155. #df_data.to_excel('df_data_2.xlsx')
  156. if len(df_data_gps_8h)==0:
  157. df_data_gps_8h=pd.DataFrame(columns=['sn','time','datatype','latitude','longitude','mileage'])
  158. if len(df_data_gps)==0:
  159. df_data_gps=pd.DataFrame(columns=['sn','time','datatype','latitude','longitude','mileage'])
  160. if len(df_data_accum_8h)==0:
  161. df_data_accum_8h=pd.DataFrame(columns=['sn','time','datatype','accum_chg_wh','accum_dschg_wh','accum_dschg_ah','accum_chg_ah'])
  162. if len(df_data_accum)==0:
  163. df_data_accum=pd.DataFrame(columns=['sn','time','datatype','accum_chg_wh','accum_dschg_wh','accum_dschg_ah','accum_chg_ah'])
  164. if len(df_data_vin_8h)==0:
  165. df_data_vin_8h=pd.DataFrame(columns=['sn','time','datatype','vin'])
  166. if len(df_data_vin)==0:
  167. df_data_vin=pd.DataFrame(columns=['sn','time','datatype','vin'])
  168. try:
  169. df_data_32h=pd.concat([df_data_8h,df_data])
  170. df_data_32h=df_data_32h.reset_index(drop=True)
  171. #df_data_32h.to_excel('data_32h.xlsx')
  172. if len(df_data_32h)==0:
  173. logger_main.info(f"process-{process_num}: 数据清洗耗时{time.time()-time_st}, 无有效数据,跳过本次运算")
  174. continue
  175. else:
  176. df_data_gps_32h=pd.concat([df_data_gps_8h,df_data_gps])
  177. df_data_gps_32h=df_data_gps_32h.reset_index(drop=True)
  178. if len(df_data_gps_32h)==0:
  179. df_data_gps_32h=pd.DataFrame(columns=['sn','time','datatype','latitude','longitude','mileage'])
  180. df_data_accum_32h=pd.concat([df_data_accum_8h,df_data_accum])
  181. df_data_accum_32h=df_data_accum_32h.reset_index(drop=True)
  182. if len(df_data_accum_32h)==0:
  183. df_data_accum_32h=pd.DataFrame(columns=['sn','time','datatype','accum_chg_wh','accum_dschg_wh','accum_dschg_ah','accum_chg_ah'])
  184. df_data_vin_32h=pd.concat([df_data_vin_8h,df_data_vin])
  185. df_data_vin_32h=df_data_vin_32h.reset_index(drop=True)
  186. if len(df_data_vin_32h)==0:
  187. df_data_vin_32h=pd.DataFrame(columns=['sn','time','datatype','vin'])
  188. ##先关联gps和accum
  189. df_merge_ga = pd.merge(df_data_gps_32h, df_data_accum_32h,on=['sn','time','datatype'],how='outer')
  190. df_merge_ga = pd.merge(df_merge_ga, df_data_vin_32h,on=['sn','time','datatype'],how='outer')
  191. df_merge_ga=df_merge_ga.sort_values(["sn","time"],ascending = [True, True])
  192. df_merge_ga_filled = df_merge_ga.groupby("sn").fillna(method='ffill')
  193. df_merge_ga_filled= pd.concat([df_merge_ga[['sn']], df_merge_ga_filled], axis=1)
  194. df_merge_ga_filled = df_merge_ga_filled.groupby("sn").fillna(method='bfill')
  195. df_merge_ga= pd.concat([df_merge_ga[['sn']], df_merge_ga_filled], axis=1)
  196. ##识别静置的情况,关联非静置数据用前值填充
  197. df_merge_ga['time'] = pd.to_datetime(df_merge_ga['time'], format='%Y-%m-%d %H:%M:%S')
  198. df_merge_bga = pd.merge(df_data_32h, df_merge_ga, on=['sn','time','datatype'],how='outer')
  199. #df_merge=ss.stand_status(df_merge_bga,s_order_delta=600,lon_delta=0.1)
  200. df_merge=df_merge_bga.sort_values(["sn","time"],ascending = [True, True])
  201. df_merge_filled = df_merge.groupby("sn").fillna(method='ffill')
  202. df_merge = pd.concat([df_merge[['sn']], df_merge_filled], axis=1)
  203. df_merge_filled = df_merge.groupby("sn").fillna(method='bfill')
  204. df_merge = pd.concat([df_merge[['sn']], df_merge_filled], axis=1)
  205. df_merge=df_merge.dropna(subset=['pack_crnt','latitude','longitude','vin'],axis=0,how='all')
  206. df_merge=df_merge[df_merge['datatype']==12]
  207. #df_merge.to_excel('merged.xlsx')
  208. except Exception as e:
  209. logger_main.error(f"process-{process_num}:32小时拼接出错")
  210. logger_main.error(f"process-{process_num}:{e}")
  211. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  212. if len(df_data) == 0:
  213. logger_main.info(f"process-{process_num}: 数据清洗耗时{time.time()-time_st}, 无有效数据,跳过本次运算")
  214. continue
  215. else:
  216. logger_main.info(f"process-{process_num}: {pack_code}, time_type:{df_data.loc[0, 'time']} ~ {df_data.iloc[-1]['time']}, 数据清洗完成耗时{time.time()-time_st}")
  217. except Exception as e:
  218. logger_main.error(f"process-{process_num}:数据清洗出错")
  219. logger_main.error(f"process-{process_num}:{e}")
  220. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  221. continue
  222. try:
  223. loggers['Data_Split'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}')
  224. df_merge=ds.data_status(df_merge,c_soc_dif_p=0.05,s_soc_dif_p=0,c_order_delta=1200,s_order_delta=300)
  225. ##基于各个状态码,进行分段,分段函数
  226. cell_type='L'
  227. df_drive,df_charge,df_stand,df_data_split_rlt,df_rank_abnormal=dt.split(df_merge,celltemp_name,cellvolt_name,drive_interval_time_min=1200,charge_interval_time_min=1200,stand_interval_time_min=1200,single_num_min=3,drive_sts=3,charge_sts=[21,22],stand_sts=0,cell_type=cell_type,capacity=456)
  228. #df_drive,df_charge,df_stand,df_data_split_rlt=dt.split(df_merge,celltemp_name,cellvolt_name,drive_interval_time_min=1200,charge_interval_time_min=1200,stand_interval_time_min=1200,single_num_min=3,drive_sts=3,charge_sts=[21,22],stand_sts=0,cell_type=cell_type,capacity=456)
  229. df_data_split_rlt.to_excel(f'{sn_list[0]}_NEW.xlsx')
  230. loggers['Data_Split'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}')
  231. except Exception as e:
  232. loggers['Data_Split'].error('算法运行出错')
  233. loggers['Data_Split'].error(str(e))
  234. loggers['Data_Split'].error(traceback.format_exc())
  235. df_drive=pd.DataFrame()
  236. df_charge=pd.DataFrame()
  237. df_stand=pd.DataFrame()
  238. df_data_split_rlt=pd.DataFrame()
  239. try:
  240. if not df_data_split_rlt.empty:
  241. time_record = time.time()
  242. df_data_split_rlt.reset_index(drop = True, inplace = True)
  243. #df_data_split_rlt.to_sql("algo_charge_info",con=mysql_algo_conn, if_exists="append",index=False)
  244. #write_mysql_time = write_mysql_time + time.time()-time_record
  245. logger_main.info(f'process-{process_num}数据分段统计入库{pack_code}完成')
  246. logger_main.info(f"process-{process_num}: 结果入库耗时:{time.time()-time_st}")
  247. except Exception as e:
  248. logger_main.error(f"process-{process_num}:数据分段结果入库出错")
  249. logger_main.error(f"process-{process_num}:{e}")
  250. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  251. except Exception as e:
  252. logger_main.error(f"process-{process_num}:获取原始数据出错")
  253. logger_main.error(f"process-{process_num}:{e}")
  254. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  255. continue
  256. #算法5_日数据分段
  257. except Exception as e:
  258. logger_main.error(f'process-{process_num}: {e}')
  259. logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
  260. cleanUtils.clean(mysql_algo_conn, mysql_algo_engine, mysql_iotp_conn, mysql_iotp_engine, kafka_consumer, rc)
  261. if __name__ == '__main__':
  262. while(True):
  263. try:
  264. # 配置量
  265. cur_env = 'dev' # 设置运行环境
  266. app_path = "/home/shouxueqi/projects/zlwl-algos/" # 设置app绝对路径
  267. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  268. app_name = "task_day_1" # 应用名
  269. sysUtils = SysUtils(cur_env, app_path)
  270. logger_main = sysUtils.get_logger(app_name, log_base_path)
  271. logger_main.info(f"本次主进程号: {os.getpid()}")
  272. # 读取配置文件 (该部分请不要修改)
  273. processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
  274. pool = Pool(processes = int(processes))
  275. logger_main.info("开始分配子进程")
  276. main()
  277. except Exception as e:
  278. print(str(e))
  279. print(traceback.format_exc())
  280. logger_main.error(str(e))
  281. logger_main.error(traceback.format_exc())
  282. finally:
  283. handlers = logger_main.handlers.copy()
  284. for h in handlers:
  285. logger_main.removeHandler(h)
  286. pool.terminate()