# UCC Door Server - hardware interface server
# reads status of reed switches and provides a D-BUS interface to read them
# Released under an MIT-style license; see COPYING for details.
# statuses: 1 means open circuit, 0 means closed circuit, -1 means error
import dbus, dbus.service
import gobject
+from LATClient import LATClient
+from select import select
+
+def check_LAT_service(servicename, retries=3):
+ latclient = LATClient(service=servicename.upper())
+ rfh, wfh = latclient.get_fh()
+ results = {'error': retries, 'closed': retries, 'open': retries}
+ test_string = "got wombles?"
+
+ # Only return after n(retries) consistent results, although not necessarily
+ # in order. This means we poll at least (retries) and up to (retries ** 2) times.
+ # This is mtearle's original code. No idea if it's the Right Thing.
+ while True:
+ try:
+ wfh.write(test_string)
+ wfh.flush()
+ except:
+ results['error'] -= 1
+ if results['error'] == 0: return -1
+ continue
+ rr, wr, er = select([rfh], [], [], 3.0)
+ if rfh not in rr:
+ results['open'] -= 1
+ if results['open'] == 0: return 1
+ continue
+ recv = rfh.read(len(test_string))
+ if recv <> test_string:
+ results['error'] -= 1
+ if results['error'] == 0: return -1
+ continue
+ results['closed'] -= 1
+ if results['closed'] == 0: return 0
class Door(dbus.service.Object):
self.interval = 10 # seconds
self.service = doorname
self.status = -1
+ self.retries = 3
+ if 'pir' in self.service:
+ self.retries = 1
# set up D-BUS service name
object_path = '/au/asn/ucc/doors/%s' % doorname
self.poll()
def poll(self):
- # check LAT
- # XXX to be added
+ try:
+ newstatus = check_LAT_service(self.service, self.retries)
+ except:
+ newstatus = -1
- newstatus = -1
if newstatus != self.status:
self.status = newstatus
# emit signal