RPC:投げっぱなしと返事待ち

RPC は Nova の各種サービス(api, compute, scheduler 等)が相互に通信を行う為に使用する通信機構であり、分散システムである Nova の根幹となる部分です。 RPC 関連のコードは nova-2012/nova/rpc/ 配下にあります。

nova-2012.1
  nova/
    rpc/
      __init__.py
      amqp.py
      common.py
      impl_carrot.py  Carrot ドライバ(AMQP 0.9)
      impl_fake.py    ダミードライバ
      impl_kombu.py   Kombu ドライバ(AMQP 0.9)
      impl_qpid.py    Apache Qpidドライバ(AMQP 0.10)
ドライバ 説明
Carrotドライバ 最古の RPC ドライバで RabbitMQ 専用です。途中若干の変遷 はありましたが Nova 初期公開時から使われています。
Kombu ドライバ Diablo リリースで登場しました。RabbitMQ 以外にも RDBMS や Redis のような多彩なソフトウェアをメッセージキューと して利用出来ます。
Apache Qpid ドライバ Apache Qpid 用のドライバです。 Essex リリースで新登場し ました。
ダミードライバ ユニットテスト用であり、外部プログラム/ライブラリに依存 しない簡易通信を提供します。

では、ここでテスト用のダミードライバである impl_fake.py を覗いてみましょう。 (比較的行数が少なく読みやすいので)

class RpcContext(context.RequestContext):
    def __init__(self, *args, **kwargs):
    def reply(self, reply=None, failure=None, ending=False):
class Consumer(object):
    def __init__(self, topic, proxy):
    def call(self, context, method, args, timeout):
        def _inner():
class Connection(object):
    def __init__(self):
    def create_consumer(self, topic, proxy, fanout=False):
    def close(self):
    def consume_in_thread(self):
def create_connection(new=True):
def check_serialize(msg):
def multicall(context, topic, msg, timeout=None):
def call(context, topic, msg, timeout=None):
def cast(context, topic, msg):
def notify(context, topic, msg):
def cleanup():
def fanout_cast(context, topic, msg):

create_connection() でメッセージキューへの接続を行い、cast(), call(), multicast(), fanout_cast() 等で通信を送ります。

単にメッセージを送るだけなのに、多数のメソッドがあります。これらの違いは何でしょうか? docstring を引用してみます(一部 impl_kombu.py より)。

メソッド 説明
call() Sends a message on a topic and wait for a response.
multicall() Make a call that returns multiple times
cast() Sends a message on a topic without waiting for a response.
fanout_cast() Cast to all consumers of a topic

上記の表の中で topic という単語が出てきますが、これは scheduler, compute, volume, network などのサービスの種類を示しています。

call(), multicall() は同期通信であり、送信したメッセージに対する応答メッセージを待ち受けます。一方、cast(), multicall() は非同期通信であり、メッセージを送信しても応答メッセージを待ち受けません。

Note

ダミードライバ(impl_fake.py)では非同期通信が実現できません。つまり、cast() == call() です。

また、call(), cast(), multicall() は通信相手が1つなのに対して、fanout_cast() は特定の topic を持つ全てのノードが通信相手になります。 更に、call(), multicall(), cast() では通信相手を特定する事もできますし、特定しない(topic だけ)事もできます。特定しない場合はその topic を持つサービスの1つが受け取ります。

RPC 呼出

RPC 呼出は各サービス用ディレクトリ中の api.py で行われます。

rpc.call(<コンテキスト>, <キュー>,
         {"method": <メソッド名>, "args": <辞書型化された引数群>})

1つ例を挙げてみましょう。 VM インスタンスにボリュームをアタッチする操作(volume.api.API.attach())です。

volume/api.py(呼び側)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class API(base.Base):

    @wrap_check_policy
    def attach(self, context, volume, instance_id, mountpoint):
        host = volume['host']
        queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
        return rpc.call(context, queue,
                        {"method": "attach_volume",
                         "args": {"volume_id": volume['id'],
                                  "instance_id": instance_id,
                                  "mountpoint": mountpoint}})

上記の例では、6行目の self.db.queue_get_for() にて host で指定されたホスト上の volume サービスへのキューを取得し、7行目の rpc.call() で attach_volue メソッドを同期リクエストとして発行しています。 attach_volume メソッドの引数は volume_id=volume[‘id’], instance_id=instance_id, mountopoint=mountpoint となります。

volume/manager.py(呼ばれ側)

1
2
3
4
5
6
7
8
9
class VolumeManager(manager.SchedulerDependentManager):

    def attach_volume(self, context, volume_id, instance_id, mountpoint):
        """Updates db to show volume is attached"""
        # TODO(vish): refactor this into a more general "reserve"
        self.db.volume_attached(context,
                                volume_id,
                                instance_id,
                                mountpoint)

Table Of Contents

Previous topic

各サービスのサーバ・クライアント機構

Next topic

肥大化するデータベース

This Page