Chargement du script prennant en entrée un fichier K12 plutot qu'un pcapng

- Ajout de la version prennant en entrée un fichier text K12 exporté depuis wireshark plutot qu'un pcapng
This commit is contained in:
2025-04-28 08:21:07 +00:00
parent aa54016b9a
commit eb9f2fcbf1
2 changed files with 580 additions and 0 deletions

351
decode_modbus_k12.py Normal file
View File

@@ -0,0 +1,351 @@
import re
import json
class Settings:
def __init__(self, file):
self.file = file
with open(self.file) as f:
self.data = json.load(f)
self.log_file = self.data['log_file']
self.output_file = self.data['log_file']+'.out'
self.ip_ats = self.data['ip_ats']
self.ip_srvr = self.data['ip_srvr']
self.scada_ene = self.data['scada_ene']
self.scada_eqpt = self.data['scada_eqpt']
self.plc = self.data['plc']
self.eth_l = self.data["eth_l"]
self.ip_l = self.data["ip_l"]
self.tcp_l = self.data["tcp_l"]
self.registersNames = self.data['registerNames']
class Ip:
def __init__(self, array_bytes):
self.telegram=array_bytes
self.src='.'.join(str(int(i,16)) for i in array_bytes[12:16])
self.dst='.'.join(str(int(i,16)) for i in array_bytes[16:])
class Frame:
def __init__(self, timestamp, array_bytes, ip_ats, ip_srvr, scada_ene, scada_eqpt, plc, eth_l, ip_l, tcp_l, registersNames):
#computing index from to for array in frame
self.eth_i = eth_l
self.ip_i = self.eth_i + ip_l
self.tcp_i = self.ip_i + tcp_l
self.timestamp = timestamp
self.ethernet=array_bytes[:self.eth_i]
self.ip=Ip(array_bytes[self.eth_i:self.ip_i])
self.tcp=array_bytes[self.ip_i:self.tcp_i]
self.modbus='N/A'
if plc:
if len(array_bytes) >= self.tcp_i+12:
if ip_ats == self.ip.src and ip_srvr == self.ip.dst:
self.modbus=Modbus_request(array_bytes[self.tcp_i:], scada_ene, scada_eqpt, plc, registersNames)
elif ip_ats == self.ip.dst and ip_srvr == self.ip.src:
self.modbus=Modbus_response(array_bytes[self.tcp_i:], scada_ene, scada_eqpt, plc, registersNames)
# 66 = header ETH+IP+TCP + 8 min length of Modbus frame
else:
if len(array_bytes) >= self.tcp_i+12:
if ip_ats == self.ip.src and ip_srvr == self.ip.dst:
self.modbus=Modbus_request(array_bytes[self.tcp_i:], scada_ene, scada_eqpt, plc, registersNames)
elif ip_ats == self.ip.dst and ip_srvr == self.ip.src:
self.modbus=Modbus_response(array_bytes[self.tcp_i:], scada_ene, scada_eqpt, plc, registersNames)
def __str__(self):
s = 'Frame:{'
s += '\n Ethernet: '+''.join(self.ethernet)
s += '\n IP : '+''.join(self.ip.telegram)
s += '\n TCP : '+''.join(self.tcp)
if self.modbus == 'N/A':
s += '\n MODBUS : '+''.join(self.modbus)
elif isinstance(self.modbus, Modbus_msg):
s += '\n MODBUS : '+''.join(self.modbus.__str__())
s += '\n}'
return s
class Modbus_msg:
def __init__(self,array_bytes, registersNames):
self.msg = ''.join(array_bytes)
self.tr_id = int(self.msg[:4],16)
self.id = int(self.msg[4:8],16)
self.long_sup = int(self.msg[8:10],16)
self.long_inf_nb_octets = int(self.msg[10:12],16)
self.addr_slave = int(self.msg[12:14],16)
self.func_id = int(self.msg[14:16],16)
def _str_(self):
s = '\n transaction_id = '+str(self.tr_id)
s += '\n id = '+str(self.id)
s += '\n long_sup = '+str(self.long_sup)
s += '\n size (bytes) = '+str(self.long_inf_nb_octets)
s += '\n addr_slave = '+str(self.addr_slave)
if self.func_id == 1:
s += '\n function_id = '+str(self.func_id)+' -> Read Coil'
elif self.func_id == 2:
s += '\n function_id = '+str(self.func_id)+' -> Read Discrete Input'
elif self.func_id == 3:
s += '\n function_id = '+str(self.func_id)+' -> Read Holding Registers'
elif self.func_id == 4:
s += '\n function_id = '+str(self.func_id)+' -> Read Input Registers'
elif self.func_id == 5:
s += '\n function_id = '+str(self.func_id)+' -> Write Single Coil'
elif self.func_id == 6:
s += '\n function_id = '+str(self.func_id)+' -> Write Single Holding Register'
elif self.func_id == 15:
s += '\n function_id = '+str(self.func_id)+' -> Write Multiple Coils'
elif self.func_id == 16:
s += '\n function_id = '+str(self.func_id)+' -> Write Multiple Register'
elif self.func_id == 129:
s += '\n function_id = '+str(self.func_id)+' -> Exception'
elif self.func_id == 130 or self.func_id == 131 or self.func_id == 132 or self.func_id == 133 or self.func_id == 134 or self.func_id == 143 or self.func_id == 144:
s += '\n function_id = '+str(self.func_id)+' -> Functional error code in response'
else:
s += '\n function_id = '+str(self.func_id)+' -> ILLEGAL_FUNCTION | NON-MODBUS PACKET'
s += '\n'
return s
class Modbus_request(Modbus_msg):
def __init__(self, array_bytes, scada_ene, scada_eqpt, plc, registersNames):
msg = 'MODBUS_REQUEST'
Modbus_msg.__init__(self, array_bytes, registersNames)
self.scada_ene = scada_ene
self.scada_eqpt = scada_eqpt
self.plc = plc
self.registersNames = registersNames
if self.msg[16:20] != '':
self.addr_fst_register = int(self.msg[16:20],16)
if self.func_id == 3:
self.nb_registers = int(self.msg[20:],16)
elif self.func_id == 16:
self.nb_registers = int(self.msg[20:24],16)
self.nb_octets = int(self.msg[24:26],16)
self.registers = self.msg[26:]
elif self.func_id == 15:
self.nb_registers = int(self.msg[20:24],16)
self.nb_octets = int(self.msg[24:26],16)
self.registers = self.msg[26:]
else:
msg += ' -> ILLEGAL PACKET'
self.addr_fst_register = -1
self.nb_registers = -1
self.nb_octets = -1
self.registers = -1
print(msg)
def __str__(self):
s='PACKET: '+self.msg+'\n'
if self.func_id == 3:
s += 'Modbus TX:{'+self._str_()
s += ' fst_r_register = '+str(self.addr_fst_register)
s += '\n nb_registers = '+str(self.nb_registers)
s += '\n}'
elif self.func_id == 16:
s += 'Modbus TX:{'+self._str_()
s += ' fst_w_register = '+str(self.addr_fst_register)+' - '+self.registersNames[str(self.addr_fst_register)]
if self.plc and (self.scada_ene or self.scada_eqpt):
s += ' -> '+str(self.registersNames[str(self.addr_fst_register)])
s += '\n nb_registers = '+str(self.nb_registers)
s += '\n nb_octets = '+str(self.nb_octets)
s += '\n value(s) = 0x'+str(self.registers)+' ('+str(int(self.registers, 16))+')'
if self.plc and (self.scada_ene or self.scada_eqpt):
if self.nb_registers < 1:
s +=' -> '
if '1' in self.registers:
if self.scada_ene and self.scada_eqpt:
s += "ENERGIZATION | BREL"
elif self.scada_ene and not self.scada_eqpt:
s += "ENERGIZATION"
else:
s += "BREL"
else:
if self.scada_ene and self.scada_eqpt:
s += "DE-ENERGIZATION | PARC"
elif self.scada_ene and not self.scada_eqpt:
s += "DE-ENERGIZATION"
else:
s += "PARC"
else:
s += "{"
dic = self.decodeValuesForRead()
for i in range(self.addr_fst_register, (self.nb_octets/2)+self.addr_fst_register):
if i%1==0:
s += '\n '
s += ' '+str(i)+' ('+str(self.registersNames[str(i)])+') -> '+dic[i]+" ("+self.computeState(dic[i])+")"
s += "\n }"
s += '\n}'
else:
s += 'Modbus TX:{'+self._str_()
s += ' fst_w_register = '+str(self.addr_fst_register)
s += '\n nb_registers = '+str(self.nb_registers) if hasattr(self, 'nb_registers') else '\n nb_registers = None'
s += '\n nb_octets = '+str(self.nb_octets) if hasattr(self, 'nb_octets') else '\n nb_octets = None'
s += '\n value(s) = 0x'+str(self.registers)+' -> ' if hasattr(self, 'registers') else '\n value(s) = None'
s += '\n}'
return s
def decodeValuesForRead(self):
dicRegisters = {}
j=0
for i in range(self.addr_fst_register, (self.nb_octets/2)+self.addr_fst_register):
dicRegisters[i]=self.registers[j:j+4]
j+=4
return dicRegisters
def computeState(self, state):
s = ""
if state[:2] == "00":
s += "KNOWN-COMMANDABLE"
elif state[:2] == "01":
s += "UNKNOWN-COMMANDABLE"
elif state[:2] == "02":
s += "KNOWN-UNCOMMANDABLE"
elif state[:2] == "03":
s += "UNKNOWN-UNCOMMANDABLE"
s += "-"
if state[2:] == "00":
if self.scada_ene and self.scada_eqpt:
s += "0V | PARC"
elif self.scada_ene and not self.scada_eqpt:
s += "0V"
else:
s += "PARC"
elif state[2:] == "01":
if self.scada_ene and self.scada_eqpt:
s += "900V | BREL"
elif self.scada_ene and not self.scada_eqpt:
s += "900V"
else:
s += "BREL"
return s
class Modbus_response(Modbus_msg):
def __init__(self, array_bytes, scada_ene, scada_eqpt, plc, registersNames):
print('MODBUS_RESPONSE')
Modbus_msg.__init__(self, array_bytes, registersNames)
self.request = None
self.scada_ene = scada_ene
self.scada_eqpt = scada_eqpt
self.plc = plc
self.registersNames = registersNames
self.exception=''
if self.func_id == 131:
self.exception= int(self.msg[16:],16)
else:
if self.func_id == 3:
self.nb_octets = int(self.msg[16:18],16)
self.registers = self.msg[18:]
elif self.func_id == 16:
self.addr_fst_register = int(self.msg[16:20],16)
self.nb_w_registers = int(self.msg[20:24],16)
elif self.func_id == 15:
self.addr_fst_register = int(self.msg[16:20],16)
self.nb_w_registers = int(self.msg[20:24],16)
def __str__(self):
s='PACKET: '+self.msg+'\n'
if self.func_id == 131 or self.func_id == 144:
s += 'Modbus RX:{'+self._str_()
s += ' Exception = '+str(self.exception)
if self.exception == 1:
s += ' -> ILLEGAL_FUNCTION'
elif self.exception == 2:
s += ' -> ILLEGAL_DATA_ADDRESS'
elif self.exception == 3:
s += ' -> ILLEGAL_DATA_VALUE'
elif self.exception == 4:
s += ' -> SLAVE_DEVICE_FAILURE'
elif self.exception == 6:
s += ' -> SLAVE_DEVICE_BUSY'
elif self.exception == 10:
s += ' -> GATEWAY_PATH_UNAVAILABLE'
elif self.exception == 11:
s += ' -> GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND'
s += '\n}'
elif self.func_id == 3:
s += 'Modbus RX:{'+self._str_()
s += ' nb_octets = '+str(self.nb_octets)+' -> '+str(int(self.nb_octets/2))+' registers'
# s += '\n value(s) = 0x'+str(self.registers)
s += '\n value(s) = {'
if self.request is not None:
if self.plc or (self.scada_ene or self.scada_eqpt):
dic = self.decodeValuesForRead()
for i in range(self.request.addr_fst_register, int(self.nb_octets/2)+self.request.addr_fst_register):
if i%1==0:
s += '\n '
s += ' '+str(i)+' ('+str(self.registersNames[str(i)])+') -> '+dic[i]+" ("+self.computeState(dic[i])+")"
s += "\n }"
else:
s += 'LINKED REQUEST NOT FOUND}'
s += '\n}'
elif self.func_id == 16:
s += 'Modbus RX:{'+self._str_()
s += ' addr_fst_rgstr = '+str(self.addr_fst_register)
s += '\n nb_w_registers = '+str(self.nb_w_registers)
s += '\n}'
elif self.func_id == 15:
s += 'Modbus RX:{'+self._str_()
s += ' addr_fst_rgstr = '+str(self.addr_fst_register)
s += '\n nb_w_registers = '+str(self.nb_w_registers)
s += '\n}'
return s
def decodeValuesForRead(self):
dicRegisters = {}
j=0
for i in range(self.request.addr_fst_register, int(self.nb_octets/2)+self.request.addr_fst_register):
dicRegisters[i]=self.registers[j:j+4]
j+=4
return dicRegisters
def computeState(self, state):
s = ""
if state[:2] == "00":
s += "KNOWN-COMMANDABLE"
elif state[:2] == "01":
s += "UNKNOWN-COMMANDABLE"
elif state[:2] == "02":
s += "KNOWN-UNCOMMANDABLE"
elif state[:2] == "03":
s += "UNKNOWN-UNCOMMANDABLE"
s += "-"
if state[2:] == "00":
if self.scada_ene and self.scada_eqpt:
s += "0V | PARC"
elif self.scada_ene and not self.scada_eqpt:
s += "0V"
else:
s += "PARC"
elif state[2:] == "01":
if self.scada_ene and self.scada_eqpt:
s += "900V | BREL"
elif self.scada_ene and not self.scada_eqpt:
s += "900V"
else:
s += "BREL"
return s
if __name__ == '__main__':
s = Settings('decode_scada_modbus.json')
f = open(s.log_file, 'r')
dModbus = []
timestamp = None
frame = None
catch_next = False
for line in f.readlines():
if not catch_next and re.match(r'[0-9]{2}:.*', line):
timestamp = re.match(r'([0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3},[0-9]{3}).*', line).groups()[0]
catch_next = True
if catch_next and re.match(r'\|.*',line):
frame = Frame(timestamp, re.match(r'(\|.*)', line).groups()[0][6:-1].split('|'), s.ip_ats, s.ip_srvr, s.scada_ene, s.scada_eqpt, s.plc, s.eth_l, s.ip_l, s.tcp_l, s.registersNames)
# checking if frame is the response of previously request and adding this request to the response linked.
if isinstance(frame.modbus, Modbus_response) and len(dModbus) > 1:
for i in range(len(dModbus)):
if isinstance(dModbus[-i].modbus, Modbus_request):
if frame.modbus.tr_id == dModbus[-i].modbus.tr_id:
frame.modbus.request = dModbus[-i].modbus
break
dModbus.append(frame)
catch_next = False
o = open(s.output_file, 'w')
for frame in dModbus:
if frame.modbus != 'N/A':
o.write(frame.timestamp + ': ' + frame.ip.src + ' -> ' + frame.ip.dst+'\n')
o.write(frame.modbus.__str__()+'\n\n')
#o.write(frame.__str__()+'\n')
o.close()
f.close()
print('TERMINATED')
input()