Java SE 8のストリームAPIの正しい使い方──ラムダ式とともに導入された新APIで、並列処理の実装はどう変わるのか?

Oracle Java & Developers編集部
2014-11-28 11:00:00
  • このエントリーをはてなブックマークに追加

ストリームAPIを使った並列処理

 ストリームAPIの強みとしては、並列処理の実装が容易に行えることが挙げられます。ストリームの処理を並列化するには、次のようにパイプラインの途中でメソッドparallelを挿入するだけでよいのです。これだけで、メソッドforEachの処理が並列に行われるようになります。

【リスト17:メソッドparallelを挿入してストリーム処理を並列化する】

IntStream.range(10, 20) .parallel() .forEach(i -> System.out.print(i + " "));

 なお、並列化する場合の第一の注意点は、要素の処理の順番が保証されなくなるということです。上のコードを実行すると、出力結果は次のような具合になり、最初の要素の並び順とは関係なくメソッドprintが実行されていることがわかります。

【リスト18:リスト17のコードの実行結果(要素の順序は保証されない)】

12 13 14 10 11 18 19 17 15 16

 メソッドparallelを使うほかに、Collectionに用意されたメソッドparallelStreamを使うことでも、並列化されたストリームを実装できます。使い方は、メソッドstreamによるストリームの取得を、次のようにメソッドparallelStreamに置き換えるだけです。

【リスト19:メソッドparallelStreamの使用例】

void robocallEligibleDrivers() { list.parallelStream() .filter(p -> p.getAge() >= 16) .map(p -> p.getHomePhoneNumber()) .forEach(num -> { robocall(num); }); }

 ストリームAPIにおける並列化のサポートは、繰り返し処理の実装に大きな影響を与えるものです。これまではfor文などで記述していた繰り返し処理をストリームに置き換えることで、簡単に並列処理として実装できるからです。

誤ったストリームAPIの使い方

 ただし、繰り返し処理を闇雲にストリーム化すればよいというものではありません。特に並列処理については、誤った使い方をすると逆にパフォーマンスを低下させたり、間違った結果を返してしまったりすることがあります。以下に、ストリームの誤った使い方の例をいくつかご覧いただきましょう。ここでは、次のような繰り返し処理を例にとります。

【リスト20:繰り返し処理の例】

long sum = 0L; for (long i = 1L; i <= 1000000L; i++) sum += i; System.out.println(sum);

 これは1から100万までの数字を順番に足し合わせる処理で、実行結果は「500000500000」になります。これと同じ処理をストリームAPIで行おうとする場合、素直に書けば次のようになるでしょう。

【リスト21:リスト20の繰り返し処理をストリームAPIを使って
素直に書いた例(コンパイル・エラーになる)】

long sum = 0L; LongStream.rangeClosed(1L, 1000000L) .forEach(i -> sum += i); System.out.println(sum);

 このコードは、「ラムダ式から参照されるローカル変数は、finalまたは事実上のfinalである必要があります」というコンパイル・エラーになるはずです。ラムダ式の中からは外側のローカル変数を参照することができますが、値を書き換えることは禁止されています。また、参照できるローカル変数は、final宣言されているか、初期化以外で値を書き換えていない変数に限られます。したがって、リスト21の例のようにsumの値をメソッドforEachに渡すラムダ式の中で書き換えるのは誤りなのです。

 この制限を回避する代替案としては、次のように要素が1つの配列を使う方法が考えられます。配列の要素であれば、finalの制限を回避してラムダ式の中で書き換えることができるのです。

【リスト22:リスト21のコードを配列を使って書き直した例】

long sum[] = new long[] { 0L }; LongStream.rangeClosed(1L, 1000000L) .forEach(i -> sum[0] += i); System.out.println(sum[0]);

 このコードは正しい結果を出力してくれます。しかし、このようなストリームの使い方はお勧めしません。この実装では、ストリームAPIの最大のメリットとも言える並列化の恩恵を受けられないからです。

 では、この処理を並列化するには、どうしたらよいでしょうか。それには、次のようにパイプラインの途中にメソッドparallelを挿入します。

【リスト23:リスト22のコードにメソッドparallelを挿入した例】

long sum[] = new long[] { 0L }; LongStream.rangeClosed(1L, 1000000L) .parallel() .forEach(i -> sum[0] += i); System.out.println(sum[0]);

 ただし、このコードでは、実行する度に異なる計算結果が出力されてしまうでしょう。「+=」はアトミックな処理ではないため、並列化することでsum[0]の値が同時に書き換えられて結果が狂う可能性があるのです。

 並列処理による同時アクセスを防止したい場合、次のようにAtomicLongを使う方法が考えられます。AtomicLongであれば、sumの単一性が保証されるため、同時に書き換えられて計算結果が狂う心配はありません。

【リスト24:リスト23をAtomicLongを使って書き直した例】

AtomicLong sum = new AtomicLong(0L); LongStream.rangeClosed(1L, 1000000L) .parallel() .forEach(i -> sum.addAndGet(i)); System.out.println(sum.get());

 このコードは、狙いどおり正しい計算結果を出してくれるはずです。しかし、これもお勧めできない方法です。sumをAtomicLongにしたということは、ストリームにおいて1つの要素の処理中に、他の要素が待ち状態になるということにほかなりません。それでは並列化する意味がないばかりか、並列化によるオーバーヘッドで通常よりもパフォーマンスが落ちてしまうからです。

並列計算を効率化する分割計算処理「リダクション」を活用する

 実は、例題としているような「並列で計算を行って最終的に1つの値に集約する処理」を実装するには、もっと優れた方法があります。それが「リダクション」です。リダクションは、大規模データ処理の分野ではMap & Reduceとして知られている手法です。リダクションでは、下図のように全体の計算を細かな部分計算に分割し、それを個別に処理したうえで最終解に向けて集約していきます。


 リダクションには、それぞれの部分計算の間の依存性を極力排除することによって効果的に並列実行できるというメリットがあります。その一方で、各部分計算がどの順序で行われるのかは保証されないということに注意してください。

 ストリームAPIには、リダクションのための終端操作が標準で用意されています。最も一般化されたものがメソッドreduceで、このメソッドを使うと、行いたい集約処理を引数としてラムダ式で渡すことができます。次に示すコードは、2つの値を受け取り、それらを足し合わせるという集約処理の例です。ストリームの各要素が足し合わされていき、最終的にすべての合計が求められます。ただし、メソッドreduceは戻り値がOptional型なので、結果の値はそこからメソッドgetなどで取り出す必要があります。

【リスト25:リスト24をメソッドreduceを使って書き直した例】

OptionalLong sum = LongStream.rangeClosed(1L, 1000000L) .parallel() .reduce((i, j) -> i + j); System.out.println(sum.getAsLong());

 合計を求めるだけなら、もっと簡単に使えるメソッドが用意されています。次の例で使用しているメソッドsumは、ストリームの全要素の値の合計を求める終端操作です。メソッドsumはIntStreamなどのプリミティブ型のストリームに用意されており、戻り値として、その型の値がそのまま返されます。

【リスト26:リスト25をメソッドsumを使って書き直した例】

long sum = LongStream.rangeClosed(1L, 1000000L) .parallel() .sum(); System.out.println(sum);

ストリームをさらに便利にする「コレクタ」

 最後に「コレクタ」を紹介しておきましょう。コレクタは、ストリームの要素を集約して1つのオブジェクトを取得したいというケースで便利な機能です。例えば、ストリームの処理の結果を文字列やコレクションのオブジェクトとして取り出したり、特定の値をキーにしてグループ化したりといった操作が行えます。コレクタも並列処理が可能であり、スレッドセーフでないデータ構造でも使うことができます。

 コレクタは、メソッドcollectで使用します。このメソッドは、クラスCollectorsに用意された集約処理のためのメソッドの呼び出しとセットで使います。次に示すのは、ストリームの結果をListオブジェクトとして取り出す例です。

【リスト27:コレクタを使い、ストリームの結果を
Listオブジェクトとして取り出す例】

List<String> wordList = …略… List result = wordList.parallelStream() .map(s -> s.toUpperCase()) .collect(Collectors.toList());

 また、次に示すのは、各要素を「 * 」を区切り文字にして連結し、文字列化する例です。メソッドjoiningは、ストリームの要素を見つけた順番に文字列連結する終端操作の機能を提供します。

【リスト28:コレクタを使い、各要素を「 * 」を区切り文字にして
文字列化する例】

List<String> wordList = ... String result = list.parallelStream() .map(s -> s.toUpperCase()) .collect(Collectors.joining(" * "));

 以上、Java SE 8でラムダ式の導入に伴って追加されたストリームAPIについて解説しました。今回紹介したストリームAPIやコレクションAPIの拡張を見てもわかるように、ラムダ式の登場は、Javaライブラリの設計を大きく変えました。特にストリームAPIは、今後アプリケーションの作り方に多大な影響を与えるでしょう。これからのJava開発では、これらの新機能に対する正しい理解が不可欠になります。この記事も参考にして、一刻も早くJava SE 8に慣れ親しんでください。

>> 前回の記事はこちら

【関連記事】

PaaSといっても、何を検討すればよいのか、違いがわからない…という方へ、Oracle Cloudの概要セミナーを開催!

オラクルのパブリッククラウドの成り立ちや代表的なサービス、価格体系等のクラウドサービスの基本情報から、具体的な活用事例までご紹介します。

  • コメント(1件)
#1 勘違いかもしれませんが   2015-08-09 10:41:23
中間操作のための主要なメソッドの説明で
マップとフィルタの内容が入れ替わっているように思います