When you start using java.util.concurrent’s Future and the thread pools or executor framework, Following questions may come to one’s mind
- How to do thread join kind of operation ?
- How to set timeout for future?
- Can the futures notify back the submitting thread when they are done?
Example explaining usage for future, Callable and CountDownLatch together
public class DataAccessor {
private static ThreadPoolExecutor executor;
private int timeout = 1000;
static {
executor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
}
public ProcessedResponse getDataFromService(List<String> requests) {
final CountDownLatch latch = new CountDownLatch(requests.size());
List<Future<SubmittedJob>> submittedJobs = new ArrayList<Future<SubmittedJob>>(
requests.size());
for (String request : requests) {
Future<SubmittedJob> job = executor.submit(new GetAndProcessResponse(request, latch));
submittedJobs.add(job);
}
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
// some of the jobs not done
}
for (Future<SubmittedJob> job : submittedJobs) {
try {
// before doing a get you may check if it is done
if (!job.isDone()) {
// cancel job
job.cancel(true);
continue;
}
ProcessedResponse response = job.get();
aggregateResponse(response);
} catch (ExecutionException cause) {
// exceptions occurred during execution
// we can get cause.getCause() and check instanceof
}
}
}
private void aggregateResponse(ProcessedResponse response) {
// aggregate response to a single response.
}
private class SubmittedJob {
final Future<ProcessedResponse> job;
public Future<ProcessedResponse> getJob() {
return job;
}
public String getRequest() {
return request;
}
final String request;
SubmittedJob(final Future<ProcessedResponse> job, final String request) {
this.job = job;
this.request = request;
}
}
private class ProcessedResponse {
ProcessedResponse(final String request) {
}
}
private class GetAndProcessResponse implements Callable<ProcessedResponse> {
private final String request;
private final CountDownLatch countDownLatch;
GetAndProcessResponse(final String request, final CountDownLatch countDownLatch) {
this.request = request;
this.countDownLatch = countDownLatch;
}
ProcessedResponse call() {
try {
return getAndProcessResponse(this.request);
} finally {
countDownLatch.countDown();
}
}
private ProcessedResponse getAndProcessResponse(final String request) {
// do the service call
// ........
return (new ProcessedResponse(request));
}
}





