Attachment 'AsyncoreHTTPServer.py'
Download 1 #
2 # AsyncoreHTTPServer.py
3 #
4 # CVS/SVN Info
5 #----------------------------------
6 # $Author: cnobile $
7 # $Date: 2010-03-04 20:50:34 $
8 # $Revision: 1.17 $
9 #----------------------------------
10 ##########################################################################
11 # Copyright (c) 2010 Carl J. Nobile.
12 # All rights reserved. This program and the accompanying materials
13 # are made available under the terms of the Eclipse Public License v1.0
14 # which accompanies this distribution, and is available at
15 # http://www.eclipse.org/legal/epl-v10.html
16 #
17 # Contributors:
18 # Carl J. Nobile - initial API and implementation
19 ##########################################################################
20
21 import sys, os, logging, socket, time, types, thread
22 import asyncore, select, traceback
23
24 from asynchat import async_chat
25 from datetime import datetime, timedelta
26 from sgmllib import SGMLParser
27 from email.Parser import Parser
28 from email.Message import Message
29 from StringIO import StringIO
30
31
32 __version__ = "0.4.1"
33
34
35 # These are the response status codes gleaned from RFC-2616 and other RFCs.
36 STATUS_CODES = {
37 # Informational
38 100: "Continue",
39 101: "Switching Protocols",
40 102: "Processing",
41 # Successful
42 200: "OK",
43 201: "Created",
44 202: "Accepted",
45 203: "Non-Authoritative Information",
46 204: "No Content",
47 205: "Reset Content",
48 206: "Partial Content",
49 207: "Multi-Status",
50 # Redirection
51 300: "Multiple Choices",
52 301: "Moved Permanently",
53 302: "Found",
54 303: "See Other",
55 304: "Not Modified",
56 305: "Use Proxy",
57 #306: "Unused",
58 307: "Temporary Redirect",
59 # Client Error
60 400: "Bad Request",
61 401: "Unauthorized",
62 402: "Payment Required",
63 403: "Forbidden",
64 404: "Not Found",
65 405: "Method Not Allowed",
66 406: "Not Acceptable",
67 407: "Proxy Authentication Required",
68 408: "Request Time-out",
69 409: "Conflict",
70 410: "Gone",
71 411: "Length Required",
72 412: "Precondition Failed",
73 413: "Request Entity Too Large",
74 414: "Request-URI Too Large",
75 415: "Unsupported Media Type",
76 416: "Requested range not satisfiable",
77 417: "Expectation Failed",
78 418: "Resume Incomplete",
79 419: "Insufficient Space On Resource",
80 420: "Method Failure",
81 #421: "Unused",
82 422: "Unprocessable Entity",
83 423: "Locked",
84 424: "Failed Dependency",
85 425: "No Code",
86 426: "Upgrade Required",
87 # Server Error
88 500: "Internal Server Error",
89 501: "Not Implemented",
90 502: "Bad Gateway",
91 503: "Service Unavailable",
92 504: "Gateway Time-out",
93 505: "HTTP Version not supported",
94 507: "Insufficient Storage",
95 #508: "Unused",
96 #509: "Unused",
97 510: "Not Extended",
98 }
99
100 # Default error message
101 DEFAULT_ERROR_MESSAGE = """\
102 <head>
103 <title>Error response</title>
104 </head>
105 <body>
106 <h1>Error response</h1>
107 <p>Error code %(code)d.
108 <p>Message: %(message)s.
109 <p>Error code explanation: %(explain)s.
110 </body>
111 """
112
113
114 class HTMLStripper(SGMLParser):
115 """
116 This class strips any and all HTML from the provided string.
117 """
118
119 def __init__(self):
120 """
121 The constructor for the HTMLStripper class.
122 """
123 SGMLParser.__init__(self)
124
125 def strip(self, html):
126 """
127 Strip the HTML tags from the string.
128 """
129 self.result = ""
130 self.feed(html)
131 self.close()
132 return self.result
133
134 def handle_data(self, data):
135 """
136 Collect the data, overridden from the base class.
137 """
138 self.result += data
139
140
141 class AsyncoreHTTPRequestHandler:
142 """
143 This request handler conforms to the HTTP/1.1 standard (see RFC-2616) it
144 should work properly with older protocols also. It is meant be used with
145 AsyncoreHTTPServer.
146 """
147 errorMessageHTML = DEFAULT_ERROR_MESSAGE
148
149 def __init__(self, channel, msgType, message):
150 """
151 SimpleAsyncoreRequestHandler constructor.
152
153 @param channel: The channel class object.
154 @param msgType: A tuple -- (method, path, protocol)
155 @param message: An C{email.Message.Message} object.
156 """
157 self.server = channel.server
158 self.channel = channel
159 self.method = msgType[0]
160 self.path = msgType[1]
161 self.protocol = msgType[2]
162 self.keepAlive = self.server.getHttpTimeout().seconds
163 # Request headers and payload (email.Message.Message) object.
164 self.requestMessage = message
165 # Response headers (email.Message.Message) object.
166 self.responseHeaders = self.makeResponseHeaders()
167 self.__closeFlag = False
168 # Default chunk size
169 self.__chunkSize = 2048
170 self.initHook()
171 self.run()
172
173 def initHook(self):
174 """
175 Override this callback if you need to execute code in the constructor
176 such as the WSGI application name or configuation data.
177 """
178 pass
179
180 def run(self):
181 """
182 Start processing the request.
183 """
184 methodName = 'do_' + self.method
185
186 if not hasattr(self, methodName):
187 self.pushError(405, "Unsupported method (%s)" % self.method)
188 return
189
190 if not self._isHostHeader(): return
191 if not self._setKeepAliveTimeout(): return
192
193 try:
194 # Execute the HTTP method.
195 getattr(self, methodName)()
196 except Exception, e:
197 self.server.handleLogging("Channel: %s, %s", self.channel.id,
198 str(e), logger=self.server.logger,
199 exc_info=True)
200 self.pushError(500, str(e))
201
202 self._conditionalCloseConnection()
203
204 def _isHostHeader(self):
205 """
206 Check that the Host header is in the request with protocol
207 version HTTP/1.1.
208
209 @return: C{True} if the header was in the request else C{False}.
210 """
211 result = True
212 host = self.requestMessage.get('Host')
213
214 if self.protocol == 'HTTP/1.1' and not host:
215 msg = "All HTTP/1.1 clients must include the 'Host' request header"
216 self.pushError(400, msg)
217 result = False
218
219 return result
220
221 def _setKeepAliveTimeout(self):
222 """
223 Set the KeepAlive timeout if it is in the request.
224
225 @return: C{True} if the header request argument was valid else
226 C{False}.
227 """
228 result = True
229 keepAlive = self.requestMessage.get('Keep-Alive')
230
231 if keepAlive:
232 if keepAlive.isdigit():
233 self.setKeepAlive(int(keepAlive))
234 else:
235 msg = "Invalid keep alive argument in request."
236 self.pushError(400, msg)
237 result = False
238
239 return result
240
241 def _conditionalCloseConnection(self):
242 """
243 Fix the Connection header depending on the protocol version.
244 """
245 keepAlive = self.requestMessage.get('Keep-Alive')
246 rsConnection = self.responseHeaders.get('Connection', '').lower()
247 rqConnection = self.requestMessage.get('Connection', '').lower()
248
249 if 'close' in (rsConnection, rqConnection) or \
250 self.protocol == 'HTTP/0.9' or \
251 (self.protocol == 'HTTP/1.0' and not keepAlive):
252 self.__closeFlag = True
253
254 def do_GET(self):
255 """
256 Respond to a GET request.
257 """
258 payload = self.__stressTestPayload()
259 self.pushHeaders(200)
260 self.pushPayload(payload)
261
262 def do_HEAD(self):
263 """
264 Respond to a HEAD request.
265 """
266 self.__stressTestPayload()
267 self.pushHeaders(200)
268
269 def __stressTestPayload(self):
270 """
271 Test method.
272
273 @return: payload
274 """
275 headers = self.responseHeaders
276 headers.add_header('Content-Type', 'text/Plain')
277 payload = "Stress Test "*86
278 headers.add_header('Content-Length', '%s' % len(payload))
279 return payload
280
281 def do_POST(self):
282 """
283 Respond to a POST request.
284 """
285 headers = self.responseHeaders
286 headers.add_header('Content-Type', 'text/html')
287 payload = self.requestMessage.get_payload().getvalue()
288
289 if len(payload) <= 0: payload = "No entity data in received request."
290
291 html = '''<?xml version="1.0" encoding="UTF-8" ?>
292 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
293 "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
294 <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
295 <head>
296 <title>Result of AsyncoreHTTPRequestHandler for AsyncoreHTTPServer.py</title>
297 <meta http-equiv="Content-Type" content="text/html;charset=UTF-8" />
298 </head>
299 <body>
300 <p>%s</p>
301 </body>
302 </html>
303 ''' % payload
304
305 headers.add_header('Content-Length', '%s' % len(html))
306 self.pushHeaders(200)
307
308 try:
309 self.pushPayload(html)
310 except Exception, e:
311 msg = "Error while pushing (POST) payload, %s" % str(e)
312 self.server.handleLogging("Channel: %s, %s", self.channel.id,
313 msg, logger=self.server.logger,
314 exc_info=True)
315 self.pushError(500, msg)
316
317 def makeResponseHeaders(self):
318 """
319 Make the response header object with default headers already set.
320
321 @return: An C{email.Message.Message} object.
322 """
323 defaultHeaders = "Date: %s\n" % self.datetimeString() + \
324 "Server: AsyncoreHTTPServer/%s\n" % __version__
325 return Parser().parsestr(defaultHeaders, headersonly=True)
326
327 def getResponseHeaders(self):
328 """
329 Get the current response header Message object.
330
331 @return: The current response header C{email.Message.Message} object.
332 """
333 return self.responseHeaders
334
335 def datetimeString(self, timestamp=None):
336 """
337 Return the current date and time formatted for a message header.
338 """
339 format = "%a, %d %b %Y %H:%M:%S GMT"
340 if not timestamp: timestamp = time.time()
341 return datetime.utcfromtimestamp(timestamp).strftime(format)
342
343 def pushHeaders(self, code, headers=None, payload=None):
344 """
345 Push headers for any request type.
346
347 @param code: The status code.
348 @keyword headers: The response header object must be type
349 C{email.Message.Message}
350 @keyword payload: The payload--used to determine the type of
351 connection.
352 """
353 self._handleHeaders(headers=headers, payload=payload)
354 self.channel.pushStatus(code, STATUS_CODES.get(code))
355 self.channel.push_with_producer(HeaderProducer(self.responseHeaders))
356 self.server.handleLogging("Channel: %s, %s", self.channel.id,
357 self.responseHeaders.as_string(),
358 logger=self.server.logger,
359 level=logging.DEBUG)
360
361 def _handleHeaders(self, headers=None, payload=None):
362 """
363 Fix any headers that we can.
364
365 @keyword headers: The response header object must be type
366 C{email.Message.Message}
367 @keyword payload: The payload--used to determine the type of
368 connection.
369 """
370 if headers:
371 self.responseHeaders = headers
372 else:
373 headers = self.responseHeaders
374
375 rsLength = headers.get('Content-Length')
376 rsKeepAlive = headers.get('Keep-Alive')
377 rsConnection = headers.get('Connection', '').lower()
378 rqKeepAlive = self.requestMessage.get('Keep-Alive')
379 rqConnection = self.requestMessage.get('Connection', '').lower()
380
381 # To set Keep-Alive for protocol HTTP/1.1; the response or request
382 # connections must not have been set to close.
383 if self.protocol == 'HTTP/1.1':
384 if 'close' != rsConnection:
385 if 'close' == rqConnection or \
386 isinstance(payload, list) or \
387 isinstance(payload, types.FileType):
388 headers.add_header('Connection', 'close')
389 else:
390 headers.add_header('Keep-Alive', '%s' % self.keepAlive)
391 headers.add_header('Connection', 'keep-alive')
392 # To set Keep-Alive for protocol HTTP/1.0 the request Keep-Alive
393 # header must have been set.
394 elif self.protocol == 'HTTP/1.0' and rqKeepAlive:
395 # Remove any response Connection header.
396 del headers['Connection']
397
398 if not rsKeepAlive:
399 headers.add_header('Keep-Alive', '%s' % self.keepAlive)
400
401 def pushPayload(self, payload):
402 """
403 Push the payload to the client.
404
405 The C{payload} object type will determine how the data is sent to
406 the client.
407 If type string it will be sent out in one chunk unless the header
408 C{Transfer-Encoding: chunked} is found.
409 if type list it will be chunked out by the size of the length in each
410 item.
411 If type file it will be sent out in chunks determined by the preset
412 chunk size.
413
414 @param payload: The entity data to send to the client.
415 """
416 if isinstance(payload, str):
417 self.channel.push(payload)
418 elif isinstance(payload, list):
419 self.channel.push_with_producer(ListProducer(payload))
420 elif isinstance(payload, types.FileType):
421 self.channel.push_with_producer(FileProducer(payload,
422 self.__chunkSize))
423 else:
424 raise TypeError("Cannot push type: %s" % type(payload))
425
426 def pushError(self, code, explain='', contentType='text/html'):
427 """
428 Push an HTML error message to the client.
429
430 @param code: The status code.
431 @param explain: The full description of the error.
432 @param contentType: Defaults to C{text/html}.
433 """
434 message = STATUS_CODES.get(code, "Invalid status %s" % code)
435 stripper = HTMLStripper()
436 content = self.errorMessageHTML % {'code': code,
437 'message': message,
438 'explain': stripper.strip(explain)}
439 # Don't use any headers already set.
440 headers = self.makeResponseHeaders()
441 headers.add_header('Content-Type', '%s' % contentType)
442 headers.add_header('Content-Length', '%s' % len(content))
443 self.pushHeaders(code)
444
445 if self.method != 'HEAD' and code >= 200 and code not in (204, 304):
446 self.channel.push(content)
447
448 def endHeaders(self):
449 """
450 Send the last CRLF.
451 NOT NEEDED if the C{pushHeaders()} method is used.
452 """
453 self.channel.push("\r\n")
454
455 def setChunkSize(self, size):
456 """
457 The chunk size.
458
459 @param size: Chunk size.
460 """
461 self.__chunkSize = size
462
463 def setKeepAlive(self, value):
464 """
465 Set the keep alive value.
466
467 @param value: The keep alive value.
468 """
469 self.keepAlive = value
470 self.channel.setHttpTimeout(value)
471
472 def getPeerName(self):
473 """
474 Get the client (peer) address and port.
475
476 @return: The peer C{(address, port)}.
477 """
478 return self.channel.addr
479
480 def getChannelID(self):
481 """
482 Get the channel ID.
483
484 @return: Channel ID
485 """
486 return self.channel.id
487
488 def getCloseStatus(self):
489 """
490 Get the status of this request
491
492 @return: If C{True} close the connection and remove the channel else
493 C{False} keep the connection alive.
494 """
495 return self.__closeFlag
496
497
498 class WSGIAsynHandler(AsyncoreHTTPRequestHandler):
499 """
500 This class is a WSGI wrapper (subclass) around the
501 AsyncoreHTTPRequestHandler.
502 """
503 __scriptExtentions = ['wsgi', 'py', 'cgi']
504 __middleware = []
505
506 def __init__(self, channel, msgType, message):
507 """
508 Constructor for WSGIAsynHandler.
509
510 @param channel: The channel class object.
511 @param msgType: A tuple -- (method, path, protocol)
512 @param message: An C{email.Message.Message} object.
513 """
514 self.__status = ''
515 self.__headersSent = False
516 self.__headersSet = False
517 self.__documentRoot = None
518 self.__applicationName = "application"
519 AsyncoreHTTPRequestHandler.__init__(self, channel, msgType, message)
520
521 def setDocumentRoot(self, path):
522 """
523 Set the document root path.
524
525 @param path: The document root.
526 """
527 self.__documentRoot = path
528
529 def setScriptExtentions(self, value=[]):
530 """
531 Set the possible script extentions.
532
533 Note: Setting this value will override the defaults C{wsgi} and C{py}.
534
535 @param value: A list of possible script extentions.
536 """
537 if value: self.__scriptExtentions[:] = value
538
539 def setApplicationName(self, name):
540 """
541 Set the WSGI callable application name.
542
543 @param name: The callable application name.
544 """
545 self.__applicationName = name
546
547 def setMiddleware(self, apps):
548 """
549 Set the middleware to run.
550
551 @param apps: A list of middleware apps.
552 """
553 if apps: self.__middleware[:] = apps
554
555 def run(self):
556 """
557 Run the WSGI application
558 """
559 if not self._isHostHeader(): return
560 if not self._setKeepAliveTimeout(): return
561 iterator = None
562
563 try:
564 # The application could be at module scope.
565 if globals().get(self.__applicationName, False):
566 application = globals().get(self.__applicationName)
567 # The application could be at class scope.
568 elif hasattr(self, self.__applicationName):
569 application = getattr(self, self.__applicationName)
570 else:
571 msg = "Could not find a WSGI application to run."
572 raise AttributeError(msg)
573
574 # Apply WSGI middleware.
575 for middleware in self.__middleware:
576 application = middleware(application)
577
578 environ = self._makeEnvironment()
579 iterator = application(environ, self.startResponse)
580 except Exception, e:
581 self.server.handleLogging("Channel: %s, %s", self.channel.id,
582 str(e), logger=self.server.logger,
583 exc_info=True)
584 self.pushError(500, str(e))
585 return
586
587 try:
588 for data in iterator:
589 # Don't push headers until body appears.
590 if data: self._write(data)
591
592 self.__pushHeaders() # Push the headers if no body data.
593 finally:
594 if hasattr(iterator,'close'):
595 iterator.close()
596
597 self._conditionalCloseConnection()
598
599 def _write(self, data):
600 if not self.__headersSet:
601 raise AssertionError("_write() before start_response()")
602
603 self.__pushHeaders()
604 self.channel.push(data)
605
606 def __pushHeaders(self):
607 if not self.__headersSent:
608 # Before the first output, send the stored headers.
609 self.__headersSent = True
610 self._handleHeaders()
611 statusLine = "%s %s\r\n" % (self.protocol, self.__status)
612 self.channel.push(statusLine)
613 self.channel.push_with_producer(
614 HeaderProducer(self.responseHeaders))
615 self.server.handleLogging("Channel: %s, %s", self.channel.id,
616 self.responseHeaders.as_string(),
617 logger=self.server.logger,
618 level=logging.DEBUG)
619
620 def startResponse(self, status, responseHeaders, exc_info=None):
621 """
622 The start_response callable as per the PEP 333 specification.
623
624 @param status: The response status string.
625 @param responseHeaders: The response headers.
626 """
627 if exc_info:
628 try:
629 if self.__headersSent:
630 # Re-raise original exception if headers sent.
631 raise exc_info[0], exc_info[1], exc_info[2]
632 finally:
633 exc_info = None # Avoid dangling circular reference.
634
635 if self.__headersSet:
636 raise AssertionError("Headers already set!")
637
638 self.__status = status
639
640 for header, value in responseHeaders:
641 self.responseHeaders.add_header(header, value)
642
643 self.__headersSet = True
644 return self._write
645
646 def _makeEnvironment(self):
647 """
648 Create the CGI/1.1 environment.
649
650 @return: The CGI environment.
651 """
652 request = self.requestMessage
653
654 if not self.__documentRoot:
655 self.__documentRoot = os.getenv('AC_HTTP_DOC_ROOT', os.getcwd())
656
657 self.__documentRoot = os.path.abspath(self.__documentRoot)
658 filename, name, query, info = self.__getScriptInfo()
659
660 environ = {}
661 environ['GATEWAY_INTERFACE'] = "CGI/1.1"
662 environ['DOCUMENT_ROOT'] = self.__documentRoot
663 addr, port = self.channel.addr
664 environ['REMOTE_ADDR'] = addr
665 environ['REMOTE_PORT'] = str(port)
666 environ['REQUEST_METHOD'] = self.method
667 environ['REQUEST_URI'] = self.path
668 environ['SCRIPT_NAME'] = name
669 environ['SCRIPT_FILENAME'] = filename
670 environ['PATH'] = ':'.join(sys.path)
671 environ['PATH_INFO'] = info
672 environ['QUERY_STRING'] = query
673 environ['CONTENT_TYPE'] = request.get_content_type()
674
675 if request.has_key('Content-Length'):
676 environ['CONTENT_LENGTH'] = request.get('Content-Length')
677
678 if 'Host' in request:
679 host, port = request.get('Host').split(':')
680 addr = socket.gethostbyname(host)
681 environ['SERVER_ADDR'] = addr
682 environ['SERVER_NAME'] = host
683 environ['SERVER_PORT'] = port
684 else: # For other than protocol HTTP/1.1
685 environ['SERVER_NAME'] = self.server.host
686 environ['SERVER_PORT'] = str(self.server.port)
687
688 environ['SERVER_PROTOCOL'] = self.protocol
689 environ['SERVER_SOFTWARE'] = 'AsyncoreHTTPServer/%s' % __version__
690 environ['wsgi.input'] = self.requestMessage.get_payload()
691 environ['wsgi.errors'] = sys.stderr
692 environ['wsgi.version'] = (1, 0)
693 environ['wsgi.multithread'] = False
694 environ['wsgi.multiprocess'] = True
695 environ['wsgi.run_once'] = True
696 environ['wsgi.url_scheme'] = self.protocol.split('/')[0].lower()
697 self.__setRequestHeaderEnvironment(environ)
698 self.server.handleLogging("Channel: %s, %s", self.channel.id, environ,
699 logger=self.server.logger,
700 level=logging.DEBUG)
701 return environ
702
703 def __getScriptInfo(self):
704 """
705 Get the SCRIPT_FILENAME, SCRIPT_NAME, QUERY_STRING, and PATH_INFO.
706
707 @return: (SCRIPT_FILENAME, SCRIPT_NAME, QUERY_STRING, PATH_INFO)
708 """
709 pathFragments = self.path.strip('/').replace('/?', '?') \
710 .replace('?', '/').split('/')
711 scriptEnd, filename = self.__getScriptEndIndex(pathFragments)
712
713 if self.path.find('?') > -1:
714 queryStart = len(pathFragments)-1
715 else:
716 queryStart = -1
717
718 infoStart = self.__getInfoStartIndex(pathFragments, scriptEnd,
719 queryStart)
720 name = query = info = ""
721
722 if queryStart > -1: query = pathFragments[queryStart]
723
724 if infoStart > -1:
725 if queryStart > infoStart:
726 info = "/%s/" % '/'.join(pathFragments[infoStart:queryStart])
727 else:
728 info = "/%s/" % '/'.join(pathFragments[infoStart:])
729
730 if scriptEnd > -1:
731 name = "/%s" % '/'.join(pathFragments[0:scriptEnd])
732
733 return filename, name, query, info
734
735 def __getScriptEndIndex(self, frags):
736 """
737 Get the end index of the stript.
738
739 @param frags: A list of the path items.
740 @return: The end index.
741 """
742 path = ''
743 index = -1
744 extentions = ['.' + ext.lstrip('.') for ext in self.__scriptExtentions]
745 # An empty string added to test a file with no extension.
746 extentions.insert(0, '')
747
748 for i in range(len(frags)):
749 if frags[i].find('=') > -1 or frags[i].find('&') > -1: break
750 tmpList = frags[0:i+1]
751 tmpList.insert(0, self.__documentRoot.strip('/'))
752 tmp = "/%s" % '/'.join(tmpList)
753 pathList = [tmp + ext for ext in extentions
754 if os.path.isfile(tmp + ext)]
755
756 if pathList:
757 path = pathList[0]
758 index = i+1
759 break
760
761 return index, path
762
763 def __getInfoStartIndex(self, frags, end, start):
764 """
765 Get the start index for the info.
766
767 @param end: The end of the script.
768 @param start: The start of the query.
769 @return: The start of the info.
770 """
771 index = -1
772
773 if end > -1 and (start < 0 or start > end) and len(frags) > end:
774 index = end
775
776 return index
777
778 def __setRequestHeaderEnvironment(self, environ):
779 """
780 Set the header environment.
781
782 @param environ: The request environment.
783 """
784 request = self.requestMessage
785
786 for header in request.keys():
787 if header in ('Content-Type', 'Content-Length'):
788 continue
789
790 tmp = "HTTP_%s" % header.replace('-', '_').upper()
791 environ[tmp] = request.get(header)
792
793
794 class ChunkProducer:
795 """
796 This producer can convert a string into chunks the length of size.
797 """
798 __SIZE = 2048
799
800 def __init__(self, data, size=__SIZE):
801 """
802 Create chunks out of a continuous string.
803
804 @param data: The string data.
805 @param size: Read chunk size.
806 """
807 self.__size = size
808 self.__buff = StringIO(data)
809
810 def more(self):
811 """
812 Returns one chunk the length of size of the total string.
813 """
814 result = ''
815 if self.__buff: result = self.__buff.read(self.__size)
816 else: self.__buff.close()
817 return result
818
819
820 class FileProducer:
821 """
822 This producer returns chunks the length of size of a file.
823 """
824 __SIZE = 2048
825
826 def __init__(self, fd, size=__SIZE):
827 """
828 FileProducer constructor.
829
830 @param fd: File object.
831 @param size: Read chunk size.
832 """
833 self.__fd = fd
834 self.__size = size
835
836 def more(self):
837 """
838 Returns one chunk the length of size of the total file.
839 """
840 result = ''
841 if self.__fd: result = self.__fd.read(self.__size)
842 return result
843
844
845 class ListProducer:
846 """
847 This producer returns chunks the length of each element of the list.
848 """
849 def __init__(self, list):
850 """
851 ListProducer constructor.
852
853 @param list: A Python list object.
854 """
855 self.__list = list
856
857 def more(self):
858 """
859 Returns one chunk corresponding the each element of the list.
860 """
861 result = ''
862 if self.__list: result = self.__list.pop(0)
863 return result
864
865
866 class HeaderProducer:
867 """
868 This producer returns the headers plus the final CRLF.
869 """
870 def __init__(self, headers):
871 """
872 HeaderProducer constructor.
873
874 @param headers: C{email.Message.Message} object.
875 """
876 if not isinstance(headers, Message):
877 raise TypeError("Must use an email.Message.Message object.")
878
879 self.__headers = headers.items()
880 self.__done = False
881
882 def more(self):
883 """
884 Returns one header for each call. The last call will return the
885 final CRLF.
886 """
887 result = ''
888
889 if self.__headers:
890 result = "%s: %s\r\n" % (self.__headers.pop(0))
891 elif not self.__done:
892 self.__done = True
893 result = "\r\n"
894
895 return result
896
897
898 class HTTPChannel(async_chat):
899 """
900 Handle the channel details and gather the data for the handler.
901 """
902 __VALID_PROTOCOLS = ('HTTP/1.1', 'HTTP/1.0', 'HTTP/0.9',)
903
904 def __init__(self, server, sock, addr):
905 async_chat.__init__(self, sock)
906 self.server = server
907 self.addr = addr
908 self.id = server._getChannelID(self)
909 self.httpTimeout = server.getHttpTimeout()
910 self.setVariables()
911
912 def setVariables(self):
913 """
914 Set the variables between request phases.
915 """
916 self.set_terminator('\r\n')
917 self.__request = []
918 self.__msgType = None
919 self.__message = None
920 self.__payload = StringIO()
921 self.__headersDone = False
922 self.server._updateChannelTime(self, self.id)
923
924 def setHttpTimeout(self, value):
925 """
926 Set the HTTP Keep-Alive timeout.
927
928 @param value: The timeout value in seconds.
929 """
930 self.httpTimeout = timedelta(seconds=value)
931
932 def collect_incoming_data(self, data):
933 """
934 Overridden method from the async_chat class.
935 """
936 self.__request.append(data)
937 #print "%s [%s]" % (data, len(data))
938
939 def found_terminator(self):
940 """
941 Overridden method from the async_chat class.
942
943 HTTP/1.1 Keep-Alive requests are supported.
944 HTTP/1.1 Pipelining requests are not supported.
945 """
946 if not self.__message:
947 if self.__request and not self.__msgType:
948 # Parse http header ex. GET / HTTP/1.1
949 request = self.__request.pop(0)
950 self.__msgType = request.split(None, 2)
951 self.server.handleLogging("Channel: %s, %s", self.id, request,
952 logger=self.server.logger,
953 level=logging.DEBUG)
954
955 if len(self.__msgType) == 3:
956 if self.__msgType[2] not in self.__VALID_PROTOCOLS:
957 self.handleBadRequest(505)
958 self.setVariables()
959 else:
960 self.set_terminator('\r\n\r\n')
961 else:
962 self.handleBadRequest(400)
963 self.setVariables()
964 elif self.__msgType and len(self.__msgType) == 3:
965 if not self.__headersDone:
966 self.__headersDone = True
967 request = ''.join(self.__request)
968 self.__message = Parser().parsestr(request,
969 headersonly=True)
970 self.server.handleLogging("Channel: %s, %s", self.id,
971 self.__message.as_string(),
972 logger=self.server.logger,
973 level=logging.DEBUG)
974 self.__message.set_payload(self.__payload)
975
976 # If a POST or PUT we may have additional data.
977 if self.__msgType[0] in ('POST', 'PUT'):
978 clen = self.__message.get('Content-Length')
979
980 if clen: # More data, exit and get it.
981 self.set_terminator(int(clen))
982 self.__request = [] # Garbage collect the headers.
983 #print "clen: [%s]" % clen
984 return
985
986 # No more data, do the request.
987 req = self.server.handle_request(self, self.__msgType,
988 self.__message)
989
990 if req.getCloseStatus():
991 self.server._closeChannel(self)
992 return
993
994 self.setVariables()
995 else:
996 self.handleBadRequest(400)
997 self.__payload.close()
998
999 if self.__msgType[2] != 'HTTP/1.1':
1000 self.server._closeChannel(self)
1001 return
1002
1003 self.setVariables()
1004 else: # Do the request with the additional data.
1005 for line in self.__request: self.__payload.write(line)
1006 self.__message.set_payload(self.__payload)
1007 req = self.server.handle_request(self, self.__msgType,
1008 self.__message)
1009 self.__payload.close()
1010
1011 if req.getCloseStatus():
1012 self.server._closeChannel(self)
1013 return
1014
1015 self.setVariables()
1016
1017 def pushStatus(self, code, msg=""):
1018 """
1019 Push the status to the client.
1020
1021 @param code: The status code.
1022 @param msg: Optional message.
1023 """
1024 statusLine = "%s %i %s\r\n" % (self.__msgType[2], code, msg)
1025 self.push(statusLine)
1026 self.server.handleLogging("Channel: %s, %s", self.id, statusLine,
1027 logger=self.server.logger,
1028 level=logging.DEBUG)
1029
1030 def handleBadRequest(self, code, log=True):
1031 """
1032 Handle a bad request.
1033
1034 @param code: The status code.
1035 @param log: C{True} will dump a log message else C{False} will not.
1036 """
1037 if log:
1038 msg = 'Channel: %s, %s %s (Address %s), Message Type: %s'
1039 self.server.handleLogging(msg, self.id, code,
1040 STATUS_CODES.get(code, 'Bad Request'),
1041 str(self.addr), str(self.__msgType),
1042 logger=self.server.logger)
1043
1044 self.__msgType = ['', '', 'HTTP/1.x']
1045 self.pushStatus(code, STATUS_CODES.get(code, 'Bad Request'))
1046
1047
1048 class AsyncoreHTTPServer(asyncore.dispatcher):
1049 """
1050 A basic HTTP server using asyncore.
1051 """
1052 _CHANNEL_MAP = {}
1053
1054 def __init__(self, host, port=80, handler=AsyncoreHTTPRequestHandler,
1055 httpTimeout=30, logger=None):
1056 """
1057 AsyncoreHTTPServer constructor.
1058
1059 @param host: The host we are running on.
1060 @param port: The port we are using.
1061 @param handler: The request handler to use.
1062 @param httpTimeout: An integer value in seconds for the HTTP timeout.
1063 @param logger: The logger name to log to.
1064 """
1065 self.host = host
1066 self.port = port
1067 self.__httpTimeout = timedelta(seconds=httpTimeout)
1068 self.handle_request = handler
1069 self.logger = logger
1070 self._idMutex = thread.allocate_lock()
1071 self._logMutex = thread.allocate_lock()
1072 self.initHook()
1073 # Increase this value to create a larger request queue.
1074 self.__backlog = 5
1075
1076 def initHook(self):
1077 """
1078 Override this callback if you need to exceute code in the constructor.
1079 """
1080 pass
1081
1082 def getHttpTimeout(self):
1083 """
1084 Get the HTTP timeout.
1085
1086 @return: The HTTP timeout C{timedelta} object.
1087 """
1088 return self.__httpTimeout
1089
1090 def setBacklog(self, backlog):
1091 """
1092 Set the socket queue size.
1093
1094 @param backlog: Socket queue size.
1095 """
1096 self.__backlog = backlog
1097
1098 def start(self):
1099 """
1100 Start the server.
1101 """
1102 asyncore.dispatcher.__init__(self)
1103 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1104 self.set_reuse_addr()
1105 self.bind((self.host, self.port))
1106 self.listen(self.__backlog)
1107
1108 def handle_accept(self):
1109 """
1110 Overriding method in the base class.
1111 """
1112 try:
1113 conn, addr = self.accept()
1114 except socket.error:
1115 self.log_info('warning: server accept() threw an exception',
1116 'warning')
1117 return
1118 except TypeError:
1119 self.log_info('warning: server accept() threw EWOULDBLOCK',
1120 'warning')
1121 return
1122
1123 # Creates an instance of the handler class to handle the
1124 # request/response on the incoming connection
1125 HTTPChannel(self, conn, addr)
1126
1127 def loop(self, timeout=30.0, use_poll=True, map=None, count=None):
1128 """
1129 The foreverloop.
1130
1131 @param timeout: The time out for each select socket.
1132 @param use_poll: Ue the newer poll function if it exists.
1133 @param map: The map that holds the channel objects.
1134 @param count: Numver of channels to execute.
1135 """
1136 if map is None: map = asyncore.socket_map
1137
1138 if use_poll and hasattr(select, 'poll'):
1139 try:
1140 import poll
1141 pollFun = asyncore.poll2
1142 except:
1143 pollFun = asyncore.poll3
1144 else:
1145 pollFun = asyncore.poll
1146
1147 if count is None:
1148 while map:
1149 pollFun(timeout, map)
1150 self.__checkHttpTimeout()
1151 else:
1152 while map and count > 0:
1153 pollFun(timeout, map)
1154 self.__checkHttpTimeout()
1155 count -= 1
1156
1157 def __checkHttpTimeout(self):
1158 """
1159 Check the HTTP timeout of all active channels. If a channel has timed
1160 out call the C{close_when_done()} method on the channel's object.
1161 """
1162 self._idMutex.acquire()
1163 now = datetime.now()
1164
1165 try:
1166 for time, channel in self._CHANNEL_MAP.values():
1167 # print now, time, self.httpTimeout.seconds
1168 if now - time >= channel.httpTimeout:
1169 channel.handleBadRequest(408, log=False)
1170 channel.close_when_done()
1171 del self._CHANNEL_MAP[channel.id]
1172 self.handleLogging("Timeout--deleted channel ID: %s, %s",
1173 channel.id, self._CHANNEL_MAP,
1174 logger=self.logger, level=logging.DEBUG)
1175 except Exception, e:
1176 msg = "System error: %s"
1177 self.handleLogging(msg, str(e), logger=self.logger)
1178
1179 self._idMutex.release()
1180
1181 def _closeChannel(self, channel):
1182 """
1183 Close the channel when using protocol less that HTTP/1.1
1184
1185 @param channel: The channel to close.
1186 """
1187 self._idMutex.acquire()
1188 channel.close_when_done()
1189 del self._CHANNEL_MAP[channel.id]
1190 self.handleLogging("ConnClose--deleted channel ID: %s, %s",
1191 channel.id, self._CHANNEL_MAP, logger=self.logger,
1192 level=logging.DEBUG)
1193 self._idMutex.release()
1194
1195 def _getChannelID(self, channel):
1196 """
1197 Update the channel ID.
1198
1199 @param channel: The channel object.
1200 """
1201 self._idMutex.acquire()
1202 cid = None
1203
1204 try:
1205 ids = self._CHANNEL_MAP.keys()
1206 ids.sort()
1207
1208 if ids:
1209 # Get the first available slot.
1210 cids = [i for i in range(1, max(ids)+1) if i not in ids]
1211 if cids: cid = cids[0]
1212 else: cid = max(ids) + 1
1213 else:
1214 cid = 1
1215
1216 # Set to None, the channel will set the value later.
1217 self._CHANNEL_MAP[cid] = None
1218 except Exception, e:
1219 msg = "System error probably a bug, %s"
1220 self.handleLogging(msg, str(e), logger=self.logger, exc_info=True)
1221
1222 self._idMutex.release()
1223 return cid
1224
1225 def _updateChannelTime(self, channel, cid):
1226 """
1227 Update the channel time.
1228
1229 @param channel: The channel object.
1230 @param cid: The channel ID.
1231 """
1232 self._idMutex.acquire()
1233 self._CHANNEL_MAP[cid] = [datetime.now(), channel]
1234 self._idMutex.release()
1235
1236 def closeAllChannels(self, map=None):
1237 """
1238 Close all the channels.
1239
1240 @param map: The map that holds the channel objects.
1241 """
1242 asyncore.close_all(map)
1243
1244 def handleLogging(self, msg, *args, **kwargs):
1245 """
1246 Send a message to the indicated output stream or logger. The default
1247 is stderr.
1248
1249 Note: The C{logger} keyword takes precedence over the C{stream}
1250 keyword.
1251
1252 @param msg: The message to handle.
1253 @param args: Positional arguments.
1254 @param kwargs: Keyword arguments are passed on to the logger.
1255 @keyword stream: If present in C{kwargs} its value will be used as
1256 the output stream.
1257 @keyword logger: If present in C{kwargs} and its value is a valid
1258 logger name the logger associated with that name
1259 will get the output message.
1260 @keyword level: If present in C{kwargs} its numeric value will be the
1261 logging level if the C{logger} keyword is also used.
1262 """
1263 self._logMutex.acquire()
1264 logger = kwargs.pop('logger')
1265 level = kwargs.pop('level', logging.ERROR)
1266
1267 # Must test for None since the root logger is ''.
1268 if logger is not None:
1269 #kwargs['exc_info'] = True
1270 logging.getLogger(logger).log(level, msg, *args, **kwargs)
1271 self._logMutex.release()
1272 return
1273
1274 msg1 = msg
1275 msg2 = ''
1276 stream = sys.stderr
1277
1278 if kwargs:
1279 stream = kwargs.pop('stream', sys.stderr)
1280 startKw = msg.find('%(')
1281
1282 if startKw > -1:
1283 msg1 = msg[0: startKw]
1284 msg2 = msg[startKw:] % kwargs
1285
1286 if args: msg1 = msg1 % args
1287 msg = msg1 + msg2 + '\n'
1288
1289 if kwargs.get('exc_info', False):
1290 msg += traceback.format_exc()
1291
1292 stream.write(msg)
1293 self._logMutex.release()
1294
1295
1296 class DisplayHandler(AsyncoreHTTPRequestHandler):
1297 """
1298 This class is used to test the entire package.
1299 """
1300 def __init__(self, channel, msgType, message):
1301 """
1302 The constructor for the DisplayHandler class.
1303 """
1304 AsyncoreHTTPRequestHandler.__init__(self, channel, msgType, message)
1305
1306 def displayDirectory(self):
1307 """
1308 This method overrides the do_GET() method in the default handler
1309 AsyncoreHTTPRequestHandler. If a document root is passed in the
1310 environment you can crudely display HTML pages.
1311 """
1312 import re
1313 relRoot = os.getenv('AC_HTTP_DOC_ROOT', os.getcwd())
1314 docRoot = os.path.abspath(relRoot)
1315 fullpath = "%s/%s" % (docRoot, self.path.lstrip('/'))
1316 #print relRoot, docRoot, fullpath, self.path
1317 headers = self.responseHeaders
1318 payload = None
1319
1320 if os.path.isdir(fullpath):
1321 if not self.path.endswith('/'):
1322 headers.add_header('Location', self.path + '/')
1323 self.pushHeaders(301)
1324 self.server.handleLogging("Channel: %s, %s", self.channel.id,
1325 headers.as_string(),
1326 logger=self.server.logger,
1327 level=logging.DEBUG)
1328 return payload
1329 else:
1330 import dircache
1331 dirList = dircache.listdir(fullpath)
1332 dirList = dirList[:]
1333 dircache.annotate(fullpath, dirList)
1334 payload = self.__dirList(self.path, dirList)
1335 dircache.reset()
1336 headers.add_header('Content-Type', 'text/html')
1337 elif os.path.isfile(fullpath):
1338 if re.search(r'.htm(l)*$', fullpath):
1339 ctype = 'text/html'
1340 else:
1341 ctype = 'text/plain'
1342
1343 payload = open(fullpath, "rb")
1344 headers.add_header('Content-Type', '%s' % ctype)
1345 headers.add_header('Last-Modified', '%s' % \
1346 self.datetimeString(os.path.getmtime(fullpath)))
1347 else:
1348 msg = "File %s not found." % self.path
1349 self.pushError(404, msg)
1350 self.server.handleLogging("Channel: %s, %s %s", self.channel.id,
1351 headers.as_string(), msg,
1352 logger=self.server.logger,
1353 level=logging.DEBUG)
1354 return payload
1355
1356 self.pushHeaders(200, payload=payload)
1357 return payload
1358
1359 def __dirList(self, path, dirList):
1360 """
1361 Generate the HTML for a directory list.
1362
1363 @return: A list containing chunks of the direcotry listing.
1364 """
1365 head, tail = os.path.split(path.rstrip('/'))
1366 if not head.endswith('/'): head += '/'
1367 parentDir = '<la><a href="%s">Parent Directory</a></la>\n' % head
1368 lia = '<li><a href="%s">%s</a></li>\n'
1369 html = '''\
1370 <?xml version="1.0" encoding="UTF-8" ?>
1371 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
1372 "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
1373 <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
1374 <head>
1375 <title>Result of test DisplayHandler for AsyncoreHTTPServer.py</title>
1376 <meta http-equiv="Content-Type" content="text/html;charset=UTF-8" />
1377 </head>
1378 <body>
1379 <h1>Index of %s</h1>
1380 <ul>
1381 ''' % path.rstrip('/')
1382 if not path == '/': html += parentDir
1383 dirHtml = [lia % (f, f) for f in dirList]
1384 dirHtml.insert(0, html)
1385 dirHtml.append("</ul>\n</body>\n</html>\n")
1386 return dirHtml
1387
1388 def do_GET(self):
1389 """
1390 This method overrides the do_GET() method in
1391 AsyncoreHTTPRequestHandler.
1392 """
1393 payload = self.displayDirectory()
1394 if payload: self.pushPayload(payload)
1395
1396 def do_HEAD(self):
1397 """
1398 This method overrides the do_HEAD() method in
1399 AsyncoreHTTPRequestHandler.
1400 """
1401 payload = self.displayDirectory()
1402 # Throw away the payload since HEAD cannot send it.
1403
1404
1405 class WSGIStressHandler(WSGIAsynHandler):
1406 """
1407 Test handler for the WSGI interface.
1408 """
1409 def __init__(self, channel, msgType, message):
1410 """
1411 The constructor for the WSGIDisplayHandler class.
1412 """
1413 WSGIAsynHandler.__init__(self, channel, msgType, message)
1414
1415 def initHook(self):
1416 """
1417 When using the WSGIAsynHandle handler use this method to change
1418 the default application name or setting your middleware.
1419 The default is application name is C{application}.
1420 """
1421 from wsgiref.validate import validator
1422
1423 self.setApplicationName('test')
1424 self.setMiddleware([validator])
1425
1426 def test(self, environ, start_response):
1427 """
1428 When using WSGI you need to do more work. First you need to determine
1429 the request HTTP method (GET, PUT, POST, HEAD, DELETE, OPTION) and
1430 then determine your own status messages.
1431 """
1432 headers = []
1433 payload = []
1434 method = environ['REQUEST_METHOD']
1435
1436 # We assume GET or HEAD is used.
1437 if method == 'HEAD':
1438 # Headers were added in self.__stressTestPayload()
1439 headers, result = self.__stressTestPayload()
1440 status = "200 %s" % STATUS_CODES.get(200)
1441 elif method == 'GET':
1442 # Headers were added in self.__stressTestPayload()
1443 headers, result = self.__stressTestPayload()
1444 payload.append(result)
1445 status = "200 %s" % STATUS_CODES.get(200)
1446 else:
1447 result = "Invalid HTTP method %s" % method
1448 payload.append(result)
1449 headers.append(('Content-Type', 'text/plain'))
1450 headers.append(('Content-Length', '%s' % len(result)))
1451 status = "500 %s" % STATUS_CODES.get(500)
1452
1453 start_response(status, headers)
1454 return payload
1455
1456 def __stressTestPayload(self):
1457 """
1458 Test method.
1459
1460 @return: payload
1461 """
1462 headers = []
1463 headers.append(('Content-Type', 'text/plain'))
1464 payload = "Stress Test "*86
1465 headers.append(('Content-Length', '%s' % len(payload)))
1466 return (headers, payload)
1467
1468
1469 def main():
1470 """
1471 This function is used to run this package as a server without any
1472 additional code.
1473
1474 @return: Zero if no error or one if an error occurred.
1475 """
1476 args = sys.argv[1:]
1477
1478 if len(args) < 1:
1479 print "You must provide a host:port argument."
1480 return 0
1481
1482 host, port = args[0].split(':', 2)
1483 port = int(port)
1484 docRoot = '.'
1485 stressTest = False
1486
1487 if len(args) >= 2:
1488 if args[1].lower() in ('true', 'false'):
1489 stressTest = eval(args[1].capitalize())
1490 else:
1491 docRoot = args[1]
1492
1493 os.environ['AC_HTTP_DOC_ROOT'] = docRoot
1494 wsgiHandler = False
1495
1496 if len(args) >= 3:
1497 if args[2].lower() in ('true', 'false'):
1498 wsgiHandler = eval(args[2].capitalize())
1499
1500 try:
1501 if stressTest:
1502 if wsgiHandler:
1503 s = AsyncoreHTTPServer(host, port, handler=WSGIStressHandler)
1504 else:
1505 # Tests the request handler in the server.
1506 s = AsyncoreHTTPServer(host, port)
1507 else:
1508 # Tests the request handler class.
1509 s = AsyncoreHTTPServer(host, port, handler=DisplayHandler)
1510
1511 s.start()
1512 print "A Simple AsyncoreHTTPServer running on port %s" % port
1513 s.loop(timeout=15.0)
1514 except KeyboardInterrupt:
1515 print "Crtl+C pressed. Shutting down."
1516 except:
1517 print "%s: %s\n" % (sys.exc_info()[0], sys.exc_info()[1])
1518 tb = sys.exc_info()[2]
1519 traceback.print_tb(tb)
1520 return 1
1521
1522 return 0
1523
1524
1525 if __name__=="__main__":
1526 #import profile
1527 #result = profile.run("main()", "tmp/AsyncoreHTTPServer_profile.dump")
1528 result = main()
1529 sys.exit(result)
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.- [get | view] (2010-03-04 20:57:06, 48.9 KB) [[attachment:AsyncoreHTTPServer.py]]
- [get | view] (2010-10-13 19:32:21, 3.2 KB) [[attachment:ChangeLog.txt]]
- [get | view] (2010-10-13 19:57:35, 4.6 KB) [[attachment:README.txt]]
- [get | view] (2010-03-04 20:58:35, 7.3 KB) [[attachment:testACHTTPServer.py]]
You are not allowed to attach a file to this page.