View Javadoc

1   // ========================================================================
2   // Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.thread;
16  
17  import java.io.Serializable;
18  import java.util.ArrayList;
19  import java.util.HashSet;
20  import java.util.Iterator;
21  import java.util.List;
22  import java.util.Set;
23  
24  import org.mortbay.component.AbstractLifeCycle;
25  import org.mortbay.log.Log;
26  
27  /* ------------------------------------------------------------ */
28  /** A pool of threads.
29   * <p>
30   * Avoids the expense of thread creation by pooling threads after
31   * their run methods exit for reuse.
32   * <p>
33   * If an idle thread is available a job is directly dispatched,
34   * otherwise the job is queued.  After queuing a job, if the total
35   * number of threads is less than the maximum pool size, a new thread 
36   * is spawned.
37   * <p>
38  
39   * @author Greg Wilkins <gregw@mortbay.com>
40   */
41  public class QueuedThreadPool extends AbstractLifeCycle implements Serializable, ThreadPool
42  {
43      private static int __id;
44      
45      private String _name;
46      private Set _threads;
47      private List _idle;
48      private Runnable[] _jobs;
49      private int _nextJob;
50      private int _nextJobSlot;
51      private int _queued;
52      private int _maxQueued;
53      
54      private boolean _daemon;
55      private int _id;
56  
57      private final Object _lock = new Lock();
58      private final Object _threadsLock = new Lock();
59      private final Object _joinLock = new Lock();
60  
61      private long _lastShrink;
62      private int _maxIdleTimeMs=60000;
63      private int _maxThreads=250;
64      private int _minThreads=2;
65      private boolean _warned=false;
66      private int _lowThreads=0;
67      private int _priority= Thread.NORM_PRIORITY;
68      private int _spawnOrShrinkAt=0;
69      private int _maxStopTimeMs;
70  
71      
72      /* ------------------------------------------------------------------- */
73      /* Construct
74       */
75      public QueuedThreadPool()
76      {
77          _name="qtp"+__id++;
78      }
79      
80      /* ------------------------------------------------------------------- */
81      /* Construct
82       */
83      public QueuedThreadPool(int maxThreads)
84      {
85          this();
86          setMaxThreads(maxThreads);
87      }
88  
89      /* ------------------------------------------------------------ */
90      /** Run job.
91       * @return true 
92       */
93      public boolean dispatch(Runnable job) 
94      {  
95          if (!isRunning() || job==null)
96              return false;
97  
98          PoolThread thread=null;
99          boolean spawn=false;
100             
101         synchronized(_lock)
102         {
103             // Look for an idle thread
104             int idle=_idle.size();
105             if (idle>0)
106                 thread=(PoolThread)_idle.remove(idle-1);
107             else
108             {
109                 // queue the job
110                 _queued++;
111                 if (_queued>_maxQueued)
112                     _maxQueued=_queued;
113                 _jobs[_nextJobSlot++]=job;
114                 if (_nextJobSlot==_jobs.length)
115                     _nextJobSlot=0;
116                 if (_nextJobSlot==_nextJob)
117                 {
118                     // Grow the job queue
119                     Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
120                     int split=_jobs.length-_nextJob;
121                     if (split>0)
122                         System.arraycopy(_jobs,_nextJob,jobs,0,split);
123                     if (_nextJob!=0)
124                         System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
125                     
126                     _jobs=jobs;
127                     _nextJob=0;
128                     _nextJobSlot=_queued;
129                 }
130                   
131                 spawn=_queued>_spawnOrShrinkAt;
132             }
133         }
134         
135         if (thread!=null)
136         {
137             thread.dispatch(job);
138         }
139         else if (spawn)
140         {
141             newThread();
142         }
143         return true;
144     }
145 
146     /* ------------------------------------------------------------ */
147     /** Get the number of idle threads in the pool.
148      * @see #getThreads
149      * @return Number of threads
150      */
151     public int getIdleThreads()
152     {
153         return _idle==null?0:_idle.size();
154     }
155     
156     /* ------------------------------------------------------------ */
157     /**
158      * @return low resource threads threshhold
159      */
160     public int getLowThreads()
161     {
162         return _lowThreads;
163     }
164     
165     /* ------------------------------------------------------------ */
166     /**
167      * @return maximum queue size
168      */
169     public int getMaxQueued()
170     {
171         return _maxQueued;
172     }
173     
174     /* ------------------------------------------------------------ */
175     /** Get the maximum thread idle time.
176      * Delegated to the named or anonymous Pool.
177      * @see #setMaxIdleTimeMs
178      * @return Max idle time in ms.
179      */
180     public int getMaxIdleTimeMs()
181     {
182         return _maxIdleTimeMs;
183     }
184     
185     /* ------------------------------------------------------------ */
186     /** Set the maximum number of threads.
187      * Delegated to the named or anonymous Pool.
188      * @see #setMaxThreads
189      * @return maximum number of threads.
190      */
191     public int getMaxThreads()
192     {
193         return _maxThreads;
194     }
195 
196     /* ------------------------------------------------------------ */
197     /** Get the minimum number of threads.
198      * Delegated to the named or anonymous Pool.
199      * @see #setMinThreads
200      * @return minimum number of threads.
201      */
202     public int getMinThreads()
203     {
204         return _minThreads;
205     }
206 
207     /* ------------------------------------------------------------ */
208     /** 
209      * @return The name of the BoundedThreadPool.
210      */
211     public String getName()
212     {
213         return _name;
214     }
215 
216     /* ------------------------------------------------------------ */
217     /** Get the number of threads in the pool.
218      * @see #getIdleThreads
219      * @return Number of threads
220      */
221     public int getThreads()
222     {
223         return _threads.size();
224     }
225 
226     /* ------------------------------------------------------------ */
227     /** Get the priority of the pool threads.
228      *  @return the priority of the pool threads.
229      */
230     public int getThreadsPriority()
231     {
232         return _priority;
233     }
234 
235     /* ------------------------------------------------------------ */
236     public int getQueueSize()
237     {
238         return _queued;
239     }
240     
241     /* ------------------------------------------------------------ */
242     /**
243      * @return the spawnOrShrinkAt  The number of queued jobs (or idle threads) needed 
244      * before the thread pool is grown (or shrunk)
245      */
246     public int getSpawnOrShrinkAt()
247     {
248         return _spawnOrShrinkAt;
249     }
250 
251     /* ------------------------------------------------------------ */
252     /**
253      * @param spawnOrShrinkAt The number of queued jobs (or idle threads) needed 
254      * before the thread pool is grown (or shrunk)
255      */
256     public void setSpawnOrShrinkAt(int spawnOrShrinkAt)
257     {
258         _spawnOrShrinkAt=spawnOrShrinkAt;
259     }
260 
261     /* ------------------------------------------------------------ */
262     /**
263      * @return maximum total time that stop() will wait for threads to die.
264      */
265     public int getMaxStopTimeMs()
266     {
267         return _maxStopTimeMs;
268     }
269 
270     /* ------------------------------------------------------------ */
271     /**
272      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
273      */
274     public void setMaxStopTimeMs(int stopTimeMs)
275     {
276         _maxStopTimeMs = stopTimeMs;
277     }
278 
279     /* ------------------------------------------------------------ */
280     /** 
281      * Delegated to the named or anonymous Pool.
282      */
283     public boolean isDaemon()
284     {
285         return _daemon;
286     }
287 
288     /* ------------------------------------------------------------ */
289     public boolean isLowOnThreads()
290     {
291         return _queued>_lowThreads;
292     }
293 
294     /* ------------------------------------------------------------ */
295     public void join() throws InterruptedException
296     {
297         synchronized (_joinLock)
298         {
299             while (isRunning())
300                 _joinLock.wait();
301         }
302         
303         // TODO remove this semi busy loop!
304         while (isStopping())
305             Thread.sleep(100);
306     }
307 
308     /* ------------------------------------------------------------ */
309     /** 
310      * Delegated to the named or anonymous Pool.
311      */
312     public void setDaemon(boolean daemon)
313     {
314         _daemon=daemon;
315     }
316 
317     /* ------------------------------------------------------------ */
318     /**
319      * @param lowThreads low resource threads threshhold
320      */
321     public void setLowThreads(int lowThreads)
322     {
323         _lowThreads = lowThreads;
324     }
325     
326     /* ------------------------------------------------------------ */
327     /** Set the maximum thread idle time.
328      * Threads that are idle for longer than this period may be
329      * stopped.
330      * Delegated to the named or anonymous Pool.
331      * @see #getMaxIdleTimeMs
332      * @param maxIdleTimeMs Max idle time in ms.
333      */
334     public void setMaxIdleTimeMs(int maxIdleTimeMs)
335     {
336         _maxIdleTimeMs=maxIdleTimeMs;
337     }
338 
339     /* ------------------------------------------------------------ */
340     /** Set the maximum number of threads.
341      * Delegated to the named or anonymous Pool.
342      * @see #getMaxThreads
343      * @param maxThreads maximum number of threads.
344      */
345     public void setMaxThreads(int maxThreads)
346     {
347         if (isStarted() && maxThreads<_minThreads)
348             throw new IllegalArgumentException("!minThreads<maxThreads");
349         _maxThreads=maxThreads;
350     }
351 
352     /* ------------------------------------------------------------ */
353     /** Set the minimum number of threads.
354      * Delegated to the named or anonymous Pool.
355      * @see #getMinThreads
356      * @param minThreads minimum number of threads
357      */
358     public void setMinThreads(int minThreads)
359     {
360         if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
361             throw new IllegalArgumentException("!0<=minThreads<maxThreads");
362         _minThreads=minThreads;
363         synchronized (_threadsLock)
364         {
365             while (isStarted() && _threads.size()<_minThreads)
366             {
367                 newThread();   
368             }
369         }
370     }
371 
372     /* ------------------------------------------------------------ */
373     /** 
374      * @param name Name of the BoundedThreadPool to use when naming Threads.
375      */
376     public void setName(String name)
377     {
378         _name= name;
379     }
380 
381     /* ------------------------------------------------------------ */
382     /** Set the priority of the pool threads.
383      *  @param priority the new thread priority.
384      */
385     public void setThreadsPriority(int priority)
386     {
387         _priority=priority;
388     }
389 
390     /* ------------------------------------------------------------ */
391     /* Start the BoundedThreadPool.
392      * Construct the minimum number of threads.
393      */
394     protected void doStart() throws Exception
395     {
396         if (_maxThreads<_minThreads || _minThreads<=0)
397             throw new IllegalArgumentException("!0<minThreads<maxThreads");
398         
399         _threads=new HashSet();
400         _idle=new ArrayList();
401         _jobs=new Runnable[_maxThreads];
402         
403         for (int i=0;i<_minThreads;i++)
404         {
405             newThread();
406         }   
407     }
408 
409     /* ------------------------------------------------------------ */
410     /** Stop the BoundedThreadPool.
411      * New jobs are no longer accepted,idle threads are interrupted
412      * and stopJob is called on active threads.
413      * The method then waits 
414      * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
415      * stop, at which time killJob is called.
416      */
417     protected void doStop() throws Exception
418     {   
419         super.doStop();
420         
421         long start=System.currentTimeMillis();
422         for (int i=0;i<100;i++)
423         {
424             synchronized (_threadsLock)
425             {
426                 Iterator iter = _threads.iterator();
427                 while (iter.hasNext())
428                     ((Thread)iter.next()).interrupt();
429             }
430             
431             Thread.yield();
432             if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start)))
433                break;
434             
435             try
436             {
437                 Thread.sleep(i*100);
438             }
439             catch(InterruptedException e){}
440             
441             
442         }
443 
444         // TODO perhaps force stops
445         if (_threads.size()>0)
446             Log.warn(_threads.size()+" threads could not be stopped");
447         
448         synchronized (_joinLock)
449         {
450             _joinLock.notifyAll();
451         }
452     }
453 
454     /* ------------------------------------------------------------ */
455     protected void newThread()
456     {
457         synchronized (_threadsLock)
458         {
459             if (_threads.size()<_maxThreads)
460             {
461                 PoolThread thread =new PoolThread();
462                 _threads.add(thread);
463                 thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);
464                 thread.start(); 
465             }
466             else if (!_warned)    
467             {
468                 _warned=true;
469                 Log.debug("Max threads for {}",this);
470             }
471         }
472     }
473 
474     /* ------------------------------------------------------------ */
475     /** Stop a Job.
476      * This method is called by the Pool if a job needs to be stopped.
477      * The default implementation does nothing and should be extended by a
478      * derived thread pool class if special action is required.
479      * @param thread The thread allocated to the job, or null if no thread allocated.
480      * @param job The job object passed to run.
481      */
482     protected void stopJob(Thread thread, Object job)
483     {
484         thread.interrupt();
485     }
486     
487 
488     /* ------------------------------------------------------------ */
489     /** Pool Thread class.
490      * The PoolThread allows the threads job to be
491      * retrieved and active status to be indicated.
492      */
493     public class PoolThread extends Thread 
494     {
495         Runnable _job=null;
496 
497         /* ------------------------------------------------------------ */
498         PoolThread()
499         {
500             setDaemon(_daemon);
501             setPriority(_priority);
502         }
503         
504         /* ------------------------------------------------------------ */
505         /** BoundedThreadPool run.
506          * Loop getting jobs and handling them until idle or stopped.
507          */
508         public void run()
509         {
510             boolean idle=false;
511             Runnable job=null;
512             try
513             {
514                 while (isRunning())
515                 {   
516                     // Run any job that we have.
517                     if (job!=null)
518                     {
519                         final Runnable todo=job;
520                         job=null;
521                         idle=false;
522                         todo.run();
523                     }
524                     
525                     synchronized(_lock)
526                     {
527                         // is there a queued job?
528                         if (_queued>0)
529                         {
530                             _queued--;
531                             job=_jobs[_nextJob++];
532                             if (_nextJob==_jobs.length)
533                                 _nextJob=0;
534                             continue;
535                         }
536 
537                         // Should we shrink?
538                         final int threads=_threads.size();
539                         if (threads>_minThreads && 
540                             (threads>_maxThreads || 
541                              _idle.size()>_spawnOrShrinkAt))   
542                         {
543                             long now = System.currentTimeMillis();
544                             if ((now-_lastShrink)>getMaxIdleTimeMs())
545                             {
546                                 _lastShrink=now;
547                                 _idle.remove(this);
548                                 return;
549                             }
550                         }
551 
552                         if (!idle)
553                         {   
554                             // Add ourselves to the idle set.
555                             _idle.add(this);
556                             idle=true;
557                         }
558                     }
559 
560                     // We are idle
561                     // wait for a dispatched job
562                     synchronized (this)
563                     {
564                         if (_job==null)
565                             this.wait(getMaxIdleTimeMs());
566                         job=_job;
567                         _job=null;
568                     }
569                 }
570             }
571             catch (InterruptedException e)
572             {
573                 Log.ignore(e);
574             }
575             finally
576             {
577                 synchronized (_lock)
578                 {
579                     _idle.remove(this);
580                 }
581                 synchronized (_threadsLock)
582                 {
583                     _threads.remove(this);
584                 }
585                 synchronized (this)
586                 {
587                     job=_job;
588                 }
589                 
590                 // we died with a job! reschedule it
591                 if (job!=null)
592                 {
593                     QueuedThreadPool.this.dispatch(job);
594                 }
595             }
596         }
597         
598         /* ------------------------------------------------------------ */
599         void dispatch(Runnable job)
600         {
601             synchronized (this)
602             {
603                 _job=job;
604                 this.notify();
605             }
606         }
607     }
608 
609     private class Lock{}
610 }