前回記事で紹介した通り、
「Durable Functions」にはいくつか機能がありましたが、
今回は、複数関数の並列実行を行う方法(ファンアウト/ファンイン)を試してみます。
前回記事
今回の記事はDurable Functionsの基礎的な作り方の部分は省きます。
基礎的な作り方に関しては下記ページで紹介してますのでご参照ください。
onarimonstudio.hatenablog.com
どういう機能を実装するのか
今回はメッセージを出力する関数を並列で実行します(ファンアウト/ファンイン)。
図にするとこんな感じです。
ファンアウト/ファンインは、複数の関数を並列に実行してすべてが完了するまで待機するパターンです。 複数の関数から返された結果に基づいて集計作業が行われる場合があります。
Durable Functions のファンアウト/ファンイン シナリオ - Azure | Microsoft Docs
実際に作ってみる
まずは準備
前回記事で作ったところまで用意します。
「FunctionParallel.cs」という名前でAzure関数を追加しています。
onarimonstudio.hatenablog.com
正直、並列処理の例としてはわかりづらいです。すみません。
並列処理を記述する
「FunctionParallel.cs」をまるっと書き換えます。 今回コードの中で並列処理の1つ目の関数のみ5秒のスリープの関数を仕込んでます。
using System.Collections.Generic; using System.Net.Http; using System.Threading.Tasks; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.Azure.WebJobs.Host; using Microsoft.Extensions.Logging; namespace DurableFuncAppFanoutFanin { public static class FunctionParallel { [FunctionName("FunctionParallel_HttpStart")] public static async Task<HttpResponseMessage> HttpStart( [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]HttpRequestMessage req, [OrchestrationClient]DurableOrchestrationClient starter, ILogger log) { // Function input comes from the request content. string instanceId = await starter.StartNewAsync("FunctionParallel", null); log.LogInformation($"Started orchestration with ID = '{instanceId}'."); return starter.CreateCheckStatusResponse(req, instanceId); } [FunctionName("FunctionParallel")] public static async Task<List<string>> RunOrchestrator( [OrchestrationTrigger] DurableOrchestrationContext context) { var outputs = new List<string>(); outputs.Add(await context.CallActivityAsync<string>("FunctionParallel_Hello", "FunctionStart")); var tasks = new Task<string>[4]; tasks[0] = context.CallActivityAsync<string>("FunctionParallel_Hello", "onarimon"); tasks[1] = context.CallActivityAsync<string>("FunctionParallel_Hello", "programmer"); tasks[2] = context.CallActivityAsync<string>("FunctionParallel_Hello", "Hello"); tasks[3] = context.CallActivityAsync<string>("FunctionParallel_Hello", "World!!"); await Task.WhenAll(tasks); foreach (Task<string> task in tasks) { outputs.Add(task.Result); } outputs.Add(await context.CallActivityAsync<string>("FunctionParallel_Hello", "FunctionFinish")); return outputs; } [FunctionName("FunctionParallel_Hello")] public static string SayHello([ActivityTrigger] string name, ILogger log) { if (name == "onarimon") { System.Threading.Thread.Sleep(5000); } log.LogInformation($"{name}."); return $"{name}"; } } }
デバッグ実行してみる
デバッグ実行すると、前回同様statusQueryGetUri
が出力されるのでコピーして実行
出力結果のoutputの部分のみ抽出するとしっかりメッセージが出てることを確認できました。
output":["FunctionStart","onarimon","programmer","Hello","World!!","FunctionFinish"]
並列処理の関数を1つだけ遅延させたけど結局await Task.WhenAll(tasks);
の部分で並列処理の関数が全部終わるの待ってから結果出力してるから、
出てくる順番はコード通りなってますね。
まとめ
Durable Functionsで並列処理ができるようになると下記のようなメリットがあります。
今度はもっとまともな例で作れるように頑張ります。
通常の関数では、関数が複数のメッセージを 1 つのキューに送信することでファンアウトが行われます。 しかし、ファンインして戻すことはこれよりずっと難しくなります。 キューによってトリガーされる関数が終了し、関数の出力が格納される時間を追跡するように、コードを記述する必要があります。 Durable Functions 拡張機能は、比較的単純なコードでこのパターンを処理します。