xmpp: import xmppy-0.5.0rc1
[uccdoor.git] / xmpp / dispatcher.py
1 ##   transports.py
2 ##
3 ##   Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
4 ##
5 ##   This program is free software; you can redistribute it and/or modify
6 ##   it under the terms of the GNU General Public License as published by
7 ##   the Free Software Foundation; either version 2, or (at your option)
8 ##   any later version.
9 ##
10 ##   This program is distributed in the hope that it will be useful,
11 ##   but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 ##   GNU General Public License for more details.
14
15 # $Id: dispatcher.py,v 1.42 2007/05/18 23:18:36 normanr Exp $
16
17 """
18 Main xmpppy mechanism. Provides library with methods to assign different handlers
19 to different XMPP stanzas.
20 Contains one tunable attribute: DefaultTimeout (25 seconds by default). It defines time that 
21 Dispatcher.SendAndWaitForResponce method will wait for reply stanza before giving up.
22 """
23
24 import simplexml,time,sys
25 from protocol import *
26 from client import PlugIn
27
28 DefaultTimeout=25
29 ID=0
30
31 class Dispatcher(PlugIn):
32     """ Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers.
33         Can be plugged out/in to restart these headers (used for SASL f.e.). """
34     def __init__(self):
35         PlugIn.__init__(self)
36         DBG_LINE='dispatcher'
37         self.handlers={}
38         self._expected={}
39         self._defaultHandler=None
40         self._pendingExceptions=[]
41         self._eventHandler=None
42         self._cycleHandlers=[]
43         self._exported_methods=[self.Process,self.RegisterHandler,self.RegisterDefaultHandler,\
44         self.RegisterEventHandler,self.UnregisterCycleHandler,self.RegisterCycleHandler,\
45         self.RegisterHandlerOnce,self.UnregisterHandler,self.RegisterProtocol,\
46         self.WaitForResponse,self.SendAndWaitForResponse,self.send,self.disconnect,\
47         self.SendAndCallForResponse, ]
48
49     def dumpHandlers(self):
50         """ Return set of user-registered callbacks in it's internal format.
51             Used within the library to carry user handlers set over Dispatcher replugins. """
52         return self.handlers
53     def restoreHandlers(self,handlers):
54         """ Restores user-registered callbacks structure from dump previously obtained via dumpHandlers.
55             Used within the library to carry user handlers set over Dispatcher replugins. """
56         self.handlers=handlers
57
58     def _init(self):
59         """ Registers default namespaces/protocols/handlers. Used internally.  """
60         self.RegisterNamespace('unknown')
61         self.RegisterNamespace(NS_STREAMS)
62         self.RegisterNamespace(self._owner.defaultNamespace)
63         self.RegisterProtocol('iq',Iq)
64         self.RegisterProtocol('presence',Presence)
65         self.RegisterProtocol('message',Message)
66         self.RegisterDefaultHandler(self.returnStanzaHandler)
67         self.RegisterHandler('error',self.streamErrorHandler,xmlns=NS_STREAMS)
68
69     def plugin(self, owner):
70         """ Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally."""
71         self._init()
72         for method in self._old_owners_methods:
73             if method.__name__=='send': self._owner_send=method; break
74         self._owner.lastErrNode=None
75         self._owner.lastErr=None
76         self._owner.lastErrCode=None
77         self.StreamInit()
78
79     def plugout(self):
80         """ Prepares instance to be destructed. """
81         self.Stream.dispatch=None
82         self.Stream.DEBUG=None
83         self.Stream.features=None
84         self.Stream.destroy()
85
86     def StreamInit(self):
87         """ Send an initial stream header. """
88         self.Stream=simplexml.NodeBuilder()
89         self.Stream._dispatch_depth=2
90         self.Stream.dispatch=self.dispatch
91         self.Stream.stream_header_received=self._check_stream_start
92         self._owner.debug_flags.append(simplexml.DBG_NODEBUILDER)
93         self.Stream.DEBUG=self._owner.DEBUG
94         self.Stream.features=None
95         self._metastream=Node('stream:stream')
96         self._metastream.setNamespace(self._owner.Namespace)
97         self._metastream.setAttr('version','1.0')
98         self._metastream.setAttr('xmlns:stream',NS_STREAMS)
99         self._metastream.setAttr('to',self._owner.Server)
100         self._owner.send("<?xml version='1.0'?>%s>"%str(self._metastream)[:-2])
101
102     def _check_stream_start(self,ns,tag,attrs):
103         if ns<>NS_STREAMS or tag<>'stream':
104             raise ValueError('Incorrect stream start: (%s,%s). Terminating.'%(tag,ns))
105
106     def Process(self, timeout=0):
107         """ Check incoming stream for data waiting. If "timeout" is positive - block for as max. this time.
108             Returns:
109             1) length of processed data if some data were processed;
110             2) '0' string if no data were processed but link is alive;
111             3) 0 (zero) if underlying connection is closed.
112             Take note that in case of disconnection detect during Process() call
113             disconnect handlers are called automatically.
114         """
115         for handler in self._cycleHandlers: handler(self)
116         if len(self._pendingExceptions) > 0:
117             _pendingException = self._pendingExceptions.pop()
118             raise _pendingException[0], _pendingException[1], _pendingException[2]
119         if self._owner.Connection.pending_data(timeout):
120             try: data=self._owner.Connection.receive()
121             except IOError: return
122             self.Stream.Parse(data)
123             if len(self._pendingExceptions) > 0:
124                 _pendingException = self._pendingExceptions.pop()
125                 raise _pendingException[0], _pendingException[1], _pendingException[2]
126             if data: return len(data)
127         return '0'      # It means that nothing is received but link is alive.
128         
129     def RegisterNamespace(self,xmlns,order='info'):
130         """ Creates internal structures for newly registered namespace.
131             You can register handlers for this namespace afterwards. By default one namespace
132             already registered (jabber:client or jabber:component:accept depending on context. """
133         self.DEBUG('Registering namespace "%s"'%xmlns,order)
134         self.handlers[xmlns]={}
135         self.RegisterProtocol('unknown',Protocol,xmlns=xmlns)
136         self.RegisterProtocol('default',Protocol,xmlns=xmlns)
137
138     def RegisterProtocol(self,tag_name,Proto,xmlns=None,order='info'):
139         """ Used to declare some top-level stanza name to dispatcher.
140            Needed to start registering handlers for such stanzas.
141            Iq, message and presence protocols are registered by default. """
142         if not xmlns: xmlns=self._owner.defaultNamespace
143         self.DEBUG('Registering protocol "%s" as %s(%s)'%(tag_name,Proto,xmlns), order)
144         self.handlers[xmlns][tag_name]={type:Proto, 'default':[]}
145
146     def RegisterNamespaceHandler(self,xmlns,handler,typ='',ns='', makefirst=0, system=0):
147         """ Register handler for processing all stanzas for specified namespace. """
148         self.RegisterHandler('default', handler, typ, ns, xmlns, makefirst, system)
149
150     def RegisterHandler(self,name,handler,typ='',ns='',xmlns=None, makefirst=0, system=0):
151         """Register user callback as stanzas handler of declared type. Callback must take
152            (if chained, see later) arguments: dispatcher instance (for replying), incomed
153            return of previous handlers.
154            The callback must raise xmpp.NodeProcessed just before return if it want preven
155            callbacks to be called with the same stanza as argument _and_, more importantly
156            library from returning stanza to sender with error set (to be enabled in 0.2 ve
157             Arguments:
158               "name" - name of stanza. F.e. "iq".
159               "handler" - user callback.
160               "typ" - value of stanza's "type" attribute. If not specified any value match
161               "ns" - namespace of child that stanza must contain.
162               "chained" - chain together output of several handlers.
163               "makefirst" - insert handler in the beginning of handlers list instead of
164                 adding it to the end. Note that more common handlers (i.e. w/o "typ" and "
165                 will be called first nevertheless.
166               "system" - call handler even if NodeProcessed Exception were raised already.
167             """
168         if not xmlns: xmlns=self._owner.defaultNamespace
169         self.DEBUG('Registering handler %s for "%s" type->%s ns->%s(%s)'%(handler,name,typ,ns,xmlns), 'info')
170         if not typ and not ns: typ='default'
171         if not self.handlers.has_key(xmlns): self.RegisterNamespace(xmlns,'warn')
172         if not self.handlers[xmlns].has_key(name): self.RegisterProtocol(name,Protocol,xmlns,'warn')
173         if not self.handlers[xmlns][name].has_key(typ+ns): self.handlers[xmlns][name][typ+ns]=[]
174         if makefirst: self.handlers[xmlns][name][typ+ns].insert(0,{'func':handler,'system':system})
175         else: self.handlers[xmlns][name][typ+ns].append({'func':handler,'system':system})
176
177     def RegisterHandlerOnce(self,name,handler,typ='',ns='',xmlns=None,makefirst=0, system=0):
178         """ Unregister handler after first call (not implemented yet). """
179         if not xmlns: xmlns=self._owner.defaultNamespace
180         self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
181
182     def UnregisterHandler(self,name,handler,typ='',ns='',xmlns=None):
183         """ Unregister handler. "typ" and "ns" must be specified exactly the same as with registering."""
184         if not xmlns: xmlns=self._owner.defaultNamespace
185         if not self.handlers.has_key(xmlns): return
186         if not typ and not ns: typ='default'
187         for pack in self.handlers[xmlns][name][typ+ns]:
188             if handler==pack['func']: break
189         else: pack=None
190         try: self.handlers[xmlns][name][typ+ns].remove(pack)
191         except ValueError: pass
192
193     def RegisterDefaultHandler(self,handler):
194         """ Specify the handler that will be used if no NodeProcessed exception were raised.
195             This is returnStanzaHandler by default. """
196         self._defaultHandler=handler
197
198     def RegisterEventHandler(self,handler):
199         """ Register handler that will process events. F.e. "FILERECEIVED" event. """
200         self._eventHandler=handler
201
202     def returnStanzaHandler(self,conn,stanza):
203         """ Return stanza back to the sender with <feature-not-implemennted/> error set. """
204         if stanza.getType() in ['get','set']:
205             conn.send(Error(stanza,ERR_FEATURE_NOT_IMPLEMENTED))
206
207     def streamErrorHandler(self,conn,error):
208         name,text='error',error.getData()
209         for tag in error.getChildren():
210             if tag.getNamespace()==NS_XMPP_STREAMS:
211                 if tag.getName()=='text': text=tag.getData()
212                 else: name=tag.getName()
213         if name in stream_exceptions.keys(): exc=stream_exceptions[name]
214         else: exc=StreamError
215         raise exc((name,text))
216
217     def RegisterCycleHandler(self,handler):
218         """ Register handler that will be called on every Dispatcher.Process() call. """
219         if handler not in self._cycleHandlers: self._cycleHandlers.append(handler)
220
221     def UnregisterCycleHandler(self,handler):
222         """ Unregister handler that will is called on every Dispatcher.Process() call."""
223         if handler in self._cycleHandlers: self._cycleHandlers.remove(handler)
224
225     def Event(self,realm,event,data):
226         """ Raise some event. Takes three arguments:
227             1) "realm" - scope of event. Usually a namespace. 
228             2) "event" - the event itself. F.e. "SUCESSFULL SEND".
229             3) data that comes along with event. Depends on event."""
230         if self._eventHandler: self._eventHandler(realm,event,data)
231
232     def dispatch(self,stanza,session=None,direct=0):
233         """ Main procedure that performs XMPP stanza recognition and calling apppropriate handlers for it.
234             Called internally. """
235         if not session: session=self
236         session.Stream._mini_dom=None
237         name=stanza.getName()
238
239         if not direct and self._owner._route:
240             if name == 'route':
241                 if stanza.getAttr('error') == None:
242                     if len(stanza.getChildren()) == 1:
243                         stanza = stanza.getChildren()[0]
244                         name=stanza.getName()
245                     else:
246                         for each in stanza.getChildren():
247                             self.dispatch(each,session,direct=1)
248                         return
249             elif name == 'presence':
250                 return
251             elif name in ('features','bind'):
252                 pass
253             else:
254                 raise UnsupportedStanzaType(name)
255
256         if name=='features': session.Stream.features=stanza
257
258         xmlns=stanza.getNamespace()
259         if not self.handlers.has_key(xmlns):
260             self.DEBUG("Unknown namespace: " + xmlns,'warn')
261             xmlns='unknown'
262         if not self.handlers[xmlns].has_key(name):
263             self.DEBUG("Unknown stanza: " + name,'warn')
264             name='unknown'
265         else:
266             self.DEBUG("Got %s/%s stanza"%(xmlns,name), 'ok')
267
268         if stanza.__class__.__name__=='Node': stanza=self.handlers[xmlns][name][type](node=stanza)
269
270         typ=stanza.getType()
271         if not typ: typ=''
272         stanza.props=stanza.getProperties()
273         ID=stanza.getID()
274
275         session.DEBUG("Dispatching %s stanza with type->%s props->%s id->%s"%(name,typ,stanza.props,ID),'ok')
276
277         list=['default']                                                     # we will use all handlers:
278         if self.handlers[xmlns][name].has_key(typ): list.append(typ)                # from very common...
279         for prop in stanza.props:
280             if self.handlers[xmlns][name].has_key(prop): list.append(prop)
281             if typ and self.handlers[xmlns][name].has_key(typ+prop): list.append(typ+prop)  # ...to very particular
282
283         chain=self.handlers[xmlns]['default']['default']
284         for key in list:
285             if key: chain = chain + self.handlers[xmlns][name][key]
286
287         output=''
288         if session._expected.has_key(ID):
289             user=0
290             if type(session._expected[ID])==type(()):
291                 cb,args=session._expected[ID]
292                 session.DEBUG("Expected stanza arrived. Callback %s(%s) found!"%(cb,args),'ok')
293                 try: cb(session,stanza,**args)
294                 except Exception, typ:
295                     if typ.__class__.__name__<>'NodeProcessed': raise
296             else:
297                 session.DEBUG("Expected stanza arrived!",'ok')
298                 session._expected[ID]=stanza
299         else: user=1
300         for handler in chain:
301             if user or handler['system']:
302                 try:
303                     handler['func'](session,stanza)
304                 except Exception, typ:
305                     if typ.__class__.__name__<>'NodeProcessed':
306                         self._pendingExceptions.insert(0, sys.exc_info())
307                         return
308                     user=0
309         if user and self._defaultHandler: self._defaultHandler(session,stanza)
310
311     def WaitForResponse(self, ID, timeout=DefaultTimeout):
312         """ Block and wait until stanza with specific "id" attribute will come.
313             If no such stanza is arrived within timeout, return None.
314             If operation failed for some reason then owner's attributes
315             lastErrNode, lastErr and lastErrCode are set accordingly. """
316         self._expected[ID]=None
317         has_timed_out=0
318         abort_time=time.time() + timeout
319         self.DEBUG("Waiting for ID:%s with timeout %s..." % (ID,timeout),'wait')
320         while not self._expected[ID]:
321             if not self.Process(0.04):
322                 self._owner.lastErr="Disconnect"
323                 return None
324             if time.time() > abort_time:
325                 self._owner.lastErr="Timeout"
326                 return None
327         response=self._expected[ID]
328         del self._expected[ID]
329         if response.getErrorCode():
330             self._owner.lastErrNode=response
331             self._owner.lastErr=response.getError()
332             self._owner.lastErrCode=response.getErrorCode()
333         return response
334
335     def SendAndWaitForResponse(self, stanza, timeout=DefaultTimeout):
336         """ Put stanza on the wire and wait for recipient's response to it. """
337         return self.WaitForResponse(self.send(stanza),timeout)
338
339     def SendAndCallForResponse(self, stanza, func, args={}):
340         """ Put stanza on the wire and call back when recipient replies.
341             Additional callback arguments can be specified in args. """
342         self._expected[self.send(stanza)]=(func,args)
343
344     def send(self,stanza):
345         """ Serialise stanza and put it on the wire. Assign an unique ID to it before send.
346             Returns assigned ID."""
347         if type(stanza) in [type(''), type(u'')]: return self._owner_send(stanza)
348         if not isinstance(stanza,Protocol): _ID=None
349         elif not stanza.getID():
350             global ID
351             ID+=1
352             _ID=`ID`
353             stanza.setID(_ID)
354         else: _ID=stanza.getID()
355         if self._owner._registered_name and not stanza.getAttr('from'): stanza.setAttr('from',self._owner._registered_name)
356         if self._owner._route and stanza.getName()!='bind':
357             to=self._owner.Server
358             if stanza.getTo() and stanza.getTo().getDomain():
359                 to=stanza.getTo().getDomain()
360             frm=stanza.getFrom()
361             if frm.getDomain():
362                 frm=frm.getDomain()
363             route=Protocol('route',to=to,frm=frm,payload=[stanza])
364             stanza=route
365         stanza.setNamespace(self._owner.Namespace)
366         stanza.setParent(self._metastream)
367         self._owner_send(stanza)
368         return _ID
369
370     def disconnect(self):
371         """ Send a stream terminator and and handle all incoming stanzas before stream closure. """
372         self._owner_send('</stream:stream>')
373         while self.Process(1): pass

UCC git Repository :: git.ucc.asn.au