Sunday, August 15, 2021

Java 8: Streams API: Normal streams vs parallel streams

By default any Stream operations is performed in sequential way unless explicitly stated. The output of sequential operations is predictable i.e. elements will always be processed in their encounter order For example, see "Total Sequential Time Taken for IntStream example" or "Total Sequential Time Taken for Employee Example" in below code. In these examples, all the elements are processed in Main thread only.

Whereas parallel streams enable us to execute the code in parallel on separate cores. Output of each of the cores processing will then be combined to produce the final output. In this case, order of execution elements is NOT in our (developer) control, it may change every time we run the program. Please see below examples where parallel execution is using other threads from ForkJoinPool along with Main thread. ForkJoin framework (added in Java 7) is in charge of this parallel execution i.e. distributing the data and handling callbacks.

There are 2 ways to create a stream to execute in parallel using .parallel() method and .parallelStream() method. Below code shows both the examples. When I executed the code, below is the time taken for each of the executions

Total Sequential Time Taken for IntStream example :9

Total Parallel Time Taken using .parallel() for IntStream example : 31


Total Sequential Time Taken for Employee Example:34

Total Parallel Time Taken using .parallelStream() for Exmployee example :13

Total Parallel Time Taken using .parallel() for Exmployee example :4


If you observe, for IntStream converting the stream to execute in parallel has worsen the performance. The reason behind this is for simple streams and operations, sometimes managing the threads, sources and results (i.e. splitting & merging) is more expensive than executing it sequential way. If you see the examples with Employee time for sequential execution is more than with parallelStream() method stream which is again more than when using parallel() method.


So the question comes, when shall we use parallel streams

1. When there is actually a performance requirement: we should first measure performance using sequential stream and then consider parallel stream as one of the optimization strategy

2. Large data & complex computations 


       


package java8;
import java.util.*;

public class EmployeeDB {
	public static List<Employee> getEmployeeList(){
		List<Employee> empList = Arrays.asList(
				//int eId, String fName, String lName, double salary, List<Integer> phNumbers
				new Employee(5, "Sonu", "Panjabi", 1000, Arrays.asList(1234, 2345)),
				new Employee(7, "Ganu", "Marathi", 1500, Arrays.asList(3456, 4567)),
				new Employee(8, "Manu", "Bangali", 2000, Arrays.asList(5678, 6789)),
				new Employee(4, "Tonu", "Bihari", 3000, Arrays.asList(7890, 8901)),
				new Employee(1, "Kanu", "Asami", 3500, Arrays.asList(9012, 1230)),
				new Employee(6, "Ranu", "Kashmiri", 4000, Arrays.asList(9876, 8765)),
				new Employee(2, "Janu", "Kannadi", 4500, Arrays.asList(7654, 6543)),
				new Employee(3, "Monu", "Gujrathi", 5000, Arrays.asList(5432, 4321))
				);
		return empList;
	}
}
package java8.sequentialparallelstream;

import java.time.Duration;
import java.time.LocalTime;
import java.util.stream.IntStream;

import java8.EmployeeDB;

public class SequentialAndParallelStream {

	public static void main(String[] args) {
		
		IntStream seqStream = IntStream.range(1, 11);
		LocalTime startTime = LocalTime.now();
		seqStream.forEach(a-> System.out.println("Thread: "+Thread.currentThread().getName()+" value:"+a));
		LocalTime endtTime = LocalTime.now();
		Duration totalTime = Duration.between(startTime, endtTime);
		System.out.println("Total Sequential Time Taken for IntStream example :"+ totalTime.toMillis());
		
		IntStream parallelStream = IntStream.range(1, 11);
		startTime = LocalTime.now();
		parallelStream.parallel().forEach(a-> System.out.println("Thread: "+Thread.currentThread().getName()+" value:"+a));
		endtTime = LocalTime.now();
		totalTime = Duration.between(startTime, endtTime);
		System.out.println("Total Parallel Time Taken using .parallel() for IntStream example : "+ totalTime.toMillis());
		
		System.out.println("\n Let's check with employee example \n");
		
		long start = System.currentTimeMillis();
		EmployeeDB.getEmployeeList().stream().sorted((e1, e2) -> e1.getfName().compareTo(e2.getfName()))
			.forEach(a ->System.out.println("Thread: "+Thread.currentThread().getName()+" value:"+a));
		long end = System.currentTimeMillis();
		long total = end - start;
		System.out.println("Total Sequential Time Taken for Employee Example:"+ total);
		
		start = System.currentTimeMillis();
		EmployeeDB.getEmployeeList().parallelStream().sorted((e1, e2) -> e1.getfName().compareTo(e2.getfName()))
			.forEach(a ->System.out.println("Thread: "+Thread.currentThread().getName()+" value:"+a));
		end = System.currentTimeMillis();
		total = end - start;
		System.out.println("Total Parallel Time Taken using .parallelStream() for Exmployee example :"+ total);
		
		start = System.currentTimeMillis();
		EmployeeDB.getEmployeeList().stream().parallel().sorted((e1, e2) -> e1.getfName().compareTo(e2.getfName()))
			.forEach(a ->System.out.println("Thread: "+Thread.currentThread().getName()+" value:"+a));
		end = System.currentTimeMillis();
		total = end - start;
		System.out.println("Total Parallel Time Taken using .parallel() for Exmployee example :"+ total);
	}

}

No comments:

SpringBoot: Features: SpringApplication

Below are a few SpringBoot features corresponding to SpringApplication StartUp Logging ·          To add additional logging during startup...