Deferred の作り方

Twisted Documentation の日本語訳

  1. クラスの概要
  2. Deferred がコードの非同期化を自動的にやってくれるわけではありません
  3. 高度な処理チェーン制御
  4. 同期的な関数から Deferred を返す
  5. ブロックが発生するコードと Twisted の結合
  6. 注意事項

Deferred とは、関数を呼び出したときにその結果データがまだ得られていないということを通知するオブジェクトです。Deferred を返す関数は、データを取得できたときそれを処理するコールバックをあらかじめ Deferred に登録してから返します。

この文書は、Deferred を返す関数の書き方、Deferrd オブジェクトの構築方法について解説します。Deferred を用いたプロブラムは、データの到着待ちが必要なときもブロックさせず即座にリターンし、データが到着したらコールバックが起動されるようになっています。

なおこの文書は Twisted で用いられる非同期モデル関数が返す Deferred の使い方について理解していることが前提になっています。

クラスの概要

Deferred を作成し、コールバックやエラーバックを起動するという観点で記述した API の解説です。Deferred クラスの docstring の代用品ではなく、使用する上でのガイドラインとしてのなるように意図して書かれたものです。

Deferred を返す関数を呼び出す側の観点からの概要は Deferred の使い方に書かれています。

基本的なコールバック処理メソッド

Deferred がコードの非同期化を自動的にやってくれるわけではありません

Deferred はコードがブロックしないような処理を自動的にしてくれるわけではありません。

次の関数がその例です。

from twisted.internet import defer
    
TARGET = 10000

def largeFibonnaciNumber():
    # この関数が返す Deferred を作成
    d = defer.Deferred()

    # 10000項のフィボナッチ数を計算する

    first = 0
    second = 1

    for i in xrange(TARGET - 1):
        new = first + second
        first = second
        second = new
        if i % 100 == 0:
            print "進行状況: %d番目のフィボナッチ数" % i

    # Deferred の callback に答えを渡す
    d.callback(second)

    # 答えを登録した Deferred を返す
    return d

import time

timeBefore = time.time()

# call the function and get our Deferred
d = largeFibonnaciNumber()

timeAfter = time.time()

print "largeFibonnaciNumber 呼び出しにかかった時間合計: %0.3f 秒" %
      (timeAfter - timeBefore)

# 答えを表示するためのコールバックを追加

def printNumber(number):
    print "フィボナッチ数%d番目は %d" % (TARGET, number)

print "コールバック追加中"

d.addCallback(printNumber)

実行してみると largeFibonnaciNumber が Deferred を返しているにも関わらず次のような結果となります。

この関数は計算を完了するまでリターンしません。つまり処理が完了するまでブロックしており非同期関数としては機能していません。Deferred は何でも非ブロック化してくれる魔法の杖ではありません。非同期関数がコールバックを関連付けて返す通知として用いられるのが Deferred であり、 Deferred を返すからといってその関数が非同期的に処理をしているとは限らないのです。

高度な処理チェーン制御

同期的な関数から Deferred を返す

同期的な関数から Deferred を返したい場合もあります。よくあるのは、 Deferred を返す別バージョンの関数を作成したときの API 互換性チェック、関数を作成するにあたって非同期にするかどうかを判定したいときなどです。

Deferred リファレンス において、同期的な関数を扱う例として次のコードを使用しました。

def synchronousIsValidUser(user):
    '''
    Return true if user is a valid user, false otherwise
    '''
    return user in ["Alice", "Angus", "Agnes"]

maybeDeferred を使うことにより、 同期的な結果を呼び出し側で Defrred にラップすることも可能です。しかし defer.succeed を使って関数が Deferred を返すようにした方が、API の互換性という点で望ましいでしょう。

from twisted.internet import defer

def immediateIsValidUser(user):
    '''
    ユーザの認証に成功すれば True、そうでないときは False を Deferred
    に結果としてセットして返す
    '''
    
    result = user in ["Alice", "Angus", "Agnes"]
    
    # result を引数にセットし、コールバック実行済みの Deferred として返す
    return defer.succeed(result)

同様にエラーバック実行済みの Deferred を返す場合のメソッドとしては defer.fail があります。

ブロックが発生するコードと Twisted の結合

サードパーテイのライブラリには長い時間ブロックしてしまう関数がたくさんありますが、そうした関数を呼び出す必要に迫られることもあるでしょう。残念ながら、意図して非同期関数を書かない限り自動的に非同期化する方法はありません。Twisted を使う場合、自分で書くコードは非同期的なものにできますが、サードパーティの関数については、自分で同等のものを書き直す以外方法がないのです。

このような場合のために、Twisted はブロックするコードを別スレッドで実行することにより、アプリケーション全体のブロックを防ぐ方法を提供しています。 twisted.internet.threads.deferToThread 関数はブロッキング関数を実行するためのスレッドを用意します。この関数は Deferred を返し、スレッドが完了した時点で Deferred を有効にします。

たとえば先に紹介した largeFibonnaciNumber を仮に(Deferred ではなく計算結果を返す)サードパーティのライブラリ関数であり、部分的な改変ができないと想定してみましょう。次の例は関数を別スレッドで実行します。先の例とは異なり、ブログラム全体がブロックすることはありません。

def largeFibonnaciNumber():
    """
    長時間ブロックする関数の例
    TARGET 番目のフィボナッチ数を求める
    """
    TARGET = 10000

    first = 0
    second = 1

    for i in xrange(TARGET - 1):
        new = first + second
        first = second
        second = new

    return second

from twisted.internet import threads, reactor

def fibonacciCallback(result):
    """
    largeFibonnaciNumber 関数の結果を画面出力するコールバック
    """
    print "largeFibonnaciNumber result =", result
    # make sure the reactor stops after the callback chain finishes,
    # just so that this example terminates
    reactor.stop()

def run():
    """
    largeFibonnaciNumber を別スレッドで実行、コールバックを追加した後、
    さらにいくつかの処理をおこなう
    """
    # largeFibonnaciNumber の結果とともに呼び出される Deferred を得る
    d = threads.deferToThread(largeFibonnaciNumber)
    # 結果を出力するコールバックを追加
    d.addCallback(fibonacciCallback)
    # largeFibonnaciNumber のスレッドがものすごく早く完了しない限り
    # 以下の内容が出力されるはず
    print "コールバック追加後の1行目"
    print "コールバック追加後の2行目"

if __name__ == '__main__':
    run()
    reactor.run()

注意事項

Deferred はコールバック登録の標準的方法を提供することにより、非同期的な処理の記述をきわめてシンプルなものにします。しかしその利用にあたっては、若干難解で分かりにくいルールに従わなくてはなりません。ただしこれは Deferred を使って新しいシステムを書く場合のことであり、既存のシステムで生成された Deferred にコールバックを追加するアプリケーションを書く場合は問題にならないことですが、知っておいて損はないでしょう。

Deferred を有効にできるのは1度だけ

Deferred は1度限りのものです。Deferred.callback または Deferred.errback を呼び出せるのは1度だけです。呼び出し後の Deferred に新たなコールバックを追加すると、処理チェーンはその都度伸びていきます。

同期的なコールバックの実行

Deferred が既に結果を受け取った段階になってから addCallback を実行した場合、コールバックが同期的に実行される可能性があります。つまりコールバック追加後、即座に実行されるのです。コールバックが何らかの状態を変更する場合、すべてのコールバックの追加が完了するまでチェーンの実行を止めておきたいことがあります。このような場合は pauseunpause を使用すると、大量のコールバックを登録しなければならないときも登録が完了するまで Deferred の処理チェーンを止めておくことができます。

これらのメソッドは充分注意して使ってください。Deferred を pause で停止した場合、その再開は必ずあなた自身の責任となります。使いやすさを損なわないように、 Deferred の停止と再開は必ずコールバックを追加する関数内で実施し、 callbackerrback を呼び出す側のコードでは絶対に処理しないようにしましょう。