この記事ではjavaの並列処理としてForkJoinPoolとCallableを試してみます。他にもjavaにはRunnableを使ったものなどありますが、ForkJoinPoolとCallableで十分に足りると思うのでここでは割愛いたします。
ForkJoinPoolを使った並列処理
まずはForkJoinPoolを使った並列処理のやり方は以下です。
listの中に文字列がリスト型で入っておりparallelStreamと組み合わせることでlistから一件ずつ要素取り出し、map内で処理させるところを並列化しています。
ForkJoinPoolの引数にスレッド数としてlist.size()を入れているので、要素の数だけスレッドが立ち上がります。そのことはコンソールログからもわかります。
(parallelStreamだけの並列化だとCPUのコア数分しかスレッドが立たない。)
Main.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; public class Main { public static void main(String... args) throws ExecutionException, InterruptedException { List<String> list = new ArrayList<String>(Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m")); ForkJoinPool forkJoinPool = new ForkJoinPool(list.size()); List<String> returns = forkJoinPool.submit(() -> list.parallelStream().map(i -> myPrint(i)).collect(Collectors.toList())).get(); forkJoinPool.shutdown(); } private static String myPrint(String string) { System.out.println(Thread.currentThread().getName() + ": word = " + string); return string; } } |
コンソールログ
1 2 3 4 5 6 7 8 9 10 11 12 13 |
ForkJoinPool-1-worker-3: word = i ForkJoinPool-1-worker-19: word = h ForkJoinPool-1-worker-11: word = e ForkJoinPool-1-worker-7: word = m ForkJoinPool-1-worker-31: word = c ForkJoinPool-1-worker-5: word = d ForkJoinPool-1-worker-9: word = b ForkJoinPool-1-worker-25: word = f ForkJoinPool-1-worker-27: word = g ForkJoinPool-1-worker-23: word = l ForkJoinPool-1-worker-13: word = a ForkJoinPool-1-worker-21: word = j ForkJoinPool-1-worker-17: word = k |
試しに、ForkJoinPoolのスレッドの数を1としてみると、スレッドが1つしか立ち上がらず、一件ずつ1つのスレッドで処理されることになります。コンソールログからもそのことはわかります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class Main { public static void main(String... args) throws ExecutionException, InterruptedException { List<String> list = new ArrayList<String>(Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m")); ForkJoinPool forkJoinPool = new ForkJoinPool(1); List<String> returns = forkJoinPool.submit(() -> list.parallelStream().map(i -> myPrint(i)).collect(Collectors.toList())).get(); forkJoinPool.shutdown(); } private static String myPrint(String string) { System.out.println(Thread.currentThread().getName() + ": word = " + string); return string; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
ForkJoinPool-1-worker-3: word = g ForkJoinPool-1-worker-3: word = h ForkJoinPool-1-worker-3: word = i ForkJoinPool-1-worker-3: word = l ForkJoinPool-1-worker-3: word = m ForkJoinPool-1-worker-3: word = j ForkJoinPool-1-worker-3: word = k ForkJoinPool-1-worker-3: word = d ForkJoinPool-1-worker-3: word = e ForkJoinPool-1-worker-3: word = f ForkJoinPool-1-worker-3: word = a ForkJoinPool-1-worker-3: word = b ForkJoinPool-1-worker-3: word = c |
Callableを使った並列処理
次にCallableを使った並列処理の実装方法は以下です。
こちらもlistの中に文字列がリスト型で入っておりfor文と組み合わせてexecutorService.submit内の処理がされるところを並列化しています。
Executors.newFixedThreadPoolの引数にスレッド数としてlist.size()を入れているので、ループの数だけスレッドが立ち上がります。そのことはコンソールログからもわかります。
Main.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; public class Main { public static void main(String... args) throws ExecutionException, InterruptedException { List<String> list = new ArrayList<String>(Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m")); ExecutorService executorService = Executors.newFixedThreadPool(list.size()); for (String i : list) { Future<String> future = executorService.submit(new Callable<String>() { @Override public String call() throws InterruptedException { System.out.println(Thread.currentThread().getName() + ": word = " + i); return i; } }); } executorService.shutdown(); } } |
コンソールログ
1 2 3 4 5 6 7 8 9 10 11 12 13 |
pool-1-thread-13: word = m pool-1-thread-12: word = l pool-1-thread-7: word = g pool-1-thread-5: word = e pool-1-thread-11: word = k pool-1-thread-1: word = a pool-1-thread-6: word = f pool-1-thread-8: word = h pool-1-thread-4: word = d pool-1-thread-10: word = j pool-1-thread-9: word = i pool-1-thread-2: word = b pool-1-thread-3: word = c |
試しに、Executors.newFixedThreadPoolのスレッドの数を1としてみると、スレッドが1つしか立ち上がらず、一件ずつ1つのスレッドで処理されることになります。コンソールログからもそのことはわかります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class Main { public static void main(String... args) throws ExecutionException, InterruptedException { List<String> list = new ArrayList<String>(Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m")); ExecutorService executorService = Executors.newFixedThreadPool(1); for (String i : list) { Future<String> future = executorService.submit(new Callable<String>() { @Override public String call() throws InterruptedException { System.out.println(Thread.currentThread().getName() + ": word = " + i); return i; } }); } executorService.shutdown(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
pool-1-thread-1: word = a pool-1-thread-1: word = b pool-1-thread-1: word = c pool-1-thread-1: word = d pool-1-thread-1: word = e pool-1-thread-1: word = f pool-1-thread-1: word = g pool-1-thread-1: word = h pool-1-thread-1: word = i pool-1-thread-1: word = j pool-1-thread-1: word = k pool-1-thread-1: word = l pool-1-thread-1: word = m |