Package pyamf :: Package remoting :: Package gateway :: Module twisted
[hide private]
[frames] | no frames]

Source Code for Module pyamf.remoting.gateway.twisted

  1  # Copyright (c) 2007-2009 The PyAMF Project. 
  2  # See LICENSE.txt for details. 
  3   
  4  """ 
  5  Twisted server implementation. 
  6   
  7  This gateway allows you to expose functions in Twisted to AMF clients and 
  8  servers. 
  9   
 10  @see: U{Twisted homepage (external)<http://twistedmatrix.com>} 
 11   
 12  @since: 0.1.0 
 13  """ 
 14   
 15  import sys 
 16  import os.path 
 17   
 18  try: 
 19      sys.path.remove('') 
 20  except ValueError: 
 21      pass 
 22   
 23  try: 
 24      sys.path.remove(os.path.dirname(os.path.abspath(__file__))) 
 25  except ValueError: 
 26      pass 
 27   
 28  twisted = __import__('twisted') 
 29  __import__('twisted.internet.defer') 
 30  __import__('twisted.internet.threads') 
 31  __import__('twisted.web.resource') 
 32  __import__('twisted.web.server') 
 33   
 34  defer = twisted.internet.defer 
 35  threads = twisted.internet.threads 
 36  resource = twisted.web.resource 
 37  server = twisted.web.server 
 38   
 39  from pyamf import remoting 
 40  from pyamf.remoting import gateway, amf0, amf3 
 41   
 42  __all__ = ['TwistedGateway'] 
 43   
 44   
45 -class AMF0RequestProcessor(amf0.RequestProcessor):
46 """ 47 A Twisted friendly implementation of 48 L{amf0.RequestProcessor<pyamf.remoting.amf0.RequestProcessor>} 49 """ 50
51 - def __call__(self, request, *args, **kwargs):
52 """ 53 Calls the underlying service method. 54 55 @return: A C{Deferred} that will contain the AMF L{Response}. 56 @rtype: C{twisted.internet.defer.Deferred} 57 """ 58 try: 59 service_request = self.gateway.getServiceRequest( 60 request, request.target) 61 except gateway.UnknownServiceError: 62 return defer.succeed(self.buildErrorResponse(request)) 63 64 response = remoting.Response(None) 65 deferred_response = defer.Deferred() 66 67 def eb(failure): 68 errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) 69 70 if self.gateway.logger: 71 self.gateway.logger.error(errMesg) 72 self.gateway.logger.info(failure.getTraceback()) 73 74 deferred_response.callback(self.buildErrorResponse( 75 request, (failure.type, failure.value, failure.tb)))
76 77 def response_cb(result): 78 if self.gateway.logger: 79 self.gateway.logger.debug("AMF Response: %s" % (result,)) 80 81 response.body = result 82 83 deferred_response.callback(response)
84 85 def preprocess_cb(result): 86 d = defer.maybeDeferred(self._getBody, request, response, 87 service_request, **kwargs) 88 89 d.addCallback(response_cb).addErrback(eb) 90 91 def auth_cb(result): 92 if result is not True: 93 response.status = remoting.STATUS_ERROR 94 response.body = remoting.ErrorFault(code='AuthenticationError', 95 description='Authentication failed') 96 97 deferred_response.callback(response) 98 99 return 100 101 d = defer.maybeDeferred(self.gateway.preprocessRequest, 102 service_request, *args, **kwargs) 103 104 d.addCallback(preprocess_cb).addErrback(eb) 105 106 # we have a valid service, now attempt authentication 107 d = defer.maybeDeferred(self.authenticateRequest, request, 108 service_request, **kwargs) 109 d.addCallback(auth_cb).addErrback(eb) 110 111 return deferred_response 112 113
114 -class AMF3RequestProcessor(amf3.RequestProcessor):
115 """ 116 A Twisted friendly implementation of 117 L{amf3.RequestProcessor<pyamf.remoting.amf3.RequestProcessor>} 118 """ 119
120 - def _processRemotingMessage(self, amf_request, ro_request, **kwargs):
121 ro_response = amf3.generate_acknowledgement(ro_request) 122 amf_response = remoting.Response(ro_response, status=remoting.STATUS_OK) 123 124 try: 125 service_name = ro_request.operation 126 127 if hasattr(ro_request, 'destination') and ro_request.destination: 128 service_name = '%s.%s' % (ro_request.destination, service_name) 129 130 service_request = self.gateway.getServiceRequest(amf_request, 131 service_name) 132 except gateway.UnknownServiceError: 133 return defer.succeed(remoting.Response( 134 self.buildErrorResponse(ro_request), 135 status=remoting.STATUS_ERROR)) 136 137 deferred_response = defer.Deferred() 138 139 def eb(failure): 140 errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) 141 142 if self.gateway.logger: 143 self.gateway.logger.error(errMesg) 144 self.gateway.logger.info(failure.getTraceback()) 145 146 ro_response = self.buildErrorResponse(ro_request, (failure.type, 147 failure.value, failure.tb)) 148 deferred_response.callback(remoting.Response(ro_response, 149 status=remoting.STATUS_ERROR))
150 151 def response_cb(result): 152 ro_response.body = result 153 res = remoting.Response(ro_response) 154 155 if self.gateway.logger: 156 self.gateway.logger.debug("AMF Response: %r" % (res,)) 157 158 deferred_response.callback(res)
159 160 def process_cb(result): 161 d = defer.maybeDeferred(self.gateway.callServiceRequest, 162 service_request, *ro_request.body, **kwargs) 163 d.addCallback(response_cb).addErrback(eb) 164 165 d = defer.maybeDeferred(self.gateway.preprocessRequest, service_request, 166 *ro_request.body, **kwargs) 167 d.addCallback(process_cb).addErrback(eb) 168 169 return deferred_response 170
171 - def __call__(self, amf_request, **kwargs):
172 """ 173 Calls the underlying service method. 174 175 @return: A C{deferred} that will contain the AMF L{Response}. 176 @rtype: C{Deferred<twisted.internet.defer.Deferred>} 177 """ 178 deferred_response = defer.Deferred() 179 ro_request = amf_request.body[0] 180 181 def cb(amf_response): 182 deferred_response.callback(amf_response)
183 184 def eb(failure): 185 errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) 186 187 if self.gateway.logger: 188 self.gateway.logger.error(errMesg) 189 self.gateway.logger.info(failure.getTraceback()) 190 191 deferred_response.callback(self.buildErrorResponse(ro_request, 192 (failure.type, failure.value, failure.tb))) 193 194 d = defer.maybeDeferred(self._getBody, amf_request, ro_request, **kwargs) 195 d.addCallback(cb).addErrback(eb) 196 197 return deferred_response 198 199
200 -class TwistedGateway(gateway.BaseGateway, resource.Resource):
201 """ 202 Twisted Remoting gateway for C{twisted.web}. 203 204 @ivar expose_request: Forces the underlying HTTP request to be the first 205 argument to any service call. 206 @type expose_request: C{bool} 207 """ 208 209 allowedMethods = ('POST',) 210
211 - def __init__(self, *args, **kwargs):
212 if 'expose_request' not in kwargs: 213 kwargs['expose_request'] = True 214 215 gateway.BaseGateway.__init__(self, *args, **kwargs) 216 resource.Resource.__init__(self)
217
218 - def _finaliseRequest(self, request, status, content, mimetype='text/plain'):
219 """ 220 Finalises the request. 221 222 @param request: The HTTP Request. 223 @type request: C{http.Request} 224 @param status: The HTTP status code. 225 @type status: C{int} 226 @param content: The content of the response. 227 @type content: C{str} 228 @param mimetype: The MIME type of the request. 229 @type mimetype: C{str} 230 """ 231 request.setResponseCode(status) 232 233 request.setHeader("Content-Type", mimetype) 234 request.setHeader("Content-Length", str(len(content))) 235 request.setHeader("Server", gateway.SERVER_NAME) 236 237 request.write(content) 238 request.finish()
239
240 - def render_POST(self, request):
241 """ 242 Read remoting request from the client. 243 244 @type request: The HTTP Request. 245 @param request: C{twisted.web.http.Request} 246 """ 247 def handleDecodeError(failure): 248 """ 249 Return HTTP 400 Bad Request. 250 """ 251 errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) 252 253 if self.logger: 254 self.logger.error(errMesg) 255 self.logger.info(failure.getTraceback()) 256 257 body = "400 Bad Request\n\nThe request body was unable to " \ 258 "be successfully decoded." 259 260 if self.debug: 261 body += "\n\nTraceback:\n\n%s" % failure.getTraceback() 262 263 self._finaliseRequest(request, 400, body)
264 265 request.content.seek(0, 0) 266 timezone_offset = self._get_timezone_offset() 267 268 d = threads.deferToThread(remoting.decode, request.content.read(), 269 strict=self.strict, logger=self.logger, 270 timezone_offset=timezone_offset) 271 272 def cb(amf_request): 273 if self.logger: 274 self.logger.debug("AMF Request: %r" % amf_request) 275 276 x = self.getResponse(request, amf_request) 277 278 x.addCallback(self.sendResponse, request)
279 280 # Process the request 281 d.addCallback(cb).addErrback(handleDecodeError) 282 283 return server.NOT_DONE_YET 284
285 - def sendResponse(self, amf_response, request):
286 def cb(result): 287 self._finaliseRequest(request, 200, result.getvalue(), 288 remoting.CONTENT_TYPE)
289 290 def eb(failure): 291 """ 292 Return 500 Internal Server Error. 293 """ 294 errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) 295 296 if self.logger: 297 self.logger.error(errMesg) 298 self.logger.info(failure.getTraceback()) 299 300 body = "500 Internal Server Error\n\nThere was an error encoding " \ 301 "the response." 302 303 if self.debug: 304 body += "\n\nTraceback:\n\n%s" % failure.getTraceback() 305 306 self._finaliseRequest(request, 500, body) 307 308 timezone_offset = self._get_timezone_offset() 309 d = threads.deferToThread(remoting.encode, amf_response, 310 strict=self.strict, logger=self.logger, 311 timezone_offset=timezone_offset) 312 313 d.addCallback(cb).addErrback(eb) 314
315 - def getProcessor(self, request):
316 """ 317 Determines the request processor, based on the request. 318 319 @param request: The AMF message. 320 @type request: L{Request<pyamf.remoting.Request>} 321 """ 322 if request.target == 'null': 323 return AMF3RequestProcessor(self) 324 325 return AMF0RequestProcessor(self)
326
327 - def getResponse(self, http_request, amf_request):
328 """ 329 Processes the AMF request, returning an AMF L{Response}. 330 331 @param http_request: The underlying HTTP Request 332 @type http_request: C{twisted.web.http.Request} 333 @param amf_request: The AMF Request. 334 @type amf_request: L{Envelope<pyamf.remoting.Envelope>} 335 """ 336 response = remoting.Envelope(amf_request.amfVersion, 337 amf_request.clientType) 338 dl = [] 339 340 def cb(body, name): 341 response[name] = body
342 343 for name, message in amf_request: 344 processor = self.getProcessor(message) 345 346 http_request.amf_request = message 347 348 d = defer.maybeDeferred( 349 processor, message, http_request=http_request) 350 351 dl.append(d.addCallback(cb, name)) 352 353 def cb2(result): 354 return response 355 356 def eb(failure): 357 """ 358 Return 500 Internal Server Error. 359 """ 360 errMesg = "%s: %s" % (failure.type, failure.getErrorMessage()) 361 362 if self.logger: 363 self.logger.error(errMesg) 364 self.logger.info(failure.getTraceback()) 365 366 body = "500 Internal Server Error\n\nThe request was unable to " \ 367 "be successfully processed." 368 369 if self.debug: 370 body += "\n\nTraceback:\n\n%s" % failure.getTraceback() 371 372 self._finaliseRequest(http_request, 500, body) 373 374 d = defer.DeferredList(dl) 375 376 return d.addCallback(cb2).addErrback(eb) 377
378 - def authenticateRequest(self, service_request, username, password, **kwargs):
379 """ 380 Processes an authentication request. If no authenticator is supplied, 381 then authentication succeeds. 382 383 @return: C{Deferred}. 384 @rtype: C{twisted.internet.defer.Deferred} 385 """ 386 authenticator = self.getAuthenticator(service_request) 387 388 if self.logger: 389 self.logger.debug('Authenticator expands to: %r' % authenticator) 390 391 if authenticator is None: 392 return defer.succeed(True) 393 394 args = (username, password) 395 396 if hasattr(authenticator, '_pyamf_expose_request'): 397 http_request = kwargs.get('http_request', None) 398 args = (http_request,) + args 399 400 return defer.maybeDeferred(authenticator, *args)
401
402 - def preprocessRequest(self, service_request, *args, **kwargs):
403 """ 404 Preprocesses a request. 405 """ 406 processor = self.getPreprocessor(service_request) 407 408 if self.logger: 409 self.logger.debug('Preprocessor expands to: %r' % processor) 410 411 if processor is None: 412 return 413 414 args = (service_request,) + args 415 416 if hasattr(processor, '_pyamf_expose_request'): 417 http_request = kwargs.get('http_request', None) 418 args = (http_request,) + args 419 420 return defer.maybeDeferred(processor, *args)
421