Download 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. # 标签原始数据获取
  2. from ZlwlAlgosCommon.utils.ProUtils import *
  3. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  4. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  5. import pandas as pd
  6. # from DataSplit.V_1_0_0 import data_status as ds ##充电状态标准化程序
  7. # from DataSplit.V_1_0_0 import data_split as dt ##分段函数程序
  8. import traceback
  9. import datetime
  10. import sys
  11. sys.path.append("/home/zhuxi/project/zlwl-algos")
  12. from ALGOS.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import data_split as dt
  13. from ALGOS.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import data_status as ds
  14. from dateutil.relativedelta import *
  15. cur_env = 'dev' # 设置运行环境
  16. app_path = "/home/zhuxi/project/zlwl-algos" # 设置相对路径
  17. sysUtils = SysUtils(cur_env, app_path)
  18. hbase_params = sysUtils.get_cf_param('hbase')
  19. hbase_datafactory_params = sysUtils.get_cf_param('hbase-datafactory')
  20. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  21. iotp_datafactory_service = IotpAlgoService(hbase_params=hbase_datafactory_params)
  22. mysql_datafactory_params = sysUtils.get_cf_param('mysql-datafactory')
  23. mysqlUtils = MysqlUtils()
  24. mysql_datafactory_engine, mysql_datafactory_Session= mysqlUtils.get_mysql_engine(mysql_datafactory_params)
  25. mysql_datafactory_conn = mysql_datafactory_engine.connect()
  26. dataSOH = pd.read_excel('sn-20210903.xlsx',sheet_name='科易6040')
  27. fileNames = dataSOH['SN号']
  28. fileNames = list(fileNames)
  29. # 根据标签 从 数据集 取数
  30. columns = [DataField.error_level, DataField.error_code, DataField.pack_crnt, DataField.pack_volt,
  31. DataField.bms_sta, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp,
  32. DataField.pack_soc, DataField.other_temp_value, DataField.cell_balance,DataField.latitude,DataField.longitude,
  33. DataField.pack_soh, DataField.charge_sta,DataField.mileage, DataField.accum_chg_wh, DataField.accum_dschg_wh, DataField.accum_chg_ah,DataField.accum_dschg_ah,DataField.vin]
  34. sql = f"select pack_code, param from algo_pack_param"
  35. df_algo_pack_param = pd.read_sql(sql, mysql_datafactory_conn)
  36. #pack_code='GM02010'
  37. pack_code='KY01710'
  38. pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  39. pack_param = pack_param.to_dict("records")
  40. df_algo_pack_param = eval(pack_param[0]['param'])
  41. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  42. app_name = "download" # 应用名
  43. logger_main = sysUtils.get_logger(app_name, log_base_path)
  44. list_col=[]
  45. for k in range(100):
  46. col=[]
  47. list_col.append(col)
  48. datelist=list(pd.date_range('20210501','20230531',freq='MS'))
  49. for sn in fileNames[3:]:
  50. #for sn in ['MGMCLN750N215I068']:
  51. for date in datelist:
  52. try:
  53. start_time=str(date)
  54. st=datetime.datetime.strptime(start_time,'%Y-%m-%d %H:%M:%S')
  55. #end=st+datetime.timedelta(days=1)
  56. end=st+ relativedelta(months = +1)
  57. end_time=str(end)
  58. # start_time='2021-05-01 00:00:00'
  59. # end_time='2023-05-31 00:00:00'
  60. df_data = iotp_datafactory_service.get_data(sn_list=[sn], columns=columns, start_time=start_time, end_time=end_time)
  61. if len(df_data)>0:
  62. df_bms,df_table,cellvolt_name,celltemp_name=iotp_service.datacleaning(df_algo_pack_param,df_data)#进行数据清洗
  63. df_data_accum=iotp_service.accum_datacleaning(df_data)
  64. df_data_gps=iotp_service.gps_datacleaning(df_data)
  65. df_data_vin=iotp_service.vin_datacleaning(df_data)
  66. if len(df_data_vin)==0:
  67. df_data_vin=pd.DataFrame(columns=['sn','time','datatype','vin'])
  68. df_data_accum['time'] = pd.to_datetime(df_data_accum['time'], format='%Y-%m-%d %H:%M:%S')
  69. df_data_gps['time'] = pd.to_datetime(df_data_gps['time'], format='%Y-%m-%d %H:%M:%S')
  70. df_data_vin['time'] = pd.to_datetime(df_data_vin['time'], format='%Y-%m-%d %H:%M:%S')
  71. df_merge_ga = pd.merge(df_bms, df_data_accum,on=['sn','time','datatype'],how='outer')
  72. df_merge_ga = pd.merge(df_merge_ga,df_data_gps,on=['sn','time','datatype'],how='outer')
  73. df_merge_ga = pd.merge(df_merge_ga,df_data_vin,on=['sn','time','datatype'],how='outer')
  74. df_merge_ga=df_merge_ga.sort_values(["sn","time"],ascending = [True, True])
  75. df_merge_ga_filled = df_merge_ga.groupby("sn").fillna(method='ffill')
  76. df_merge_ga_filled= pd.concat([df_merge_ga[['sn']], df_merge_ga_filled], axis=1)
  77. df_merge_ga_filled = df_merge_ga_filled.groupby("sn").fillna(method='bfill')
  78. df_merge_ga= pd.concat([df_merge_ga[['sn']], df_merge_ga_filled], axis=1)
  79. df_merge_ga['time'] = pd.to_datetime(df_merge_ga['time'], format='%Y-%m-%d %H:%M:%S')
  80. df_merge=df_merge_ga.sort_values(["sn","time"],ascending = [True, True])
  81. df_merge_filled = df_merge.groupby("sn").fillna(method='ffill')
  82. df_merge = pd.concat([df_merge[['sn']], df_merge_filled], axis=1)
  83. df_merge_filled = df_merge.groupby("sn").fillna(method='bfill')
  84. df_merge = pd.concat([df_merge[['sn']], df_merge_filled], axis=1)
  85. df_merge=df_merge.dropna(subset=['pack_crnt','latitude','longitude','vin'],axis=0,how='all')
  86. df_merge=df_merge[df_merge['datatype']==12]
  87. df_merge=ds.data_status(df_merge,c_soc_dif_p=0.05,s_soc_dif_p=0,c_order_delta=1200,s_order_delta=300)
  88. #df_merge['vin'] = df_merge['vin'].astype('str')
  89. #df_merge['vin'] = sn
  90. df_merge.reset_index(drop=True,inplace=True)
  91. df_drive,df_charge,df_stand,df_data_split_rlt=dt.split(df_merge,celltemp_name,cellvolt_name,drive_interval_time_min=1200,charge_interval_time_min=1200,stand_interval_time_min=1200,single_num_min=3,drive_sts=3,charge_sts=[21,22],stand_sts=0)
  92. df_charge.drop(['cell_balance','cell_temp','cell_voltage','other_temp_value'],axis=1,errors='ignore',inplace=True)
  93. if len(df_charge)>0:
  94. chrg_columns = list(df_charge.columns)
  95. l=len(chrg_columns)
  96. for i in range(l):
  97. list_col[i].extend(list(df_charge[chrg_columns[i]]))
  98. data_charge=pd.DataFrame({chrg_columns[i]:list_col[i] for i in range(l)})
  99. data_charge.to_feather('data_chargePK5002.feather')
  100. #data_charge.to_csv('data_charge.csv')
  101. except Exception as e:
  102. logger_main.error(f"process-{0}:{'MGMC'}数据清洗出错")
  103. logger_main.error(f"process-{0}:{e}")
  104. logger_main.error(f"process-{0}:{traceback.format_exc()}")