20 is still causing in use errors, try 30 seconds
[uccdoor.git] / xmpp / session.py
1 ##
2 ##   XMPP server
3 ##
4 ##   Copyright (C) 2004 Alexey "Snake" Nezhdanov
5 ##
6 ##   This program is free software; you can redistribute it and/or modify
7 ##   it under the terms of the GNU General Public License as published by
8 ##   the Free Software Foundation; either version 2, or (at your option)
9 ##   any later version.
10 ##
11 ##   This program is distributed in the hope that it will be useful,
12 ##   but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ##   GNU General Public License for more details.
15
16 __version__="$Id"
17
18 """
19 When your handler is called it is getting the session instance as the first argument.
20 This is the difference from xmpppy 0.1 where you got the "Client" instance.
21 With Session class you can have "multi-session" client instead of having
22 one client for each connection. Is is specifically important when you are
23 writing the server.
24 """
25
26 from protocol import *
27
28 # Transport-level flags
29 SOCKET_UNCONNECTED  =0
30 SOCKET_ALIVE        =1
31 SOCKET_DEAD         =2
32 # XML-level flags
33 STREAM__NOT_OPENED =1
34 STREAM__OPENED     =2
35 STREAM__CLOSING    =3
36 STREAM__CLOSED     =4
37 # XMPP-session flags
38 SESSION_NOT_AUTHED =1
39 SESSION_AUTHED     =2
40 SESSION_BOUND      =3
41 SESSION_OPENED     =4
42 SESSION_CLOSED     =5
43
44 class Session:
45     """
46     The Session class instance is used for storing all session-related info like 
47     credentials, socket/xml stream/session state flags, roster items (in case of
48     client type connection) etc.
49     Session object have no means of discovering is any info is ready to be read.
50     Instead you should use poll() (recomended) or select() methods for this purpose.
51     Session can be one of two types: 'server' and 'client'. 'server' session handles
52     inbound connection and 'client' one used to create an outbound one.
53     Session instance have multitude of internal attributes. The most imporant is the 'peer' one.
54     It is set once the peer is authenticated (client).
55     """
56     def __init__(self,socket,owner,xmlns=None,peer=None):
57         """ When the session is created it's type (client/server) is determined from the beginning.
58             socket argument is the pre-created socket-like object.
59             It must have the following methods: send, recv, fileno, close.
60             owner is the 'master' instance that have Dispatcher plugged into it and generally
61             will take care about all session events.
62             xmlns is the stream namespace that will be used. Client must set this argument
63             If server sets this argument than stream will be dropped if opened with some another namespace.
64             peer is the name of peer instance. This is the flag that differentiates client session from
65             server session. Client must set it to the name of the server that will be connected, server must
66             leave this argument alone.
67             """
68         self.xmlns=xmlns
69         if peer:
70             self.TYP='client'
71             self.peer=peer
72             self._socket_state=SOCKET_UNCONNECTED
73         else:
74             self.TYP='server'
75             self.peer=None
76             self._socket_state=SOCKET_ALIVE
77         self._sock=socket
78         self._send=socket.send
79         self._recv=socket.recv
80         self.fileno=socket.fileno
81         self._registered=0
82
83         self.Dispatcher=owner.Dispatcher
84         self.DBG_LINE='session'
85         self.DEBUG=owner.Dispatcher.DEBUG
86         self._expected={}
87         self._owner=owner
88         if self.TYP=='server': self.ID=`random.random()`[2:]
89         else: self.ID=None
90
91         self.sendbuffer=''
92         self._stream_pos_queued=None
93         self._stream_pos_sent=0
94         self.deliver_key_queue=[]
95         self.deliver_queue_map={}
96         self.stanza_queue=[]
97
98         self._session_state=SESSION_NOT_AUTHED
99         self.waiting_features=[]
100         for feature in [NS_TLS,NS_SASL,NS_BIND,NS_SESSION]:
101             if feature in owner.features: self.waiting_features.append(feature)
102         self.features=[]
103         self.feature_in_process=None
104         self.slave_session=None
105         self.StartStream()
106
107     def StartStream(self):
108         """ This method is used to initialise the internal xml expat parser
109             and to send initial stream header (in case of client connection).
110             Should be used after initial connection and after every stream restart."""
111         self._stream_state=STREAM__NOT_OPENED
112         self.Stream=simplexml.NodeBuilder()
113         self.Stream._dispatch_depth=2
114         self.Stream.dispatch=self._dispatch
115         self.Parse=self.Stream.Parse
116         self.Stream.stream_footer_received=self._stream_close
117         if self.TYP=='client':
118             self.Stream.stream_header_received=self._catch_stream_id
119             self._stream_open()
120         else:
121             self.Stream.stream_header_received=self._stream_open
122
123     def receive(self):
124         """ Reads all pending incoming data.
125             Raises IOError on disconnection.
126             Blocks until at least one byte is read."""
127         try: received = self._recv(10240)
128         except: received = ''
129
130         if len(received): # length of 0 means disconnect
131             self.DEBUG(`self.fileno()`+' '+received,'got')
132         else:
133             self.DEBUG('Socket error while receiving data','error')
134             self.set_socket_state(SOCKET_DEAD)
135             raise IOError("Peer disconnected")
136         return received
137
138     def sendnow(self,chunk):
139         """ Put chunk into "immidiatedly send" queue.
140             Should only be used for auth/TLS stuff and like.
141             If you just want to shedule regular stanza for delivery use enqueue method.
142         """
143         if isinstance(chunk,Node): chunk = chunk.__str__().encode('utf-8')
144         elif type(chunk)==type(u''): chunk = chunk.encode('utf-8')
145         self.enqueue(chunk)
146
147     def enqueue(self,stanza):
148         """ Takes Protocol instance as argument.
149             Puts stanza into "send" fifo queue. Items into the send queue are hold until
150             stream authenticated. After that this method is effectively the same as "sendnow" method."""
151         if isinstance(stanza,Protocol):
152             self.stanza_queue.append(stanza)
153         else: self.sendbuffer+=stanza
154         if self._socket_state>=SOCKET_ALIVE: self.push_queue()
155
156     def push_queue(self,failreason=ERR_RECIPIENT_UNAVAILABLE):
157         """ If stream is authenticated than move items from "send" queue to "immidiatedly send" queue.
158             Else if the stream is failed then return all queued stanzas with error passed as argument.
159             Otherwise do nothing."""
160         # If the stream authed - convert stanza_queue into sendbuffer and set the checkpoints
161
162         if self._stream_state>=STREAM__CLOSED or self._socket_state>=SOCKET_DEAD: # the stream failed. Return all stanzas that are still waiting for delivery.
163             self._owner.deactivatesession(self)
164             for key in self.deliver_key_queue:                                          # Not sure. May be I
165                 self._dispatch(Error(self.deliver_queue_map[key],failreason),trusted=1) # should simply re-dispatch it?
166             for stanza in self.stanza_queue:                                            # But such action can invoke
167                 self._dispatch(Error(stanza,failreason),trusted=1)                      # Infinite loops in case of S2S connection...
168             self.deliver_queue_map,self.deliver_key_queue,self.stanza_queue={},[],[]
169             return
170         elif self._session_state>=SESSION_AUTHED:       # FIXME! äÏÌÖÅΠÂÙÔØ ËÁËÏÊ-ÔÏ ÄÒÕÇÏÊ ÆÌÁÇ.
171             #### LOCK_QUEUE
172             for stanza in self.stanza_queue:
173                 txt=stanza.__str__().encode('utf-8')
174                 self.sendbuffer+=txt
175                 self._stream_pos_queued+=len(txt)       # should be re-evaluated for SSL connection.
176                 self.deliver_queue_map[self._stream_pos_queued]=stanza     # position of the stream when stanza will be successfully and fully sent
177                 self.deliver_key_queue.append(self._stream_pos_queued)
178             self.stanza_queue=[]
179             #### UNLOCK_QUEUE
180
181     def flush_queue(self):
182         """ Put the "immidiatedly send" queue content on the wire. Blocks until at least one byte sent."""
183         if self.sendbuffer:
184             try:
185                 # LOCK_QUEUE
186                 sent=self._send(self.sendbuffer)    # âÌÏËÉÒÕÀÝÁÑ ÛÔÕÞËÁ!
187             except:
188                 # UNLOCK_QUEUE
189                 self.set_socket_state(SOCKET_DEAD)
190                 self.DEBUG("Socket error while sending data",'error')
191                 return self.terminate_stream()
192             self.DEBUG(`self.fileno()`+' '+self.sendbuffer[:sent],'sent')
193             self._stream_pos_sent+=sent
194             self.sendbuffer=self.sendbuffer[sent:]
195             self._stream_pos_delivered=self._stream_pos_sent            # Should be acquired from socket somehow. Take SSL into account.
196             while self.deliver_key_queue and self._stream_pos_delivered>self.deliver_key_queue[0]:
197                 del self.deliver_queue_map[self.deliver_key_queue[0]]
198                 self.deliver_key_queue.remove(self.deliver_key_queue[0])
199             # UNLOCK_QUEUE
200
201     def _dispatch(self,stanza,trusted=0):
202         """ This is callback that is used to pass the received stanza forth to owner's dispatcher
203             _if_ the stream is authorised. Otherwise the stanza is just dropped.
204             The 'trusted' argument is used to emulate stanza receive.
205             This method is used internally.
206         """
207         self._owner.packets+=1
208         if self._stream_state==STREAM__OPENED or trusted:               # if the server really should reject all stanzas after he is closed stream (himeself)?
209             self.DEBUG(stanza.__str__(),'dispatch')
210             stanza.trusted=trusted
211             return self.Dispatcher.dispatch(stanza,self)
212
213     def _catch_stream_id(self,ns=None,tag='stream',attrs={}):
214         """ This callback is used to detect the stream namespace of incoming stream. Used internally. """
215         if not attrs.has_key('id') or not attrs['id']:
216             return self.terminate_stream(STREAM_INVALID_XML)
217         self.ID=attrs['id']
218         if not attrs.has_key('version'): self._owner.Dialback(self)
219
220     def _stream_open(self,ns=None,tag='stream',attrs={}):
221         """ This callback is used to handle opening stream tag of the incoming stream.
222             In the case of client session it just make some validation.
223             Server session also sends server headers and if the stream valid the features node.
224             Used internally. """
225         text='<?xml version="1.0" encoding="utf-8"?>\n<stream:stream'
226         if self.TYP=='client':
227             text+=' to="%s"'%self.peer
228         else:
229             text+=' id="%s"'%self.ID
230             if not attrs.has_key('to'): text+=' from="%s"'%self._owner.servernames[0]
231             else: text+=' from="%s"'%attrs['to']
232         if attrs.has_key('xml:lang'): text+=' xml:lang="%s"'%attrs['xml:lang']
233         if self.xmlns: xmlns=self.xmlns
234         else: xmlns=NS_SERVER
235         text+=' xmlns:db="%s" xmlns:stream="%s" xmlns="%s"'%(NS_DIALBACK,NS_STREAMS,xmlns)
236         if attrs.has_key('version') or self.TYP=='client': text+=' version="1.0"'
237         self.sendnow(text+'>')
238         self.set_stream_state(STREAM__OPENED)
239         if self.TYP=='client': return
240         if tag<>'stream': return self.terminate_stream(STREAM_INVALID_XML)
241         if ns<>NS_STREAMS: return self.terminate_stream(STREAM_INVALID_NAMESPACE)
242         if self.Stream.xmlns<>self.xmlns: return self.terminate_stream(STREAM_BAD_NAMESPACE_PREFIX)
243         if not attrs.has_key('to'): return self.terminate_stream(STREAM_IMPROPER_ADDRESSING)
244         if attrs['to'] not in self._owner.servernames: return self.terminate_stream(STREAM_HOST_UNKNOWN)
245         self.ourname=attrs['to'].lower()
246         if self.TYP=='server' and attrs.has_key('version'):
247             # send features
248             features=Node('stream:features')
249             if NS_TLS in self.waiting_features:
250                 features.NT.starttls.setNamespace(NS_TLS)
251                 features.T.starttls.NT.required
252             if NS_SASL in self.waiting_features:
253                 features.NT.mechanisms.setNamespace(NS_SASL)
254                 for mec in self._owner.SASL.mechanisms:
255                     features.T.mechanisms.NT.mechanism=mec
256             else:
257                 if NS_BIND in self.waiting_features: features.NT.bind.setNamespace(NS_BIND)
258                 if NS_SESSION in self.waiting_features: features.NT.session.setNamespace(NS_SESSION)
259             self.sendnow(features)
260
261     def feature(self,feature):
262         """ Declare some stream feature as activated one. """
263         if feature not in self.features: self.features.append(feature)
264         self.unfeature(feature)
265
266     def unfeature(self,feature):
267         """ Declare some feature as illegal. Illegal features can not be used.
268             Example: BIND feature becomes illegal after Non-SASL auth. """
269         if feature in self.waiting_features: self.waiting_features.remove(feature)
270
271     def _stream_close(self,unregister=1):
272         """ Write the closing stream tag and destroy the underlaying socket. Used internally. """
273         if self._stream_state>=STREAM__CLOSED: return
274         self.set_stream_state(STREAM__CLOSING)
275         self.sendnow('</stream:stream>')
276         self.set_stream_state(STREAM__CLOSED)
277         self.push_queue()       # decompose queue really since STREAM__CLOSED
278         self._owner.flush_queues()
279         if unregister: self._owner.unregistersession(self)
280         self._destroy_socket()
281
282     def terminate_stream(self,error=None,unregister=1):
283         """ Notify the peer about stream closure.
284             Ensure that xmlstream is not brokes - i.e. if the stream isn't opened yet -
285             open it before closure.
286             If the error condition is specified than create a stream error and send it along with
287             closing stream tag.
288             Emulate receiving 'unavailable' type presence just before stream closure.
289         """
290         if self._stream_state>=STREAM__CLOSING: return
291         if self._stream_state<STREAM__OPENED:
292             self.set_stream_state(STREAM__CLOSING)
293             self._stream_open()
294         else:
295             self.set_stream_state(STREAM__CLOSING)
296             p=Presence(typ='unavailable')
297             p.setNamespace(NS_CLIENT)
298             self._dispatch(p,trusted=1)
299         if error:
300             if isinstance(error,Node): self.sendnow(error)
301             else: self.sendnow(ErrorNode(error))
302         self._stream_close(unregister=unregister)
303         if self.slave_session:
304             self.slave_session.terminate_stream(STREAM_REMOTE_CONNECTION_FAILED)
305
306     def _destroy_socket(self):
307         """ Break cyclic dependancies to let python's GC free memory right now."""
308         self.Stream.dispatch=None
309         self.Stream.stream_footer_received=None
310         self.Stream.stream_header_received=None
311         self.Stream.destroy()
312         self._sock.close()
313         self.set_socket_state(SOCKET_DEAD)
314
315     def start_feature(self,f):
316         """ Declare some feature as "negotiating now" to prevent other features from start negotiating. """
317         if self.feature_in_process: raise "Starting feature %s over %s !"%(f,self.feature_in_process)
318         self.feature_in_process=f
319
320     def stop_feature(self,f):
321         """ Declare some feature as "negotiated" to allow other features start negotiating. """
322         if self.feature_in_process<>f: raise "Stopping feature %s instead of %s !"%(f,self.feature_in_process)
323         self.feature_in_process=None
324
325     def set_socket_state(self,newstate):
326         """ Change the underlaying socket state.
327             Socket starts with SOCKET_UNCONNECTED state
328             and then proceeds (possibly) to SOCKET_ALIVE
329             and then to SOCKET_DEAD """
330         if self._socket_state<newstate: self._socket_state=newstate
331
332     def set_session_state(self,newstate):
333         """ Change the session state.
334             Session starts with SESSION_NOT_AUTHED state
335             and then comes through 
336             SESSION_AUTHED, SESSION_BOUND, SESSION_OPENED and SESSION_CLOSED states.
337         """
338         if self._session_state<newstate:
339             if self._session_state<SESSION_AUTHED and \
340                newstate>=SESSION_AUTHED: self._stream_pos_queued=self._stream_pos_sent
341             self._session_state=newstate
342
343     def set_stream_state(self,newstate):
344         """ Change the underlaying XML stream state
345             Stream starts with STREAM__NOT_OPENED and then proceeds with
346             STREAM__OPENED, STREAM__CLOSING and STREAM__CLOSED states.
347             Note that some features (like TLS and SASL)
348             requires stream re-start so this state can have non-linear changes. """
349         if self._stream_state<newstate: self._stream_state=newstate

UCC git Repository :: git.ucc.asn.au