Attachment 'clientqueuetransport.py'
Download 1 #!/usr/bin/env python
2 #
3 # clientqueuetransport.py
4 #
5 # CVS/SVN Info
6 #----------------------------------
7 # $Author: $
8 # $Date: $
9 # $Revision: $
10 #----------------------------------
11
12 import logging, errno, select, time, socket, re
13 import random
14 from datetime import datetime, timedelta
15
16
17 class TransportException(Exception):
18 def __init__(self, msg):
19 super(TransportException, self).__init__(msg)
20
21
22 class StompException(Exception):
23 def __init__(self, msg):
24 super(StompException, self).__init__(msg)
25
26
27 class ClientTransport(object):
28 """
29 This class acts as the client transport for the backend service.
30 """
31 __slots__ = ('__POLL_FLAGS', '__conns', '__services', '__log')
32
33 __POLL_FLAGS = select.POLLPRI | select.POLLIN | select.POLLOUT
34
35 def __init__(self, services, loggerName=''):
36 """
37 Set up the transport.
38
39 Example:: [((host, port), handler instance)...]
40
41 @param services: A list of tuples. Each tuple contains a tuple of the
42 host and port and a handler instance.
43 @keyword loggerName: The logger name, the default is the root logger,
44 if C{None} is passed no logging will occur.
45 """
46 self.__services = services
47 self.__conns = {}
48 if loggerName is not None: self.__log = logging.getLogger(loggerName)
49
50 def connect(self, timeout=5):
51 """
52 Connect to the backend services.
53
54 @param timeout: The connection timeout, defaults to 5 seconds.
55 """
56 try:
57 for server in self.__services:
58 later = datetime.now() + timedelta(seconds=timeout)
59 hp , handler = server
60
61 while later > datetime.now():
62 conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
63
64 if not conn:
65 time.sleep(1)
66 continue
67 else:
68 conn.connect(hp)
69 conn.setblocking(0)
70 fd = conn.fileno()
71 self.__conns[fd] = handler
72
73 if self.__log:
74 self.__log.debug("Opened connection: %s, fd: %s",
75 conn, fd)
76
77 handler.setConnection(conn)
78 handler.start()
79 break
80 else:
81 msg = "Error, no connection could be made to: %s" % hp
82 raise TransportException(msg)
83 except socket.error, e:
84 msg = "Could not connect to the socket, %s\n" % e
85 raise TransportException(msg)
86 except Exception, e:
87 msg = "Unknown error, %s\n" % e
88 raise TransportException(msg)
89
90 def close(self):
91 """
92 Close all server connections.
93 """
94 for fd, handler in self.__conns.items():
95 handler.end()
96 conn = handler.getConnection()
97
98 if self.__log:
99 self.__log.debug("Closing connection: %s, fd: %s", conn, fd)
100
101 conn.close()
102
103 self.__conns.clear()
104
105 def poll(self, timeout=0.0):
106 """
107 Find anything in the queue then respond through the processEvent()
108 method.
109
110 @param timeout: The time to allow poll to find a message.
111 """
112 if timeout is not None:
113 # Convert to milliseconds.
114 timeout = int(timeout*1000)
115
116 poller = select.poll()
117
118 for fd in self.__conns:
119 poller.register(fd, self.__POLL_FLAGS)
120
121 fdt = []
122 receipt = False
123
124 while True:
125 try:
126 fdt[:] = poller.poll(timeout)
127 except select.error, e:
128 if e[0] != errno.EINTR:
129 msg = "Could not poll for data, %s" % e
130 raise TransportException(msg)
131
132 if fdt:
133 try:
134 for fd, event in fdt:
135 reciept = self.__conns.get(fd).processEvent(event)
136 except Exception, e:
137 conn = self.__conns.get(fd).getConnection()
138 msg = "Handler error, connection: %s, fd: %s, %s" % \
139 (conn, fd, e)
140 if self.__log: self.__log.error(msg, exc_info=True)
141 raise TransportException(msg)
142
143 if reciept: break
144
145
146 class BaseClientHandler(object):
147 """
148 This is the base client handler.
149 """
150 __slots__ = ('__MSG_REGEX', '__msgRegEx', '_log', '_conn',)
151
152 __MSG_REGEX = r"(\n[ \t]+)|([ \t]+\n)"
153
154 def __init__(self, loggerName=''):
155 """
156 The BaseClientHandler constructor.
157
158 @keyword loggerName: The logger name, the default is the root logger,
159 if C{None} is passed no logging will occur.
160 Note: If C{None} is passed you cannot use the
161 C{self._log} member object or you must test for
162 it being C{is not None} in your code.
163 """
164 self.__msgRegEx = re.compile(self.__MSG_REGEX)
165 self._log = None
166 if loggerName is not None: self._log = logging.getLogger(loggerName)
167 self._conn = None
168
169 def _cleanText(self, value):
170 """
171 Remove extra whitespace preceded by or succeeded by a linefeed from
172 multiline text then add a single newline. This method can be used in
173 multi-line strings to lop off the leading and trailing white space
174 before the actual text.
175
176 @param value: The Multiline text statement.
177 @return: the striped text.
178 """
179 return self.__msgRegEx.sub('\n', value).lstrip('\n')
180
181 def setConnection(self, conn):
182 """
183 Called by ClientTransport to set the connection.
184
185 @param conn: The socket connection.
186 """
187 self._conn = conn
188
189 def getConnection(self):
190 """
191 Called by ClientTransport to get the connection.
192
193 @return: The socket connection.
194 """
195 return self._conn
196
197 def start(self):
198 """
199 Called by ClientTransport to initialize the connection.
200 """
201 msg = "Programming error, must impliment the start() method."
202 raise NotImplementedError(msg)
203
204 def end(self):
205 """
206 Called by ClientTransport to terminate the connection.
207 """
208 msg = "Programming error, must impliment the end() method."
209 raise NotImplementedError(msg)
210
211 def processEvent(self, event):
212 """
213 Called by ClientTransport to process the incomming connection state.
214
215 @return: Status, C{True} is completed else C{False}.
216 """
217 receipt = False
218
219 if event & select.POLLIN:
220 receipt = self.incomingEvent()
221 elif event & select.POLLPRI:
222 receipt = self.incomingPriorityEvent()
223 elif event & select.POLLOUT:
224 receipt = self.outgoingEvent()
225 elif event & select.POLLERR:
226 receipt = self.errorEvent()
227 elif evenr & select.POLLHUP:
228 receipt = self.hungupEvent()
229 else: # POLLNVAL
230 receipt = self.invalidEvent()
231
232 return receipt
233
234 ##############################################
235 # These methods need not all be inplimented. #
236 ##############################################
237 def incomingEvent(self):
238 """
239 Handle an incoming event.
240
241 @return: Status, C{True} is completed else C{False}.
242 """
243 return False
244
245 def incomingPriorityEvent(self):
246 """
247 Handle an incoming priority event.
248
249 @return: Status, C{True} is completed else C{False}.
250 """
251 return False
252
253 def outgoingEvent(self):
254 """
255 Handle an outgoing event.
256
257 @return: Status, C{True} is completed else C{False}.
258 """
259 return False
260
261 def errorEvent(self):
262 """
263 Handle an error event.
264
265 @return: Status, C{True} is completed else C{False}.
266 """
267 return False
268
269 def hungupEvent(self):
270 """
271 Handle a hung up event.
272
273 @return: Status, C{True} is completed else C{False}.
274 """
275 return False
276
277 def invalidEvent(self):
278 """
279 Handle an invalid event.
280
281 @return: Status, C{True} is completed else C{False}.
282 """
283 return False
284
285 class StompHandler(BaseClientHandler):
286 """
287 This class sends data to the queue through the STOMP protocol.
288 """
289
290 __slots__ = ('__TRANS_ID_PREFIX', '__RECEIPT_ID_PREFIX', '__queue',
291 '__username', '__passcode', '__data', '__connected', '__sent',
292 '__transId', '__receiptId',)
293
294 __TRANS_ID_PREFIX = "transaction-%s"
295 __RECEIPT_ID_PREFIX = "receipt-%s"
296
297 def __init__(self, queue, data, username='', passcode='', loggerName=''):
298 """
299 StompHandler constructor
300
301 @param queue: The queue to use.
302 @param data: The data to send.
303 @keyword username: The user name to sign into the queue with.
304 @keyword passcode: The password to sign into the queue with.
305 @keyword loggerName: The logger name, the default is the root logger,
306 if C{None} is passed no logging will occur.
307 """
308 super(StompHandler, self).__init__(loggerName=loggerName)
309 self.__queue = queue
310 self.__data = data
311 self.__username = username
312 self.__passcode = passcode
313 self.__connected = False
314 self.__sent = False
315 self.__transId = None
316 self.__receiptId = None
317
318 def start(self):
319 """
320 Called by ClientTransport to initialize the stomp connection.
321 """
322 self._connect()
323 self._subscribe()
324
325 def end(self):
326 """
327 Called by ClientTransport to terminate the stomp connection.
328 """
329 self._unsubscribe()
330 self._disconnect()
331
332 def incomingEvent(self):
333 """
334 Get the data from the queue and parse it based on type.
335 """
336 receipt = False
337 message = self._conn.recv(4096)
338
339 if 'CONNECTED' == message[0:9]:
340 self.__connected = True
341 elif 'RECEIPT' == message[0:7]:
342 headers = dict([(header.split(':'))
343 for header in message.split('\n')
344 if len(header.split(':')) == 2])
345 receipt = headers.get('receipt-id')
346
347 if self.__receiptId != receipt:
348 msg = "Invalid receipt should be: %s, received: %s" % \
349 (self.__receiptId, receipt)
350 raise StompException(msg)
351
352 receipt = True
353 elif 'MESSAGE' == message[0:7]:
354 pass
355 else:
356 msg = "Received an invalid message from the queue: %s" % \
357 message
358 raise StompException(msg)
359
360 return receipt
361
362 def incomingPriorityEvent(self):
363 self.incomingEvent()
364
365 def outgoingEvent(self):
366 if not self.__sent and self.__connected:
367 self.__sent = True
368 self._send()
369
370 return False
371
372 def _send(self):
373 """
374 Send the data to the queue.
375 """
376 number = random.randint(1, 10000000)
377 self.__receiptId = self.__RECEIPT_ID_PREFIX % number
378 contentLength = len(self.__data)
379 send = """
380 SEND
381 destination: %s
382 receipt: %s
383 Content-Length: %s
384
385 %s
386 \x00
387 """ % (self.__queue, self.__receiptId, contentLength, self.__data)
388 self._conn.send(self._cleanText(send))
389
390 def _begin(self):
391 """
392 Send a bigin transaction.
393 """
394 number = random.randint(1, 10000000)
395 self.__transId = self.__TRANS_ID_PREFIX % number
396 begin = """
397 BIGIN
398 transaction: %s
399
400 \x00
401 """ % self.__transId
402 self._conn.send(self._cleanText(begin))
403
404 def _commit(self):
405 """
406 Send a commit transaction.
407 """
408 commit = """
409 COMMIT
410 transaction: %s
411
412 \x00
413 """ % self.__transId
414 self._conn.send(self._cleanText(commit))
415
416 def _connect(self):
417 """
418 Connect to the queue through STOMP.
419 """
420 connect = """
421 CONNECT
422 login: %s
423 passcode: %s
424
425 \x00
426 """ % (self.__username, self.__passcode)
427 self._conn.send(self._cleanText(connect))
428
429 def _subscribe(self):
430 """
431 Subscribe to the queue through STOMP.
432 """
433 subscribe = """
434 SUBSCRIBE
435 destination: %s
436 activemq.prefetchSize: 1
437 activemq.noLocal: true
438
439 \x00
440 """ % self.__queue
441 self._conn.send(self._cleanText(subscribe))
442
443 def _unsubscribe(self):
444 """
445 Unsubscribe from the queue through STOMP.
446 """
447 unsubscribe = """
448 UNSUBSCRIBE
449 destination: %s
450
451 \x00
452 """ % self.__queue
453 self._conn.send(self._cleanText(unsubscribe))
454
455 def _disconnect(self):
456 """
457 Disconnect from the queue through STOMP.
458 """
459 disconnect = """
460 DISCONNECT
461
462 \x00
463 """
464 self._conn.send(self._cleanText(disconnect))
465
466
467 if __name__ == '__main__':
468 import sys, traceback
469
470 data = "My first data in the queue!!"
471 queueName = "/queue/test"
472
473 try:
474 sh = StompHandler(queueName, data)
475 servers = [(('localhost', 61613), sh),]
476 ct = ClientTransport(servers)
477 ct.connect(timeout=0.25)
478 ct.poll(timeout=5)
479 ct.close()
480 except Exception, e:
481 print "%s: %s\n" % (sys.exc_info()[0], sys.exc_info()[1])
482 traceback.print_tb(sys.exc_info()[2])
483 sys.exit(1)
484 finally:
485 logging.shutdown()
486
487 sys.exit(0)
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.You are not allowed to attach a file to this page.