Monday, January 31, 2011

Part 13: java.util.concurrent - Atomic Variables


In this article we will look into Atomic Variables which can help us to write lock free and wait free algorithms which were not possible prior to Java 5.0.

Two main points about Atomic Variables are 

   1. Help to write lock free and wait free algorithm

Under high contention ( lots of thread are fighting for lock ), JVM spends more time with scheduling of threads, managing contention, queues of waiting threads and less time in doing the real work.
This dramatically reduces the throughput of the process.
Problem with locking:
  1) Thread in block state cannot do anything else.
    2) If the blocked thread is high priority, then its a big disaster.
    3) Can cause Dead lock
    4) Managing a Block thread is a heavy weight process, so throughput decreases.

Soon we will see how can we write lock free algorithms using atomic variables


2. Implement very light weight process like CAS –


CAS (compares and swap):
       
Let’s take an example to understand the concept of CAS. Suppose we have once variable “i” and we are doing some calculation over “I” and storing the result back into “i”. In a nutshell-
        i = someComplicateComputation( i )
for “i” = 1,
        someComplicatedComputation(i) è 1234

In CAS Process following happens-
        A memory location V will be defined.
        A local variable A will be defined.
        A local variable B will be defined.

V will hold the initial value of “i”. So
        V = i =1
A = V = 1
B = result of that computation = 1234
compare ( V , A )
if
both values are same --> replace V with B's value.
else
        this means in the mean while someone has changed the value of V, so repeat the whole process again. Lets someone changes the value of “i”, hence V to 2.
       
             V = 2;
             A = V = 2
             B = result = 3246;
              compare ( V , A )
                        and so on...!!
       
This is very light weight process. This CAS technique is implemented by atomic package classes.



Example – Lets write a simple program which first increase the number by 1, then decrease the number by 1, and then increase again by 1. So overall effect is increase the number by 1. Lets run 4 threads concurrently access the method and compare the performance of AtomicInteger Vs Integer.

package com.jovialjava.blog.threads;

import java.util.concurrent.atomic.*;

public class AtomicVariableExample implements Runnable {
 AtomicInteger atomic_int_1 = new AtomicInteger();
 AtomicInteger atomic_int_2 = new AtomicInteger();
 int int_1;
 int int_2;
 private static int count = 0;

 
 public static void main(String[] args) {
  AtomicVariableExample pr = new AtomicVariableExample();
  new Thread(pr).start();// 1 0 1
  new Thread(pr).start();// 2 1 2
  new Thread(pr).start(); // 3 2 3
  new Thread(pr).start(); // 4 3 4
  while (true) {
   if (count == 4) {
    System.out.println(pr.atomic_int_1.get());
    System.out.println(pr.int_1);
    break;
   }
  }

 }

 public void run() {
  System.out.println("Inside run method...");
  doCalc();

 }

 private void doCalc() {
  try {
   atomic_int_2 = atomic_int_1;
   int_2 = int_1;
   atomic_int_2.incrementAndGet();
   int_2 = int_2 + 1;
   Thread.sleep(1000);
   atomic_int_2.decrementAndGet();
   int_2 = int_2-1;
   Thread.sleep(1000);
   atomic_int_2.incrementAndGet();
   int_2 = int_2 + 1;
   Thread.sleep(1000);
   atomic_int_1 = atomic_int_2;
   int_1 = int_2;
   synchronized (this) {
    count++;
   }
  } catch (InterruptedException e) {   
   e.printStackTrace();
  }
 }

}

Friday, January 28, 2011

Part 12: java.util.concurrent : SingleThreadPool Example


This article will discuss about Thread pool that uses single thread to execute tasks. From Java 5.0+ one can get such pool from Executors using following method –
public static ExecutorService newSingleThreadExecutor()
    Creates an Executor that uses a single worker thread operating off an unbounded queue. (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.) Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.



Example-
Suppose we have 100 properties files in an application. We have one thread that can read properties file and return a map value.

Pseudo code – READER THREAD

Config Reader implements Callable<Map<String, String>
try{
    // Get the file name in the constructor of thread
    // Check if File exists
    // Read the file and retrun the map object
}catch(Exception e){
        //release all the resource
        //return null
}

Main THREAD-
        // Get a Single thread pool from Executors
 try{
    // Get the list of all properties file in the directory
    // Create a READER THREAD by passing the name of file
    // store the READER thread in the a list
    //release all the thread in one go and get the Map objects
}catch(Exception e){
                //release all the resources
                // print the stack trace
}finally{
        //shutdown the thread pool
}
package com.jovialjava.blog.threads;

import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


class SingleReader implements Callable  <  Properties  > {
   
  private String name = null; 
  public SingleReader(String name){
   this.name = name;  
  } 
  public Properties call(){
   try{
    File f = new File(name); 
    Properties prop = new Properties();
          if(f.exists() && f.canRead() && f.isFile()){
           FileInputStream in = new FileInputStream(f);         
           prop.load(in);          
           return prop;
          }else{
           System.err.println("Please check about this file:["+ f.getAbsolutePath()+"]");
           return null;
          }   
   }catch(Exception e){
     e.printStackTrace();
     return null;
   }
  }
 }
public class SingleThreadPoolExample {
 public static String directory = "config";
   private static ExecutorService executorPool=null;
       
   public static void main(String...args){
    try{
     File dir = new File(directory);
     if(dir.isDirectory()){
       List < Callable < Properties >  >  fileList = new ArrayList < Callable < Properties >  > ();
       String[] files = dir.list();
       /**
        * Optimization - Single thread executor.
        */
       executorPool = Executors.newSingleThreadExecutor();
       
      for(String file : files){
        Callable < Properties >  reader = new SingleReader(dir.getAbsolutePath()+ File.separator+ file);
        fileList.add(reader);      
      }
      List < Future < Properties >  >  results = executorPool.invokeAll(fileList);
      /**
       * Check how many success and how many failure
       */
      int success =0, failure = 0;
      for(Future < Properties >  result : results){
       if(result.get() == null){
        failure++;
       }else{
        success++;
       }
      }
      System.out.println("Total number of files ["+ fileList.size()+"]");
      System.out.println("Success Count ["+ success+"]");
      System.out.println("Failure Count ["+ failure+"]");
     }else{
       throw new IllegalArgumentException("There is no such directory name -"+ directory);
     }
    }catch(Exception e){
      e.printStackTrace();
    }finally{
     if(executorPool!=null){
       executorPool.shutdown();
     }
     }
   }

}

Part 11: java.util.concurrent - CachedThreadPool Example


This article will discuss about Thread pool that can reuse previously constructed threads when they are available. From Java 5.0+ one can get such pool from Executors using following method –
public static ExecutorService newCachedThreadPool()
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. Note that pools with similar properties but different details (for example, timeout parameters) may be created using ThreadPoolExecutor constructors.


Example-
Suppose we have 100 properties files in an application. We have one thread that can read properties file and return a map value. We want to optimize the time to read all 100 properties file by using concurrent reading. Here optimize means – we need a perfect balance between CPU Utilization and total time consumed by reading process.

Pseudo code – READER THREAD

Config Reader implements Callable<Map<String, String>
try{
    // Get the file name in the constructor of thread
    // Check if File exists
    // Read the file and retrun the map object
}catch(Exception e){
        //release all the resource
        //return null
}

Main THREAD-
        // Get a Cached thread pool from Executors
 try{
    // Get the list of all properties file in the directory
    // Create a READER THREAD by passing the name of file
    // store the READER thread in the a list
    //release all the thread in one go and get the Map objects
}catch(Exception e){
                //release all the resources
                // print the stack trace
}finally{
        //shutdown the thread pool
}
package com.jovialjava.blog.threads;

import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


class CachedReader implements Callable  <  Properties  > {
   
  private String name = null; 
  public CachedReader(String name){
   this.name = name;  
  } 
  public Properties call(){
   try{
    File f = new File(name); 
    Properties prop = new Properties();
          if(f.exists() && f.canRead() && f.isFile()){
           FileInputStream in = new FileInputStream(f);         
           prop.load(in);          
           return prop;
          }else{
           System.err.println("Please check about this file:["+ f.getAbsolutePath()+"]");
           return null;
          }   
   }catch(Exception e){
     e.printStackTrace();
     return null;
   }
  }
 }
public class CachedThreadPoolExample {
 public static String directory = "config";
   private static ExecutorService executorPool=null;
   private static int MAX_THREADS = 20;
    
   public static void main(String...args){
    try{
     File dir = new File(directory);
     if(dir.isDirectory()){
       List < Callable < Properties >  >  fileList = new ArrayList < Callable < Properties >  > ();
       String[] files = dir.list();
       /**
        * Optimization - Reuse the the threads.
        */
       executorPool = Executors.newCachedThreadPool();
       
      for(String file : files){
        Callable < Properties >  reader = new CachedReader(dir.getAbsolutePath()+ File.separator+ file);
        fileList.add(reader);      
      }
      List < Future < Properties >  >  results = executorPool.invokeAll(fileList);
      /**
       * Check how many success and how many failure
       */
      int success =0, failure = 0;
      for(Future < Properties >  result : results){
       if(result.get() == null){
        failure++;
       }else{
        success++;
       }
      }
      System.out.println("Total number of files ["+ fileList.size()+"]");
      System.out.println("Success Count ["+ success+"]");
      System.out.println("Failure Count ["+ failure+"]");
     }else{
       throw new IllegalArgumentException("There is no such directory name -"+ directory);
     }
    }catch(Exception e){
      e.printStackTrace();
    }finally{
     if(executorPool!=null){
       executorPool.shutdown();
     }
     }
   }
}

Part 10: java.util.concurrent - ScheduledThreadPool Example


This article will discuss about Thread pool that can schedule threads to run after a specified interval of time. From Java 5.0+ one can get such pool from Executors using following method –

public static ScheduledExecutorService 
       newScheduledThreadPool(int corePoolSize)
    Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.

The return type of this method (return type of thread pool) is ScheduledExecutorService.Some of the salient features of ScheduledExecutorService are –
  1.         Schedule a Callable or Runnable to run once with a fixed delay after submission
  2.         Schedule a Runnable to run periodically at a fixed rate
  3.         Schedule a Runnable to run periodically with a fixed delay between executions
  4.         Submission returns a ScheduledFutureTask handle which can be used to cancel the task
  5.  .Like Timer, but supports pooling


Example:
  1. Suppose we are building a Bank check processing system. Here is the process –
  2.  Every local branch collect cheques and create a txt file contain cheque info.
  3.  A service runs which copy the cheque txt file from local branch to main server.
  4.  A Local service runs on server which check if any file has received and notify the cheque Clearing process.


We will try to make “Local Service”, which check the file reception and “Copying process” Which copy file from client machine to Server.
Pseudo Code-

//Define Local & Copying service running interval time in Seconds
//Make an ScheduledThreadPool with pool size 2
try{
    // Make Local Service thread
    // Make Copying process thread
    // Scheduled Both thread to run at regular interval
}catch(Exception e){
  //release all resources
}

LocalService Thread-
Try{
        //Check if directory exists
        //Check if any file exists in directory
        //return status
}catch(Exception e){
        //Print necessary exception
}

Copying Process Thread-
Try{
   // Check if File existin on remote server
   // Copy the file to main server
}catch(Exception e){
        //Print necessary exception
}
package com.jovialjava.blog.threads;

import java.io.File;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


//LOCAL SERVICE THREAD
class LocalService implements Runnable{
 
 private String DIRECTORY = null;
 
 public LocalService(String DIRECTORY){
  this.DIRECTORY = DIRECTORY;  
 } 
 public void run(){
  try{
   File dir = new File(this.DIRECTORY);
   if(dir.isDirectory()){
    if(dir.list().length > 0){
     System.out.println("FILE EXISTS");
    }
   }else{
    System.err.println("NO SUCH DIRECTORY ["+dir.getAbsolutePath()+"] exists");    
   }      
  }catch(Exception e){
   e.printStackTrace();
  }
 }
}

//COPYING SERVICE THREAD
class CopyService implements Runnable{
 
 private String REMOTE_DIR = null;
 private String LOCAL_DIR = null;
 
 public CopyService(String remoteDir, String localDir){
  this.REMOTE_DIR = remoteDir;
  this.LOCAL_DIR = localDir;
 } 
 public void run(){
  try{
   File remote = new File(this.REMOTE_DIR);
   File local = new File(this.LOCAL_DIR);
   if(remote.isDirectory() && local.isDirectory()){
    if(remote.list().length > 0){
     System.out.println("REMOTE FILE FOUND, COPYING");
     //--- Call the file copying method.     
    }else{
     System.out.println("NO REMOTE FILE FOUND");     
    }    
   }else{
    System.err.println("PLEASE CHECK DIRECTORY ["+remote.getAbsolutePath()+" OR/AND"
      +local.getAbsolutePath()+"] existence");    
   }
  }catch(Exception e){
   e.printStackTrace();
  }
 }
}

public class ScheduledExample {

 private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
 private static final int LOCAL_INTERVAL = 5, COPY_INTERVAL= 2;
 private static final String REMOTE_DIR="REMOTE", LOCAL_DIR= "LOCAL";
 
 public static void main(String... args){
  Runnable localService = new LocalService(LOCAL_DIR);
  Runnable remoteService = new CopyService(REMOTE_DIR, LOCAL_DIR);
  try{
   executor.scheduleWithFixedDelay(localService, 0, LOCAL_INTERVAL, TimeUnit.SECONDS);
   executor.scheduleWithFixedDelay(remoteService,0, COPY_INTERVAL, TimeUnit.SECONDS);   
  }catch(Exception e){
   e.printStackTrace();
  }
 } 
}

Thursday, January 27, 2011

Part 9: java.util.concurrent : FixedThreadPool Example


This article will discuss about Thread pool with fixed number of thread. From Java 5.0+ one can get such pool from Executors using following method –
public static ExecutorService 
               newFixedThreadPool(int nThreads)
    Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

Example-
Suppose we have 100 properties files in an application. We have one thread that can read properties file and return a map value. We want to optimize the time to read all 10 properties file by using concurrent reading. Here optimize means – we need a perfect balance between CPU Utilization and total time consumed by reading process.

Pseudo code – READER THREAD

Config Reader implements Callable<Map<String, String>
try{
    // Get the file name in the constructor of thread
    // Check if File exists
    // Read the file and retrun the map object
}catch(Exception e){
        //release all the resource
        //return null
}

Main THREAD-
        // Get a fixed thread pool from Executors
 try{
    // Get the list of all properties file in the directory
    // Create a READER THREAD by passing the name of file
    // store the READER thread in the a list
    //release all the thread in one go and get the Map objects
}catch(Exception e){
                //release all the resources
                // print the stack trace
}finally{
        //shutdown the thread pool
}






package com.jovialjava.blog.threads;

import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


class Reader implements Callable  <  Properties  > {
 
 private String name = null; 
 public Reader(String name){
  this.name = name;  
 } 
 public Properties call(){
  try{
   File f = new File(name); 
   Properties prop = new Properties();
         if(f.exists() && f.canRead() && f.isFile()){
         FileInputStream in = new FileInputStream(f);         
            prop.load(in);          
            return prop;
         }else{
          System.err.println("Please check about this file:["+ f.getAbsolutePath()+"]");
          return null;
         }   
  }catch(Exception e){
   e.printStackTrace();
   return null;
  }
 }
}

public class FixedThreadPoolExample {
  public static String directory = "config";
  private static ExecutorService executorPool=null;
  
  public static void main(String...args){
   try{
    File dir = new File(directory);
    if(dir.isDirectory()){
     List < Callable < Properties >  >  fileList = new ArrayList < Callable < Properties >  > ();
     String[] files = dir.list();
     /**
      * Optimization - just 20% of number of files.
      */
     executorPool = Executors.newFixedThreadPool(files.length/5);
     
     for(String file : files){
      Callable < Properties >  reader = new Reader(dir.getAbsolutePath()+ File.separator+ file);
      fileList.add(reader);      
     }
     List < Future < Properties >  >  results = executorPool.invokeAll(fileList);
     /**
      * Check how many success and how many failure
      */
     int success =0, failure = 0;
     for(Future < Properties >  result : results){
      if(result.get() == null){
       failure++;
      }else{
       success++;
      }
     }
     System.out.println("Total number of files ["+ fileList.size()+"]");
     System.out.println("Success Count ["+ success+"]");
     System.out.println("Failure Count ["+ failure+"]");
    }else{
     throw new IllegalArgumentException("There is no such directory name -"+ directory);
    }
   }catch(Exception e){
    e.printStackTrace();
   }finally{
    if(executorPool!=null){
     executorPool.shutdown();
    }
   }
  } 
}

Wednesday, January 26, 2011

Part 8: java.util.concurrent - "Executors" Factory Threads Pool


Today in this article we will see some of threads pool available in “Executors” Factory -
public static ExecutorService 
                         newFixedThreadPool(int nThreads)
    Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

public static ScheduledExecutorService 
                    newScheduledThreadPool(int corePoolSize)
    Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.

public static ExecutorService newCachedThreadPool()
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. Note that pools with similar properties but different details (for example, timeout parameters) may be created using ThreadPoolExecutor constructors.



public static ExecutorService newSingleThreadExecutor()
    Creates an Executor that uses a single worker thread operating off an unbounded queue. (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.) Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.

public static ExecutorService newSingleThreadExecutor()
    Creates an Executor that uses a single worker thread operating off an unbounded queue. (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.) Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.

Tuesday, January 25, 2011

Part 7: java.util.concurrent - invokeAll via ExecutorService


Submit Collection of Threads via ExecutorService

Today in next part of the series we will talk about How we can release multiple threads via Executor Interface.

As per JAVA 6.0 docs –
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)                                                            throws InterruptedException


In traditional Java – If we have to release multiple threads- we have to create Thread objects and call
Start method one by one.
In Java 5.0 and above – If we can put all callable objects in a collection object and pass the collection objects to ExecutorService to release.

For example – If we have 1000 thread object to be released, we can create an array list and an ExecutorService object with thread pool size of 20.

        // create an ExecutorService object with thread pool size = 20
        // create an array List with 1000 callable objects.
        // Release all threads by ExecutorService.

Some important points to note here are

  • Ø  As ExecutorService has thread pool of 20, it will internally manage queuing and releasing of 1000 threads.
  • Ø  invokeAll is a blocking method. It means – JVM won’t proceed to next line until all the 1000 threads are complete.
  • Ø  All the 1000 threads must have same Return Type < T >.
  • Ø  Future.isDone() is true for each element of the returned list
  • Ø  completed thread could have terminated either normally or by throwing an exception. In both the case return type of Future.isDone( ) is true.
  • Ø  The results of this method are undefined if the given collection is modified while this operation is in progress
  • Ø  It can throw following exceptions –
  • InterruptedException - if interrupted while waiting, in which case unfinished tasks are cancelled.
  • NullPointerException - if tasks or any of its elements are null.
  • RejectedExecutionException - if any task cannot be scheduled for execution.



EXAMPLE –
Imagine a scenario, one needs to write create a txn file of each request coming to system. Following are the requirements-
  • Ø  File name will be the client name.
  • Ø  At a time, almost 1000 clients could be connected at a time, so system must be capable of creating 1000 files in one go.
  • Ø  Process of creating one file should be less than 5 ms.
  • Ø  Total time of creating all the files should be less than 300 ms.


Let’s write the pseudo code for this case –

File creation Thread – implements CALLABLE<Boolean>
// GET the File name and Raw Data in Constructor
Try{
// Create a File Object
// Create a print Writer Object
// Prepare the data and Write in the File
// Close the print Writer and File Object
// Return TRUE
} catch Exception {
return FALSE
}
                            

Main Parent Thread
  // -- In the Loop for whole Client list ---
    try{
        // create a file creator thread – Pass filename and raw Data
// add the thread object into an Array List
// Pass the array list to Executor Service interface and invokeAll.
// Loop in the Future<Boolean> and check how many threads completed successfully.
  }catch Exception{
// Take necessary action
}finally{
// Shut down the ExecutorService
 }




package com.jovialjava.blog.threads;

import java.io.File;
import java.io.PrintWriter;
import java.util.*;
import java.util.concurrent.*;


// FILE CREATOR THREAD
class FileTask implements Callable< Boolean >{
 
 private String name = null;
 private String data = null;
 public FileTask(String name, String data){
  this.name = name;
  this.data = data;
 }
 
 public Boolean call(){
  try{
   File file = new File(this.name);
   PrintWriter writer = new PrintWriter(file);
   writer.write(this.data);
   writer.close();
   return true;   
  }catch(Exception e){
   return false;
  }
 }
}

// MAIN THREAD
public class InvokeAllExample {
 
 private static final ExecutorService executorPool=Executors.newFixedThreadPool(20);
 private static final int NO_OF_CLIENT = 1000;
 private static final String FILE_EXT = ".txt";
 private static String TXN_DATA = "SOME RANDOM TXN DATA FOR CLIENT --> ";
 private static String DIRECTORY = "EXAMPLE" + File.separator;
 
 static{
  if(!new File(DIRECTORY).isDirectory()){
   new File(DIRECTORY).mkdir();
  }
 }
 
 public static void main(String[] args) {
  int success = 0;
  int failure = 0;
  /**
   * Lets assume we have 1000 clients connected and sending request at a time.
   */
  Collection collection = new ArrayList( );
  for(int i=0; i< NO_OF_CLIENT; i++){
   FileTask task = new FileTask(DIRECTORY+ Integer.toString(i)+ FILE_EXT, TXN_DATA+ i);
   collection.add(task);
  }
  long startTime = new Date().getTime();
  try {   
   List< Future< Boolean > > list = executorPool.invokeAll(collection);   
    for(Future< Boolean > fut : list){
     int ignore = fut.get()? success++ : failure++;
    }
   
   } catch (Exception e) {
     e.printStackTrace();
   }finally{
     System.out.println("TOTAL SUCCESS - "+ success);
     System.out.println("TOTAL FAILURE - "+ failure);
     System.out.println("Total time - " + (new Date().getTime() - startTime) + " ms");
     executorPool.shutdown();
   }
 } // End of Main

}