Ouverture du repo avec les fichiers existants du decode

Ajout des deux fichiers principaux:
- decode_modbus.json
    > fichier de configuration du script python
- decode_modbus.py
    > script python (version 3.9+)
This commit is contained in:
2025-04-28 08:17:46 +00:00
parent 1ef36d1d93
commit aa54016b9a
2 changed files with 529 additions and 0 deletions

303
decode_modbus.py Normal file
View File

@@ -0,0 +1,303 @@
import os
from scapy.all import *
os.system('cls')
from art import *
import json
import datetime
import binascii
class Settings:
def __init__(self, file):
self.file = file
with open(self.file) as f:
self.data = json.load(f)
self.log_file = self.data['log_file']
self.output_file = self.data['log_file']+'.out'
self.ip_ats = self.data['ip_ats']
self.ip_srvr = self.data['ip_srvr']
self.scada_ene = self.data['scada_ene']
self.scada_eqpt = self.data['scada_eqpt']
self.plc = self.data['plc']
self.registersNames = self.data['registersNames']
class ModbusPacket:
def __init__(self, isTx, src, dst, src_p, dst_p, infotime, data, registersNames, scada_ene, scada_eqpt, plc):
self.isTx = isTx
self.src = src
self.dst = dst
self.src_p = src_p
self.dst_p = dst_p
self.infotime = infotime
self.data = data
self.registersNames = registersNames
self.scada_ene = scada_ene
self.scada_eqpt = scada_eqpt
self.plc = plc
self.transaction_id = None
self.proto_id = None
self.length = None
self.size = None
self.addr_slave = None
self.function_id = None
self.nb_octets = None
self.registers = None
self.fst_register = None
self.nb_registers = None
self.request = None
self.extractData()
def __str__(self):
txt = str(self.infotime)+": "+self.src+"["+self.src_p+"] -> "+self.dst+"["+self.dst_p+"]\n"
txt += "PACKET: "+' '.join(self.data[i:i+2] for i in range(0, len(self.data), 2))+"\n"
if self.isTx:
txt += "Modbus TX:{\n"
txt += "\ttransaction_id = "+str(self.transaction_id)+"\n"
txt += "\tproto_id = "+str(self.proto_id)+"\n"
txt += "\tlength (bytes) = "+str(self.length)+"\n"
txt += "\taddr_slave = "+str(self.addr_slave)+"\n"
if self.function_id == 1:
txt += "\tfunction_id = "+str(self.function_id)+" -> Read Coil\n"
elif self.function_id == 2:
txt += "\tfunction_id = "+str(self.function_id)+" -> Read Discrete Input\n"
elif self.function_id == 3:
txt += "\tfunction_id = "+str(self.function_id)+" -> Read Holding Registers\n"
txt += "\tfst_r_register = "+str(self.fst_register)+"\n"
txt += "\tnb_registers = "+str(self.nb_registers)+"\n"
elif self.function_id == 4:
txt += "\tfunction_id = "+str(self.function_id)+" -> Read Input Registers\n"
elif self.function_id == 5:
txt += "\tfunction_id = "+str(self.function_id)+" -> Write Single Coil\n"
elif self.function_id == 6:
txt += "\tfunction_id = "+str(self.function_id)+" -> Write Single Holding Register\n"
elif self.function_id == 16:
txt += "\tfunction_id = "+str(self.function_id)+" -> Write Multiple Registers\n"
if str(self.fst_register) in self.registersNames.keys():
txt += "\tfst_w_register = "+str(self.fst_register)+" - "+self.registersNames[str(self.fst_register)]+"\n"
else:
txt += "\tfst_w_register = "+str(self.fst_register)+" -> NOT LISTED\n"
txt += "\tnb_registers = "+str(self.nb_registers)+"\n"
txt += "\tnb_octets = "+str(self.nb_octets)+"\n"
txt += "\tvalues = 0x"+str(self.registers)+"\n"
if self.nb_registers > 0:
txt += "\t\t{\n"
v = [(self.registers[i:i+4]) for i in range(0, len(self.registers), 4)]
for i in range(self.nb_registers):
if int(v[i]) == 0:
txt+="\t\t\t"+str(self.fst_register+i)+" - "+self.registersNames[str(self.fst_register+i)]+" -> 0x"+v[i]+" (RESET)\n"
else:
txt+="\t\t\t"+str(self.fst_register+i)+" - "+self.registersNames[str(self.fst_register+i)]+" -> 0x"+v[i]+" (DECLENCHEMENT)\n"
txt += "\t\t}\n"
elif self.function_id == 15:
txt += "\tfunction_id = "+str(self.function_id)+" -> Write Multiple Coils\n"
txt += "\tfst_w_register = "+str(self.fst_register)+"\n"
txt += "\tnb_registers = "+str(self.nb_registers)+"\n"
txt += "\tnb_octets = "+str(self.nb_octets)+"\n"
txt += "\tvalues = "+str(self.registers)+"\n"
elif self.function_id == 129:
txt += "\tfunction_id = "+str(self.function_id)+" -> Exception\n"
elif self.function_id == 130 or self.function_id == 131 or self.function_id == 132 or self.function_id == 133 or self.function_id == 134 or self.function_id == 143 or self.function_id == 144:
txt += "\tfunction_id = "+str(self.function_id)+" -> Functional error code in response\n"
else:
txt += "\tfunction_id = "+str(self.function_id)+" -> ILLEGAL_FUNCTION | NON-MODBUS PACKET\n"
txt += "}\n"
else:
txt += "Modbus RX:{\n"
txt += "\ttransaction_id = "+str(self.transaction_id)+"\n"
txt += "\tproto_id = "+str(self.proto_id)+"\n"
txt += "\tlength (bytes) = "+str(self.length)+"\n"
txt += "\taddr_slave = "+str(self.addr_slave)+"\n"
if self.function_id == 1:
txt += "\tfunction_id = "+str(self.function_id)+" -> Read Coil\n"
elif self.function_id == 2:
txt += "\tfunction_id = "+str(self.function_id)+" -> Read Discrete Input\n"
elif self.function_id == 3:
txt += "\tfunction_id = "+str(self.function_id)+" -> Read Holding Registers\n"
txt += "\tnb_octets = "+str(self.nb_octets)+"\n"
txt += "\tvalue(s) = {"
if self.request is not None and self.request.fst_register is not None and self.nb_octets is not None and self.request.function_id == self.function_id:
dic = self.decodeValuesForRead()
for i in range(int(self.request.fst_register), int(self.nb_octets/2)+int(self.request.fst_register)):
if i%1==0:
txt += '\n'
txt += '\t\t\t'+str(i)+' ('+str(self.registersNames[str(i)])+') -> '+dic[i]+" ("+self.computeState(dic[i])+")"
txt += "\n\t\t}\n"
elif self.function_id == 4:
txt += "\tfunction_id = "+str(self.function_id)+" -> Read Input Registers\n"
elif self.function_id == 5:
txt += "\tfunction_id = "+str(self.function_id)+" -> Write Single Coil\n"
elif self.function_id == 6:
txt += "\tfunction_id = "+str(self.function_id)+" -> Write Single Holding Register\n"
elif self.function_id == 16:
txt += "\tfunction_id = "+str(self.function_id)+" -> Write Multiple Registers\n"
if str(self.fst_register) in self.registersNames.keys():
txt += "\tfst_w_register = "+str(self.fst_register)+" - "+self.registersNames[str(self.fst_register)]+"\n"
else:
txt += "\tfst_w_register = "+str(self.fst_register)+" -> NOT LISTED\n"
txt += "\tnb_registers = "+str(self.nb_registers)+"\n"
elif self.function_id == 15:
txt += "\tfunction_id = "+str(self.function_id)+" -> Write Multiple Coils\n"
txt += "\tfst_w_register = "+str(self.fst_register)+"\n"
txt += "\tnb_registers = "+str(self.nb_registers)+"\n"
txt += "\tnb_octets = "+str(self.nb_octets)+"\n"
txt += "\tvalues = "+str(self.registers)+"\n"
elif self.function_id == 129:
txt += "\tfunction_id = "+str(self.function_id)+" -> Exception\n"
elif self.function_id == 130 or self.function_id == 131 or self.function_id == 132 or self.function_id == 133 or self.function_id == 134 or self.function_id == 143 or self.function_id == 144:
txt += "\tfunction_id = "+str(self.function_id)+" -> Functional error code in response\n"
else:
txt += "\tfunction_id = "+str(self.function_id)+" -> ILLEGAL_FUNCTION | NON-MODBUS PACKET\n"
txt += "}\n"
return(txt)
def decodeValuesForRead(self):
dicRegisters = {}
j=0
for i in range(self.request.fst_register, int(self.nb_octets/2)+self.request.fst_register):
dicRegisters[i]=self.registers[j:j+4]
j+=4
return dicRegisters
def computeState(self, state):
s = ""
if state[:2] == "00":
s += "KNOWN-COMMANDABLE"
elif state[:2] == "01":
s += "UNKNOWN-COMMANDABLE"
elif state[:2] == "02":
s += "KNOWN-UNCOMMANDABLE"
elif state[:2] == "03":
s += "UNKNOWN-UNCOMMANDABLE"
s += "-"
if state[2:] == "00":
if self.scada_ene and self.scada_eqpt:
s += "0V | PARC"
elif self.scada_ene and not self.scada_eqpt:
s += "0V"
else:
s += "PARC"
elif state[2:] == "01":
if self.scada_ene and self.scada_eqpt:
s += "900V | BREL"
elif self.scada_ene and not self.scada_eqpt:
s += "900V"
else:
s += "BREL"
return s
def extractData(self):
self.cleanData()
try:
if len(self.data) > 16:
self.transaction_id = int(self.data[:4],16)
self.proto_id = int(self.data[4:8],16)
self.length = int(self.data[8:12],16)
self.addr_slave = int(self.data[12:14],16)
self.function_id = int(self.data[14:16],16)
if self.isTx:
self.fst_register = int(self.data[16:20],16)
if self.function_id == 3:
self.nb_registers = int(self.data[20:],16)
elif self.function_id == 16:
self.nb_registers = int(self.data[20:24],16)
self.nb_octets = int(self.data[24:26],16)
self.registers = self.data[26:]
elif self.function_id == 15:
self.nb_registers = int(self.data[20:24],16)
self.nb_octets = int(self.data[24:26],16)
self.registers = self.data[26:]
else:
if self.function_id == 3:
self.nb_octets = int(self.data[16:18],16)
self.registers = self.data[18:]
elif self.function_id == 16:
self.fst_register = int(self.data[16:20],16)
self.nb_registers = int(self.data[20:],16)
elif self.function_id == 15:
self.fst_register = int(self.data[16:20],16)
self.nb_registers = int(self.data[20:],16)
except Exception as e:
print('Wrong data to make a modbus message')
print(e)
# scapy is crap script .. he's sending Hex string with char when it's detected. E.g. you can have \x00V ... V is 56, then you can miss a byte :)
# So, we need to clean that shit
def cleanData(self):
chars="0123456789abcdef"
temp = ""
for b in self.data:
if b not in chars:
temp += str(binascii.hexlify(bytes(b, 'utf-8')))[1:]
else:
temp += b
self.data = temp
self.data = str(self.data).replace("'","")
def getListOfPacketsFromPcapng(file, ip_ats, ip_srvr):
packets = rdpcap(file)
listofpackets = []
layer = ""
for packet in packets:
sortedPacket = {}
sortedPacket['timestamp']=packet.time
raw_modbus =""
if 'Raw' in packet.show(dump=True):
for b in str(bytes_hex(packet['Raw'].load)).replace('x','').replace('\\','')[2:-1]:
raw_modbus+=str(b)
sortedPacket['raw_modbus']=raw_modbus
try:
for line in packet.show2(dump=True).split('\n'):
if '###' in line:
layer = line.strip('#[] ')
sortedPacket[layer] = {}
elif '=' in line:
key, val = line.split('=', 1)
sortedPacket[layer][key.strip()] = val.strip()
else:
pass
#Delete packet where IP are not correct and where there is no modbus data
if 'IP' in sortedPacket.keys():
if (sortedPacket['IP']['src'] == ip_ats and sortedPacket['IP']['dst'] == ip_srvr) or (sortedPacket['IP']['src'] == ip_srvr and sortedPacket['IP']['dst'] == ip_ats):
if 'TCP' in sortedPacket.keys() and 'Raw' in sortedPacket.keys():
listofpackets.append(sortedPacket)
except Exception as e:
print('***EXCEPTION***')
print(e)
return listofpackets
def buildModbusObjects(packets, settings):
ListOfModbusObjects = []
# All packets are taken in ATS sys (ATS = source of all)
for packet in packets:
if packet['IP']['src'] == settings.ip_ats and packet['IP']['dst'] == settings.ip_srvr:
tx = ModbusPacket(True, packet['IP']['src'], packet['IP']['dst'], packet['TCP']['sport'], packet['TCP']['dport'], datetime.datetime.fromtimestamp(int(packet['timestamp'])), packet['raw_modbus'], settings.registersNames, settings.scada_ene, settings.scada_eqpt, settings.plc)
ListOfModbusObjects.append(tx)
elif packet['IP']['src'] == settings.ip_srvr and packet['IP']['dst'] == settings.ip_ats:
tx = ModbusPacket(False, packet['IP']['src'], packet['IP']['dst'], packet['TCP']['sport'], packet['TCP']['dport'], datetime.datetime.fromtimestamp(int(packet['timestamp'])), packet['raw_modbus'], settings.registersNames, settings.scada_ene, settings.scada_eqpt, settings.plc)
ListOfModbusObjects.append(tx)
else:
pass
return ListOfModbusObjects
if __name__ == '__main__':
print(text2art("DecodeModbusLog"))
print("Getting settings ...")
s = Settings('decode_modbus.json')
print("Parsing PCAPNG file: "+s.log_file+" ...")
packets = getListOfPacketsFromPcapng(s.log_file, s.ip_ats, s.ip_srvr)
print("Building Modbus Objetcs ...")
objects = buildModbusObjects(packets, s)
print("Printing all packets in output file:"+ s.output_file+" ...")
output_log = open(s.output_file, 'w')
output_log.write(text2art("DecodeModbusLog"))
if len(objects) == 0 :
output_log.write(str(len(objects))+" MODBUS PACKETS WERE FOUND :( !\n\n")
output_log.write("PLEASE CHECK JSON SETTINGS FILE (Are IP correct ?)")
else:
output_log.write(str(len(objects))+" MODBUS PACKETS WERE FOUND :) !\n\n")
for o in objects:
if not o.isTx:
for i in objects:
if i.transaction_id == o.transaction_id:
o.request = i
break
output_log.write(str(o))
output_log.close()
print("OK.")
input()