Split this code out.
authorBernard Blackham <[email protected]>
Fri, 25 Jun 2004 15:33:11 +0000 (15:33 +0000)
committerBernard Blackham <[email protected]>
Fri, 25 Jun 2004 15:33:11 +0000 (15:33 +0000)
sql-edition/servers/VendServer.py
sql-edition/servers/VendingMachine.py [new file with mode: 0644]

index c6a9416..3f806bc 100755 (executable)
@@ -4,117 +4,7 @@ import sys, os, string, socket, time, re
 from popen2 import popen2
 from pyPgSQL import PgSQL
 from LATClient import LATClient
 from popen2 import popen2
 from pyPgSQL import PgSQL
 from LATClient import LATClient
-from CRC import do_crc
-from binascii import unhexlify
-
-asynchronous_responses = [     '400', '401', # door open/closed
-                                                       '610',        # switches changed
-                                                ]
-
-class VendingMachine:
-       def __init__(self, rfh, wfh):
-               self.secret = 'AAAAAAAAAAAAAAAA'
-               self.rfh = rfh
-               self.wfh = wfh
-               self.challenge = None
-               # Initialise ourselves into a known state
-               self.wfh.write('\n')
-               self.await_prompt()
-               self.wfh.write('echo off\n')
-               self.await_prompt()
-               self.wfh.write('PING\n')
-               code = ''
-               while code != '000':
-                       (code, _) = self.get_response()
-
-       def await_prompt(self):
-               self.wfh.flush()
-               state = 1
-               prefix = ''
-               s = ''
-               while True:
-                       s = self.rfh.read(1)
-                       if s == '': raise Exception
-                       if s == '\n' or s == '\r':
-                               state = 1
-                               prefix = ''
-                       if (s == '#' or s == '%') and state == 1: state = 2
-                       if s == ' ' and state == 2:
-                               if prefix == '':
-                                       self.challenge = None
-                                       return
-                               if re.search('^[0-9a-fA-F]{4}$', prefix):
-                                       self.challenge = unhexlify(prefix)
-                                       return
-
-       def get_response(self):
-               self.wfh.flush()
-               while True:
-                       s = ''
-                       while s == '':
-                               s = self.rfh.readline()
-                               if s == '': return None
-                               s = s.strip('\r\n')
-                       code = s[0:3]
-                       text = s[4:]
-                       if code in asynchronous_responses:
-                               self.handle_event(code, text)
-                       else:
-                               self.await_prompt()
-                               return (code, text)
-
-       def handle_event(self, code, text):
-               pass
-
-       def authed_message(self, message):
-               if self.challenge == None:
-                       return message
-               crc = do_crc('%c%c'%(self.challenge >> 8, self.challenge & 0xff))
-               crc = do_crc(self.secret, crc)
-               crc = do_crc(message, crc)
-               return message+'|'+('%04x'%crc)
-
-       def ping(self):
-               self.wfh.write('PING\n')
-               (code, string) = self.get_response()
-               return (code == '000', code, string)
-
-       def vend(self, item):
-               if not re.search('^[0-9][0-9]$', item):
-                       return (False, 'Invalid item requested (%s)'%item)
-               self.wfh.write(self.authed_message(('V%s\n'%item)+'\n'))
-               (code, string) = self.get_response()
-               return (code, string)
-
-       def beep(self, duration = None, synchronous = True):
-               msg = 'B'
-               if synchronous: msg += 'S'
-               if duration != None:
-                       if duration > 255: duration = 255
-                       if duration < 1: duration = 1
-                       msg += '%02x'%duration
-               self.wfh.write(msg+'\n')
-               (code, string) = self.get_response()
-               return (code == '500', code, string)
-
-       def silence(self, duration = None, synchronous = True):
-               msg = 'C'
-               if synchronous: msg += 'S'
-               if duration != None:
-                       if duration > 255: duration = 255
-                       if duration < 1: duration = 1
-                       msg += '%02x'%duration
-               self.wfh.write(msg+'\n')
-               (code, string) = self.get_response()
-               # FIXME: workaround a bug in rom W. should be just: return (code == '500', code, string)
-               return (code == '500' or code == '501', code, string)
-
-       def display(self, string):
-               if len(string) > 10:
-                       string = string[0:10]
-               self.wfh.write('D'+string+'\n')
-               (code, string) = self.get_response()
-               return (code == '300', code, string)
+from VendingMachine import VendingMachine
 
 if __name__ == '__main__':
        # Open vending machine via LAT
 
 if __name__ == '__main__':
        # Open vending machine via LAT
diff --git a/sql-edition/servers/VendingMachine.py b/sql-edition/servers/VendingMachine.py
new file mode 100644 (file)
index 0000000..3046360
--- /dev/null
@@ -0,0 +1,113 @@
+import re
+from CRC import do_crc
+from binascii import unhexlify
+
+asynchronous_responses = [     '400', '401', # door open/closed
+                               '610',        # switches changed
+                        ]
+
+class VendingMachine:
+       def __init__(self, rfh, wfh):
+               self.secret = 'AAAAAAAAAAAAAAAA'
+               self.rfh = rfh
+               self.wfh = wfh
+               self.challenge = None
+               # Initialise ourselves into a known state
+               self.wfh.write('\n')
+               self.await_prompt()
+               self.wfh.write('echo off\n')
+               self.await_prompt()
+               self.wfh.write('PING\n')
+               code = ''
+               while code != '000':
+                       (code, _) = self.get_response()
+
+       def await_prompt(self):
+               self.wfh.flush()
+               state = 1
+               prefix = ''
+               s = ''
+               while True:
+                       s = self.rfh.read(1)
+                       if s == '': raise Exception
+                       if s == '\n' or s == '\r':
+                               state = 1
+                               prefix = ''
+                       if (s == '#' or s == '%') and state == 1: state = 2
+                       if s == ' ' and state == 2:
+                               if prefix == '':
+                                       self.challenge = None
+                                       return
+                               if re.search('^[0-9a-fA-F]{4}$', prefix):
+                                       self.challenge = unhexlify(prefix)
+                                       return
+
+       def get_response(self):
+               self.wfh.flush()
+               while True:
+                       s = ''
+                       while s == '':
+                               s = self.rfh.readline()
+                               if s == '': return None
+                               s = s.strip('\r\n')
+                       code = s[0:3]
+                       text = s[4:]
+                       if code in asynchronous_responses:
+                               self.handle_event(code, text)
+                       else:
+                               self.await_prompt()
+                               return (code, text)
+
+       def handle_event(self, code, text):
+               pass
+
+       def authed_message(self, message):
+               if self.challenge == None:
+                       return message
+               crc = do_crc('%c%c'%(self.challenge >> 8, self.challenge & 0xff))
+               crc = do_crc(self.secret, crc)
+               crc = do_crc(message, crc)
+               return message+'|'+('%04x'%crc)
+
+       def ping(self):
+               self.wfh.write('PING\n')
+               (code, string) = self.get_response()
+               return (code == '000', code, string)
+
+       def vend(self, item):
+               if not re.search('^[0-9][0-9]$', item):
+                       return (False, 'Invalid item requested (%s)'%item)
+               self.wfh.write(self.authed_message(('V%s\n'%item)+'\n'))
+               (code, string) = self.get_response()
+               return (code, string)
+
+       def beep(self, duration = None, synchronous = True):
+               msg = 'B'
+               if synchronous: msg += 'S'
+               if duration != None:
+                       if duration > 255: duration = 255
+                       if duration < 1: duration = 1
+                       msg += '%02x'%duration
+               self.wfh.write(msg+'\n')
+               (code, string) = self.get_response()
+               return (code == '500', code, string)
+
+       def silence(self, duration = None, synchronous = True):
+               msg = 'C'
+               if synchronous: msg += 'S'
+               if duration != None:
+                       if duration > 255: duration = 255
+                       if duration < 1: duration = 1
+                       msg += '%02x'%duration
+               self.wfh.write(msg+'\n')
+               (code, string) = self.get_response()
+               # FIXME: workaround a bug in rom W. should be just: return (code == '500', code, string)
+               return (code == '500' or code == '501', code, string)
+
+       def display(self, string):
+               if len(string) > 10:
+                       string = string[0:10]
+               self.wfh.write('D'+string+'\n')
+               (code, string) = self.get_response()
+               return (code == '300', code, string)
+

UCC git Repository :: git.ucc.asn.au