[TPG] Misc changes for new dispense
[zanchey/dispense2.git] / sql-edition / servers / MIFAREDriver.py
1 #!/usr/bin/env python2.5
2
3 '''mifare - a library for interacting with MIFARE readers.
4 Written by David Adam <[email protected]>
5 Requires Python 2.5.
6
7 Licensed under an MIT-style license: see LICENSE file for details.
8 '''
9
10 import serial, logging
11
12 xor = lambda x, y: x ^ y
13 def checksum(string):
14     return chr(reduce(xor, [ord(i) for i in string]))   
15
16
17 class MIFAREException(Exception):
18     pass
19
20
21 class MIFARECommunicationException(MIFAREException):
22     pass
23
24
25 class MIFAREAuthenticationException(MIFAREException):
26     pass
27
28
29 class MIFAREReader:
30     '''An interface to a particular MIFARE reader.'''
31     
32     def __init__(self, io):
33         '''Returns an interface to a MIFARE reader given a file-like object.
34         The file-like object is generally a pyserial Serial object.'''
35         self.io = io
36         if isinstance(self.io, serial.Serial):
37             self.io.setTimeout(2)
38         self.address = '\x00\x00'
39     
40     def get_absolute_block(self, vector):
41         if vector[0] < 32:
42             return vector[0] * 4 + vector[1]
43         else:
44             # Sectors below 32 are 4 blocks
45             # Sectors above are 16 blocks
46             # Thus, sector 32 starts at block 128, 33 at 144, and so on
47             return 128 + (vector[0] - 32) * 16 + vector[1]
48         
49     def send_packet(self, data):
50         '''Constructs a packet for the supplied data string, sends it to the
51         MIFARE reader, then returns the response (if any) to the commmand.'''
52         
53         # Occasionally the reader inserts extra trailing characters into its
54         # responses, so flush the buffers if possible beforehand.
55         if isinstance(self.io, serial.Serial):
56             self.io.flushInput()
57             self.io.flushOutput()
58         
59         # XXX - Needs more error checking.
60         data = '\x00' + self.address + data
61         packet = '\xAA\xBB' + chr(len(data)) + data + checksum(data)
62         self.io.write(packet)
63         response = ''
64         header = self.io.read(2)
65         if header == '\xaa\xbb':
66             length = ord(self.io.read(1))
67             data = self.io.read(length)
68             packet_xsum = self.io.read(1)
69             if checksum(data) == packet_xsum and len(data) == length:
70                 # Strip off separator and address header
71                 return data[3:]
72             else:
73                 raise MIFARECommunicationException, "Invalid response received"
74             
75     def set_antenna(self, state = True):
76         """Turn the card reader's antenna on or off (no return value)"""
77         command = '\x0C\x01' + chr(int(state))
78         response = self.send_packet(command)
79         if response == '\x0c\x01\x00':
80             return None
81         else:
82             raise MIFAREException, 'command failed: set_antenna (%s)' % state
83         
84     def select_card(self, include_halted = False):
85         """Selects a card and returns a tuple of  (serial number, capacity).
86         
87         If include_halted is set, may select a card that halt() has previously
88         been called on."""
89         
90         # Request type of card available
91         command = command = '\x01\x02'
92         if include_halted:
93             command += '\x52'
94         else:
95             command += '\x26'
96         
97         card_type_response = self.send_packet(command)
98         
99         if card_type_response[2] == '\x14':
100             raise MIFAREException, "select_card: no card available"
101         card_type = card_type_response[3:5]
102         
103         if card_type == '\x44\x00': # MIFARE UltraLight
104             raise NotImplementedError, "UltraLight card selected - no functions available"
105         
106         else:
107         # Otherwise, must be a standard MIFARE card.
108             # Anticollision
109             command = '\x02\x02\x04'
110             # No error handling on this command
111             serial = self.send_packet(command)[3:]
112             
113             # Select the card for use
114             try:
115                 select_response = self.send_packet('\x03\x02' + serial)
116                 capacity = ord(select_response[3])
117             except IndexError:
118                 logging.warning('Tried to select card but failed: card_type %s, serial %s, select_response %s' % (card_type.__repr__(), serial.__repr__(), select_response.__repr__()))
119                 capacity = 0
120             return (serial, capacity)
121     
122     def sector_login(self, blockvect, key, keytype=0):
123         """Log in to a block using the six-byte key.
124         
125         Use a keytype of 1 to use key B."""
126         sector = self.get_absolute_block((blockvect[0], 0))
127         
128         if len(key) != 6:
129             raise ValueError, 'key must be a six-byte string'
130         
131         keytype = 96 + keytype
132         
133         data = chr(keytype) + chr(sector) + key
134         
135         result = self.send_packet('\x07\x02' + data)
136         if ord(result[2]) == 22:
137             raise MIFAREAuthenticationException, "incorrect key provided"
138         
139         return
140     
141     def read_block(self, blockvect):
142         "Read the 16-byte block at vector (sector, block)."
143         block = self.get_absolute_block(blockvect)
144         
145         result = self.send_packet('\x08\x02' + chr(block))
146         return result[3:19]
147     
148     def write_block(self, blockvect, data):
149         """Write the 16 bytes in data to the block at vector (sector, block)."""
150         block = self.get_absolute_block(blockvect)
151         if len(data) != 16:
152             raise ValueError, "invalid data length - must be 16 bytes"
153         
154         result = self.send_packet('\x09\x02' + chr(block) + data)
155         return
156     
157     def write_key(self, key):
158         pass
159     
160     def value_block_increment(self, blocknum, increment):
161         pass
162     
163     def value_block_decrement(self, blocknum, decrement):
164         pass
165     
166     def copy_block(self, source, dest):
167         pass
168     
169     def halt(self):
170         """Halt the current card - no further transactions will be performed with it."""
171         self.send_packet('\x04\x02')
172     
173     def set_led(self, red = False, green = False):
174         led_state = 0
175         if red:
176             led_state += 1
177         if green:
178             led_state += 2
179         self.send_packet('\x07\x01' + chr(led_state))
180         
181     def beep(self, length):
182         '''Beep for a specified length of milliseconds.'''
183         length = int(round(length / 10.))
184         if length > 255:
185             length = 255
186         self.send_packet('\x06\x01' + chr(length))
187         
188     def reset(self):
189         pass

UCC git Repository :: git.ucc.asn.au