View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.execution;
17  
18  import java.lang.reflect.Method;
19  import java.util.concurrent.ConcurrentMap;
20  import java.util.concurrent.Executor;
21  import java.util.concurrent.Executors;
22  import java.util.concurrent.RejectedExecutionException;
23  import java.util.concurrent.RejectedExecutionHandler;
24  import java.util.concurrent.Semaphore;
25  import java.util.concurrent.ThreadFactory;
26  import java.util.concurrent.ThreadPoolExecutor;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.jboss.netty.buffer.ChannelBuffer;
31  import org.jboss.netty.channel.Channel;
32  import org.jboss.netty.channel.ChannelEvent;
33  import org.jboss.netty.channel.ChannelHandlerContext;
34  import org.jboss.netty.channel.ChannelState;
35  import org.jboss.netty.channel.ChannelStateEvent;
36  import org.jboss.netty.channel.MessageEvent;
37  import org.jboss.netty.channel.WriteCompletionEvent;
38  import org.jboss.netty.logging.InternalLogger;
39  import org.jboss.netty.logging.InternalLoggerFactory;
40  import org.jboss.netty.util.DefaultObjectSizeEstimator;
41  import org.jboss.netty.util.ObjectSizeEstimator;
42  import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
43  import org.jboss.netty.util.internal.LinkedTransferQueue;
44  import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
45  
46  /**
47   * A {@link ThreadPoolExecutor} which blocks the task submission when there's
48   * too many tasks in the queue.  Both per-{@link Channel} and per-{@link Executor}
49   * limitation can be applied.
50   * <p>
51   * When a task (i.e. {@link Runnable}) is submitted,
52   * {@link MemoryAwareThreadPoolExecutor} calls {@link ObjectSizeEstimator#estimateSize(Object)}
53   * to get the estimated size of the task in bytes to calculate the amount of
54   * memory occupied by the unprocessed tasks.
55   * <p>
56   * If the total size of the unprocessed tasks exceeds either per-{@link Channel}
57   * or per-{@link Executor} threshold, any further {@link #execute(Runnable)}
58   * call will block until the tasks in the queue are processed so that the total
59   * size goes under the threshold.
60   *
61   * <h3>Using an alternative task size estimation strategy</h3>
62   *
63   * Although the default implementation does its best to guess the size of an
64   * object of unknown type, it is always good idea to to use an alternative
65   * {@link ObjectSizeEstimator} implementation instead of the
66   * {@link DefaultObjectSizeEstimator} to avoid incorrect task size calculation,
67   * especially when:
68   * <ul>
69   *   <li>you are using {@link MemoryAwareThreadPoolExecutor} independently from
70   *       {@link ExecutionHandler},</li>
71   *   <li>you are submitting a task whose type is not {@link ChannelEventRunnable}, or</li>
72   *   <li>the message type of the {@link MessageEvent} in the {@link ChannelEventRunnable}
73   *       is not {@link ChannelBuffer}.</li>
74   * </ul>
75   * Here is an example that demonstrates how to implement an {@link ObjectSizeEstimator}
76   * which understands a user-defined object:
77   * <pre>
78   * public class MyRunnable implements {@link Runnable} {
79   *
80   *     <b>private final byte[] data;</b>
81   *
82   *     public MyRunnable(byte[] data) {
83   *         this.data = data;
84   *     }
85   *
86   *     public void run() {
87   *         // Process 'data' ..
88   *     }
89   * }
90   *
91   * public class MyObjectSizeEstimator extends {@link DefaultObjectSizeEstimator} {
92   *
93   *     {@literal @Override}
94   *     public int estimateSize(Object o) {
95   *         if (<b>o instanceof MyRunnable</b>) {
96   *             <b>return ((MyRunnable) o).data.length + 8;</b>
97   *         }
98   *         return super.estimateSize(o);
99   *     }
100  * }
101  *
102  * {@link ThreadPoolExecutor} pool = new {@link MemoryAwareThreadPoolExecutor}(
103  *         16, 65536, 1048576, 30, {@link TimeUnit}.SECONDS,
104  *         <b>new MyObjectSizeEstimator()</b>,
105  *         {@link Executors}.defaultThreadFactory());
106  *
107  * <b>pool.execute(new MyRunnable(data));</b>
108  * </pre>
109  *
110  * <h3>Event execution order</h3>
111  *
112  * Please note that this executor does not maintain the order of the
113  * {@link ChannelEvent}s for the same {@link Channel}.  For example,
114  * you can even receive a {@code "channelClosed"} event before a
115  * {@code "messageReceived"} event, as depicted by the following diagram.
116  *
117  * For example, the events can be processed as depicted below:
118  *
119  * <pre>
120  *           --------------------------------&gt; Timeline --------------------------------&gt;
121  *
122  * Thread X: --- Channel A (Event 2) --- Channel A (Event 1) ---------------------------&gt;
123  *
124  * Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) ---&gt;
125  *
126  * Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) ---&gt;
127  * </pre>
128  *
129  * To maintain the event order, you must use {@link OrderedMemoryAwareThreadPoolExecutor}.
130  *
131  * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
132  * @author <a href="http://gleamynode.net/">Trustin Lee</a>
133  *
134  * @version $Rev: 2351 $, $Date: 2010-08-26 11:55:10 +0900 (Thu, 26 Aug 2010) $
135  *
136  * @apiviz.has org.jboss.netty.util.ObjectSizeEstimator oneway - -
137  * @apiviz.has org.jboss.netty.handler.execution.ChannelEventRunnable oneway - - executes
138  */
139 public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
140 
141     private static final InternalLogger logger =
142         InternalLoggerFactory.getInstance(MemoryAwareThreadPoolExecutor.class);
143 
144     private static final SharedResourceMisuseDetector misuseDetector =
145         new SharedResourceMisuseDetector(MemoryAwareThreadPoolExecutor.class);
146 
147     private volatile Settings settings;
148 
149     private final ConcurrentMap<Channel, AtomicLong> channelCounters =
150         new ConcurrentIdentityHashMap<Channel, AtomicLong>();
151     private final AtomicLong totalCounter = new AtomicLong();
152 
153     private final Semaphore semaphore = new Semaphore(0);
154 
155     /**
156      * Creates a new instance.
157      *
158      * @param corePoolSize          the maximum number of active threads
159      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
160      *                              Specify {@code 0} to disable.
161      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
162      *                              Specify {@code 0} to disable.
163      */
164     public MemoryAwareThreadPoolExecutor(
165             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
166 
167         this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 30, TimeUnit.SECONDS);
168     }
169 
170     /**
171      * Creates a new instance.
172      *
173      * @param corePoolSize          the maximum number of active threads
174      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
175      *                              Specify {@code 0} to disable.
176      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
177      *                              Specify {@code 0} to disable.
178      * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
179      * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
180      */
181     public MemoryAwareThreadPoolExecutor(
182             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
183             long keepAliveTime, TimeUnit unit) {
184 
185         this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, Executors.defaultThreadFactory());
186     }
187 
188     /**
189      * Creates a new instance.
190      *
191      * @param corePoolSize          the maximum number of active threads
192      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
193      *                              Specify {@code 0} to disable.
194      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
195      *                              Specify {@code 0} to disable.
196      * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
197      * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
198      * @param threadFactory         the {@link ThreadFactory} of this pool
199      */
200     public MemoryAwareThreadPoolExecutor(
201             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
202             long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
203 
204         this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, new DefaultObjectSizeEstimator(), threadFactory);
205     }
206 
207     /**
208      * Creates a new instance.
209      *
210      * @param corePoolSize          the maximum number of active threads
211      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
212      *                              Specify {@code 0} to disable.
213      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
214      *                              Specify {@code 0} to disable.
215      * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
216      * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
217      * @param threadFactory         the {@link ThreadFactory} of this pool
218      * @param objectSizeEstimator   the {@link ObjectSizeEstimator} of this pool
219      */
220     public MemoryAwareThreadPoolExecutor(
221             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
222             long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator,
223             ThreadFactory threadFactory) {
224 
225         super(corePoolSize, corePoolSize, keepAliveTime, unit,
226               new LinkedTransferQueue<Runnable>(), threadFactory, new NewThreadRunsPolicy());
227 
228         if (objectSizeEstimator == null) {
229             throw new NullPointerException("objectSizeEstimator");
230         }
231         if (maxChannelMemorySize < 0) {
232             throw new IllegalArgumentException(
233                     "maxChannelMemorySize: " + maxChannelMemorySize);
234         }
235         if (maxTotalMemorySize < 0) {
236             throw new IllegalArgumentException(
237                     "maxTotalMemorySize: " + maxTotalMemorySize);
238         }
239 
240         // Call allowCoreThreadTimeOut(true) using reflection
241         // because it is not supported in Java 5.
242         try {
243             Method m = getClass().getMethod("allowCoreThreadTimeOut", new Class[] { boolean.class });
244             m.invoke(this, Boolean.TRUE);
245         } catch (Throwable t) {
246             // Java 5
247             logger.debug(
248                     "ThreadPoolExecutor.allowCoreThreadTimeOut() is not " +
249                     "supported in this platform.");
250         }
251 
252         settings = new Settings(
253                 objectSizeEstimator, maxChannelMemorySize, maxTotalMemorySize);
254 
255         // Misuse check
256         misuseDetector.increase();
257     }
258 
259     @Override
260     protected void terminated() {
261         super.terminated();
262         misuseDetector.decrease();
263     }
264 
265     /**
266      * Returns the {@link ObjectSizeEstimator} of this pool.
267      */
268     public ObjectSizeEstimator getObjectSizeEstimator() {
269         return settings.objectSizeEstimator;
270     }
271 
272     /**
273      * Sets the {@link ObjectSizeEstimator} of this pool.
274      */
275     public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
276         if (objectSizeEstimator == null) {
277             throw new NullPointerException("objectSizeEstimator");
278         }
279 
280         settings = new Settings(
281                 objectSizeEstimator,
282                 settings.maxChannelMemorySize, settings.maxTotalMemorySize);
283     }
284 
285     /**
286      * Returns the maximum total size of the queued events per channel.
287      */
288     public long getMaxChannelMemorySize() {
289         return settings.maxChannelMemorySize;
290     }
291 
292     /**
293      * Sets the maximum total size of the queued events per channel.
294      * Specify {@code 0} to disable.
295      */
296     public void setMaxChannelMemorySize(long maxChannelMemorySize) {
297         if (maxChannelMemorySize < 0) {
298             throw new IllegalArgumentException(
299                     "maxChannelMemorySize: " + maxChannelMemorySize);
300         }
301 
302         if (getTaskCount() > 0) {
303             throw new IllegalStateException(
304                     "can't be changed after a task is executed");
305         }
306 
307         settings = new Settings(
308                 settings.objectSizeEstimator,
309                 maxChannelMemorySize, settings.maxTotalMemorySize);
310     }
311 
312     /**
313      * Returns the maximum total size of the queued events for this pool.
314      */
315     public long getMaxTotalMemorySize() {
316         return settings.maxTotalMemorySize;
317     }
318 
319     /**
320      * Sets the maximum total size of the queued events for this pool.
321      * Specify {@code 0} to disable.
322      */
323     public void setMaxTotalMemorySize(long maxTotalMemorySize) {
324         if (maxTotalMemorySize < 0) {
325             throw new IllegalArgumentException(
326                     "maxTotalMemorySize: " + maxTotalMemorySize);
327         }
328 
329         if (getTaskCount() > 0) {
330             throw new IllegalStateException(
331                     "can't be changed after a task is executed");
332         }
333 
334         settings = new Settings(
335                 settings.objectSizeEstimator,
336                 settings.maxChannelMemorySize, maxTotalMemorySize);
337     }
338 
339     @Override
340     public void execute(Runnable command) {
341         if (!(command instanceof ChannelEventRunnable)) {
342             command = new MemoryAwareRunnable(command);
343         }
344 
345         boolean pause = increaseCounter(command);
346         doExecute(command);
347         if (pause) {
348             //System.out.println("ACQUIRE: " + command);
349             semaphore.acquireUninterruptibly();
350         }
351     }
352 
353     /**
354      * Put the actual execution logic here.  The default implementation simply
355      * calls {@link #doUnorderedExecute(Runnable)}.
356      */
357     protected void doExecute(Runnable task) {
358         doUnorderedExecute(task);
359     }
360 
361     /**
362      * Executes the specified task without maintaining the event order.
363      */
364     protected final void doUnorderedExecute(Runnable task) {
365         super.execute(task);
366     }
367 
368     @Override
369     public boolean remove(Runnable task) {
370         boolean removed = super.remove(task);
371         if (removed) {
372             decreaseCounter(task);
373         }
374         return removed;
375     }
376 
377     @Override
378     protected void beforeExecute(Thread t, Runnable r) {
379         super.beforeExecute(t, r);
380         decreaseCounter(r);
381     }
382 
383     protected boolean increaseCounter(Runnable task) {
384         if (!shouldCount(task)) {
385             return false;
386         }
387 
388         Settings settings = this.settings;
389         long maxTotalMemorySize = settings.maxTotalMemorySize;
390         long maxChannelMemorySize = settings.maxChannelMemorySize;
391 
392         int increment = settings.objectSizeEstimator.estimateSize(task);
393         long totalCounter = this.totalCounter.addAndGet(increment);
394 
395         if (task instanceof ChannelEventRunnable) {
396             ChannelEventRunnable eventTask = (ChannelEventRunnable) task;
397             eventTask.estimatedSize = increment;
398             Channel channel = eventTask.getEvent().getChannel();
399             long channelCounter = getChannelCounter(channel).addAndGet(increment);
400             //System.out.println("IC: " + channelCounter + ", " + increment);
401             if (maxChannelMemorySize != 0 && channelCounter >= maxChannelMemorySize && channel.isOpen()) {
402                 if (channel.isReadable()) {
403                     //System.out.println("UNREADABLE");
404                     ChannelHandlerContext ctx = eventTask.getContext();
405                     if (ctx.getHandler() instanceof ExecutionHandler) {
406                         // readSuspended = true;
407                         ctx.setAttachment(Boolean.TRUE);
408                     }
409                     channel.setReadable(false);
410                 }
411             }
412         } else {
413             ((MemoryAwareRunnable) task).estimatedSize = increment;
414         }
415 
416         //System.out.println("I: " + totalCounter + ", " + increment);
417         return maxTotalMemorySize != 0 && totalCounter >= maxTotalMemorySize;
418     }
419 
420     protected void decreaseCounter(Runnable task) {
421         if (!shouldCount(task)) {
422             return;
423         }
424 
425         Settings settings = this.settings;
426         long maxTotalMemorySize = settings.maxTotalMemorySize;
427         long maxChannelMemorySize = settings.maxChannelMemorySize;
428 
429         int increment;
430         if (task instanceof ChannelEventRunnable) {
431             increment = ((ChannelEventRunnable) task).estimatedSize;
432         } else {
433             increment = ((MemoryAwareRunnable) task).estimatedSize;
434         }
435 
436         long totalCounter = this.totalCounter.addAndGet(-increment);
437 
438         //System.out.println("D: " + totalCounter + ", " + increment);
439         if (maxTotalMemorySize != 0 && totalCounter + increment >= maxTotalMemorySize) {
440             //System.out.println("RELEASE: " + task);
441             while (semaphore.hasQueuedThreads()) {
442                 semaphore.release();
443             }
444         }
445 
446         if (task instanceof ChannelEventRunnable) {
447             ChannelEventRunnable eventTask = (ChannelEventRunnable) task;
448             Channel channel = eventTask.getEvent().getChannel();
449             long channelCounter = getChannelCounter(channel).addAndGet(-increment);
450             //System.out.println("DC: " + channelCounter + ", " + increment);
451             if (maxChannelMemorySize != 0 && channelCounter < maxChannelMemorySize && channel.isOpen()) {
452                 if (!channel.isReadable()) {
453                     //System.out.println("READABLE");
454                     ChannelHandlerContext ctx = eventTask.getContext();
455                     if (ctx.getHandler() instanceof ExecutionHandler) {
456                         // readSuspended = false;
457                         ctx.setAttachment(null);
458                     }
459                     channel.setReadable(true);
460                 }
461             }
462         }
463     }
464 
465     private AtomicLong getChannelCounter(Channel channel) {
466         AtomicLong counter = channelCounters.get(channel);
467         if (counter == null) {
468             counter = new AtomicLong();
469             AtomicLong oldCounter = channelCounters.putIfAbsent(channel, counter);
470             if (oldCounter != null) {
471                 counter = oldCounter;
472             }
473         }
474 
475         // Remove the entry when the channel closes.
476         if (!channel.isOpen()) {
477             channelCounters.remove(channel);
478         }
479         return counter;
480     }
481 
482     /**
483      * Returns {@code true} if and only if the specified {@code task} should
484      * be counted to limit the global and per-channel memory consumption.
485      * To override this method, you must call {@code super.shouldCount()} to
486      * make sure important tasks are not counted.
487      */
488     protected boolean shouldCount(Runnable task) {
489         if (task instanceof ChannelEventRunnable) {
490             ChannelEventRunnable r = (ChannelEventRunnable) task;
491             ChannelEvent e = r.getEvent();
492             if (e instanceof WriteCompletionEvent) {
493                 return false;
494             } else if (e instanceof ChannelStateEvent) {
495                 if (((ChannelStateEvent) e).getState() == ChannelState.INTEREST_OPS) {
496                     return false;
497                 }
498             }
499         }
500         return true;
501     }
502 
503     private static final class Settings {
504         final ObjectSizeEstimator objectSizeEstimator;
505         final long maxChannelMemorySize;
506         final long maxTotalMemorySize;
507 
508         Settings(ObjectSizeEstimator objectSizeEstimator,
509                  long maxChannelMemorySize, long maxTotalMemorySize) {
510             this.objectSizeEstimator = objectSizeEstimator;
511             this.maxChannelMemorySize = maxChannelMemorySize;
512             this.maxTotalMemorySize = maxTotalMemorySize;
513         }
514     }
515 
516     private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
517         NewThreadRunsPolicy() {
518             super();
519         }
520 
521         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
522             try {
523                 final Thread t = new Thread(r, "Temporary task executor");
524                 t.start();
525             } catch (Throwable e) {
526                 throw new RejectedExecutionException(
527                         "Failed to start a new thread", e);
528             }
529         }
530     }
531 
532     private static final class MemoryAwareRunnable implements Runnable {
533         final Runnable task;
534         int estimatedSize;
535 
536         MemoryAwareRunnable(Runnable task) {
537             this.task = task;
538         }
539 
540         public void run() {
541             task.run();
542         }
543     }
544 }