/Users/lyon/j4p/src/classUtils/pack/util/tp/ThreadPool.java

1    package classUtils.pack.util.tp; 
2     
3    import java.io.*; 
4     
5    /** 
6     * A thread pooling class. Ensures that no more than <i>n</i> thread are alive 
7     * at the same time, while queueing incoming requests. 
8     * <p> 
9     * The pooled threads can be created as daemon or not - if they're daemon, the 
10    * JVM will exit when only pooled threads are running. 
11    * <p> 
12    * Note that a thread pool is <i>not</i> a thread in itself, i.e. is executed 
13    * in the thread of the caller. 
14    * <p> 
15    * Use the {@link ThreadPoolThread ThreadPoolThread} for a pool which runs in a 
16    * thread on its own and on emtpy queue just sleeps. 
17    * 
18    * @author Cris Sadun 
19    * @version 1.0 
20    */ 
21   public class ThreadPool { 
22    
23       private boolean shutdown=false; 
24       private int size; 
25       private Queue queue; 
26       private PooledThread [] pool; 
27       public boolean verbose=( System.getProperty("org.sadun.verbose") != null); 
28    
29       /** 
30        * A thread class that waits() undefinitely unless explicitly 
31        * notified. When notified, it attempts to run the associated 
32        * runnable. If no runnable exists, or when the runnable exits or 
33        * fails, it goes back to waiting. 
34        */ 
35       public class PooledThread extends Thread { 
36    
37           private Runnable runnable; 
38           private boolean waiting=true; 
39           int i; 
40    
41           /** 
42            * Create a PooledThread on the given group, with the given 
43            * identifier and the given daemon property 
44            */ 
45           PooledThread(ThreadGroup tg, int i, boolean daemon) { 
46               super(tg, "pooled-thread-"+i); 
47               //super(tg, "pooled-thread-"+i+" (free)"); 
48               this.i=i; 
49               setDaemon(daemon); 
50           } 
51    
52           private synchronized void setRunnable(Runnable r) { 
53               if (! isFree()) 
54                   throw new IllegalStateException("Thread "+getName()+" in the pool is not free"); 
55               if (verbose) System.out.println("{"+Thread.currentThread()+"} "+"Thread "+this+" is free, associating "+r); 
56               //setName("pooled-thread-"+i+" hosting "+r); 
57               runnable=r; 
58               waiting=false; 
59               notify(); 
60           } 
61    
62           /** 
63            * Runs the available runnable object, or waits. 
64            */ 
65           public final synchronized void run() { 
66               do { 
67                   if (runnable != null) { 
68                       waiting=false; 
69                       try { 
70                           runnable.run(); 
71                       } catch(Exception exc) { 
72                           // An exception must just set the pooled thread to free 
73                           exc.printStackTrace(); 
74                           runnable=null; 
75                           //setName("pooled-thread-"+i+" (free)"); 
76                       } 
77                   } 
78    
79                   // Notify the pool the thread is free and ask for a new runnable, 
80                   // if any 
81                   runnable = getNextThread(); 
82                   if (runnable != null) { 
83                       if (verbose) System.out.println("{"+Thread.currentThread()+"} "+"Dequeueed "+runnable+" ("+queue.size()+" in queue)"); 
84                       continue; 
85                   } 
86    
87                   waiting=true; 
88                   try { 
89                       if (verbose) System.out.println("{"+Thread.currentThread()+"} "+this+" going to wait."); 
90                       wait(0); 
91                       if (verbose) System.out.println("{"+Thread.currentThread()+"} "+this+" notified."); 
92                   } catch(InterruptedException e) { 
93                       if (verbose) System.out.println("{"+Thread.currentThread()+"} "+this+" INTERRUPTED!"); 
94                   } 
95                   waiting=false; 
96               } while(!shutdown); 
97           } 
98    
99           /** 
100           * Return true if the thread is not associated to any runnable 
101           */ 
102          protected boolean isFree() { return waiting; } 
103   
104          /** 
105           * Return the associated Runnable, or <b>null</b> 
106           * @return the associated Runnable, or <b>null</b> 
107           */ 
108          protected Runnable getRunnable() { return runnable; } 
109   
110          public String toString() { return getName(); } 
111   
112          /** 
113           * Finalizes terminates the controlled threads. 
114           */ 
115          public void finalize() { 
116              terminate(); 
117          } 
118      } 
119   
120      /** 
121       * Return the next thread to run, as scheduled by the queue 
122       */ 
123      private Runnable getNextThread() { 
124          if (queue.isEmpty()) return null; 
125          return (Runnable)queue.get(); 
126      } 
127   
128      /** 
129       * Create a pool with the given size using the given queue object. 
130       * @param size the size of the pool 
131       * @param daemon if <b>true</b> the pools will be daemon 
132       * @param queue the queue to use for determining the next thread to 
133       *        instantiate when there are pending start requests. 
134       * 
135       */ 
136      public ThreadPool(int size, boolean daemon, Queue queue) { 
137          if (verbose) System.out.println("Creating thread pool of size "+size); 
138          this.queue=queue; 
139          this.pool=new PooledThread[size]; 
140          ThreadGroup tg = new ThreadGroup("thread-pool"); 
141          for(int i=0;i<size;i++) { 
142              pool[i]= new PooledThread(tg, i, daemon); 
143              pool[i].start(); 
144          } 
145   
146      } 
147   
148      /** 
149       * Create a pool with the given size with a FIFO queue 
150       * @param size the size of the pool 
151       * @param daemon if <b>true</b> the pools will be daemon 
152       */ 
153      public ThreadPool(int size, boolean daemon) { 
154          this(size, daemon, new FIFOQueue()); 
155      } 
156   
157      /** 
158       * Create a pool of daemon threads with the given size and a FIFO waiting queue 
159       * @param size the size of the pool 
160       */ 
161      public ThreadPool(int size) { 
162          this(size, true); 
163      } 
164   
165   
166      /** 
167       * Return the size of the pool 
168       * @return the size of the pool 
169       */ 
170      public int size() { return pool.length; } 
171   
172      /** 
173       * Return the number of thread currently queued 
174       * @return the number of thread currently queued 
175       */ 
176      public int getQueueSize() { return queue.size(); } 
177   
178      /** 
179       * Adds a runnable object to the pool. 
180       * If there's a thread available, the runnable is associated to the thread 
181       * and started. Else, it is queued, and will run as soon as one thread becomes 
182       * available. 
183       * @param runnable the Runnable object to execute 
184       * @return <b>true</b> if the runnable is started, </b>false</b> if it's queued. 
185       */ 
186      public boolean start(Runnable runnable) { 
187          // Pick up an available thread 
188          for(int i=0;i<size();i++) { 
189              if (pool[i].isFree()) { 
190                  pool[i].setRunnable(runnable); 
191                  return true; 
192              } 
193          } 
194          // Queue the thread 
195          if (verbose) System.out.println("{"+Thread.currentThread()+"} "+"Queueing "+runnable); 
196          queue.put(runnable); 
197          return false; 
198      } 
199   
200      /** 
201       * Return <b>true</b> if the Runnable is associated with any pooled thread. 
202       * @return <b>true</b> if the Runnable is associated with any pooled thread. 
203       */ 
204      public synchronized boolean isAlive(Runnable r) { 
205          for(int i=0;i<pool.length;i++) { 
206              if (r == pool[i].getRunnable()) return true; 
207          } 
208          return false; 
209      } 
210   
211      /** 
212       * Unconditionally terminates all the threads in the pool. 
213       * Free threads simply exit. Threads associated to a Runnable 
214       * are interrupt()ed. 
215       */ 
216      public void terminate() { 
217          shutdown=true; 
218          for(int i=0;i<pool.length;i++) { 
219              if (verbose) System.out.println("{"+Thread.currentThread()+"} "+"Interrupting "+pool[i].getName()); 
220              pool[i].interrupt(); 
221          } 
222      } 
223  }