/Users/lyon/j4p/src/classUtils/pack/util/ThreadBlockRunner.java

1    package classUtils.pack.util; 
2     
3    import java.util.Random; 
4     
5    /** 
6     * Contains a thread to run, plus related information. 
7     */ 
8    class ThreadDescriptor implements Cloneable { 
9         
10       private Thread thread; 
11       private boolean synchronizeWithPrevious; 
12        
13       ThreadDescriptor(Thread t, boolean synchronizeWithPrevious) { 
14           this.thread=t; 
15           this.synchronizeWithPrevious=synchronizeWithPrevious; 
16       } 
17        
18       ThreadDescriptor(Thread t) { 
19           this(t, false); 
20       } 
21        
22       public Object clone() { 
23           return new ThreadDescriptor(thread,synchronizeWithPrevious); 
24       } 
25        
26       /** 
27        * Returns the synchronizeWithPrevious. 
28        * @return boolean 
29        */ 
30       public boolean isSynchronizeWithPrevious() { 
31           return synchronizeWithPrevious; 
32       } 
33    
34       /** 
35        * Sets the synchronizeWithPrevious. 
36        * @param synchronizeWithPrevious The synchronizeWithPrevious to set 
37        */ 
38       public void setSynchronizeWithPrevious(boolean synchronizeWithPrevious) { 
39           this.synchronizeWithPrevious = synchronizeWithPrevious; 
40       } 
41    
42       /** 
43        * Returns the thread. 
44        * @return Thread 
45        */ 
46       public Thread getThread() { 
47           return thread; 
48       } 
49        
50       public static ThreadDescriptor[] toDefaultDescriptorArray(Thread [] threads) { 
51           ThreadDescriptor[] td = new ThreadDescriptor[threads.length]; 
52           for(int i=0;i<threads.length;i++) { 
53               td[i]=new ThreadDescriptor(threads[i]);          
54           } 
55           return td; 
56       } 
57        
58       /** 
59        * Sets the thread. 
60        * @param thread The thread to set 
61        */ 
62       public void setThread(Thread thread) { 
63           this.thread = thread; 
64       } 
65    
66   } 
67    
68   /** 
69    * Runs a block of threads within an array, with a maximum number of concurrent threads, 
70    * and returning only when all the threads in the block have finished. 
71    * <p> 
72    * Optionally, each thread (but the first in each block) can be synchronized with the previous one, that is, 
73    * it waits to start for the previous one is terminated. 
74    *  
75    * @author cris sadun 
76    * @version 2.0 
77    */ 
78   public class ThreadBlockRunner { 
79        
80       private long sleepTime=1000L; 
81       private ThreadDescriptor []threadDescriptors; 
82       private int maxThreads; 
83    
84       /** 
85        * Create a runner for the given array of threads, which will run a maximum of maxThreads 
86        * threads concurrently 
87        * @param threads the array of threads to run 
88        * @param maxThreads the maximum number of concurrent arrays to run 
89        */  
90       public ThreadBlockRunner(Thread []threads, int maxThreads) { 
91           this.threadDescriptors=ThreadDescriptor.toDefaultDescriptorArray(threads); 
92           this.maxThreads=maxThreads; 
93           if (maxThreads < 1) throw new IllegalArgumentException("Invalid maxThreads value:"+maxThreads); 
94       } 
95        
96       private int findThreadIndex(Thread t) { 
97           for(int i=0;i<threadDescriptors.length;i++) { 
98               if (threadDescriptors[i].getThread() == t) return i; 
99           } 
100          throw new IllegalArgumentException("Programming error: thread "+t.getName()+" is not controlled by the this ThreadBlockRunner object"); 
101      } 
102       
103      /** 
104       * Set a certain thread in the threads passed at construction to be synchronized with the previous one.  
105       * <p> If at execution time the thread results the first in a block, the synchronization will be ignored. 
106       * @param thread one of the threads passed at {@link #ThreadBlockRunner(java.lang.Thread[], int) construction}. 
107       * @param value if <b>true</b>, the thread will wait until the previous one has terminated before starting. 
108       */ 
109      public void setSynchronizedWithPrevious(Thread thread, boolean value) { 
110          setSynchronizedWithPrevious(findThreadIndex(thread), value); 
111           
112      } 
113       
114      /** 
115       * Return whether or not a thread is synchronized with the previous one (see  
116       * {@link #setSynchronizedWithPrevious(java.lang.Thread, boolean) setSynchronizeWithPrevious()}). 
117       * @param thread one of the threads passed at {@link #ThreadBlockRunner(java.lang.Thread[], int) construction}. 
118       * @return whether or not the thread is synchronized with the previous one. 
119       */ 
120      public boolean isSynchronizedWithPrevious(Thread thread) { 
121          return isSynchronizedWithPrevious(findThreadIndex(thread)); 
122      } 
123       
124      /** 
125       * Set a the i-th thread (realtive to the thread array passed at construction) to be synchronized with the  
126       * previous one.  
127       * <p> If at execution time the thread results the first in a block, the synchronization will be ignored. 
128       * @param i a valid index in the thread array passed at {@link #ThreadBlockRunner(java.lang.Thread[], int) construction}. 
129       * @param value if <b>true</b>, the thread will wait until the previous one has terminated before starting. 
130       */ 
131      public void setSynchronizedWithPrevious(int i, boolean value) { 
132          if (i<0 || i>=threadDescriptors.length) throw new IllegalArgumentException("Programming error: invalid thread index"); 
133          threadDescriptors[i].setSynchronizeWithPrevious(value); 
134      } 
135   
136      /** 
137       * Return whether or not a thread is synchronized with the previous one (see  
138       * {@link #setSynchronizedWithPrevious(java.lang.Thread, boolean) setSynchronizeWithPrevious()}). 
139       * @param i a valid index in the thread array passed at {@link #ThreadBlockRunner(java.lang.Thread[], int) construction}. 
140       * @return whether or not the thread in the i-th position is synchronized with the previous one. 
141       */ 
142      public boolean isSynchronizedWithPrevious(int i) { 
143          if (i<0 || i>=threadDescriptors.length) throw new IllegalArgumentException("Programming error: invalid thread index"); 
144          return threadDescriptors[i].isSynchronizeWithPrevious(); 
145      } 
146       
147       
148       
149      /** 
150       * Run a block of consecutive threads from the array passed at construction. 
151       * <p> 
152       * If any thread's "synchronizeWithPrevious" flag has been set, the thread waits for  
153       * the previous thread to terminate before starting (the first thread flag state is ignored). 
154       *  
155       * @param blockStart the index of the first thread to run, inclusive 
156       * @param blockEnd the index of the last thread to run, exclusive 
157       */ 
158      public void runBlock(int blockStart, int blockEnd) { 
159          if(blockStart<0 || blockEnd>threadDescriptors.length)  
160              throw new IllegalArgumentException(threadDescriptors.length+" threads in the array. ("+blockStart+","+ 
161                                                  blockEnd+") is an invalid range"); 
162           
163          /* 
164           ThreadDescriptor [] threadDescriptors2=new ThreadDescriptor[threadDescriptors.length]; 
165           System.arraycopy(threadDescriptors,0,threadDescriptors2,0,threadDescriptors.length); 
166          */ 
167          ThreadDescriptor [] threadDescriptors2=new ThreadDescriptor[threadDescriptors.length]; 
168          for(int i=0;i<threadDescriptors2.length;i++) { 
169              threadDescriptors2[i]=(ThreadDescriptor)threadDescriptors[i].clone(); 
170          } 
171          int count=0;                                        
172          int waitingForPreviousCount=0; 
173          boolean waitingForPrevious[] = new boolean[threadDescriptors2.length]; 
174          boolean started[] = new boolean[threadDescriptors2.length]; 
175          do { 
176              for(int i=blockStart;i<blockEnd;i++) { 
177                  if (threadDescriptors2[i].getThread()==null) { 
178                      //System.out.println(threadDescriptors[i].getThread().getName()+" has already terminated, skipping"); 
179                      continue; 
180                  } 
181                   
182                  // If the first entry is supposed to be synchronized, ignore the fact: 
183               
184                  if (i>blockStart && 
185                      threadDescriptors2[i].isSynchronizeWithPrevious()) { 
186                      if (waitingForPrevious[i]==false) { 
187                          waitingForPrevious[i]=true; 
188                          waitingForPreviousCount++; 
189                          // Do not start the thread - yet 
190                          //System.out.println("Not starting "+threadDescriptors2[i].getThread().getName()+" since it must wait for previous to finish"); 
191                      } else { 
192                          if (! started[i]) { 
193                              // Start the thread only if the previous is dead 
194                              //System.out.print("Checking if "+threadDescriptors[i-1].getThread().getName()+" has finished.."); 
195                              if (started[i-1] && ! threadDescriptors[i-1].getThread().isAlive()) { 
196                                  // Do not reset waitingForPrevious flag, to avoid restarts 
197                                  waitingForPreviousCount--; 
198                                  //System.out.println("Starting "+threadDescriptors2[i].getThread().getName()+" since previous has finished."); 
199                                  threadDescriptors2[i].getThread().start(); 
200                                  started[i]=true; 
201                                  count++; 
202                              } else { 
203                                  //System.out.println("Not starting "+threadDescriptors2[i].getThread().getName()+" since previous has not finished yet"); 
204                              } 
205                          } else  { 
206                              //System.out.println("Thread "+threadDescriptors[i].getThread().getName()+" has already started, skipping"); 
207                          } 
208                      }    
209                           
210                  } else { 
211                      if (! started[i]) { 
212                          started[i]=true; 
213                          Thread tr=threadDescriptors2[i].getThread(); 
214                          //System.out.println("Starting "+tr.getName()); 
215                          tr.start(); 
216                          count++; 
217                      } 
218                  } 
219                   
220                  if (count == maxThreads) { 
221                      count=waitForOneToFinish(threadDescriptors2, started, blockStart, i+1); 
222                      //assert(count >= 0); 
223                      //assert(count <= maxThreads); 
224                  } 
225              }                                     
226              //System.out.println("("+waitingForPreviousCount+" threads waiting for a previous one to finish)"); 
227               
228              if (waitingForPreviousCount>0)  { 
229                  try { 
230                      Thread.sleep(sleepTime); 
231                  } catch (InterruptedException e) { 
232                      e.printStackTrace(); 
233                  } 
234              } 
235               
236          } while (waitingForPreviousCount>0); 
237           
238          waitForAllFinished(blockStart, blockEnd); 
239      } 
240       
241      /** 
242       * Wait for at least one thread in the given range to finish 
243       * @return the number of alive threads in the given range 
244       */ 
245      private int waitForOneToFinish(ThreadDescriptor[] threads, boolean [] started, int start, int end) { 
246          int count, aliveCount; 
247          do { 
248              count=0; 
249              aliveCount=0; 
250              for(int i=start;i<end;i++) { 
251                  if (!started[i]) continue; 
252                  if (threads[i].getThread()!=null) { 
253                      if (! threads[i].getThread().isAlive()) { 
254                          threads[i].setThread(null); 
255                          count++;  
256                      } else { aliveCount++; } 
257                  }  
258              } 
259              if (count==0) 
260                  try { 
261                      Thread.sleep(sleepTime); 
262                  } catch (InterruptedException e) { 
263                      e.printStackTrace(); 
264                  } 
265          } while(count==0); 
266          return aliveCount; 
267      } 
268       
269      /** 
270       *  Wait for all the threads within the given range in the "threads" member to be finished. 
271       */ 
272      private void waitForAllFinished(int start, int end) { 
273          int count; 
274          do { 
275              count=0; 
276              for(int i=start;i<end;i++) { 
277                  if (threadDescriptors[i].getThread() == null) continue; 
278                  if (threadDescriptors[i].getThread().isAlive()) count++;  
279              } 
280              if (count>0) 
281                  try { 
282                      Thread.sleep(sleepTime); 
283                  } catch (InterruptedException e) { 
284                      e.printStackTrace(); 
285                  } 
286          } while(count>0); 
287      } 
288       
289      private static Random testRandom = new Random(); 
290      private static Object lock = new Object(); 
291   
292      private static class TestThread extends Thread { 
293           
294          int count; 
295           
296          TestThread(int count) { 
297              super("thread-"+count); 
298              this.count=count; 
299          } 
300   
301          public void run() { 
302              long l=20L; 
303              if (count % 2 == 0) l=1000L; 
304              synchronized(lock) { 
305                  System.out.println(getName()+" started"); 
306                  System.out.flush(); 
307              } 
308              try { 
309                  sleep(l); 
310              } catch(InterruptedException e) { 
311                  e.printStackTrace(); 
312              } 
313              synchronized(lock) { 
314                  System.out.println(getName()+" finished"); 
315                  System.out.println(getName()+" -------------------"); 
316                  System.out.flush(); 
317              } 
318          } 
319      } 
320       
321      /** 
322       * A test method.  
323       */ 
324      public static void main2(String args[]) { 
325          Thread [] threads = new Thread[3]; 
326          Random random = new Random(); 
327          for(int i=0;i<threads.length;i++) { 
328              threads[i]=new ThreadBlockRunner.TestThread(i); 
329          } 
330           
331          //threads[0]=null; 
332          ThreadBlockRunner tbr = new ThreadBlockRunner(threads, 3); 
333          for(int i=0;i<threads.length;i++) { 
334              if (i==1) { 
335                  tbr.setSynchronizedWithPrevious(i, true); 
336              } 
337          } 
338           
339          tbr.runBlock(0,3); 
340          synchronized(lock) { 
341              System.out.println("********************"); 
342          } 
343      } 
344       
345      public static void main(String args[]) { 
346          Thread [] threads = new Thread[10]; 
347          Random random = new Random(); 
348          for(int i=0;i<threads.length;i++) { 
349              threads[i]=new ThreadBlockRunner.TestThread(i); 
350          } 
351           
352          //threads[0]=null; 
353          ThreadBlockRunner tbr = new ThreadBlockRunner(threads, 3); 
354          for(int i=0;i<threads.length;i++) { 
355              //if (i>=5 && i<=8) { 
356              if (i>0 && random.nextBoolean()) { 
357                  System.out.println("thread "+i+" waits for previous one to finish"); 
358                  tbr.setSynchronizedWithPrevious(i, true); 
359              } 
360          } 
361           
362           
363          tbr.runBlock(0,6); 
364          synchronized(lock) { 
365              System.out.println("********************"); 
366          } 
367          tbr.runBlock(6, threads.length); 
368      } 
369  } 
370