277 lines
8.3 KiB
Python
277 lines
8.3 KiB
Python
#! /usr/bin/env python
|
|
|
|
# Copyright (C) 2014 Gayner Technical Services Pty Ltd
|
|
|
|
from ctypes import *
|
|
|
|
# PLC TYPES
|
|
Unknow=0
|
|
PLC=1
|
|
SLC500=2
|
|
LGX=3
|
|
|
|
# EIP DATA TYPES
|
|
PLC_BIT=1
|
|
PLC_BIT_STRING=2
|
|
PLC_BYTE_STRING=3
|
|
PLC_INTEGER=4
|
|
PLC_TIMER=5
|
|
PLC_COUNTER=6
|
|
PLC_CONTROL=7
|
|
PLC_FLOATING=8
|
|
PLC_ARRAY=9
|
|
PLC_ADRESS=15
|
|
PLC_BCD=16
|
|
|
|
# LOGIX DATA TYPES
|
|
LGX_BOOL=0xC1
|
|
LGX_BITARRAY=0xD3
|
|
LGX_SINT=0xC2
|
|
LGX_INT=0xC3
|
|
LGX_DINT=0xC4
|
|
LGX_REAL=0xCA
|
|
|
|
class Eip_Session(Structure):
|
|
_fields_ = [
|
|
('sock',c_int),
|
|
('Session_Handle', c_uint),
|
|
('Sender_ContextL',c_int),
|
|
('Sender_ContextH',c_int),
|
|
('timeout', c_int),
|
|
('references', c_int),
|
|
('Data', c_void_p),
|
|
]
|
|
|
|
class Eip_Connection(Structure):
|
|
_fields_ = [
|
|
('Eip_Session', Eip_Session),
|
|
('references', c_int),
|
|
('Data', c_void_p),
|
|
('ConnectionSerialNumber', c_uint),
|
|
('OriginatorVendorID', c_uint),
|
|
('OriginatorSerialNumber', c_int),
|
|
('OT_ConnID', c_int),
|
|
('TO_ConnID', c_int),
|
|
('packet', c_short),
|
|
('Path_size', c_byte)
|
|
]
|
|
|
|
class Eip_PLC_Read(Structure):
|
|
_fields_ = [
|
|
('type', c_int),
|
|
('Varcount', c_int),
|
|
('totalise', c_int),
|
|
('elementsize', c_int),
|
|
('mask', c_uint),
|
|
]
|
|
|
|
class TuxEIPException(Exception):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def __str__(self):
|
|
return repr(self.value)
|
|
|
|
class TuxEIP:
|
|
|
|
def __init__(self, **kwargs):
|
|
self.__libpath = kwargs.get("libpath", "libtuxeip.dylib")
|
|
self.__tuxeip = CDLL(self.__libpath)
|
|
self.__tuxeip._cip_err_msg.restype = c_char_p
|
|
|
|
def __del__(self):
|
|
del self.__tuxeip
|
|
|
|
def OpenSession(self, slaveip_, slaveport_=44818, slavetimeout_=1000):
|
|
self.__tuxeip._OpenSession.restype = POINTER(Eip_Session)
|
|
|
|
# Convert params to C types
|
|
slaveip = c_char_p(slaveip_)
|
|
slaveport = c_int(slaveport_)
|
|
slavetimeout = c_int(slavetimeout_)
|
|
|
|
session = self.__tuxeip._OpenSession(slaveip, slaveport, slavetimeout)
|
|
|
|
#print self.__tuxeip._cip_err_msg, self.__tuxeip._cip_errno, self.__tuxeip._cip_ext_errno
|
|
|
|
if bool(session) == False:
|
|
raise TuxEIPException("Could not open session to " + str(slaveip) + ":" + str(slaveport))
|
|
|
|
return session
|
|
|
|
def RegisterSession(self, sess_):
|
|
self.__tuxeip._RegisterSession.restype = c_int
|
|
reg = self.__tuxeip._RegisterSession(sess_)
|
|
|
|
if reg != False:
|
|
raise TuxEIPException("Could not register session")
|
|
|
|
return reg
|
|
|
|
def ConnectPLCOverCNET(self, sess_, plctype_, priority_, timeoutticks_, connid_, conserial_,
|
|
vendorid_, serialnum_, timeoutmult_, rpi_, transport_, slavepath_):
|
|
# Convert params to C types
|
|
priority = c_byte(priority_)
|
|
timeoutticks = c_byte(timeoutticks_)
|
|
connid = c_uint(connid_)
|
|
conserial = c_ushort(conserial_)
|
|
vendorid = c_ushort(vendorid_)
|
|
serialnum = c_uint(serialnum_)
|
|
timeutmult = c_byte(timeoutmult_)
|
|
rpi = c_uint(rpi_)
|
|
transport = c_byte(transport_)
|
|
slavepath = c_char_p(slavepath_)
|
|
pathlength = len(slavepath_)
|
|
|
|
self.__tuxeip._ConnectPLCOverCNET.restype = POINTER(Eip_Connection)
|
|
|
|
connection = self.__tuxeip._ConnectPLCOverCNET(
|
|
sess_,
|
|
plctype_,
|
|
priority,
|
|
timeoutticks,
|
|
connid,
|
|
conserial,
|
|
vendorid,
|
|
serialnum,
|
|
timeutmult,
|
|
rpi,
|
|
transport,
|
|
slavepath,
|
|
pathlength
|
|
)
|
|
|
|
if bool(connection) == False:
|
|
raise TuxEIPException("Could not connect to CPU")
|
|
|
|
return connection
|
|
|
|
def ReadLgxData(self, sess_, conn_, var_, num_):
|
|
self.__tuxeip._ReadLgxData.restype = POINTER(Eip_PLC_Read)
|
|
readdata = self.__tuxeip._ReadLgxData(sess_, conn_, var_, num_)
|
|
|
|
if bool(readdata) == False:
|
|
raise TuxEIPException("Read data failed")
|
|
|
|
return readdata
|
|
|
|
def WriteLGXData(self, sess_, conn_, address_, datatype_, data_, num_ ):
|
|
if datatype_ == LGX_INT or datatype_ == LGX_BOOL or datatype_ == LGX_DINT or datatype_ == LGX_SINT:
|
|
data = c_int(data_)
|
|
elif datatype_ == LGX_REAL:
|
|
data = c_float(data_)
|
|
else:
|
|
raise TuxEIPException("Write data failed")
|
|
|
|
data = self.__tuxeip._WriteLgxData(sess_, conn_, address_, datatype_, byref(data), num_)
|
|
|
|
return data
|
|
|
|
def ReadLGXDataAsFloat(self, sess_, conn_, var_, num_):
|
|
data = self.ReadLgxData(sess_, conn_, var_, num_)
|
|
d = self.GetLGXValueAsFloat(data)
|
|
self.FreePLCRead(data)
|
|
return d
|
|
|
|
def ReadLGXDataAsInteger(self, sess_, conn_, var_, num_):
|
|
data = self.ReadLgxData(sess_, conn_, var_, num_)
|
|
d = self.GetLGXValueAsInteger(data)
|
|
self.FreePLCRead(data)
|
|
return d
|
|
|
|
def ReadPLCDataAsFloat(self, sess_, conn_, dhp_, routepath_, routesize_, plctype_, tns_, address_, number_):
|
|
data = self.ReadPLCData(sess_, conn_, dhp_, routepath_, routesize_, plctype_, tns_, address_, number_)
|
|
d = self.PCCC_GetValueAsFloat(data)
|
|
self.FreePLCRead(data)
|
|
return d
|
|
|
|
def ReadPLCDataAsInteger(self, sess_, conn_, dhp_, routepath_, routesize_, plctype_, tns_, address_, number_):
|
|
data = self.ReadPLCData(sess_, conn_, dhp_, routepath_, routesize_, plctype_, tns_, address_, number_)
|
|
d = self.PCCC_GetValueAsInteger(data)
|
|
self.FreePLCRead(data)
|
|
return d
|
|
|
|
def ReadPLCData(self, sess_, conn_, dhp_, routepath_, routesize_, plctype_, tns_, address_, number_):
|
|
self.__tuxeip._ReadPLCData.restype = POINTER(Eip_PLC_Read)
|
|
readdata = self.__tuxeip._ReadPLCData(sess_, conn_, dhp_, routepath_, routesize_, plctype_,
|
|
tns_, address_, number_)
|
|
|
|
if bool(readdata) == False:
|
|
raise TuxEIPException("Read data failed")
|
|
|
|
return readdata
|
|
|
|
def GetLGXValueAsFloat(self, readdata_):
|
|
if bool(readdata_) == False:
|
|
return None
|
|
|
|
self.__tuxeip._GetLGXValueAsFloat.restype = c_float
|
|
values = []
|
|
for i in range(0, readdata_.contents.Varcount):
|
|
v = self.__tuxeip._GetLGXValueAsFloat(readdata_, i)
|
|
values.append(v)
|
|
|
|
return values
|
|
|
|
def GetLGXValueAsInteger(self, readdata_):
|
|
if bool(readdata_) == False:
|
|
return None
|
|
|
|
self.__tuxeip._GetLGXValueAsInteger.restype = c_int
|
|
values = []
|
|
for i in range(0, readdata_.contents.Varcount):
|
|
v = self.__tuxeip._GetLGXValueAsInteger(readdata_, i)
|
|
values.append(v)
|
|
|
|
return values
|
|
|
|
def PCCC_GetValueAsFloat(self, readdata_):
|
|
if bool(readdata_) == False:
|
|
return None
|
|
|
|
self.__tuxeip._PCCC_GetValueAsFloat.restype = c_float
|
|
values = []
|
|
for i in range(0, readdata_.contents.Varcount):
|
|
v = self.__tuxeip._PCCC_GetValueAsFloat(readdata_, i)
|
|
values.append(v)
|
|
|
|
return values
|
|
|
|
def PCCC_GetValueAsInteger(self, readdata_):
|
|
if bool(readdata_) == False:
|
|
return None
|
|
|
|
self.__tuxeip._PCCC_GetValueAsInteger.restype = c_int
|
|
values = []
|
|
for i in range(0, readdata_.contents.Varcount):
|
|
v = self.__tuxeip._PCCC_GetValueAsInteger(readdata_, i)
|
|
values.append(v)
|
|
|
|
return values
|
|
|
|
def WritePLCData(self, sess_, conn_, dhp_, routepath_, routesize_, plctype_, tns_, address_, datatype_, data_, number_):
|
|
|
|
if datatype_ == PLC_INTEGER:
|
|
data = c_int(data_)
|
|
elif datatype_ == PLC_FLOATING:
|
|
data = c_float(data_)
|
|
else:
|
|
raise TuxEIPException("Variable type not supported" + str(datatype_))
|
|
|
|
result = self.__tuxeip._WritePLCData(sess_, conn_, dhp_, routepath_, routesize_, plctype_,
|
|
tns_, address_, datatype_, byref(data), number_)
|
|
|
|
return result
|
|
|
|
def Forward_Close(self, conn_):
|
|
self.__tuxeip._Forward_Close(conn_)
|
|
|
|
def UnRegisterSession(self, sess_):
|
|
self.__tuxeip._UnRegisterSession(sess_)
|
|
|
|
def CloseSession(self, sess_):
|
|
self.__tuxeip.CloseSession(sess_)
|
|
|
|
def FreePLCRead(self, data_):
|
|
self.__tuxeip._FreePLCRead(data_)
|