Ch10. マルチスレッド
マルチスレッド
マルチスレッドの実現方法は2通り。
- Threadクラスを継承したクラスを作って、並行処理されたい内容を
run
メソッドに実装する。 - Runnableインターフェースを実装して、並行処理させたい内容を
run
メソッドに実装する。
Runnableインターフェースは関数型インターフェースなので、ラムダで実装することが可能。
import java.util.Arrays; public class IntroduceThread { public static void main(String[] args){ MyThread thread1 = new MyThread(); MyAnotherThread thread2 = new MyAnotherThread(); thread1.start(); thread2.run(); new Thread(()-> Arrays.asList(11,12,13,14,15,16,17,18,19).stream().forEach(System.out::print)).run(); new Thread(()-> Arrays.asList(41,42,43,44,45,46,47,48,49).stream().forEach(System.out::print)).start(); //'313233343536373839212223242526272811121314151617181929414243444546474849' } } class MyThread extends Thread{ public void run(){ Arrays.asList(21,22,23,24,25,26,27,28,29).stream().forEach(System.out::print); } } class MyAnotherThread implements Runnable{ public void run(){ Arrays.asList(31,32,33,34,35,36,37,38,39).stream().forEach(System.out::print); } }
スレッドの制御
interrrupt
は他のスレッドに対する割り込みを行ってくれる。
割り込みが行われたスレッドは【InteruputedException】例外を吐くが、それでも実行を再開してくれる。
public class ThreadTrial { public static void main(String[] args){ Thread ThreadA = new Thread(() -> { System.out.println("ThreadA 開始"); try{ Thread.sleep(5000); } catch(InterruptedException e){ System.out.println("A: 割り込まれました"); } System.out.println("ThreadA 終了"); }); ThreadA.start(); try{ System.out.println("mainThread 開始"); Thread.sleep(2000); ThreadA.interrupt(); }catch (InterruptedException e){ System.out.println("main: 割り込まれました"); } System.out.println("mainThread 終了"); } }
mainThread 開始 ThreadA 開始 mainThread 終了 A: 割り込まれました ThreadA 終了 Process finished with exit code 0
排他制御と同期制御
排他制御
複数のスレッドが同じリソース(オブジェクト)を利用する場合、リソース同士の競合が発生しないようにオブジェクトにロックをかけて使用中のスレッドだけが占有して使えるようにしたい。 このような制御を排他制御という。
synchronized
キーワードを付けることで、そのメソッドは同時に一つのスレッドからしか呼び出せなくなる。
class Share{ private int a = 0; private String b; public synchronized void set(){ a++; b = "String"; System.out.println("a : " + a + " b: " + b); } public synchronized void print(){ a--; b = null; System.out.println("a : " + a + " b: " + b); } } class ThreadA extends Thread{ private Share share; public ThreadA(Share share){ this.share = share; } public void run(){ for(int i = 0; i < 5; i++){ this.share.set(); } } } class ThreadB extends Thread{ private Share share; public ThreadB(Share share){ this.share = share; } public void run(){ for(int i = 0; i < 5; i++){ this.share.print(); } } } public class ThreadContoll { public static void main(String[] args){ Share share = new Share(); ThreadA threadA = new ThreadA(share); ThreadB threadB = new ThreadB(share); threadA.start(); threadB.start(); } }
a : 1 b: String a : 2 b: String a : 3 b: String a : 4 b: String a : 5 b: String a : 4 b: null a : 3 b: null a : 2 b: null a : 1 b: null a : 0 b: null
下記はsyncronized
キーワードを設定しなかった場合の表示である。
a : 0 b: null a : 0 b: null a : 1 b: String a : 0 b: null a : 0 b: null a : -1 b: null a : -2 b: null a : 1 b: String a : -1 b: String a : 0 b: String
同期制御
syncronized
キーワードを利用しても、そのメソッドが同じスレッドから2回以上呼び出される可能性は排除できない(現に同じメソッドから連続して5回呼び出されている)。
そこで、指定したオブジェクトの利用を待つ待機中の別のスレッドに対して、スレッドの再開を許すnotify()
メソッドと、別のメソッドがそのオブジェクトの利用を完了するまで待つwait()
メソッドが存在する。
class Share{ private int a = 0; private String b; public synchronized void set(){ while(a != 0) { try { wait(); } catch (InterruptedException e) { System.out.print(e); } } a++; b = "String"; System.out.println("a : " + a + " b: " + b); notify(); } public synchronized void print(){ while (b == null){ try{ wait(); }catch (InterruptedException e){ System.out.println(e); } } a--; b = null; System.out.println("a : " + a + " b: " + b); notify(); } } class ThreadA extends Thread{ private Share share; public ThreadA(Share share){ this.share = share; } public void run(){ for(int i = 0; i < 5; i++){ this.share.set(); } } } class ThreadB extends Thread{ private Share share; public ThreadB(Share share){ this.share = share; } public void run(){ for(int i = 0; i < 5; i++){ this.share.print(); } } } public class ThreadContoll { public static void main(String[] args){ Share share = new Share(); ThreadA threadA = new ThreadA(share); ThreadB threadB = new ThreadB(share); threadA.start(); threadB.start(); } }
a : 1 b: String a : 0 b: null a : 1 b: String a : 0 b: null a : 1 b: String a : 0 b: null a : 1 b: String a : 0 b: null a : 1 b: String a : 0 b: null
notify()
を利用することで各スレッドがオブジェクトを利用するたびに、待機中の他のスレッドに対してオブジェクトの利用を許可する。
そのため、オブジェクトは交互に利用されるようになる。
ただし、notify()
にしてもnotifyAll()
にしても、どのスレッドにそのオブジェクトを利用させるか指定できるわけではない。
また、syncronized
キーワードを指定していないメソッドに対してnoitify
を利用することは出来ない(例外となる)。
ライブロックとデッドロック
後述
コレクションAPIでの並列処理
並列処理に対応していないコレクションAPIを使って、拡張For文内でコレクションの中身を変更しようとすると、即座にConcurrentModificationException
がスローされる。
これは拡張forの中で「フェイルファースト・イテレータ」と呼ばれ、単一スレッド内でもイテレータの反復処理中にコレクションの中身を変更をする処理があれば処理が中断される。
複数スレッド内から一つのコレクションオブジェクトを利用したい場合はjava.util.concurrent
パッケージを利用する。
BlockingDeque
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; public class Blocking { public static void main(String[] args){ BlockingQueue<Double> bQueue = new LinkedBlockingDeque<>(2); new Thread(() -> { while(true){ try{ bQueue.offer(Math.random(), 2, TimeUnit.SECONDS); System.out.println("Queue size: " + bQueue.size()); }catch (InterruptedException e){ System.out.println(e); } } }).start(); new Thread(() -> { while (true){ try{ double number = bQueue.poll(1, TimeUnit.SECONDS); System.out.println("poll: " + number); }catch (InterruptedException e){ System.out.println(e); } } }).start(); } }
上記の例ではQueueの要素を追加する際、Queueに空きがなければ指定された秒だけofferを待つ・Queueが空っぽなら指定された秒だけpollを待つ、といった処理を実現している。
ConcurrentMap
Mapにもマルチスレッドに対応したクラスが用意されている。
Mapであるキーとバリューの組み合わせを取得したいとき、「まずキーの存在を確認(containsKey)し、存在すれば取得する(get)」ような処理が一般的に行われる。 しかし、「キーの存在確認」から「取得」までに別のスレッドが割り込んでMapを変更してしまう可能性がある。
そこで、「キーの存在確認」から「取得」までを一つのロックで実施し、割り込みを防ぐのがputIfAbsent()
である。
ConcurrentMap (Java Platform SE 8)
CopyOnWriteArrayList
ArrayListにもマルチスレッディングに対応したクラスが存在する。
import java.util.ArrayList; import java.util.Iterator; public class ConcMap { public static void main(String[] args){ ArrayList<String> list = new ArrayList<String>(); list.add("A"); list.add("B"); list.add("C"); new Thread(() -> { Iterator itr = list.iterator(); while(itr.hasNext()){ System.out.println("ThreadA : " + itr.next()); try{ Thread.sleep(5000); }catch (InterruptedException e){ e.printStackTrace(); } } }).start(); try{ Thread.sleep(1000); }catch (InterruptedException e){ e.printStackTrace(); } list.add("d"); list.add("e"); } }
ThreadA : A Exception in thread "Thread-0" java.util.ConcurrentModificationException at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1043) at java.base/java.util.ArrayList$Itr.next(ArrayList.java:997) at ConcMap.lambda$main$0(ConcMap.java:14) at java.base/java.lang.Thread.run(Thread.java:834)
上記は通常のArrayListを複数のスレッドで操作したもの。 ラムダで生成したスレッドで取得したイテレータは「フェイルファースト・イテレータ」なので、mainスレッドでコレクションの中身を変更しようとして例外が発生している。
//ArrayList<String> list = new ArrayList<String>(); CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>();
上記のように並行制御に対応したArrayListを使用することで例外の発生は回避できる。
ただしその違いは、ラムダで生成したスレッドで取得したイテレータが「フェイルファースト・イテレータ」ではなくなっただけである。 スレッドの外側で発生したコレクションの変更に対応できている訳ではない。
ThreadA : A ThreadA : B ThreadA : C Process finished with exit code 0
Executor Framework
複数のスレッドの処理を一元管理してくれるスレッドマネージャクラスである。 シングルトンなので、必要に応じてクラスメソッドを呼び変えてインスタンス化する。
newSingleThreadExecutor
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecTest { public static void main(String[] args){ ExecutorService service = null; try{ service = Executors.newSingleThreadExecutor(); System.out.println("service.execnute()"); for(int i = 0; i < 3; i++) { service.execute(() -> { for (int a = 0; a < 5; a++) { try { Thread.sleep(500); System.out.print("*"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("thread task"); }); } }finally { service.shutdown(); System.out.println("service.shutdown()"); } } }
service.execnute() service.shutdown() *****thread task *****thread task *****thread task
常に一つのスレッドでタスクを処理するよう管理するスレッドマネージャ。 ループで3つのスレッドを生成しているが、初めに生成したスレッドの完了を待って残り2つのスレッドを休止状態としている。
またshutdown()
によってスレッドマネージャが終了しても、実行(休止)中のスレッドは生き残り続ける。
submitメソッド
(これはExecutorSerivceのインスタンスメソッドである)
execute
ではなくsubmit
メソッドを介してスレッドを開始すると、そのスレッドの完了状況をFuture
クラスのインスタンスとして取得することが出来る。
import java.util.concurrent.*; public class SubmitTest { public static void main(String[] args){ ExecutorService service = null; try{ service = Executors.newSingleThreadExecutor(); Future<?> result = service.submit(() -> System.out.println("submitで実行")); System.out.println(result.get()); Future<String> result2 = service.submit(() -> { try{ Thread.sleep(2000); //throw new RuntimeException(); }catch (InterruptedException e){ }}, "おわったよ"); System.out.println(result2.get()); }catch(InterruptedException | ExecutionException e){ }finally { service.shutdown(); } } }
submitで実行 null おわったよ
特徴的なのは、実行結果がtrueやfalseで返却されるわけではなく、スレッドと一緒に自分で指定する点である。
scheduleメソッド
(これはExecutorSerivceのインスタンスメソッドである)
スレッドに実行の遅延時間や、定期実行などを指定する。 引数には(後述する)Callableが利用できず、Runnableにする必要がある点に注意する。
import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduleTest { public static void main (String args[]){ ScheduledExecutorService service = null; try{ service = Executors.newSingleThreadScheduledExecutor(); Runnable task = () -> System.out.println(new Date()); service.scheduleWithFixedDelay(task, 2, 2, TimeUnit.SECONDS); //Thread.sleep(2000); }catch (Exception e){ e.printStackTrace(); } } }
Callable
上記の例では、スレッドが完了したかどうかは把握できたが、スレッドがどのような結果を得たかは把握できなかった。
これを解消するのがCallable
である。Runnable
と同じくスレッドを扱うオブジェクトとして機能する。
Future<Date> result = service.submit(() -> new Date()); //'Sun Apr 18 10:38:37 JST 2021'
newCashedThreadPool
いままでのスレッドマネージャは単一のスレッドしか管理できなかった(複数スレッドを作る場合でも、実行は逐一だった)。 複数のスレッドを準備しておき、同時並行で実行するスレッドマネージャが存在する。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolTest { public static void main(String[] args){ ExecutorService service = null; try{ //service = Executors.newCachedThreadPool(); service = Executors.newFixedThreadPool(3); Runnable task = () -> { String name = Thread.currentThread().getName(); System.out.println(name + " : start"); try{ Thread.sleep(5000); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println(name + " : end"); }; for(int i = 0; i < 5; i++){ service.execute(task); } }finally { if(service != null)service.shutdown(); } } }
pool-1-thread-1 : start pool-1-thread-3 : start pool-1-thread-2 : start pool-1-thread-1 : end pool-1-thread-2 : end pool-1-thread-3 : end pool-1-thread-2 : start pool-1-thread-1 : start pool-1-thread-2 : end pool-1-thread-1 : end
newFixedThreadPoolのスレッドマネージャを利用したことで、指定された個数のスレッドを再利用して処理を完了させているのがわかる。 5回の実行に3つのスレッドが生成され、thread-3のみ2回だけ利用されている。
CyclicBarrier
CyclicBarrierを用いることで、同じ処理を持つ複数のスレッドが特定の処理ポイントを通過するのを待つことが出来る(スレッドを跨いだ処理の同期が出来る)。 実行したいスレッドに対して、CyclicBarrierのオブジェクトを渡して利用する。
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicTest { void a(){System.out.println("a "); } void b(){System.out.println("b "); } void c(){System.out.println("c "); } void exec(CyclicBarrier c1, CyclicBarrier c2){ try{ a(); c1.await(); b(); c2.await(); c(); }catch (Exception e){ } } public static void main(String[] args){ ExecutorService service = null; try{ service = Executors.newFixedThreadPool(3); CyclicBarrier c1 = new CyclicBarrier(3); CyclicBarrier c2 = new CyclicBarrier(3, () -> System.out.println("task")); for(int i = 0; i < 3; i++){ service.execute(() -> new CyclicTest().exec(c1,c2)); } }finally { service.shutdown(); } } }
a a a b b b task c c c
CyclicBarrierは、バリアポイントで待機するスレッド数を指定して、スレッド化する処理に渡すことで利用する。
スレッド化する処理の中で、await()
を呼ぶことでX本のスレッドの完了を待つことが出来る。
CyclicBarrierを利用しないと以下のようになる。
a a b a b c c b c
三つのスレッドが同期ポイントを持たず、それぞれのタイミングで開始して終了する。
アトミック
putIfAbsent
メソッドに代表されるように「その変数を参照して、確認したのちに変数に対して変更を加えたい」という処理は翌発生する。
参照から更新までの間に更新が発生して「ファントムリード」のような事象が発生し得る。
参照から更新までに値が変更されないことを保証できる変数オブジェクトがJavaで用意されておりjava.util.concurrent.atomic
で利用できる。
import java.util.concurrent.atomic.AtomicInteger; public class AtomicTest { private final AtomicInteger var = new AtomicInteger(0); public static void main(String[] args){ new AtomicTest(); } public AtomicTest(){ for(int i = 0; i < 5; i++){ System.out.println(var.getAndIncrement()); } System.out.println("Result: " + var); } }
パラレルストリーム
この章で扱わなくてもStreamAPIの章で扱っても良い内容。
StreamAPIでの中間操作・終端操作をマルチスレッドで行ってくれるもの。 パラレルストリームに対して、通常のストリームのことを「シーケンシャルストリーム」と呼ぶ。
import java.util.Arrays; import java.util.List; import java.util.stream.IntStream; import java.util.stream.Stream; public class ParallelTest { public static void main(String[] args){ List<String> data = Arrays.asList("千代田線", "浅草線", "半蔵門線"); Stream<String> pStream1 = data.parallelStream(); Stream<String> nStream1 = data.stream(); System.out.println("isPararell : " + pStream1.isParallel());//'isPararell : true' System.out.println("isPararell : " + nStream1.isParallel());//'isPararell : false' IntStream pStream2 = IntStream.range(0,10).parallel(); pStream2.forEach(System.out::print);//'6581429037' System.out.println(); IntStream.range(0,10).forEach(System.out::print);//'0123456789' } }
最後のintStreamのパラレル化からもわかるように、パラレル化されたStreamではどの処理から実行されるか保証されない。
ストリームの要素が少ない場合、処理をパラレル化することに対するオーバーヘッドがによって、パラレル化の時間短縮化の効果はキャンセルされてしまう。 パラレル化によってパフォーマンス改善を図る場合は要素数の多いストリームに限るべきである。
forEachOrdered
パラレル化しても実行順序を保証するメソッドとしてforEachOrdered
が用意されている。パフォーマンスは下がるが、シーケンシャルStreamと同じような挙動を得ることが出来る。
import java.util.Arrays; import java.util.Locale; public class ParallelOrderTest { public static void main(String[] args){ Arrays.asList("a", "b", "c", "d").parallelStream().forEachOrdered(System.out::print);//'abcd' System.out.println(); Arrays.asList("a", "b", "c", "d").parallelStream().forEach(System.out::print);//'cdba' Arrays.asList("a", "b", "c", "d").parallelStream().map(s -> s.toUpperCase(Locale.ROOT)).peek(System.out::print).forEachOrdered(System.out::print);//'CBADABCD' } }
パラレルストリームのまま中間操作に入った場合、順序が保証されないまま中間操作が実施されるも、最後はforEachOrderd
で元の順番に順序を入れ替えて処理を終了する。
findAny()とfindFirst()
どちらも処理に時間がかかるが、シーケンシャルStreamと同じように動作する。
reduce()とcollect()
教科書ではパラレルStreamを利用する際にcollect
やreduce
を利用するときは引数の最後に部分的な途中の集約処理が必要と書かれているが、今のところなくても期待通り動作している。
import java.util.Arrays; import java.util.stream.Collectors; public class ParallelReduce { public static void main(String[] args){ Integer result = Arrays.asList(1,2,3,4,5,6,7,8,9).parallelStream().reduce(0, (sum, a) -> sum + a); System.out.println(result); Integer result2 = Arrays.asList(1,2,3,4,5,6,7,8,9).parallelStream().collect(Collectors.summingInt(t -> t)); System.out.println(result2); } }
groupingByConccurent()とConcurrentMap()
どちらも通常のシーケンシャルStreamと同じ動きである。
ただし、返却されるMapオブジェクトは並列処理に対応したオブジェクトConcurrentHashMap
になる。
Fork/Joinフレームワーク
Fork/Joinフレームワークは大量の計算処理を小さなスレッドに分割して並列で計算する仕組み。 具体的には既にみた「スレッドプール」の仕組みを使い、分割された小さな単位の計算を各スレッドにあてがっていく。