Package core :: Module event_bus
[hide private]
[frames] | no frames]

Source Code for Module core.event_bus

  1  # Copyright 2011 the original author or authors. 
  2  # 
  3  # Licensed under the Apache License, Version 2.0 (the "License"); 
  4  # you may not use this file except in compliance with the License. 
  5  # You may obtain a copy of the License at 
  6  # 
  7  #      http://www.apache.org/licenses/LICENSE-2.0 
  8  # 
  9  # Unless required by applicable law or agreed to in writing, software 
 10  # distributed under the License is distributed on an "AS IS" BASIS, 
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 12  # See the License for the specific language governing permissions and 
 13  # limitations under the License. 
 14   
 15  import org.vertx.java.deploy.impl.VertxLocator 
 16  import org.vertx.java.core.buffer 
 17  import org.vertx.java.core 
 18  import org.vertx.java.core.json 
 19  import java.lang 
 20   
 21  from core.javautils import map_to_java, map_from_java 
 22  from core.buffer import Buffer 
 23   
 24  __author__ = "Scott Horn" 
 25  __email__ = "scott@hornmicro.com" 
 26  __credits__ = "Based entirely on work by Tim Fox http://tfox.org" 
27 28 -class EventBus(object):
29 """This class represents a distributed lightweight event bus which can encompass multiple vert.x instances. 30 It is very useful for otherwise isolated vert.x application instances to communicate with each other. 31 32 Messages sent over the event bus are JSON objects represented as Ruby Hash instances. 33 34 The event bus implements a distributed publish / subscribe network. 35 36 Messages are sent to an address. 37 38 There can be multiple handlers registered against that address. 39 Any handlers with a matching name will receive the message irrespective of what vert.x application instance and 40 what vert.x instance they are located in. 41 42 All messages sent over the bus are transient. On event of failure of all or part of the event bus messages 43 may be lost. Applications should be coded to cope with lost messages, e.g. by resending them, and making application 44 services idempotent. 45 46 The order of messages received by any specific handler from a specific sender will match the order of messages 47 sent from that sender. 48 49 When sending a message, a reply handler can be provided. If so, it will be called when the reply from the receiver 50 has been received. 51 52 When receiving a message in a handler the received object is an instance of EventBus::Message - this contains 53 the actual Hash of the message plus a reply method which can be used to reply to it. 54 """ 55 handler_dict = {} 56 57 @staticmethod
58 - def java_eventbus():
59 return org.vertx.java.deploy.impl.VertxLocator.vertx.eventBus()
60 61 @staticmethod
62 - def send(address, message, reply_handler=None):
63 """Send a message on the event bus 64 65 Keyword arguments: 66 @param address: the address to publish to 67 @param message: The message to send 68 @param reply_handler: An optional reply handler. 69 It will be called when the reply from a receiver is received. 70 """ 71 EventBus.send_or_pub(True, address, message, reply_handler)
72 73 @staticmethod
74 - def publish(address, message):
75 """Publish a message on the event bus 76 77 Keyword arguments: 78 @param address: the address to publish to 79 @param message: The message to publish 80 """ 81 EventBus.send_or_pub(False, address, message)
82 83 @staticmethod
84 - def send_or_pub(send, address, message, reply_handler=None):
85 if not address: 86 raise RuntimeError("An address must be specified") 87 if message is None: 88 raise RuntimeError("A message must be specified") 89 message = EventBus.convert_msg(message) 90 if send: 91 if reply_handler != None: 92 EventBus.java_eventbus().send(address, message, InternalHandler(reply_handler)) 93 else: 94 EventBus.java_eventbus().send(address, message) 95 else: 96 EventBus.java_eventbus().publish(address, message)
97 98 99 @staticmethod
100 - def register_handler(address, local_only=False, handler=None):
101 """ Register a handler. 102 103 Keyword arguments: 104 @param address: the address to register for. Any messages sent to that address will be 105 received by the handler. A single handler can be registered against many addresses. 106 @param local_only: if True then handler won't be propagated across cluster 107 @param handler: The handler 108 109 @return: id of the handler which can be used in EventBus.unregister_handler 110 """ 111 if handler is None: 112 raise RuntimeError("handler is required") 113 internal = InternalHandler(handler) 114 if local_only: 115 id = EventBus.java_eventbus().registerLocalHandler(address, internal) 116 else: 117 id = EventBus.java_eventbus().registerHandler(address, internal) 118 EventBus.handler_dict[id] = internal 119 return id
120 121 @staticmethod
122 - def register_simple_handler(local_only=False, handler=None):
123 """ 124 Registers a handler against a uniquely generated address, the address is returned as the id 125 received by the handler. A single handler can be registered against many addresses. 126 127 Keyword arguments: 128 @param local_only: If Rrue then handler won't be propagated across cluster 129 @param handler: The handler 130 131 @return: id of the handler which can be used in EventBus.unregister_handler 132 """ 133 if handler is None: 134 raise RuntimeError("Handler is required") 135 internal = InternalHandler(handler) 136 if local_only: 137 id = EventBus.java_eventbus().registerLocalHandler(internal) 138 else: 139 id = EventBus.java_eventbus().registerHandler(internal) 140 EventBus.handler_dict[id] = internal 141 return id
142 143 @staticmethod
144 - def unregister_handler(handler_id):
145 """Unregisters a handler 146 147 Keyword arguments: 148 @param handler_id: the id of the handler to unregister. Returned from EventBus.register_handler 149 """ 150 handler = EventBus.handler_dict.pop(handler_id) 151 EventBus.java_eventbus().unregisterHandler(handler_id)
152 153 @staticmethod
154 - def convert_msg(message):
155 if isinstance(message, dict): 156 message = org.vertx.java.core.json.JsonObject(map_to_java(message)) 157 elif isinstance(message, Buffer): 158 message = message._to_java_buffer() 159 elif isinstance(message, long): 160 message = java.lang.Long(message) 161 elif isinstance(message, float): 162 message = java.lang.Double(message) 163 elif isinstance(message, int): 164 message = java.lang.Integer(message) 165 else: 166 message = map_to_java(message) 167 return message
168
169 -class InternalHandler(org.vertx.java.core.Handler):
170 - def __init__(self, handler):
171 self.handler = handler
172
173 - def handle(self, message):
174 self.handler(Message(message))
175
176 -class Message(object):
177 """Represents a message received from the event bus"""
178 - def __init__(self, message):
179 self.java_obj = message 180 if isinstance(message.body, org.vertx.java.core.json.JsonObject): 181 self.body = map_from_java(message.body.toMap()) 182 elif isinstance(message.body, org.vertx.java.core.buffer.Buffer): 183 self.body = Buffer(message.body) 184 else: 185 self.body = map_from_java(message.body)
186
187 - def reply(self, reply, handler=None):
188 """Reply to this message. If the message was sent specifying a receipt handler, that handler will be 189 called when it has received a reply. If the message wasn't sent specifying a receipt handler 190 this method does nothing. 191 Replying to a message this way is equivalent to sending a message to an address which is the same as the message id 192 of the original message. 193 194 Keyword arguments: 195 @param reply: message to send as reply 196 @param handler: the reply handler 197 """ 198 reply = EventBus.convert_msg(reply) 199 if handler is None: 200 self.java_obj.reply(reply) 201 else: 202 self.java_obj.reply(reply, InternalHandler(handler))
203