RPC は Nova の各種サービス(api, compute, scheduler 等)が相互に通信を行う為に使用する通信機構であり、分散システムである Nova の根幹となる部分です。 RPC 関連のコードは nova-2012/nova/rpc/ 配下にあります。
nova-2012.1
nova/
rpc/
__init__.py
amqp.py
common.py
impl_carrot.py Carrot ドライバ(AMQP 0.9)
impl_fake.py ダミードライバ
impl_kombu.py Kombu ドライバ(AMQP 0.9)
impl_qpid.py Apache Qpidドライバ(AMQP 0.10)
ドライバ | 説明 |
Carrotドライバ | 最古の RPC ドライバで RabbitMQ 専用です。途中若干の変遷 はありましたが Nova 初期公開時から使われています。 |
Kombu ドライバ | Diablo リリースで登場しました。RabbitMQ 以外にも RDBMS や Redis のような多彩なソフトウェアをメッセージキューと して利用出来ます。 |
Apache Qpid ドライバ | Apache Qpid 用のドライバです。 Essex リリースで新登場し ました。 |
ダミードライバ | ユニットテスト用であり、外部プログラム/ライブラリに依存 しない簡易通信を提供します。 |
では、ここでテスト用のダミードライバである impl_fake.py を覗いてみましょう。 (比較的行数が少なく読みやすいので)
class RpcContext(context.RequestContext):
def __init__(self, *args, **kwargs):
def reply(self, reply=None, failure=None, ending=False):
class Consumer(object):
def __init__(self, topic, proxy):
def call(self, context, method, args, timeout):
def _inner():
class Connection(object):
def __init__(self):
def create_consumer(self, topic, proxy, fanout=False):
def close(self):
def consume_in_thread(self):
def create_connection(new=True):
def check_serialize(msg):
def multicall(context, topic, msg, timeout=None):
def call(context, topic, msg, timeout=None):
def cast(context, topic, msg):
def notify(context, topic, msg):
def cleanup():
def fanout_cast(context, topic, msg):
create_connection() でメッセージキューへの接続を行い、cast(), call(), multicast(), fanout_cast() 等で通信を送ります。
単にメッセージを送るだけなのに、多数のメソッドがあります。これらの違いは何でしょうか? docstring を引用してみます(一部 impl_kombu.py より)。
メソッド | 説明 |
call() | Sends a message on a topic and wait for a response. |
multicall() | Make a call that returns multiple times |
cast() | Sends a message on a topic without waiting for a response. |
fanout_cast() | Cast to all consumers of a topic |
上記の表の中で topic という単語が出てきますが、これは scheduler, compute, volume, network などのサービスの種類を示しています。
call(), multicall() は同期通信であり、送信したメッセージに対する応答メッセージを待ち受けます。一方、cast(), multicall() は非同期通信であり、メッセージを送信しても応答メッセージを待ち受けません。
Note
ダミードライバ(impl_fake.py)では非同期通信が実現できません。つまり、cast() == call() です。
また、call(), cast(), multicall() は通信相手が1つなのに対して、fanout_cast() は特定の topic を持つ全てのノードが通信相手になります。 更に、call(), multicall(), cast() では通信相手を特定する事もできますし、特定しない(topic だけ)事もできます。特定しない場合はその topic を持つサービスの1つが受け取ります。
RPC 呼出は各サービス用ディレクトリ中の api.py で行われます。
rpc.call(<コンテキスト>, <キュー>,
{"method": <メソッド名>, "args": <辞書型化された引数群>})
1つ例を挙げてみましょう。 VM インスタンスにボリュームをアタッチする操作(volume.api.API.attach())です。
volume/api.py(呼び側)
1 2 3 4 5 6 7 8 9 10 11 | class API(base.Base):
@wrap_check_policy
def attach(self, context, volume, instance_id, mountpoint):
host = volume['host']
queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "attach_volume",
"args": {"volume_id": volume['id'],
"instance_id": instance_id,
"mountpoint": mountpoint}})
|
上記の例では、6行目の self.db.queue_get_for() にて host で指定されたホスト上の volume サービスへのキューを取得し、7行目の rpc.call() で attach_volue メソッドを同期リクエストとして発行しています。 attach_volume メソッドの引数は volume_id=volume[‘id’], instance_id=instance_id, mountopoint=mountpoint となります。
volume/manager.py(呼ばれ側)
1 2 3 4 5 6 7 8 9 | class VolumeManager(manager.SchedulerDependentManager):
def attach_volume(self, context, volume_id, instance_id, mountpoint):
"""Updates db to show volume is attached"""
# TODO(vish): refactor this into a more general "reserve"
self.db.volume_attached(context,
volume_id,
instance_id,
mountpoint)
|