From 3d1168a537169794cb3dda12269dd0d7c00d28d2 Mon Sep 17 00:00:00 2001 From: Bernard Blackham Date: Fri, 25 Jun 2004 15:33:11 +0000 Subject: [PATCH 1/1] Split this code out. --- sql-edition/servers/VendServer.py | 112 +------------------------ sql-edition/servers/VendingMachine.py | 113 ++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 111 deletions(-) create mode 100644 sql-edition/servers/VendingMachine.py diff --git a/sql-edition/servers/VendServer.py b/sql-edition/servers/VendServer.py index c6a9416..3f806bc 100755 --- a/sql-edition/servers/VendServer.py +++ b/sql-edition/servers/VendServer.py @@ -4,117 +4,7 @@ import sys, os, string, socket, time, re from popen2 import popen2 from pyPgSQL import PgSQL from LATClient import LATClient -from CRC import do_crc -from binascii import unhexlify - -asynchronous_responses = [ '400', '401', # door open/closed - '610', # switches changed - ] - -class VendingMachine: - def __init__(self, rfh, wfh): - self.secret = 'AAAAAAAAAAAAAAAA' - self.rfh = rfh - self.wfh = wfh - self.challenge = None - # Initialise ourselves into a known state - self.wfh.write('\n') - self.await_prompt() - self.wfh.write('echo off\n') - self.await_prompt() - self.wfh.write('PING\n') - code = '' - while code != '000': - (code, _) = self.get_response() - - def await_prompt(self): - self.wfh.flush() - state = 1 - prefix = '' - s = '' - while True: - s = self.rfh.read(1) - if s == '': raise Exception - if s == '\n' or s == '\r': - state = 1 - prefix = '' - if (s == '#' or s == '%') and state == 1: state = 2 - if s == ' ' and state == 2: - if prefix == '': - self.challenge = None - return - if re.search('^[0-9a-fA-F]{4}$', prefix): - self.challenge = unhexlify(prefix) - return - - def get_response(self): - self.wfh.flush() - while True: - s = '' - while s == '': - s = self.rfh.readline() - if s == '': return None - s = s.strip('\r\n') - code = s[0:3] - text = s[4:] - if code in asynchronous_responses: - self.handle_event(code, text) - else: - self.await_prompt() - return (code, text) - - def handle_event(self, code, text): - pass - - def authed_message(self, message): - if self.challenge == None: - return message - crc = do_crc('%c%c'%(self.challenge >> 8, self.challenge & 0xff)) - crc = do_crc(self.secret, crc) - crc = do_crc(message, crc) - return message+'|'+('%04x'%crc) - - def ping(self): - self.wfh.write('PING\n') - (code, string) = self.get_response() - return (code == '000', code, string) - - def vend(self, item): - if not re.search('^[0-9][0-9]$', item): - return (False, 'Invalid item requested (%s)'%item) - self.wfh.write(self.authed_message(('V%s\n'%item)+'\n')) - (code, string) = self.get_response() - return (code, string) - - def beep(self, duration = None, synchronous = True): - msg = 'B' - if synchronous: msg += 'S' - if duration != None: - if duration > 255: duration = 255 - if duration < 1: duration = 1 - msg += '%02x'%duration - self.wfh.write(msg+'\n') - (code, string) = self.get_response() - return (code == '500', code, string) - - def silence(self, duration = None, synchronous = True): - msg = 'C' - if synchronous: msg += 'S' - if duration != None: - if duration > 255: duration = 255 - if duration < 1: duration = 1 - msg += '%02x'%duration - self.wfh.write(msg+'\n') - (code, string) = self.get_response() - # FIXME: workaround a bug in rom W. should be just: return (code == '500', code, string) - return (code == '500' or code == '501', code, string) - - def display(self, string): - if len(string) > 10: - string = string[0:10] - self.wfh.write('D'+string+'\n') - (code, string) = self.get_response() - return (code == '300', code, string) +from VendingMachine import VendingMachine if __name__ == '__main__': # Open vending machine via LAT diff --git a/sql-edition/servers/VendingMachine.py b/sql-edition/servers/VendingMachine.py new file mode 100644 index 0000000..3046360 --- /dev/null +++ b/sql-edition/servers/VendingMachine.py @@ -0,0 +1,113 @@ +import re +from CRC import do_crc +from binascii import unhexlify + +asynchronous_responses = [ '400', '401', # door open/closed + '610', # switches changed + ] + +class VendingMachine: + def __init__(self, rfh, wfh): + self.secret = 'AAAAAAAAAAAAAAAA' + self.rfh = rfh + self.wfh = wfh + self.challenge = None + # Initialise ourselves into a known state + self.wfh.write('\n') + self.await_prompt() + self.wfh.write('echo off\n') + self.await_prompt() + self.wfh.write('PING\n') + code = '' + while code != '000': + (code, _) = self.get_response() + + def await_prompt(self): + self.wfh.flush() + state = 1 + prefix = '' + s = '' + while True: + s = self.rfh.read(1) + if s == '': raise Exception + if s == '\n' or s == '\r': + state = 1 + prefix = '' + if (s == '#' or s == '%') and state == 1: state = 2 + if s == ' ' and state == 2: + if prefix == '': + self.challenge = None + return + if re.search('^[0-9a-fA-F]{4}$', prefix): + self.challenge = unhexlify(prefix) + return + + def get_response(self): + self.wfh.flush() + while True: + s = '' + while s == '': + s = self.rfh.readline() + if s == '': return None + s = s.strip('\r\n') + code = s[0:3] + text = s[4:] + if code in asynchronous_responses: + self.handle_event(code, text) + else: + self.await_prompt() + return (code, text) + + def handle_event(self, code, text): + pass + + def authed_message(self, message): + if self.challenge == None: + return message + crc = do_crc('%c%c'%(self.challenge >> 8, self.challenge & 0xff)) + crc = do_crc(self.secret, crc) + crc = do_crc(message, crc) + return message+'|'+('%04x'%crc) + + def ping(self): + self.wfh.write('PING\n') + (code, string) = self.get_response() + return (code == '000', code, string) + + def vend(self, item): + if not re.search('^[0-9][0-9]$', item): + return (False, 'Invalid item requested (%s)'%item) + self.wfh.write(self.authed_message(('V%s\n'%item)+'\n')) + (code, string) = self.get_response() + return (code, string) + + def beep(self, duration = None, synchronous = True): + msg = 'B' + if synchronous: msg += 'S' + if duration != None: + if duration > 255: duration = 255 + if duration < 1: duration = 1 + msg += '%02x'%duration + self.wfh.write(msg+'\n') + (code, string) = self.get_response() + return (code == '500', code, string) + + def silence(self, duration = None, synchronous = True): + msg = 'C' + if synchronous: msg += 'S' + if duration != None: + if duration > 255: duration = 255 + if duration < 1: duration = 1 + msg += '%02x'%duration + self.wfh.write(msg+'\n') + (code, string) = self.get_response() + # FIXME: workaround a bug in rom W. should be just: return (code == '500', code, string) + return (code == '500' or code == '501', code, string) + + def display(self, string): + if len(string) > 10: + string = string[0:10] + self.wfh.write('D'+string+'\n') + (code, string) = self.get_response() + return (code == '300', code, string) + -- 2.20.1