Package pyamf :: Package flex :: Module messaging
[hide private]
[frames] | no frames]

Source Code for Module pyamf.flex.messaging

  1  # Copyright (c) 2007-2009 The PyAMF Project. 
  2  # See LICENSE.txt for details. 
  3   
  4  """ 
  5  Flex Messaging implementation. 
  6   
  7  This module contains the message classes used with Flex Data Services. 
  8   
  9  @see: U{RemoteObject on OSFlash (external) 
 10  <http://osflash.org/documentation/amf3#remoteobject>} 
 11   
 12  @since: 0.1 
 13  """ 
 14   
 15  import uuid 
 16   
 17  import pyamf.util 
 18  from pyamf import amf3 
 19   
 20   
 21  __all__ = [ 
 22      'RemotingMessage', 
 23      'CommandMessage', 
 24      'AcknowledgeMessage', 
 25      'ErrorMessage' 
 26  ] 
 27   
 28  NAMESPACE = 'flex.messaging.messages' 
 29   
 30  SMALL_FLAG_MORE = 0x80 
 31   
 32   
33 -class AbstractMessage(object):
34 """ 35 Abstract base class for all Flex messages. 36 37 Messages have two customizable sections; headers and data. The headers 38 property provides access to specialized meta information for a specific 39 message instance. The data property contains the instance specific data 40 that needs to be delivered and processed by the decoder. 41 42 @see: U{AbstractMessage on Livedocs (external) 43 <http://livedocs.adobe.com/flex/201/langref/mx/messaging/messages/AbstractMessage.html>} 44 45 @ivar body: Specific data that needs to be delivered to the remote 46 destination. 47 @type body: C{mixed} 48 @ivar clientId: Indicates which client sent the message. 49 @type clientId: C{str} 50 @ivar destination: Message destination. 51 @type destination: C{str} 52 @ivar headers: Message headers. Core header names start with DS. 53 @type headers: C{dict} 54 @ivar messageId: Unique Message ID. 55 @type messageId: C{str} 56 @ivar timeToLive: How long the message should be considered valid and 57 deliverable. 58 @type timeToLive: C{int} 59 @ivar timestamp: Timestamp when the message was generated. 60 @type timestamp: C{int} 61 """ 62
63 - class __amf__:
64 amf3 = True 65 static = ('body', 'clientId', 'destination', 'headers', 'messageId', 66 'timestamp', 'timeToLive') 67 dynamic = False
68 69 #: Each message pushed from the server will contain this header identifying 70 #: the client that will receive the message. 71 DESTINATION_CLIENT_ID_HEADER = "DSDstClientId" 72 #: Messages are tagged with the endpoint id for the channel they are sent 73 #: over. 74 ENDPOINT_HEADER = "DSEndpoint" 75 #: Messages that need to set remote credentials for a destination carry the 76 #: C{Base64} encoded credentials in this header. 77 REMOTE_CREDENTIALS_HEADER = "DSRemoteCredentials" 78 #: The request timeout value is set on outbound messages by services or 79 #: channels and the value controls how long the responder will wait for an 80 #: acknowledgement, result or fault response for the message before timing 81 #: out the request. 82 REQUEST_TIMEOUT_HEADER = "DSRequestTimeout" 83 84 SMALL_ATTRIBUTE_FLAGS = [0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40] 85 SMALL_ATTRIBUTES = dict(zip( 86 SMALL_ATTRIBUTE_FLAGS, 87 __amf__.static 88 )) 89 90 SMALL_UUID_FLAGS = [0x01, 0x02] 91 SMALL_UUIDS = dict(zip( 92 SMALL_UUID_FLAGS, 93 ['clientId', 'messageId'] 94 )) 95
96 - def __init__(self, *args, **kwargs):
97 self.body = kwargs.get('body', None) 98 self.clientId = kwargs.get('clientId', None) 99 self.destination = kwargs.get('destination', None) 100 self.headers = kwargs.get('headers', {}) 101 self.messageId = kwargs.get('messageId', None) 102 self.timestamp = kwargs.get('timestamp', None) 103 self.timeToLive = kwargs.get('timeToLive', None)
104
105 - def __repr__(self):
106 m = '<%s ' % self.__class__.__name__ 107 108 for k in self.__dict__: 109 m += ' %s=%r' % (k, getattr(self, k)) 110 111 return m + " />"
112
113 - def decodeSmallAttribute(self, attr, input):
114 """ 115 @since: 0.5 116 """ 117 obj = input.readObject() 118 119 if attr in ['timestamp', 'timeToLive']: 120 return pyamf.util.get_datetime(obj / 1000.0) 121 122 return obj
123
124 - def encodeSmallAttribute(self, attr):
125 """ 126 @since: 0.5 127 """ 128 obj = getattr(self, attr) 129 130 if not obj: 131 return obj 132 133 if attr in ['timestamp', 'timeToLive']: 134 return pyamf.util.get_timestamp(obj) * 1000.0 135 elif attr in ['clientId', 'messageId']: 136 if isinstance(obj, uuid.UUID): 137 return None 138 139 return obj
140
141 - def __readamf__(self, input):
142 flags = read_flags(input) 143 144 if len(flags) > 2: 145 raise pyamf.DecodeError('Expected <=2 (got %d) flags for the ' 146 'AbstractMessage portion of the small message for %r' % ( 147 len(flags), self.__class__)) 148 149 for index, byte in enumerate(flags): 150 if index == 0: 151 for flag in self.SMALL_ATTRIBUTE_FLAGS: 152 if flag & byte: 153 attr = self.SMALL_ATTRIBUTES[flag] 154 setattr(self, attr, self.decodeSmallAttribute(attr, input)) 155 elif index == 1: 156 for flag in self.SMALL_UUID_FLAGS: 157 if flag & byte: 158 attr = self.SMALL_UUIDS[flag] 159 setattr(self, attr, decode_uuid(input.readObject()))
160
161 - def __writeamf__(self, output):
162 flag_attrs = [] 163 uuid_attrs = [] 164 byte = 0 165 166 for flag in self.SMALL_ATTRIBUTE_FLAGS: 167 value = self.encodeSmallAttribute(self.SMALL_ATTRIBUTES[flag]) 168 169 if value: 170 byte |= flag 171 flag_attrs.append(value) 172 173 flags = byte 174 byte = 0 175 176 for flag in self.SMALL_UUID_FLAGS: 177 attr = self.SMALL_UUIDS[flag] 178 value = getattr(self, attr) 179 180 if not value: 181 continue 182 183 byte |= flag 184 uuid_attrs.append(amf3.ByteArray(value.bytes)) 185 186 if not byte: 187 output.writeUnsignedByte(flags) 188 else: 189 output.writeUnsignedByte(flags | SMALL_FLAG_MORE) 190 output.writeUnsignedByte(byte) 191 192 [output.writeObject(attr) for attr in flag_attrs] 193 [output.writeObject(attr) for attr in uuid_attrs]
194
195 - def getSmallMessage(self):
196 """ 197 Return a ISmallMessage representation of this object. If one is not 198 available, L{NotImplementedError} will be raised. 199 200 @since: 0.5 201 """ 202 raise NotImplementedError
203 204
205 -class AsyncMessage(AbstractMessage):
206 """ 207 I am the base class for all asynchronous Flex messages. 208 209 @see: U{AsyncMessage on Livedocs (external) 210 <http://livedocs.adobe.com/flex/201/langref/mx/messaging/messages/AsyncMessage.html>} 211 212 @ivar correlationId: Correlation id of the message. 213 @type correlationId: C{str} 214 """ 215 216 #: Messages that were sent with a defined subtopic property indicate their 217 #: target subtopic in this header. 218 SUBTOPIC_HEADER = "DSSubtopic" 219
220 - class __amf__:
221 static = ('correlationId',)
222
223 - def __init__(self, *args, **kwargs):
224 AbstractMessage.__init__(self, *args, **kwargs) 225 226 self.correlationId = kwargs.get('correlationId', None)
227
228 - def __readamf__(self, input):
229 AbstractMessage.__readamf__(self, input) 230 231 flags = read_flags(input) 232 233 if len(flags) > 1: 234 raise pyamf.DecodeError('Expected <=1 (got %d) flags for the ' 235 'AsyncMessage portion of the small message for %r' % ( 236 len(flags), self.__class__)) 237 238 byte = flags[0] 239 240 if byte & 0x01: 241 self.correlationId = input.readObject() 242 243 if byte & 0x02: 244 self.correlationId = decode_uuid(input.readObject())
245
246 - def __writeamf__(self, output):
247 AbstractMessage.__writeamf__(self, output) 248 249 if not isinstance(self.correlationId, uuid.UUID): 250 output.writeUnsignedByte(0x01) 251 output.writeObject(self.correlationId) 252 else: 253 output.writeUnsignedByte(0x02) 254 output.writeObject(pyamf.amf3.ByteArray(self.correlationId.bytes))
255
256 - def getSmallMessage(self):
257 """ 258 Return a ISmallMessage representation of this async message. 259 260 @since: 0.5 261 """ 262 return AsyncMessageExt(**self.__dict__)
263 264
265 -class AcknowledgeMessage(AsyncMessage):
266 """ 267 I acknowledge the receipt of a message that was sent previously. 268 269 Every message sent within the messaging system must receive an 270 acknowledgement. 271 272 @see: U{AcknowledgeMessage on Livedocs (external) 273 <http://livedocs.adobe.com/flex/201/langref/mx/messaging/messages/AcknowledgeMessage.html>} 274 """ 275 276 #: Used to indicate that the acknowledgement is for a message that 277 #: generated an error. 278 ERROR_HINT_HEADER = "DSErrorHint" 279
280 - def __readamf__(self, input):
281 AsyncMessage.__readamf__(self, input) 282 283 flags = read_flags(input) 284 285 if len(flags) > 1: 286 raise pyamf.DecodeError('Expected <=1 (got %d) flags for the ' 287 'AcknowledgeMessage portion of the small message for %r' % ( 288 len(flags), self.__class__))
289
290 - def __writeamf__(self, output):
291 AsyncMessage.__writeamf__(self, output) 292 293 output.writeUnsignedByte(0)
294
295 - def getSmallMessage(self):
296 """ 297 Return a ISmallMessage representation of this acknowledge message. 298 299 @since: 0.5 300 """ 301 return AcknowledgeMessageExt(**self.__dict__)
302 303
304 -class CommandMessage(AsyncMessage):
305 """ 306 Provides a mechanism for sending commands related to publish/subscribe 307 messaging, ping, and cluster operations. 308 309 @see: U{CommandMessage on Livedocs (external) 310 <http://livedocs.adobe.com/flex/201/langref/mx/messaging/messages/CommandMessage.html>} 311 312 @ivar operation: The command 313 @type operation: C{int} 314 @ivar messageRefType: hmm, not sure about this one. 315 @type messageRefType: C{str} 316 """ 317 318 #: The server message type for authentication commands. 319 AUTHENTICATION_MESSAGE_REF_TYPE = "flex.messaging.messages.AuthenticationMessage" 320 #: This is used to test connectivity over the current channel to the remote 321 #: endpoint. 322 PING_OPERATION = 5 323 #: This is used by a remote destination to sync missed or cached messages 324 #: back to a client as a result of a client issued poll command. 325 SYNC_OPERATION = 4 326 #: This is used to request a list of failover endpoint URIs for the remote 327 #: destination based on cluster membership. 328 CLUSTER_REQUEST_OPERATION = 7 329 #: This is used to send credentials to the endpoint so that the user can be 330 #: logged in over the current channel. The credentials need to be C{Base64} 331 #: encoded and stored in the body of the message. 332 LOGIN_OPERATION = 8 333 #: This is used to log the user out of the current channel, and will 334 #: invalidate the server session if the channel is HTTP based. 335 LOGOUT_OPERATION = 9 336 #: This is used to poll a remote destination for pending, undelivered 337 #: messages. 338 POLL_OPERATION = 2 339 #: Subscribe commands issued by a consumer pass the consumer's C{selector} 340 #: expression in this header. 341 SELECTOR_HEADER = "DSSelector" 342 #: This is used to indicate that the client's session with a remote 343 #: destination has timed out. 344 SESSION_INVALIDATE_OPERATION = 10 345 #: This is used to subscribe to a remote destination. 346 SUBSCRIBE_OPERATION = 0 347 #: This is the default operation for new L{CommandMessage} instances. 348 UNKNOWN_OPERATION = 1000 349 #: This is used to unsubscribe from a remote destination. 350 UNSUBSCRIBE_OPERATION = 1 351 #: This operation is used to indicate that a channel has disconnected. 352 DISCONNECT_OPERATION = 12 353
354 - class __amf__:
355 static = ('operation',)
356
357 - def __init__(self, *args, **kwargs):
358 AsyncMessage.__init__(self, *args, **kwargs) 359 360 self.operation = kwargs.get('operation', None) 361 #: Remote destination belonging to a specific service, based upon 362 #: whether this message type matches the message type the service 363 #: handles. 364 self.messageRefType = kwargs.get('messageRefType', None)
365
366 - def __readamf__(self, input):
367 AsyncMessage.__readamf__(self, input) 368 369 flags = read_flags(input) 370 371 if not flags: 372 return 373 374 if len(flags) > 1: 375 raise pyamf.DecodeError('Expected <=1 (got %d) flags for the ' 376 'CommandMessage portion of the small message for %r' % ( 377 len(flags), self.__class__)) 378 379 byte = flags[0] 380 381 if byte & 0x01: 382 self.operation = input.readObject()
383
384 - def __writeamf__(self, output):
385 AsyncMessage.__writeamf__(self, output) 386 387 if self.operation: 388 output.writeUnsignedByte(0x01) 389 output.writeObject(self.operation) 390 else: 391 output.writeUnsignedByte(0)
392
393 - def getSmallMessage(self):
394 """ 395 Return a ISmallMessage representation of this command message. 396 397 @since: 0.5 398 """ 399 return CommandMessageExt(**self.__dict__)
400 401
402 -class ErrorMessage(AcknowledgeMessage):
403 """ 404 I am the Flex error message to be returned to the client. 405 406 This class is used to report errors within the messaging system. 407 408 @see: U{ErrorMessage on Livedocs (external) 409 <http://livedocs.adobe.com/flex/201/langref/mx/messaging/messages/ErrorMessage.html>} 410 """ 411 412 #: If a message may not have been delivered, the faultCode will contain 413 #: this constant. 414 MESSAGE_DELIVERY_IN_DOUBT = "Client.Error.DeliveryInDoubt" 415 #: Header name for the retryable hint header. 416 #: 417 #: This is used to indicate that the operation that generated the error may 418 #: be retryable rather than fatal. 419 RETRYABLE_HINT_HEADER = "DSRetryableErrorHint" 420
421 - class __amf__:
422 static = ('extendedData', 'faultCode', 'faultDetail', 'faultString', 423 'rootCause')
424
425 - def __init__(self, *args, **kwargs):
426 AcknowledgeMessage.__init__(self, *args, **kwargs) 427 #: Extended data that the remote destination has chosen to associate 428 #: with this error to facilitate custom error processing on the client. 429 self.extendedData = kwargs.get('extendedData', {}) 430 #: Fault code for the error. 431 self.faultCode = kwargs.get('faultCode', None) 432 #: Detailed description of what caused the error. 433 self.faultDetail = kwargs.get('faultDetail', None) 434 #: A simple description of the error. 435 self.faultString = kwargs.get('faultString', None) 436 #: Should a traceback exist for the error, this property contains the 437 #: message. 438 self.rootCause = kwargs.get('rootCause', {})
439
440 - def getSmallMessage(self):
441 """ 442 Return a ISmallMessage representation of this error message. 443 444 @since: 0.5 445 """ 446 raise NotImplementedError
447 448
449 -class RemotingMessage(AbstractMessage):
450 """ 451 I am used to send RPC requests to a remote endpoint. 452 453 @see: U{RemotingMessage on Livedocs (external) 454 <http://livedocs.adobe.com/flex/201/langref/mx/messaging/messages/RemotingMessage.html>} 455 """ 456
457 - class __amf__:
458 static = ('operation', 'source')
459
460 - def __init__(self, *args, **kwargs):
461 AbstractMessage.__init__(self, *args, **kwargs) 462 #: Name of the remote method/operation that should be called. 463 self.operation = kwargs.get('operation', None) 464 #: Name of the service to be called including package name. 465 #: This property is provided for backwards compatibility. 466 self.source = kwargs.get('source', None)
467 468
469 -class AcknowledgeMessageExt(AcknowledgeMessage):
470 """ 471 An L{AcknowledgeMessage}, but implementing C{ISmallMessage}. 472 473 @since: 0.5 474 """ 475
476 - class __amf__:
477 external = True
478 479
480 -class CommandMessageExt(CommandMessage):
481 """ 482 A L{CommandMessage}, but implementing C{ISmallMessage}. 483 484 @since: 0.5 485 """ 486
487 - class __amf__:
488 external = True
489 490
491 -class AsyncMessageExt(AsyncMessage):
492 """ 493 A L{AsyncMessage}, but implementing C{ISmallMessage}. 494 495 @since: 0.5 496 """ 497
498 - class __amf__:
499 external = True
500 501
502 -def read_flags(input):
503 """ 504 @since: 0.5 505 """ 506 flags = [] 507 508 done = False 509 510 while not done: 511 byte = input.readUnsignedByte() 512 513 if not byte & SMALL_FLAG_MORE: 514 done = True 515 else: 516 byte = byte ^ SMALL_FLAG_MORE 517 518 flags.append(byte) 519 520 return flags
521 522
523 -def decode_uuid(obj):
524 """ 525 Decode a L{ByteArray} contents to a C{uuid.UUID} instance. 526 527 @since: 0.5 528 """ 529 return uuid.UUID(bytes=str(obj))
530 531 532 pyamf.register_package(globals(), package=NAMESPACE) 533 pyamf.register_class(AcknowledgeMessageExt, 'DSK') 534 pyamf.register_class(CommandMessageExt, 'DSC') 535 pyamf.register_class(AsyncMessageExt, 'DSA') 536