2 # UCC Door Server - hardware interface server
3 # reads status of reed switches and provides a D-BUS interface to read them
6 # Released under an MIT-style license; see COPYING for details.
8 # statuses: 1 means open circuit, 0 means closed circuit, -1 means error
10 import dbus, dbus.service
12 from LATClient import LATClient
13 from select import select
14 from threading import Timer
16 def check_LAT_service(servicename, retries=3):
17 latclient = LATClient(service=servicename.upper())
18 rfh, wfh = latclient.get_fh()
19 results = {'error': retries, 'closed': retries, 'open': retries}
20 test_string = "got wombles?"
22 # Only return after n(retries) consistent results, although not necessarily
23 # in order. This means we poll at least (retries) and up to (retries ** 2) times.
24 # This is mtearle's original code. No idea if it's the Right Thing.
27 wfh.write(test_string)
31 if results['error'] == 0: return -1
33 rr, wr, er = select([rfh], [], [], 3.0)
36 if results['open'] == 0: return 1
38 recv = rfh.read(len(test_string))
39 if recv <> test_string:
41 if results['error'] == 0: return -1
43 results['closed'] -= 1
44 if results['closed'] == 0: return 0
46 class Door(dbus.service.Object):
48 def __init__(self, doorname, bus):
49 self.interval = 10 # seconds
50 self.service = doorname
53 if 'pir' in self.service:
56 # set up D-BUS service name
57 object_path = '/au/asn/ucc/doors/%s' % doorname
58 dbus.service.Object.__init__(self, bus, object_path)
60 # get initial state in a new thread
61 self.timeout = Timer(0, self.poll)
62 # daemon threads will be killed when the mainloop exits
63 # the timeout threads will inherit this value
64 self.timeout.setDaemon(True)
69 newstatus = check_LAT_service(self.service, self.retries)
73 if newstatus != self.status:
74 self.status = newstatus
76 self.status_changed(newstatus)
78 # set up timeout again
79 self.timeout = Timer(self.interval, self.poll)
82 @dbus.service.signal('au.asn.ucc.DoorInterface', signature='n')
83 def status_changed(self, newstatus):
86 @dbus.service.method('au.asn.ucc.DoorInterface', in_signature='',
91 if __name__ == '__main__':
92 gobject.threads_init()
93 doors = ('uccdoor', 'unisfadoor', 'chdoor', 'mrdoor', 'uccpir')
95 from dbus.mainloop.glib import DBusGMainLoop
96 DBusGMainLoop(set_as_default=True)
98 system_bus = dbus.SystemBus()
99 system_bus.request_name('au.asn.ucc.DoorServer')
104 door_objects.append(Door(door, system_bus))
107 loop = gobject.MainLoop()
110 except KeyboardInterrupt: