/Users/lyon/j4p/src/classUtils/pack/util/ThreadBlockRunner.java
|
1 package classUtils.pack.util;
2
3 import java.util.Random;
4
5 /**
6 * Contains a thread to run, plus related information.
7 */
8 class ThreadDescriptor implements Cloneable {
9
10 private Thread thread;
11 private boolean synchronizeWithPrevious;
12
13 ThreadDescriptor(Thread t, boolean synchronizeWithPrevious) {
14 this.thread=t;
15 this.synchronizeWithPrevious=synchronizeWithPrevious;
16 }
17
18 ThreadDescriptor(Thread t) {
19 this(t, false);
20 }
21
22 public Object clone() {
23 return new ThreadDescriptor(thread,synchronizeWithPrevious);
24 }
25
26 /**
27 * Returns the synchronizeWithPrevious.
28 * @return boolean
29 */
30 public boolean isSynchronizeWithPrevious() {
31 return synchronizeWithPrevious;
32 }
33
34 /**
35 * Sets the synchronizeWithPrevious.
36 * @param synchronizeWithPrevious The synchronizeWithPrevious to set
37 */
38 public void setSynchronizeWithPrevious(boolean synchronizeWithPrevious) {
39 this.synchronizeWithPrevious = synchronizeWithPrevious;
40 }
41
42 /**
43 * Returns the thread.
44 * @return Thread
45 */
46 public Thread getThread() {
47 return thread;
48 }
49
50 public static ThreadDescriptor[] toDefaultDescriptorArray(Thread [] threads) {
51 ThreadDescriptor[] td = new ThreadDescriptor[threads.length];
52 for(int i=0;i<threads.length;i++) {
53 td[i]=new ThreadDescriptor(threads[i]);
54 }
55 return td;
56 }
57
58 /**
59 * Sets the thread.
60 * @param thread The thread to set
61 */
62 public void setThread(Thread thread) {
63 this.thread = thread;
64 }
65
66 }
67
68 /**
69 * Runs a block of threads within an array, with a maximum number of concurrent threads,
70 * and returning only when all the threads in the block have finished.
71 * <p>
72 * Optionally, each thread (but the first in each block) can be synchronized with the previous one, that is,
73 * it waits to start for the previous one is terminated.
74 *
75 * @author cris sadun
76 * @version 2.0
77 */
78 public class ThreadBlockRunner {
79
80 private long sleepTime=1000L;
81 private ThreadDescriptor []threadDescriptors;
82 private int maxThreads;
83
84 /**
85 * Create a runner for the given array of threads, which will run a maximum of maxThreads
86 * threads concurrently
87 * @param threads the array of threads to run
88 * @param maxThreads the maximum number of concurrent arrays to run
89 */
90 public ThreadBlockRunner(Thread []threads, int maxThreads) {
91 this.threadDescriptors=ThreadDescriptor.toDefaultDescriptorArray(threads);
92 this.maxThreads=maxThreads;
93 if (maxThreads < 1) throw new IllegalArgumentException("Invalid maxThreads value:"+maxThreads);
94 }
95
96 private int findThreadIndex(Thread t) {
97 for(int i=0;i<threadDescriptors.length;i++) {
98 if (threadDescriptors[i].getThread() == t) return i;
99 }
100 throw new IllegalArgumentException("Programming error: thread "+t.getName()+" is not controlled by the this ThreadBlockRunner object");
101 }
102
103 /**
104 * Set a certain thread in the threads passed at construction to be synchronized with the previous one.
105 * <p> If at execution time the thread results the first in a block, the synchronization will be ignored.
106 * @param thread one of the threads passed at {@link #ThreadBlockRunner(java.lang.Thread[], int) construction}.
107 * @param value if <b>true</b>, the thread will wait until the previous one has terminated before starting.
108 */
109 public void setSynchronizedWithPrevious(Thread thread, boolean value) {
110 setSynchronizedWithPrevious(findThreadIndex(thread), value);
111
112 }
113
114 /**
115 * Return whether or not a thread is synchronized with the previous one (see
116 * {@link #setSynchronizedWithPrevious(java.lang.Thread, boolean) setSynchronizeWithPrevious()}).
117 * @param thread one of the threads passed at {@link #ThreadBlockRunner(java.lang.Thread[], int) construction}.
118 * @return whether or not the thread is synchronized with the previous one.
119 */
120 public boolean isSynchronizedWithPrevious(Thread thread) {
121 return isSynchronizedWithPrevious(findThreadIndex(thread));
122 }
123
124 /**
125 * Set a the i-th thread (realtive to the thread array passed at construction) to be synchronized with the
126 * previous one.
127 * <p> If at execution time the thread results the first in a block, the synchronization will be ignored.
128 * @param i a valid index in the thread array passed at {@link #ThreadBlockRunner(java.lang.Thread[], int) construction}.
129 * @param value if <b>true</b>, the thread will wait until the previous one has terminated before starting.
130 */
131 public void setSynchronizedWithPrevious(int i, boolean value) {
132 if (i<0 || i>=threadDescriptors.length) throw new IllegalArgumentException("Programming error: invalid thread index");
133 threadDescriptors[i].setSynchronizeWithPrevious(value);
134 }
135
136 /**
137 * Return whether or not a thread is synchronized with the previous one (see
138 * {@link #setSynchronizedWithPrevious(java.lang.Thread, boolean) setSynchronizeWithPrevious()}).
139 * @param i a valid index in the thread array passed at {@link #ThreadBlockRunner(java.lang.Thread[], int) construction}.
140 * @return whether or not the thread in the i-th position is synchronized with the previous one.
141 */
142 public boolean isSynchronizedWithPrevious(int i) {
143 if (i<0 || i>=threadDescriptors.length) throw new IllegalArgumentException("Programming error: invalid thread index");
144 return threadDescriptors[i].isSynchronizeWithPrevious();
145 }
146
147
148
149 /**
150 * Run a block of consecutive threads from the array passed at construction.
151 * <p>
152 * If any thread's "synchronizeWithPrevious" flag has been set, the thread waits for
153 * the previous thread to terminate before starting (the first thread flag state is ignored).
154 *
155 * @param blockStart the index of the first thread to run, inclusive
156 * @param blockEnd the index of the last thread to run, exclusive
157 */
158 public void runBlock(int blockStart, int blockEnd) {
159 if(blockStart<0 || blockEnd>threadDescriptors.length)
160 throw new IllegalArgumentException(threadDescriptors.length+" threads in the array. ("+blockStart+","+
161 blockEnd+") is an invalid range");
162
163 /*
164 ThreadDescriptor [] threadDescriptors2=new ThreadDescriptor[threadDescriptors.length];
165 System.arraycopy(threadDescriptors,0,threadDescriptors2,0,threadDescriptors.length);
166 */
167 ThreadDescriptor [] threadDescriptors2=new ThreadDescriptor[threadDescriptors.length];
168 for(int i=0;i<threadDescriptors2.length;i++) {
169 threadDescriptors2[i]=(ThreadDescriptor)threadDescriptors[i].clone();
170 }
171 int count=0;
172 int waitingForPreviousCount=0;
173 boolean waitingForPrevious[] = new boolean[threadDescriptors2.length];
174 boolean started[] = new boolean[threadDescriptors2.length];
175 do {
176 for(int i=blockStart;i<blockEnd;i++) {
177 if (threadDescriptors2[i].getThread()==null) {
178 //System.out.println(threadDescriptors[i].getThread().getName()+" has already terminated, skipping");
179 continue;
180 }
181
182 // If the first entry is supposed to be synchronized, ignore the fact:
183
184 if (i>blockStart &&
185 threadDescriptors2[i].isSynchronizeWithPrevious()) {
186 if (waitingForPrevious[i]==false) {
187 waitingForPrevious[i]=true;
188 waitingForPreviousCount++;
189 // Do not start the thread - yet
190 //System.out.println("Not starting "+threadDescriptors2[i].getThread().getName()+" since it must wait for previous to finish");
191 } else {
192 if (! started[i]) {
193 // Start the thread only if the previous is dead
194 //System.out.print("Checking if "+threadDescriptors[i-1].getThread().getName()+" has finished..");
195 if (started[i-1] && ! threadDescriptors[i-1].getThread().isAlive()) {
196 // Do not reset waitingForPrevious flag, to avoid restarts
197 waitingForPreviousCount--;
198 //System.out.println("Starting "+threadDescriptors2[i].getThread().getName()+" since previous has finished.");
199 threadDescriptors2[i].getThread().start();
200 started[i]=true;
201 count++;
202 } else {
203 //System.out.println("Not starting "+threadDescriptors2[i].getThread().getName()+" since previous has not finished yet");
204 }
205 } else {
206 //System.out.println("Thread "+threadDescriptors[i].getThread().getName()+" has already started, skipping");
207 }
208 }
209
210 } else {
211 if (! started[i]) {
212 started[i]=true;
213 Thread tr=threadDescriptors2[i].getThread();
214 //System.out.println("Starting "+tr.getName());
215 tr.start();
216 count++;
217 }
218 }
219
220 if (count == maxThreads) {
221 count=waitForOneToFinish(threadDescriptors2, started, blockStart, i+1);
222 //assert(count >= 0);
223 //assert(count <= maxThreads);
224 }
225 }
226 //System.out.println("("+waitingForPreviousCount+" threads waiting for a previous one to finish)");
227
228 if (waitingForPreviousCount>0) {
229 try {
230 Thread.sleep(sleepTime);
231 } catch (InterruptedException e) {
232 e.printStackTrace();
233 }
234 }
235
236 } while (waitingForPreviousCount>0);
237
238 waitForAllFinished(blockStart, blockEnd);
239 }
240
241 /**
242 * Wait for at least one thread in the given range to finish
243 * @return the number of alive threads in the given range
244 */
245 private int waitForOneToFinish(ThreadDescriptor[] threads, boolean [] started, int start, int end) {
246 int count, aliveCount;
247 do {
248 count=0;
249 aliveCount=0;
250 for(int i=start;i<end;i++) {
251 if (!started[i]) continue;
252 if (threads[i].getThread()!=null) {
253 if (! threads[i].getThread().isAlive()) {
254 threads[i].setThread(null);
255 count++;
256 } else { aliveCount++; }
257 }
258 }
259 if (count==0)
260 try {
261 Thread.sleep(sleepTime);
262 } catch (InterruptedException e) {
263 e.printStackTrace();
264 }
265 } while(count==0);
266 return aliveCount;
267 }
268
269 /**
270 * Wait for all the threads within the given range in the "threads" member to be finished.
271 */
272 private void waitForAllFinished(int start, int end) {
273 int count;
274 do {
275 count=0;
276 for(int i=start;i<end;i++) {
277 if (threadDescriptors[i].getThread() == null) continue;
278 if (threadDescriptors[i].getThread().isAlive()) count++;
279 }
280 if (count>0)
281 try {
282 Thread.sleep(sleepTime);
283 } catch (InterruptedException e) {
284 e.printStackTrace();
285 }
286 } while(count>0);
287 }
288
289 private static Random testRandom = new Random();
290 private static Object lock = new Object();
291
292 private static class TestThread extends Thread {
293
294 int count;
295
296 TestThread(int count) {
297 super("thread-"+count);
298 this.count=count;
299 }
300
301 public void run() {
302 long l=20L;
303 if (count % 2 == 0) l=1000L;
304 synchronized(lock) {
305 System.out.println(getName()+" started");
306 System.out.flush();
307 }
308 try {
309 sleep(l);
310 } catch(InterruptedException e) {
311 e.printStackTrace();
312 }
313 synchronized(lock) {
314 System.out.println(getName()+" finished");
315 System.out.println(getName()+" -------------------");
316 System.out.flush();
317 }
318 }
319 }
320
321 /**
322 * A test method.
323 */
324 public static void main2(String args[]) {
325 Thread [] threads = new Thread[3];
326 Random random = new Random();
327 for(int i=0;i<threads.length;i++) {
328 threads[i]=new ThreadBlockRunner.TestThread(i);
329 }
330
331 //threads[0]=null;
332 ThreadBlockRunner tbr = new ThreadBlockRunner(threads, 3);
333 for(int i=0;i<threads.length;i++) {
334 if (i==1) {
335 tbr.setSynchronizedWithPrevious(i, true);
336 }
337 }
338
339 tbr.runBlock(0,3);
340 synchronized(lock) {
341 System.out.println("********************");
342 }
343 }
344
345 public static void main(String args[]) {
346 Thread [] threads = new Thread[10];
347 Random random = new Random();
348 for(int i=0;i<threads.length;i++) {
349 threads[i]=new ThreadBlockRunner.TestThread(i);
350 }
351
352 //threads[0]=null;
353 ThreadBlockRunner tbr = new ThreadBlockRunner(threads, 3);
354 for(int i=0;i<threads.length;i++) {
355 //if (i>=5 && i<=8) {
356 if (i>0 && random.nextBoolean()) {
357 System.out.println("thread "+i+" waits for previous one to finish");
358 tbr.setSynchronizedWithPrevious(i, true);
359 }
360 }
361
362
363 tbr.runBlock(0,6);
364 synchronized(lock) {
365 System.out.println("********************");
366 }
367 tbr.runBlock(6, threads.length);
368 }
369 }
370