Files
datalogger-POC/SQLite/writeTag.py
2016-03-31 16:41:49 -05:00

132 lines
4.0 KiB
Python

from pycomm.ab_comm.clx import Driver as ClxDriver
import sys
from time import sleep
import sqlite3 as lite
con = lite.connect("/mnt/usb/data.db")
PLC_IP_ADDRESS = "192.168.1.10"
PLC_TYPE = "VFD"
def closeEnough(a,b):
return abs(a - b) <= 0.1
def getPLCIP():
global PLC_IP_ADDRESS, PLC_TYPE
with con:
cur = con.cursor()
query = "SELECT * FROM config ORDER BY dateChanged DESC LIMIT 1;"
cur.execute(query)
setup = cur.fetchall()
try:
PLC_IP_ADDRESS = setup[0][2]
PLC_TYPE = setup[0][1]
except:
PLC_IP_ADDRESS = "192.168.1.10"
PLC_TYPE = "VFD"
return
def readTag(tagName):
global PLC_IP_ADDRESS
c = ClxDriver()
def readString(tag):
read_vals = c.read_array(tag, 82)
string = filter(lambda b: b != "",map(lambda a: chr(a[1]),read_vals))
return "".join(string)
if c.open(PLC_IP_ADDRESS):
out = {}
try:
result = c.read_tag([tagName])
if result[0][2] == None:
raise ValueError('Tag not found')
out['status'] = "success"
out['value'] = result[0][1]
out['type'] = result[0][2]
except Exception, e:
out['status'] = "error"
out['message'] = "Tag Not Found"
c.close()
return out
def main(tag, value):
global PLC_IP_ADDRESS
getPLCIP()
r = 0
readObj = readTag(tag)
if readObj['status'] == "error":
return readObj
elif readObj['status'] == 'success':
tagType = readObj['type']
if tagType[:-3] == "INT" or tagType == "BOOL":
value = int(value)
elif tagType == "REAL":
value = float(value)
c = ClxDriver()
if c.open(PLC_IP_ADDRESS):
r = c.write_tag(tag, value, tagType)
else:
return {"status": 'error', "message": "not connected to PLC"}
c.close()
sleep(2)
return readTag(tag)
def writeTagAndVerify(tag,value, sleepValue=2):
"""Writes the specified value to tag and confirms that the value has been set"""
global PLC_IP_ADDRESS
r = 0
getPLCIP()
readObj = readTag(tag)
if readObj['status'] == "error":
return readObj
elif readObj['status'] == 'success':
tagType = readObj['type']
prevValue = readObj['value']
if tagType[:-3] == "INT" or tagType == "BOOL":
value = int(value)
elif tagType == "REAL":
value = float(value)
c = ClxDriver()
if c.open(PLC_IP_ADDRESS):
r = c.write_tag(tag, value, tagType)
sleep(float(sleepValue))
newObj = readTag(tag)
if newObj['status'] == "error":
return newObj
elif newObj['status'] == 'success':
newTagType = newObj['type']
newValue = newObj['value']
if (newTagType == tagType):
if (closeEnough(newValue, value)):
return {'status':'success', 'value':newValue, 'type':newTagType, 'verified':'true','prevValue':prevValue}
else:
return {"status": 'error', "message": "The tag value does not match the specified value", "value":newValue}
else:
return {"status": 'error', "message": "Somehow the tag type has changed"}
else:
return {"status": 'error', "message": "not connected to PLC"}
c.close()
def toggle(tag, toggleVal, timeToggled=2):
val = int(toggleVal)
set_tag = writeTagAndVerify(tag,toggleVal,sleepValue=1)
if set_tag['status']== 'success':
sleep(timeToggled)
reset_tag = writeTagAndVerify(tag, set_tag['prevValue'],sleepValue=1)
if reset_tag['status'] == "success":
return {'status':'success'}
else:
return {'status':'error', 'message':reset_tag['message']}
else:
return {'status':'error', 'message':set_tag['message']}
if __name__ == '__main__':
print main(sys.argv[1], sys.argv[2])