/Users/lyon/j4p/src/classUtils/pack/util/tp/ThreadPool.java
|
1 package classUtils.pack.util.tp;
2
3 import java.io.*;
4
5 /**
6 * A thread pooling class. Ensures that no more than <i>n</i> thread are alive
7 * at the same time, while queueing incoming requests.
8 * <p>
9 * The pooled threads can be created as daemon or not - if they're daemon, the
10 * JVM will exit when only pooled threads are running.
11 * <p>
12 * Note that a thread pool is <i>not</i> a thread in itself, i.e. is executed
13 * in the thread of the caller.
14 * <p>
15 * Use the {@link ThreadPoolThread ThreadPoolThread} for a pool which runs in a
16 * thread on its own and on emtpy queue just sleeps.
17 *
18 * @author Cris Sadun
19 * @version 1.0
20 */
21 public class ThreadPool {
22
23 private boolean shutdown=false;
24 private int size;
25 private Queue queue;
26 private PooledThread [] pool;
27 public boolean verbose=( System.getProperty("org.sadun.verbose") != null);
28
29 /**
30 * A thread class that waits() undefinitely unless explicitly
31 * notified. When notified, it attempts to run the associated
32 * runnable. If no runnable exists, or when the runnable exits or
33 * fails, it goes back to waiting.
34 */
35 public class PooledThread extends Thread {
36
37 private Runnable runnable;
38 private boolean waiting=true;
39 int i;
40
41 /**
42 * Create a PooledThread on the given group, with the given
43 * identifier and the given daemon property
44 */
45 PooledThread(ThreadGroup tg, int i, boolean daemon) {
46 super(tg, "pooled-thread-"+i);
47 //super(tg, "pooled-thread-"+i+" (free)");
48 this.i=i;
49 setDaemon(daemon);
50 }
51
52 private synchronized void setRunnable(Runnable r) {
53 if (! isFree())
54 throw new IllegalStateException("Thread "+getName()+" in the pool is not free");
55 if (verbose) System.out.println("{"+Thread.currentThread()+"} "+"Thread "+this+" is free, associating "+r);
56 //setName("pooled-thread-"+i+" hosting "+r);
57 runnable=r;
58 waiting=false;
59 notify();
60 }
61
62 /**
63 * Runs the available runnable object, or waits.
64 */
65 public final synchronized void run() {
66 do {
67 if (runnable != null) {
68 waiting=false;
69 try {
70 runnable.run();
71 } catch(Exception exc) {
72 // An exception must just set the pooled thread to free
73 exc.printStackTrace();
74 runnable=null;
75 //setName("pooled-thread-"+i+" (free)");
76 }
77 }
78
79 // Notify the pool the thread is free and ask for a new runnable,
80 // if any
81 runnable = getNextThread();
82 if (runnable != null) {
83 if (verbose) System.out.println("{"+Thread.currentThread()+"} "+"Dequeueed "+runnable+" ("+queue.size()+" in queue)");
84 continue;
85 }
86
87 waiting=true;
88 try {
89 if (verbose) System.out.println("{"+Thread.currentThread()+"} "+this+" going to wait.");
90 wait(0);
91 if (verbose) System.out.println("{"+Thread.currentThread()+"} "+this+" notified.");
92 } catch(InterruptedException e) {
93 if (verbose) System.out.println("{"+Thread.currentThread()+"} "+this+" INTERRUPTED!");
94 }
95 waiting=false;
96 } while(!shutdown);
97 }
98
99 /**
100 * Return true if the thread is not associated to any runnable
101 */
102 protected boolean isFree() { return waiting; }
103
104 /**
105 * Return the associated Runnable, or <b>null</b>
106 * @return the associated Runnable, or <b>null</b>
107 */
108 protected Runnable getRunnable() { return runnable; }
109
110 public String toString() { return getName(); }
111
112 /**
113 * Finalizes terminates the controlled threads.
114 */
115 public void finalize() {
116 terminate();
117 }
118 }
119
120 /**
121 * Return the next thread to run, as scheduled by the queue
122 */
123 private Runnable getNextThread() {
124 if (queue.isEmpty()) return null;
125 return (Runnable)queue.get();
126 }
127
128 /**
129 * Create a pool with the given size using the given queue object.
130 * @param size the size of the pool
131 * @param daemon if <b>true</b> the pools will be daemon
132 * @param queue the queue to use for determining the next thread to
133 * instantiate when there are pending start requests.
134 *
135 */
136 public ThreadPool(int size, boolean daemon, Queue queue) {
137 if (verbose) System.out.println("Creating thread pool of size "+size);
138 this.queue=queue;
139 this.pool=new PooledThread[size];
140 ThreadGroup tg = new ThreadGroup("thread-pool");
141 for(int i=0;i<size;i++) {
142 pool[i]= new PooledThread(tg, i, daemon);
143 pool[i].start();
144 }
145
146 }
147
148 /**
149 * Create a pool with the given size with a FIFO queue
150 * @param size the size of the pool
151 * @param daemon if <b>true</b> the pools will be daemon
152 */
153 public ThreadPool(int size, boolean daemon) {
154 this(size, daemon, new FIFOQueue());
155 }
156
157 /**
158 * Create a pool of daemon threads with the given size and a FIFO waiting queue
159 * @param size the size of the pool
160 */
161 public ThreadPool(int size) {
162 this(size, true);
163 }
164
165
166 /**
167 * Return the size of the pool
168 * @return the size of the pool
169 */
170 public int size() { return pool.length; }
171
172 /**
173 * Return the number of thread currently queued
174 * @return the number of thread currently queued
175 */
176 public int getQueueSize() { return queue.size(); }
177
178 /**
179 * Adds a runnable object to the pool.
180 * If there's a thread available, the runnable is associated to the thread
181 * and started. Else, it is queued, and will run as soon as one thread becomes
182 * available.
183 * @param runnable the Runnable object to execute
184 * @return <b>true</b> if the runnable is started, </b>false</b> if it's queued.
185 */
186 public boolean start(Runnable runnable) {
187 // Pick up an available thread
188 for(int i=0;i<size();i++) {
189 if (pool[i].isFree()) {
190 pool[i].setRunnable(runnable);
191 return true;
192 }
193 }
194 // Queue the thread
195 if (verbose) System.out.println("{"+Thread.currentThread()+"} "+"Queueing "+runnable);
196 queue.put(runnable);
197 return false;
198 }
199
200 /**
201 * Return <b>true</b> if the Runnable is associated with any pooled thread.
202 * @return <b>true</b> if the Runnable is associated with any pooled thread.
203 */
204 public synchronized boolean isAlive(Runnable r) {
205 for(int i=0;i<pool.length;i++) {
206 if (r == pool[i].getRunnable()) return true;
207 }
208 return false;
209 }
210
211 /**
212 * Unconditionally terminates all the threads in the pool.
213 * Free threads simply exit. Threads associated to a Runnable
214 * are interrupt()ed.
215 */
216 public void terminate() {
217 shutdown=true;
218 for(int i=0;i<pool.length;i++) {
219 if (verbose) System.out.println("{"+Thread.currentThread()+"} "+"Interrupting "+pool[i].getName());
220 pool[i].interrupt();
221 }
222 }
223 }