LastaFluteで非同期処理 (Async)

LastaFluteの特徴の一つです。

外部通信などは非同期で

戻ってこないと悲しい

メール送信など外部通信が入る処理は、あまり画面のスレッドで実行するのではなく、別のスレッドに投げて画面のレスポンスはすぐに戻してあげたいと思うことがたくさんがあります。

外部通信は、物理的な要因で突如遅くなったり戻ってこなかったりする可能性があります。 そのとき、画面を待っているユーザーにとってはあまりよいことではありませんし、 場合によってはトランザクション貼ったDBのコネクションを保持したままのケースもあるでしょう。 リクエスト数が多くなって負荷がかかって、そういったスレッドがたくさん発生すると、DBのコネクションが枯渇してサイトにアクセスできなくなる可能性があります。

それを回避するために、外部通信の処理は非同期で実行したいと考えることがあります。

async側のエラーを許容できる?

ただし、同期でなくなるということは、外部通信の処理でエラーが発生しても呼び出し元のスレッドはそれを検知することができないので、 非同期側のエラー処理はしっかりトレースしやすい形にしておく必要がありますし、それが許容されるケースに限ります。

実装の仕方

async()のコールバックが非同期

AsyncManager の async() メソッドの引数で指定したコールバックが非同期になります。

e.g. Actionクラスで AsyncManager を DI して async() で非同期処理 @Java
@Resource
private AsyncManager asyncManager;

@Execute
public HtmlResponse index() {
    asyncManager.async(() -> {
        // ここが非同期 (メール送信とか)
    });
    // 呼び出しスレッドは先に進む
    ...
}

非同期処理でトランザクションは?

もし、非同期処理で新たにトランザクションを発行したいときは、TransactionStage の requiresNew() を async() の中で使います。

e.g. 非同期処理でトランザクション @Java
@Resource
private AsyncManager asyncManager;

@Resource
private TransactionStage transactionStage;

@Execute(...)
public void index() {
    asyncManager.async(() -> {
        transactionStage.requiresNew(tx -> {
            // 更新処理とか
        });
    });
}

非同期処理の最初はトランザクションかかってないので required() でも同じですが、required() だと "あれっ、外側の引き継いでるのかな?...ん?" と迷いやすいので、このケースでは迷いのない requiresNew() をオススメしています。

もろもろ引き継ぎ

スレッドキャッシュを引き継ぎ

あまり知られていない機能ですが(積極的に使うものでもない)、LastaFluteが提供するスレッドキャッシュが非同期スレッドにも引き継がれます。 Mapの内容がまるごとコピーされると考えていいです。

AccessContextを引き継ぎ

非同期処理でDBに検索したり更新したりすることもあるでしょう。 DBFlute の AccessContext が引き継がれるので、トランザクションさえ発行すれば更新処理が可能です。

ただ、AccessContext がコールバック形式の Provider として設定されている場合 (LastaFluteのデフォルトではそうなっている) は、Provider 自体が引き継がれるのではなく、async呼び出し時点の Provider の provideXxx() の戻り値が固定的にコピーされます。 Provider を複数スレッドで共有すると、そのコールバックがスレッドセーフになっていないとおかしなことになる可能性があるので、そのようにしています。 Provider形式だとしても、呼ばれるたびに違う値が戻ってくるような作り方をすることはまずないと考えられ、かつ、 共通カラムのトレース値なのでそれが問題になることがないという割り切りです。

CallbackContextは選択式

CallbackContextは、デフォルトでは引き継がず選択式になっています。

async()の引数の FunctionInterface のデフォルトメソッドで、何を引き継ぐかをoptionで指定できます。 例えば、BehaviorCommandHook を引き継ぎたいときは、ConcurrentAsyncOption#inheritBehaviorCommandHook()を指定します。

e.g. 非同期処理でBehaviorCommandHookを引き継ぐ @Java
asyncManager.async(new ConcurrentAsyncCall() {
    public void callback() { // この場合、Lambdaは使えない
        sea();
    }

    @Override
    public ConcurrentAsyncOption option() {
        return new ConcurrentAsyncOption().inheritBehaviorCommandHook();
    }
});

ただ、SqlStringFilterに関しては、何も指定しなくても非同期処理としてデフォルトの filter が登録されます。それにより、async側のSQLでも、呼び出し側のActionクラスなどの情報が埋め込まれます。

CallbackContextをデフォルトで引き継いでいないのは、コールバックの実装がスレッドセーフになっているかどうかが不明だからです。 そこはアプリに判断してもらう必要があるということで選択式になっていますし、CallbackContextの処理は非同期では不要であるケースもありえると考えました。 (SQLの呼び出し情報だけは欲しいので、デフォルトの実装をいれている)

スレッドはプールされている

非同期処理のためのスレッドは、TomcatやJettyとは別のスレッドプールで管理されています。

デフォルトでは、2+10スレッド

デフォルトでは、最大12スレッド (Primary と Secondary の両方使うなら22スレッド、詳しくは後述) が同時に存在する可能性があります。 そのうちの2スレッドは、非同期スレッドの待機用のスレッドで、残りの10スレッドが実際に非同期処理を実行するスレッドです。

まったく非同期処理のリクエストがない場合は、10スレッドは破棄されます。 一方で待機用の2スレッドは、固定で存続します。ただし、アプリ起動時点から一度も非同期処理のリクエストがなければ、待機スレッドも作成されません。 つまり、待機スレッドは初めてのリクエストのときに初期化されるようになっています。

スレッド待ちでも呼び出し側は待たない

リクエスト数が多くスレッドが足りない状態でさらにリクエストが来た場合、別の非同期処理が使っているスレッドの戻りを待つことになりますが、 ただ、async()を呼び出したスレッドは待ちません。待機スレッドが代わりにスレッドの戻りを待ちます。 (待機スレッド自体のスレッドプールの Queue の)

Primary, Secondary選べる

非同期処理用のスレッドプールは二つ用意されており、デフォルトではすべて Secondary のスレッドプールで実行されます。業務上優先度の高いものがあれば、Primaryのスレッドプールを使うことができます。

ConcurrentAsyncOption#asPrimary()でtrueを戻すと、Primaryでの実行になります。

e.g. 非同期処理をPrimaryのスレッドで実行 @Java
asyncManager.async(new ConcurrentAsyncCall() {
    public void callback() { // この場合、Lambdaは使えない
        sea();
    }

    @Override
    public boolean asPrimary() {
        return true;
    }
});

例えば、業務上の優先度が低いリクエストがスレッドをすべて占有していると、すぐに実行したい優先度の高いリクエストが待つことになります。 そこで、優先度の高いリクエストであることがあらかじめわかっているのであれば、Primaryのスレッドプールを使うようにすることで、スレッド待ちのリスクを減らすことができます。 ただ、スレッドを割り当てた後、PrimaryのスレッドがSecondaryのスレッドよりも先に実行されるとは限りません。 割り込み機能は実装されていません。(将来的な拡張の余地はあり)

相変わらずログが気合い入ってる

非同期処理でもログは頑張っています。

非同期でエラーが発生したら?

非同期処理の中で例外が発生したときに、スレッドの 業務的な状態 がスタックトレースと共にエラーログとして記録されるようにしています。

e.g. asyncの中で例外が発生したときのログ @Log
... ERROR ... - Failed to callback the asynchronous process.
/= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =: /login/
  callbackInterface=...logic.SeaLogic$$Lambda$11/1919928341@3daeb6a1
  ; requestPath=/login/
  ; entryMethod=...web.login.SigninAction#index()
  ; userBean={userId=1, sync=2018/03/27 19:26:38}@144e2f8b
  ; accessContext=AccessContext:{localDateTimeProvider=org.docksidestage.app.base.MaihamaBaseAction$1$$Lambda$4/2105624763@3b76c7e6, user=M:-1,DCK,SigninAction}
  ; callbackContext=CallbackContext:{behaviorCommandHook=null, sqlFireHook=null, sqlLogHandler=null, sqlResultHandler=null, sqlStringFilter=org.dbflute.bhv.proposal.callback.SimpleTraceableSqlStringFilter@1d1c4db}
  ; sqlCount={total=3, selectCB=3, entityUpdate=0, queryUpdate=0, outsideSql=0, procedure=0}
= = = = = = = = = =/ [00m3s385ms] #2caaa834
java.lang.IllegalStateException: sealand
	at org.dbflute.maihama....(...java:98)
	at org.dbflute.maihama....(...java:432)
	at org.dbflute.maihama....(...java:24)
...
  • async()を呼び出したリクエストのURL
  • async()を呼び出したリクエストのActionクラスとメソッド
  • そのときのログインユーザー
  • フレームワークの ThreadLocal の状態
  • その非同期処理の中での、SQLの発行回数
  • その非同期処理の中での、メールの送信回数
  • その非同期処理が始まってから落ちるまでの処理時間

非同期を示すハッシュタグ

非同期処理の開始と終了のデバッグログで、#async でログがでるようになっています。

e.g. asyncの呼び出しログのハッシュタグ @Log
#flow #async ...Running asynchronous call as secondary@221a3fa4
...
(非同期の処理が実行される)
...
#flow #async ...Finishing asynchronous call as secondary@221a3fa4: 00m03s056ms

as secondaryであれば、Secondaryのスレッドプールを利用しての実行となります。そのときのスレッドプールのハッシュ値も表示されます。 これは、アプリ起動時に初期化されるスレッドプールのハッシュ値とリンクしています。

終わったとき、その非同期処理がどのくらいの時間かかったのかも表示されます。

SQLの中に呼び出しActionの情報

非同期処理で発行したSQLにも、async()を呼び出したリクエストに対応するActionクラスが組み込まれます。 加えて、その async() のコールバックのクラスも表示されます。

例えば、以下は、SigninActionのindex()メソッドがリクエストされて、そこから AbcLogic を経由して async() が呼び出され、その非同期処理で検索されたことがわかります。

e.g. asyncの呼び出しログのハッシュタグ @Log
select dfloc.MEMBER_ID as MEMBER_ID, dfloc.MEMBER_NAME as ...
  from MEMBER dfloc
 where dfloc.MEMBER_ID = 3
-- ...login.SigninAction#index(): (via SeaLogic$$Lambda$26/1161854698)

AssistantDirectorで拡張

もろもろの設定を、AssistantDirectorで拡張できます。

FwCoreDirection#directAsync()にて、CallbackContextの引き継ぎ設定や、スレッドプールのサイズを指定することができます。