h1

Example : Using Java future, Callable, Executor and CountDownLatch

January 21, 2012

When you start using java.util.concurrent’s Future and the  thread pools or executor framework, Following questions may come to one’s mind

  • How to do thread join kind of operation ?
  • How to set timeout for future?
  • Can the futures notify back the submitting thread when they are done?

Example explaining usage for future, Callable and CountDownLatch together

 
public class DataAccessor {
    private static ThreadPoolExecutor executor;
    private int timeout = 1000;
    static {
        executor = new ThreadPoolExecutor(10, 10, 1000,     TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
     }
    public ProcessedResponse getDataFromService(List<String> requests) {
        final CountDownLatch latch = new CountDownLatch(requests.size());
        List<Future<SubmittedJob>> submittedJobs = new ArrayList<Future<SubmittedJob>>(
            requests.size());
        for (String request : requests) {
            Future<SubmittedJob> job = executor.submit(new GetAndProcessResponse(request, latch));
            submittedJobs.add(job);
       }
       if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
            // some of the jobs not done
       }
       for (Future<SubmittedJob> job : submittedJobs) {
           try {
               // before doing a get you may check if it is done
               if (!job.isDone()) {
                   // cancel job
                   job.cancel(true);
                   continue;
              }
              ProcessedResponse response = job.get();
              aggregateResponse(response);
         } catch (ExecutionException cause) {
           // exceptions occurred during execution
           // we can get cause.getCause() and check instanceof
        }
    }
 }
private void aggregateResponse(ProcessedResponse response) {
    // aggregate response to a single response.
 }
private class SubmittedJob {
    final Future<ProcessedResponse> job;
    public Future<ProcessedResponse> getJob() {
        return job;
    }
    public String getRequest() {
        return request;
    }
     final String request;
     SubmittedJob(final Future<ProcessedResponse> job, final String request) {
         this.job = job;
         this.request = request;
     }
 }
private class ProcessedResponse {
    ProcessedResponse(final String request) {
    }
 }
private class GetAndProcessResponse implements Callable<ProcessedResponse> {
    private final String request;
    private final CountDownLatch countDownLatch;
    GetAndProcessResponse(final String request,    final CountDownLatch countDownLatch) {
        this.request = request;
        this.countDownLatch = countDownLatch;
    }
    ProcessedResponse call() {
         try {
             return getAndProcessResponse(this.request);
         } finally {
             countDownLatch.countDown();
         }
     }
    private ProcessedResponse getAndProcessResponse(final String request) {
         // do the service call
         // ........
         return (new ProcessedResponse(request));
    }
}
Advertisement

One comment

  1. Amazing topic, helped with my security!! Thanks for that.



Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.