132 lines
4.0 KiB
Python
132 lines
4.0 KiB
Python
from pycomm.ab_comm.clx import Driver as ClxDriver
|
|
import sys
|
|
from time import sleep
|
|
import sqlite3 as lite
|
|
|
|
con = lite.connect("/mnt/usb/data.db")
|
|
PLC_IP_ADDRESS = "192.168.1.10"
|
|
PLC_TYPE = "VFD"
|
|
|
|
def closeEnough(a,b):
|
|
return abs(a - b) <= 0.1
|
|
|
|
def getPLCIP():
|
|
global PLC_IP_ADDRESS, PLC_TYPE
|
|
with con:
|
|
cur = con.cursor()
|
|
query = "SELECT * FROM config ORDER BY dateChanged DESC LIMIT 1;"
|
|
cur.execute(query)
|
|
setup = cur.fetchall()
|
|
try:
|
|
PLC_IP_ADDRESS = setup[0][2]
|
|
PLC_TYPE = setup[0][1]
|
|
except:
|
|
PLC_IP_ADDRESS = "192.168.1.10"
|
|
PLC_TYPE = "VFD"
|
|
return
|
|
|
|
def readTag(tagName):
|
|
global PLC_IP_ADDRESS
|
|
|
|
c = ClxDriver()
|
|
|
|
def readString(tag):
|
|
read_vals = c.read_array(tag, 82)
|
|
string = filter(lambda b: b != "",map(lambda a: chr(a[1]),read_vals))
|
|
return "".join(string)
|
|
|
|
if c.open(PLC_IP_ADDRESS):
|
|
out = {}
|
|
try:
|
|
result = c.read_tag([tagName])
|
|
if result[0][2] == None:
|
|
raise ValueError('Tag not found')
|
|
out['status'] = "success"
|
|
out['value'] = result[0][1]
|
|
out['type'] = result[0][2]
|
|
except Exception, e:
|
|
out['status'] = "error"
|
|
out['message'] = "Tag Not Found"
|
|
c.close()
|
|
return out
|
|
|
|
def main(tag, value):
|
|
global PLC_IP_ADDRESS
|
|
getPLCIP()
|
|
|
|
r = 0
|
|
|
|
readObj = readTag(tag)
|
|
if readObj['status'] == "error":
|
|
return readObj
|
|
elif readObj['status'] == 'success':
|
|
tagType = readObj['type']
|
|
|
|
if tagType[:-3] == "INT" or tagType == "BOOL":
|
|
value = int(value)
|
|
elif tagType == "REAL":
|
|
value = float(value)
|
|
c = ClxDriver()
|
|
if c.open(PLC_IP_ADDRESS):
|
|
r = c.write_tag(tag, value, tagType)
|
|
else:
|
|
return {"status": 'error', "message": "not connected to PLC"}
|
|
c.close()
|
|
|
|
sleep(2)
|
|
|
|
return readTag(tag)
|
|
|
|
def writeTagAndVerify(tag,value, sleepValue=2):
|
|
"""Writes the specified value to tag and confirms that the value has been set"""
|
|
global PLC_IP_ADDRESS
|
|
r = 0
|
|
getPLCIP()
|
|
|
|
readObj = readTag(tag)
|
|
if readObj['status'] == "error":
|
|
return readObj
|
|
elif readObj['status'] == 'success':
|
|
tagType = readObj['type']
|
|
prevValue = readObj['value']
|
|
if tagType[:-3] == "INT" or tagType == "BOOL":
|
|
value = int(value)
|
|
elif tagType == "REAL":
|
|
value = float(value)
|
|
c = ClxDriver()
|
|
if c.open(PLC_IP_ADDRESS):
|
|
r = c.write_tag(tag, value, tagType)
|
|
sleep(float(sleepValue))
|
|
newObj = readTag(tag)
|
|
if newObj['status'] == "error":
|
|
return newObj
|
|
elif newObj['status'] == 'success':
|
|
newTagType = newObj['type']
|
|
newValue = newObj['value']
|
|
if (newTagType == tagType):
|
|
if (closeEnough(newValue, value)):
|
|
return {'status':'success', 'value':newValue, 'type':newTagType, 'verified':'true','prevValue':prevValue}
|
|
else:
|
|
return {"status": 'error', "message": "The tag value does not match the specified value", "value":newValue}
|
|
else:
|
|
return {"status": 'error', "message": "Somehow the tag type has changed"}
|
|
else:
|
|
return {"status": 'error', "message": "not connected to PLC"}
|
|
c.close()
|
|
|
|
def toggle(tag, toggleVal, timeToggled=2):
|
|
val = int(toggleVal)
|
|
set_tag = writeTagAndVerify(tag,toggleVal,sleepValue=1)
|
|
if set_tag['status']== 'success':
|
|
sleep(timeToggled)
|
|
reset_tag = writeTagAndVerify(tag, set_tag['prevValue'],sleepValue=1)
|
|
if reset_tag['status'] == "success":
|
|
return {'status':'success'}
|
|
else:
|
|
return {'status':'error', 'message':reset_tag['message']}
|
|
else:
|
|
return {'status':'error', 'message':set_tag['message']}
|
|
|
|
if __name__ == '__main__':
|
|
print main(sys.argv[1], sys.argv[2])
|