20 is still causing in use errors, try 30 seconds
[uccdoor.git] / xmpp / filetransfer.py
1 ##   filetransfer.py 
2 ##
3 ##   Copyright (C) 2004 Alexey "Snake" Nezhdanov
4 ##
5 ##   This program is free software; you can redistribute it and/or modify
6 ##   it under the terms of the GNU General Public License as published by
7 ##   the Free Software Foundation; either version 2, or (at your option)
8 ##   any later version.
9 ##
10 ##   This program is distributed in the hope that it will be useful,
11 ##   but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 ##   GNU General Public License for more details.
14
15 # $Id: filetransfer.py,v 1.6 2004/12/25 20:06:59 snakeru Exp $
16
17 """
18 This module contains IBB class that is the simple implementation of JEP-0047.
19 Note that this is just a transport for data. You have to negotiate data transfer before
20 (via StreamInitiation most probably). Unfortunately SI is not implemented yet.
21 """
22
23 from protocol import *
24 from dispatcher import PlugIn
25 import base64
26
27 class IBB(PlugIn):
28     """ IBB used to transfer small-sized data chunk over estabilished xmpp connection.
29         Data is split into small blocks (by default 3000 bytes each), encoded as base 64
30         and sent to another entity that compiles these blocks back into the data chunk.
31         This is very inefficiend but should work under any circumstances. Note that 
32         using IBB normally should be the last resort.
33     """
34     def __init__(self):
35         """ Initialise internal variables. """
36         PlugIn.__init__(self)
37         self.DBG_LINE='ibb'
38         self._exported_methods=[self.OpenStream]
39         self._streams={}
40         self._ampnode=Node(NS_AMP+' amp',payload=[Node('rule',{'condition':'deliver-at','value':'stored','action':'error'}),Node('rule',{'condition':'match-resource','value':'exact','action':'error'})])
41
42     def plugin(self,owner):
43         """ Register handlers for receiving incoming datastreams. Used internally. """
44         self._owner.RegisterHandlerOnce('iq',self.StreamOpenReplyHandler) # Move to StreamOpen and specify stanza id
45         self._owner.RegisterHandler('iq',self.IqHandler,ns=NS_IBB)
46         self._owner.RegisterHandler('message',self.ReceiveHandler,ns=NS_IBB)
47
48     def IqHandler(self,conn,stanza):
49         """ Handles streams state change. Used internally. """
50         typ=stanza.getType()
51         self.DEBUG('IqHandler called typ->%s'%typ,'info')
52         if typ=='set' and stanza.getTag('open',namespace=NS_IBB): self.StreamOpenHandler(conn,stanza)
53         elif typ=='set' and stanza.getTag('close',namespace=NS_IBB): self.StreamCloseHandler(conn,stanza)
54         elif typ=='result': self.StreamCommitHandler(conn,stanza)
55         elif typ=='error': self.StreamOpenReplyHandler(conn,stanza)
56         else: conn.send(Error(stanza,ERR_BAD_REQUEST))
57         raise NodeProcessed
58
59     def StreamOpenHandler(self,conn,stanza):
60         """ Handles opening of new incoming stream. Used internally. """
61         """
62 <iq type='set' 
63     from='romeo@montague.net/orchard'
64     to='juliet@capulet.com/balcony'
65     id='inband_1'>
66   <open sid='mySID' 
67         block-size='4096'
68         xmlns='http://jabber.org/protocol/ibb'/>
69 </iq>
70 """
71         err=None
72         sid,blocksize=stanza.getTagAttr('open','sid'),stanza.getTagAttr('open','block-size')
73         self.DEBUG('StreamOpenHandler called sid->%s blocksize->%s'%(sid,blocksize),'info')
74         try: blocksize=int(blocksize)
75         except: err=ERR_BAD_REQUEST
76         if not sid or not blocksize: err=ERR_BAD_REQUEST
77         elif sid in self._streams.keys(): err=ERR_UNEXPECTED_REQUEST
78         if err: rep=Error(stanza,err)
79         else:
80             self.DEBUG("Opening stream: id %s, block-size %s"%(sid,blocksize),'info')
81             rep=Protocol('iq',stanza.getFrom(),'result',stanza.getTo(),{'id':stanza.getID()})
82             self._streams[sid]={'direction':'<'+str(stanza.getFrom()),'block-size':blocksize,'fp':open('/tmp/xmpp_file_'+sid,'w'),'seq':0,'syn_id':stanza.getID()}
83         conn.send(rep)
84
85     def OpenStream(self,sid,to,fp,blocksize=3000):
86         """ Start new stream. You should provide stream id 'sid', the endpoind jid 'to',
87             the file object containing info for send 'fp'. Also the desired blocksize can be specified.
88             Take into account that recommended stanza size is 4k and IBB uses base64 encoding
89             that increases size of data by 1/3."""
90         if sid in self._streams.keys(): return
91         if not JID(to).getResource(): return
92         self._streams[sid]={'direction':'|>'+to,'block-size':blocksize,'fp':fp,'seq':0}
93         self._owner.RegisterCycleHandler(self.SendHandler)
94         syn=Protocol('iq',to,'set',payload=[Node(NS_IBB+' open',{'sid':sid,'block-size':blocksize})])
95         self._owner.send(syn)
96         self._streams[sid]['syn_id']=syn.getID()
97         return self._streams[sid]
98
99     def SendHandler(self,conn):
100         """ Send next portion of data if it is time to do it. Used internally. """
101         self.DEBUG('SendHandler called','info')
102         for sid in self._streams.keys():
103             stream=self._streams[sid]
104             if stream['direction'][:2]=='|>': cont=1
105             elif stream['direction'][0]=='>':
106                 chunk=stream['fp'].read(stream['block-size'])
107                 if chunk:
108                     datanode=Node(NS_IBB+' data',{'sid':sid,'seq':stream['seq']},base64.encodestring(chunk))
109                     stream['seq']+=1
110                     if stream['seq']==65536: stream['seq']=0
111                     conn.send(Protocol('message',stream['direction'][1:],payload=[datanode,self._ampnode]))
112                 else:
113                     """ notify the other side about stream closing
114                         notify the local user about sucessfull send
115                         delete the local stream"""
116                     conn.send(Protocol('iq',stream['direction'][1:],'set',payload=[Node(NS_IBB+' close',{'sid':sid})]))
117                     conn.Event(self.DBG_LINE,'SUCCESSFULL SEND',stream)
118                     del self._streams[sid]
119                     self._owner.UnregisterCycleHandler(self.SendHandler)
120
121                     """
122 <message from='romeo@montague.net/orchard' to='juliet@capulet.com/balcony' id='msg1'>
123   <data xmlns='http://jabber.org/protocol/ibb' sid='mySID' seq='0'>
124     qANQR1DBwU4DX7jmYZnncmUQB/9KuKBddzQH+tZ1ZywKK0yHKnq57kWq+RFtQdCJ
125     WpdWpR0uQsuJe7+vh3NWn59/gTc5MDlX8dS9p0ovStmNcyLhxVgmqS8ZKhsblVeu
126     IpQ0JgavABqibJolc3BKrVtVV1igKiX/N7Pi8RtY1K18toaMDhdEfhBRzO/XB0+P
127     AQhYlRjNacGcslkhXqNjK5Va4tuOAPy2n1Q8UUrHbUd0g+xJ9Bm0G0LZXyvCWyKH
128     kuNEHFQiLuCY6Iv0myq6iX6tjuHehZlFSh80b5BVV9tNLwNR5Eqz1klxMhoghJOA
129   </data>
130   <amp xmlns='http://jabber.org/protocol/amp'>
131     <rule condition='deliver-at' value='stored' action='error'/>
132     <rule condition='match-resource' value='exact' action='error'/>
133   </amp>
134 </message>
135 """
136
137     def ReceiveHandler(self,conn,stanza):
138         """ Receive next portion of incoming datastream and store it write
139             it to temporary file. Used internally.
140         """
141         sid,seq,data=stanza.getTagAttr('data','sid'),stanza.getTagAttr('data','seq'),stanza.getTagData('data')
142         self.DEBUG('ReceiveHandler called sid->%s seq->%s'%(sid,seq),'info')
143         try: seq=int(seq); data=base64.decodestring(data)
144         except: seq=''; data=''
145         err=None
146         if not sid in self._streams.keys(): err=ERR_ITEM_NOT_FOUND
147         else:
148             stream=self._streams[sid]
149             if not data: err=ERR_BAD_REQUEST
150             elif seq<>stream['seq']: err=ERR_UNEXPECTED_REQUEST
151             else:
152                 self.DEBUG('Successfull receive sid->%s %s+%s bytes'%(sid,stream['fp'].tell(),len(data)),'ok')
153                 stream['seq']+=1
154                 stream['fp'].write(data)
155         if err:
156             self.DEBUG('Error on receive: %s'%err,'error')
157             conn.send(Error(Iq(to=stanza.getFrom(),frm=stanza.getTo(),payload=[Node(NS_IBB+' close')]),err,reply=0))
158
159     def StreamCloseHandler(self,conn,stanza):
160         """ Handle stream closure due to all data transmitted.
161             Raise xmpppy event specifying successfull data receive. """
162         sid=stanza.getTagAttr('close','sid')
163         self.DEBUG('StreamCloseHandler called sid->%s'%sid,'info')
164         if sid in self._streams.keys():
165             conn.send(stanza.buildReply('result'))
166             conn.Event(self.DBG_LINE,'SUCCESSFULL RECEIVE',self._streams[sid])
167             del self._streams[sid]
168         else: conn.send(Error(stanza,ERR_ITEM_NOT_FOUND))
169
170     def StreamBrokenHandler(self,conn,stanza):
171         """ Handle stream closure due to all some error while receiving data.
172             Raise xmpppy event specifying unsuccessfull data receive. """
173         syn_id=stanza.getID()
174         self.DEBUG('StreamBrokenHandler called syn_id->%s'%syn_id,'info')
175         for sid in self._streams.keys():
176             stream=self._streams[sid]
177             if stream['syn_id']==syn_id:
178                 if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream)
179                 else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream)
180                 del self._streams[sid]
181
182     def StreamOpenReplyHandler(self,conn,stanza):
183         """ Handle remote side reply about is it agree or not to receive our datastream.
184             Used internally. Raises xmpppy event specfiying if the data transfer
185             is agreed upon."""
186         syn_id=stanza.getID()
187         self.DEBUG('StreamOpenReplyHandler called syn_id->%s'%syn_id,'info')
188         for sid in self._streams.keys():
189             stream=self._streams[sid]
190             if stream['syn_id']==syn_id:
191                 if stanza.getType()=='error':
192                     if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream)
193                     else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream)
194                     del self._streams[sid]
195                 elif stanza.getType()=='result':
196                     if stream['direction'][0]=='|':
197                         stream['direction']=stream['direction'][1:]
198                         conn.Event(self.DBG_LINE,'STREAM COMMITTED',stream)
199                     else: conn.send(Error(stanza,ERR_UNEXPECTED_REQUEST))

UCC git Repository :: git.ucc.asn.au