Concurrency vs paralelism
Concurrent vs parallel
A system is said to be concurrent
if it can support two or more actions in progress
at the same time.
A system is said to be parallel
if it can support two or more actions executing simultaneously
.
concurrent
programming is using only a core of the CPU
and executing several tasks during the same period time.
The tasks are in progress, but the scheduler is changing between doing a task during the time.
So the effect is looks like the tasks are doing at the same time, but it's not, are doing at the same time
because are in progress, but everytime only one task is executing in the core of the CPU
.
For instance: you have to eat the whole cake and you have to sing a song meanwhile.
You can't do the both task at the same time, or eat or sing, but you can bite a bit of cake and sing a little,
a keep on doing the same during the time until you have done the cake, and you have done to sing a song.
The tasks have a duration, an you can alternate the execution between them. This is concurrency
But what's happened is you have two cores of the CPU
, you can execute the task of eating a cake and the task of singing
at the same time. So this is parallelism
.
In the earlier example, you can execute at the same time
the eating task in a core,
and the singing task in the other at the same time
.
Another example of parallelism
is when you split a process in a set of tasks and every task is executed at the same time
in different cores of the CPU
.
Throughput
number of http requests or number I/O request per second. The amount of information you can process per second or per minute...
ExecutorService pattern
The executorService piece of code which is using a threadpool and queue to store the messages. You can create a task and send it to the executor. The executor has an event loop to execute the task in a concurrent way.
Sync vs Async, block vs unblocked
Mainly I/O has the problem of the latency and the throughput because they are blocking the resources until there's a response.
So your CPU is waiting without doing anything during a period of time.
We have to do all the I/O tasks async to increase the performance and reduce the latency.
blocking I/O request:
HTTPClient client = HTTPClient)()
String response = client.get("http:/ /www.mydata.com/data")
Synchronous = Blocking A synchronous code is always blocking. It will slow down vour application if it blocks it for a long time.
Asynchronous The code you write will be executed at some point in the future.
Asynchronous + Concurrent Running a blocking code in another thread is a way to avoid blocking the main thread of your application. How can you get the result from this other thread?
unblocking I/O request:
Concurrent programming
Sync Callable/Supplier functions
In this example you can use Callable
functions.
Callables
or suppliers
are executable tasks.
Callables
or suppliers
are like lambdas in kotlin.
Define 3 callable functions
Create a list of callables
Over the list, iterate using a stream, and over the stream do a map to execute the callable.
Everything is synchronous
ExecutorService, async
var executor: ExecutorService = Executors.newFixedThreadPool (4);
Over the callable functions
in earlier example, we can submit
the task to the executorService
.
To do that we can use the submit method.
Over every submit execution, you get a future
of the callable task.
A Future
is a promise.
So to get the result you have to do it one by one.
Using the Future.get()
the future is resolved or get an error.
To resolve the Future we can use get
or join
.
It's better join, because doesn't throw any exception.
Finally, at the end of the program, the executor has to be
shutdown`
CompletableFuture async
Completable Future is another way to do async requests.
The completableFuture implementation implements Future and CompletionStage
class CompletableFuture<T>: Future<T>, CompletionStage<T>
Over the earlier example, we can iterate over the list of suppliers,
and for everyone, we can add it to the CompletableFuture.supplierAsync()
to create a new instance of CompletableFuture
.
To resolve the completeFuture we can use get
or join
.
It's better join, because doesn't throw any exception.
Chain of Completable futures
- divide your processing in small I/O operations and chain them!
Example with executor
Example with completableFutures
Another example, the outcome of one future is the another future to pass to the next future.
Reminder: when you call a java method all the arguments are evaluated and resolved if they are Futures
Reminder: use composition over combination when we were using futures (thenCompose() over thenCombine())
// chained futures
CompletableFuture<Quotation> quotationCF =
CompletableFuture.supplyAsync( ( ) -> getQuotation ( )) ;
CompletableFuture<EmailInfos> infosCF =
quotationCF .thenApply (quotation -> email(quotation));
CompletableFuture<Boolean> doneCF =
infosCF.thenApply (emailInfos -> writeToDB(emailInfos));
doneCF .thenApply (done -> updateGUI (done)) ;
// to execute the futures, and block the main thread
doneCF.join()
// to get the results
Object results = doneCF.get()
Run three task in parallel, and get the first one in being ready: any0f
You can run three task, and anyOf
get the faster completed, but the result can change
in every execution. If the three task are completed at the same time, is returned one,
but if you run the same code another time, you can get another which is done.
Take care of this.
Supplier<Weather> w1 = () -> getWeatherA();
Supplier<Weather> w2 = () -> getWeatherB();
Supplier<Weather> w3 = () -> getWeatherC();
// create the completableFutures
CompletableFuture<Weather> cf1 = CompletableFuture.supplyAsync(w1);
CompletableFuture<Weather> cf3 = CompletableFuture.supplyAsync(w1);
CompletableFuture<Weather> cf4 = CompletableFuture.supplyAsync(w1);
// pass the completeFutures
// return Object because the futures can be of any type
CompletableFuture<Object> weatherCF = CompletableFuture.any0f(cf1, cf2, cf3) ; // return Object
// when there was a task ready, the completefuture is resolved for this task. The others are skipped
weatherCF.join();
Weather result = (Weather)weatherCF.get();
CompletableFuture‹Weather> taskA = CompletableFuture.supplyAsync(fetchWeatherA);
CompletableFuture<Weather> taskB = CompletableFuture.supplyAsync(fetchWeatherB);
CompletableFuture.any0f(taskA, taskB)
.thenAccept (System.out: :printIn)
•join ();
Run three task in parallel, and get the best result of them
// create suppliers or callables, lambdas
Supplier<Weather> w1 = () -> getWeatherA();
Supplier<Weather> w2 = () -> getWeatherB();
Supplier<Weather> w3 = () -> getWeatherC();
// crete the completeFutures
CompletableFuture<Quotation> cf1 = CompletableFuture.supplyAsync(q1);
CompletableFuture<Quotation> cf2 = CompletableFuture.supplyAsync(q2);
CompletableFuture<Quotation> cf3 = CompletableFuture.supplyAsync(q3);
// pass the completeFutures
CompletableFuture<Void> done = CompletableFuture.all0f(cf1, cf2, cf3);
// to join, we have to join one by one and then do the join the final future
Quotation bestQuotation =
done.thenApply ( v ->
Stream.of (cf1, cf2, cf3)
.map (CompletableFuture: :join) // join one by one to resolve them
.min (comparing (Quotation: :amount))
.orElseThrow ())
).join();
Composing Async Tasks: combining dependant tasks. Run two task in parallel, for creating the result of another object
first try, blocking for the response using a call method and blocking the main thread doing two get() calls
// create CompletableFutures from suppliers
var quotationCF = CompletableFuture.supplyAsync(() -> getQuotation ( )) ;
var weatherCF = CompletableFuture.supplyAsync(() -> getWeather ()) ;
// here the result is blocking for the response, which is not optimus
var travelPage = new TravelPage (quotationCF .get (), weatherCF. get ()) ;
another variant using thenCombine()
record class TravelPage(Quotation q, Weather w)
CompletableFuture<Weather> anyWeather =
CompletableFuture.any0f(weatherCFs.toArray(CompletableFuture[]::new))
.thenApply(o -> (Weather) o);
CompletableFuture<Quotation> bestQuotationCF =
all0fQuotations.thenApply( v ->
quotationCFs.stream()
.map (CompletableFuture: :join)
.min (Comparator.comparing (Quotation:: amount))
.orElseThrow()
);
CompletableFuture‹TravelPage> pageCompletableFuture = bestQuotationCF.thenCombine(anyWeather,TravelPage::new);
// CompletableFuture‹TravelPage> pageCompletableFuture = bestQuotationCF.thenCombine(anyWeather,(q, w) -> TravelPage(q, w));
pageCompletableFuture.thenAccept(Svstem.out::println).join();
second try, using allOf
method. Really complex, and you have to block the main thread doing a get() and join()
// create CompletableFutures from suppliers
var quotationCF = CompletableFuture.supplyAsync(() -> getQuotation ( )) ;
var weatherCF = CompletableFuture.supplyAsync(() -> getWeather ()) ;
// here the result is blocking for the response, which is not optimus
var travelPage = new TravelPage (quotationCF .get (), weatherCF. get ()) ;
// create another completefuture, but it's not needed in this example
CompletableFuture<Void> done = CompletableFuture.all0f(quotationCF, weatherCF);
// here we can use thenApply and after that, we have to wait to resolve the futures
and after that, to do join.
var travelPage =
done.thenApply (V ->
new TravelPage (quotationCF.get () , weatherCF.get ())
.join()
third try, chaining the futures, you have to block the main thread doing a get() and join()
// create CompletableFutures from suppliers
var quotationCF = CompletableFuture.supplyAsync(() -> getQuotation ( )) ;
var weatherCF = CompletableFuture.supplyAsync(() -> getWeather ()) ;
// here the result is blocking for the response, which is not optimus
var travelPage = new TravelPage (quotationCF .get (), weatherCF. get ()) ;
// create another completefuture, but it's not needed in this example
CompletableFuture<Void> done = CompletableFuture.all0f(quotationCF, weatherCF);
// here we can chain the futures and do a final join to resolve all of them
var travelPage =
quotationCF.thenApply ( quotation ->
new TravelPage (quotation, weatherCF .get ()
).join()) ;
fourth try, using thenCompose() method
// create CompletableFutures from suppliers
var quotationCF = CompletableFuture.supplyAsync(() -> getQuotation ( )) ;
var weatherCF = CompletableFuture.supplyAsync(() -> getWeather ()) ;
// here the result is blocking for the response, which is not optimus
var travelPage = new TravelPage (quotationCF .get (), weatherCF. get ()) ;
// create another completefuture, but it's not needed in this example
CompletableFuture<Void> done = CompletableFuture.all0f(quotationCF, weatherCF);
// here we are going to compose or chain the futures, only a block with the final join(),
//but in between is not blocking
TravelPage travelPage =
quotationCF. thenCompose( quotation ->
-> weatherCF.thenApply (
weather -> new TravelPage (quotation, weather))
).join();
Controlling your threads: join pool
Asynchronous tasks are executed in the Common Fork / Join pool
The number of threads is the number of CPU cores of the host machine
The executor service
has one queue of waiting list to serve the messages to the consumers. Every consumer is a handle by a thread.
The fork/join pool
has one queue of waiting list per every thread. And if the thread has consume all the messages in its queue or waiting list,
then starts stealing messages from another neighbour waiting list of another thread.
By default, a task is executed in the same thread as the one that created it
By convention, if the methods ends with the "async" keyword, the function passed is executed in a new thread on the join pool
otherwise, the same thread is keep on using it.
For instance supplyAsync
is executed in another thread of the join pool
CompletableFuture<Quotation> quotationCF =
CompletableFuture. supplyAsync( () -> getQuotation ( ));
meanwhile thenApply
is executed in the same thread.
CompletableFuture<Boolean> doneCF =
infosCF.thenApply (
emailInfos -> writeToDB(emailInfos)
);
Executor executor = Executors.newFixedThreadPool (4);
// example for our custom executor
CompletableFuture<Quotation> quotationCF =
CompletableFuture. supplyAsync (
() -> getQuotation (), executor);
}
// here the SwingUtilities: :invokeLater implements the Executor interface, so you can use it.
doneCF. thenApplyAsync(done -> updateGUI (done),
SwingUtilities: :invokeLater);
ExecutorService as argument of async method to control the threads in which task are executing
The async
methods accepts the Executor
interface, so we can pass then or an executorService
or a lambda
.
public interface Executor {
void execute(Runnable task);
}
In the earlier examples, we can create some executors and apply them to the supplyAsync
and thenApplyAsync
, as second parameter
ExecutorService quotationExecutor = Executors.newFixedThreadPool (4, quotationThreadFactory);
ExecutorService weatherExecutor = Executors.newFixedThreadPool (4, weatherThreadFactory);
ExecutorService minExecutor = Executors.newFixedThreadPool (1, minThreadFactory);
// pass the references to the async methods
//finally close the executors
quotationExecutor.shutdown ();
weatherExecutor.shutdown();
minExecutor.shutdown ();
we can pass from this execution
QB running in Thread[ForkJoinPool.commonPool-worker-5,5,main]
WB running in Thread[ForkJoinPool.commonPool-worker-2,5,main]
Q running in Thread [ForkJoinPool.commonPool-worker-6,5,main]
WC running in Thread[ForkJoinPool.commonPool-worker-3,5,main]
Q running in Thread[ForkJoinPool.commonPool-worker-4,5,main]
Allof then apply Thread[ForkJoinPool.commonPool-worker-3,5,main]
WA running in Thread[ForkJoinPool.commonPool-worker-1,5,main]
to this one, where all the executor are running
WB running in Thread[Weather-1,5,main]
QA running in Thread[Quotation-0,5,main]
QB running in Thread[Quotation-1,5,main]
WC running in Thread[Weather-2,5,main]
WA running in Thread[Weather-0,5,main]
Q running in Thread[Quotation-2,5,main]
Allof then apply Thread[ForkJoinPool.commonPool-worker-1,5, main]
TravelPage[quotation=Quotation[server=ServerA,amount=41],weather=Weather[server=ServerB,v
handling exceptions
How to deal with exceptions:
- exceptionally)
- whenComplete()
- handle()
We can use the exceptionally()
or handle (T, exception)
or whenOnComplete(T, exception)
Supplier<Weather> w = () -> getWeather () ;
CompletableFuture<Weather> cf =
CompletableFuture.supplyAsync(w)
.exceptionally(t -> new Weather (...));
// or handle
cf.handle ( (weather, exception) -> {
if (exception != null) {
logger.error (exception);
return new Weather (...)
} else{
return weather;
}
});
// or whenOnComplete
cf.whenComplete( (weather, exception) -> {
if (exception != null)
logger .error (exception);
});
wrap up
Creating completable future with supplier
and runnable
:
Creating completable future with allOf
and anyOf
:
Creating completable future supported tasks:
Creating completable future chaining tasks:
Creating completable future chaining tasks, and get both values :
Creating completable future chaining tasks, and get a value:
Conclusions
CompletionStage is interface CompletionFuture is implementation
All these things are created for I/O tasks, no for inmemory tasks. The CompletionStage API is useless for inmmemory tasks. The performance is worst. Decrease the performance and the throughput
One executorService is best, faster than having more executorServices.
Be carefully with the number of threads of every executorService.
Next steps:
- Loom project for java, similar to kotlin coroutines
references:
- https://github.com/JosePaumard
- https://www.youtube.com/user/JPaumard