Attachment 'clientqueuetransport.py'

Download

   1 #!/usr/bin/env python
   2 #
   3 # clientqueuetransport.py
   4 #
   5 # CVS/SVN Info
   6 #----------------------------------
   7 # $Author: $
   8 # $Date: $
   9 # $Revision: $
  10 #----------------------------------
  11 
  12 import logging, errno, select, time, socket, re
  13 import random
  14 from datetime import datetime, timedelta
  15 
  16 
  17 class TransportException(Exception):
  18     def __init__(self, msg):
  19         super(TransportException, self).__init__(msg)
  20 
  21 
  22 class StompException(Exception):
  23     def __init__(self, msg):
  24         super(StompException, self).__init__(msg)
  25 
  26 
  27 class ClientTransport(object):
  28     """
  29     This class acts as the client transport for the backend service.
  30     """
  31     __slots__ = ('__POLL_FLAGS', '__conns', '__services', '__log')
  32 
  33     __POLL_FLAGS = select.POLLPRI | select.POLLIN | select.POLLOUT
  34 
  35     def __init__(self, services, loggerName=''):
  36         """
  37         Set up the transport.
  38 
  39         Example:: [((host, port), handler instance)...]
  40 
  41         @param services: A list of tuples. Each tuple contains a tuple of the
  42                          host and port and a handler instance.
  43         @keyword loggerName: The logger name, the default is the root logger,
  44                              if C{None} is passed no logging will occur.
  45         """
  46         self.__services = services
  47         self.__conns = {}
  48         if loggerName is not None: self.__log = logging.getLogger(loggerName)
  49 
  50     def connect(self, timeout=5):
  51         """
  52         Connect to the backend services.
  53 
  54         @param timeout: The connection timeout, defaults to 5 seconds.
  55         """
  56         try:
  57             for server in self.__services:
  58                 later = datetime.now() + timedelta(seconds=timeout)
  59                 hp , handler = server
  60 
  61                 while later > datetime.now():
  62                     conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  63 
  64                     if not conn:
  65                         time.sleep(1)
  66                         continue
  67                     else:
  68                         conn.connect(hp)
  69                         conn.setblocking(0)
  70                         fd = conn.fileno()
  71                         self.__conns[fd] = handler
  72 
  73                         if self.__log:
  74                             self.__log.debug("Opened connection: %s, fd: %s",
  75                                              conn, fd)
  76 
  77                         handler.setConnection(conn)
  78                         handler.start()
  79                         break
  80                 else:
  81                     msg = "Error, no connection could be made to: %s" % hp
  82                     raise TransportException(msg)
  83         except socket.error, e:
  84             msg = "Could not connect to the socket, %s\n" % e
  85             raise TransportException(msg)
  86         except Exception, e:
  87             msg = "Unknown error, %s\n" % e
  88             raise TransportException(msg)
  89 
  90     def close(self):
  91         """
  92         Close all server connections.
  93         """
  94         for fd, handler in self.__conns.items():
  95             handler.end()
  96             conn = handler.getConnection()
  97 
  98             if self.__log:
  99                 self.__log.debug("Closing connection: %s, fd: %s", conn, fd)
 100 
 101             conn.close()
 102 
 103         self.__conns.clear()
 104 
 105     def poll(self, timeout=0.0):
 106         """
 107         Find anything in the queue then respond through the processEvent()
 108         method.
 109 
 110         @param timeout: The time to allow poll to find a message.
 111         """
 112         if timeout is not None:
 113             # Convert to milliseconds.
 114             timeout = int(timeout*1000)
 115 
 116         poller = select.poll()
 117 
 118         for fd in self.__conns:
 119             poller.register(fd, self.__POLL_FLAGS)
 120 
 121         fdt = []
 122         receipt = False
 123 
 124         while True:
 125             try:
 126                 fdt[:] = poller.poll(timeout)
 127             except select.error, e:
 128                 if e[0] != errno.EINTR:
 129                     msg = "Could not poll for data, %s" % e
 130                     raise TransportException(msg)
 131 
 132             if fdt:
 133                 try:
 134                     for fd, event in fdt:
 135                         reciept = self.__conns.get(fd).processEvent(event)
 136                 except Exception, e:
 137                     conn = self.__conns.get(fd).getConnection()
 138                     msg = "Handler error, connection: %s, fd: %s, %s" % \
 139                           (conn, fd, e)
 140                     if self.__log: self.__log.error(msg, exc_info=True)
 141                     raise TransportException(msg)
 142 
 143             if reciept: break
 144 
 145 
 146 class BaseClientHandler(object):
 147     """
 148     This is the base client handler.
 149     """
 150     __slots__ = ('__MSG_REGEX', '__msgRegEx', '_log', '_conn',)
 151 
 152     __MSG_REGEX = r"(\n[ \t]+)|([ \t]+\n)"
 153 
 154     def __init__(self, loggerName=''):
 155         """
 156         The BaseClientHandler constructor.
 157 
 158         @keyword loggerName: The logger name, the default is the root logger,
 159                              if C{None} is passed no logging will occur.
 160                              Note: If C{None} is passed you cannot use the
 161                              C{self._log} member object or you must test for
 162                              it being C{is not None} in your code.
 163         """
 164         self.__msgRegEx = re.compile(self.__MSG_REGEX)
 165         self._log = None
 166         if loggerName is not None: self._log = logging.getLogger(loggerName)
 167         self._conn = None
 168 
 169     def _cleanText(self, value):
 170         """
 171         Remove extra whitespace preceded by or succeeded by a linefeed from
 172         multiline text then add a single newline. This method can be used in
 173         multi-line strings to lop off the leading and trailing white space
 174         before the actual text.
 175 
 176         @param value: The Multiline text statement.
 177         @return: the striped text.
 178         """
 179         return self.__msgRegEx.sub('\n', value).lstrip('\n')
 180 
 181     def setConnection(self, conn):
 182         """
 183         Called by ClientTransport to set the connection.
 184 
 185         @param conn: The socket connection.
 186         """
 187         self._conn = conn
 188 
 189     def getConnection(self):
 190         """
 191         Called by ClientTransport to get the connection.
 192 
 193         @return: The socket connection.
 194         """
 195         return self._conn
 196 
 197     def start(self):
 198         """
 199         Called by ClientTransport to initialize the connection.
 200         """
 201         msg = "Programming error, must impliment the start() method."
 202         raise NotImplementedError(msg)
 203 
 204     def end(self):
 205         """
 206         Called by ClientTransport to terminate the connection.
 207         """
 208         msg = "Programming error, must impliment the end() method."
 209         raise NotImplementedError(msg)
 210 
 211     def processEvent(self, event):
 212         """
 213         Called by ClientTransport to process the incomming connection state.
 214 
 215         @return: Status, C{True} is completed else C{False}.
 216         """
 217         receipt = False
 218 
 219         if event & select.POLLIN:
 220             receipt = self.incomingEvent()
 221         elif event & select.POLLPRI:
 222             receipt = self.incomingPriorityEvent()
 223         elif event & select.POLLOUT:
 224             receipt = self.outgoingEvent()
 225         elif event & select.POLLERR:
 226             receipt = self.errorEvent()
 227         elif evenr & select.POLLHUP:
 228             receipt = self.hungupEvent()
 229         else:  # POLLNVAL
 230             receipt = self.invalidEvent()
 231 
 232         return receipt
 233 
 234     ##############################################
 235     # These methods need not all be inplimented. #
 236     ##############################################
 237     def incomingEvent(self):
 238         """
 239         Handle an incoming event.
 240 
 241         @return: Status, C{True} is completed else C{False}.
 242         """
 243         return False
 244 
 245     def incomingPriorityEvent(self):
 246         """
 247         Handle an incoming priority event.
 248 
 249         @return: Status, C{True} is completed else C{False}.
 250         """
 251         return False
 252 
 253     def outgoingEvent(self):
 254         """
 255         Handle an outgoing event.
 256 
 257         @return: Status, C{True} is completed else C{False}.
 258         """
 259         return False
 260 
 261     def errorEvent(self):
 262         """
 263         Handle an error event.
 264 
 265         @return: Status, C{True} is completed else C{False}.
 266         """
 267         return False
 268 
 269     def hungupEvent(self):
 270         """
 271         Handle a hung up event.
 272 
 273         @return: Status, C{True} is completed else C{False}.
 274         """
 275         return False
 276 
 277     def invalidEvent(self):
 278         """
 279         Handle an invalid event.
 280 
 281         @return: Status, C{True} is completed else C{False}.
 282         """
 283         return False
 284 
 285 class StompHandler(BaseClientHandler):
 286     """
 287     This class sends data to the queue through the STOMP protocol.
 288     """
 289 
 290     __slots__ = ('__TRANS_ID_PREFIX', '__RECEIPT_ID_PREFIX', '__queue',
 291                  '__username', '__passcode', '__data', '__connected', '__sent',
 292                  '__transId', '__receiptId',)
 293 
 294     __TRANS_ID_PREFIX = "transaction-%s"
 295     __RECEIPT_ID_PREFIX = "receipt-%s"
 296 
 297     def __init__(self, queue, data, username='', passcode='', loggerName=''):
 298         """
 299         StompHandler constructor
 300 
 301         @param queue: The queue to use.
 302         @param data: The data to send.
 303         @keyword username: The user name to sign into the queue with.
 304         @keyword passcode: The password to sign into the queue with.
 305         @keyword loggerName: The logger name, the default is the root logger,
 306                              if C{None} is passed no logging will occur.
 307         """
 308         super(StompHandler, self).__init__(loggerName=loggerName)
 309         self.__queue = queue
 310         self.__data = data
 311         self.__username = username
 312         self.__passcode = passcode
 313         self.__connected = False
 314         self.__sent = False
 315         self.__transId = None
 316         self.__receiptId = None
 317 
 318     def start(self):
 319         """
 320         Called by ClientTransport to initialize the stomp connection.
 321         """
 322         self._connect()
 323         self._subscribe()
 324 
 325     def end(self):
 326         """
 327         Called by ClientTransport to terminate the stomp connection.
 328         """
 329         self._unsubscribe()
 330         self._disconnect()
 331 
 332     def incomingEvent(self):
 333         """
 334         Get the data from the queue and parse it based on type.
 335         """
 336         receipt = False
 337         message = self._conn.recv(4096)
 338 
 339         if 'CONNECTED' == message[0:9]:
 340             self.__connected = True
 341         elif 'RECEIPT' == message[0:7]:
 342             headers = dict([(header.split(':'))
 343                             for header in message.split('\n')
 344                             if len(header.split(':')) == 2])
 345             receipt = headers.get('receipt-id')
 346 
 347             if self.__receiptId != receipt:
 348                 msg = "Invalid receipt should be: %s, received: %s" % \
 349                       (self.__receiptId, receipt)
 350                 raise StompException(msg)
 351 
 352             receipt = True
 353         elif 'MESSAGE' == message[0:7]:
 354             pass
 355         else:
 356             msg = "Received an invalid message from the queue: %s" % \
 357                   message
 358             raise StompException(msg)
 359 
 360         return receipt
 361 
 362     def incomingPriorityEvent(self):
 363         self.incomingEvent()
 364 
 365     def outgoingEvent(self):
 366         if not self.__sent and self.__connected:
 367             self.__sent = True
 368             self._send()
 369 
 370         return False
 371 
 372     def _send(self):
 373         """
 374         Send the data to the queue.
 375         """
 376         number = random.randint(1, 10000000)
 377         self.__receiptId = self.__RECEIPT_ID_PREFIX % number
 378         contentLength = len(self.__data)
 379         send = """
 380         SEND
 381         destination: %s
 382         receipt: %s
 383         Content-Length: %s
 384 
 385         %s
 386         \x00
 387         """ % (self.__queue, self.__receiptId, contentLength, self.__data)
 388         self._conn.send(self._cleanText(send))
 389 
 390     def _begin(self):
 391         """
 392         Send a bigin transaction.
 393         """
 394         number = random.randint(1, 10000000)
 395         self.__transId = self.__TRANS_ID_PREFIX % number
 396         begin = """
 397         BIGIN
 398         transaction: %s
 399 
 400         \x00
 401         """ % self.__transId
 402         self._conn.send(self._cleanText(begin))
 403 
 404     def _commit(self):
 405         """
 406         Send a commit transaction.
 407         """
 408         commit = """
 409         COMMIT
 410         transaction: %s
 411 
 412         \x00
 413         """ % self.__transId
 414         self._conn.send(self._cleanText(commit))
 415 
 416     def _connect(self):
 417         """
 418         Connect to the queue through STOMP.
 419         """
 420         connect = """
 421         CONNECT
 422         login: %s
 423         passcode: %s
 424 
 425         \x00
 426         """ % (self.__username, self.__passcode)
 427         self._conn.send(self._cleanText(connect))
 428 
 429     def _subscribe(self):
 430         """
 431         Subscribe to the queue through STOMP.
 432         """
 433         subscribe = """
 434         SUBSCRIBE
 435         destination: %s
 436         activemq.prefetchSize: 1
 437         activemq.noLocal: true
 438 
 439         \x00
 440         """ % self.__queue
 441         self._conn.send(self._cleanText(subscribe))
 442 
 443     def _unsubscribe(self):
 444         """
 445         Unsubscribe from the queue through STOMP.
 446         """
 447         unsubscribe = """
 448         UNSUBSCRIBE
 449         destination: %s
 450 
 451         \x00
 452         """ % self.__queue
 453         self._conn.send(self._cleanText(unsubscribe))
 454 
 455     def _disconnect(self):
 456         """
 457         Disconnect from the queue through STOMP.
 458         """
 459         disconnect = """
 460         DISCONNECT
 461 
 462         \x00
 463         """
 464         self._conn.send(self._cleanText(disconnect))
 465 
 466 
 467 if __name__ == '__main__':
 468     import sys, traceback
 469 
 470     data = "My first data in the queue!!"
 471     queueName = "/queue/test"
 472 
 473     try:
 474         sh = StompHandler(queueName, data)
 475         servers = [(('localhost', 61613), sh),]
 476         ct = ClientTransport(servers)
 477         ct.connect(timeout=0.25)
 478         ct.poll(timeout=5)
 479         ct.close()
 480     except Exception, e:
 481         print "%s: %s\n" % (sys.exc_info()[0], sys.exc_info()[1])
 482         traceback.print_tb(sys.exc_info()[2])
 483         sys.exit(1)
 484     finally:
 485         logging.shutdown()
 486 
 487     sys.exit(0)

Attached Files

To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.

You are not allowed to attach a file to this page.