ヽ(´・肉・`)ノログ

How do we fighting without fighting?

すごいE本をElixirでやる(43)

第14章 OTPの紹介 - 14.2 基本的なサーバ - 呼び出しの汎用化 から

サーバーの役割である,メッセージが来るのを待ち続け,来たら捌くという部分の抽象化がよくわかる.

第14章OTPの紹介

14.2 基本的なサーバ

呼び出しの汎用化

ソースコードで最初に気がつくのは同期呼び出しがどれも非常に似ていることです

## Synchronous call
def order_cat(pid, name, color, description) do
  ref = Process.monitor(pid)
  send(pid, {self, ref, {:order, name, color, description}})
  receive do
    {ref, cat} ->
      Process.demonitor(ref, [:flush])
      cat
    {:DOWN, ref, :process, pid, reason} ->
      raise(reason)
  after 5000 ->
    raise(:timeout)
  end
end

## Synchronous call
def close_shop(pid) do
  ref = Process.monitor(pid)
  send(pid, {self, ref, :terminate})
  receive do
    {ref, :ok} ->
      Process.demonitor(ref, [:flush])
      :ok
    {:DOWN, ref, :process, pid, reason} ->
      raise(reason)
  after 5000 ->
    raise(:timeout)
  end
end

確かに似ている.切り出すとこのようになる.

defmodule MyServer do
  def call(pid, msg) do
    ref = Process.monitor(pid)
    send(pid, {self, ref, msg})
    receive do
      {ref, reply} ->
        Process.demonitor(ref, [:flush])
        reply
      {:DOWN, ref, :process, pid, reason} ->
        raise(reason)
    after 5000 ->
      raise(:timeout)
    end
  end
end

MyServerを利用した,KittyServerはこうなる.

defmodule KittyServer do
  defmodule Cat, do: defstruct name: "", color: :green, description: ""

  ### Client API
  def start_link, do: spawn_link(&init/0)

  ## Synchronous call
  def order_cat(pid, name, color, description) do
    MyServer.call(pid, {name, color, description})
  end

  ## This call is asynchronous
  def return_cat(pid, cat = %Cat{}) do
    send(pid, {:return, cat})
    :ok
  end

  ## Synchronous call
  def close_shop(pid) do
    MyServer.call(pid, :terminate)
  end

  ### Server functions
  def init, do: loop([])

  def loop(cats) do
    receive do
      {pid, ref, {:order, name, color, description}} ->
        case cats do
          [] ->
            send(pid, {ref, make_cat(name, color, description)})
            loop(cats)
          [h|t] ->
            send(pid, {ref, h})
            loop(t)
        end
      {:return, cat = %Cat{}} ->
        loop([cat|cats])
      {pid, ref, :terminate} ->
        send(pid, {ref, :ok})
        terminate(cats)
      unknown ->
        ## Do some logging here too.
        IO.puts("Unknown message: #{inspect unknown}")
        loop(cats)
    end
  end

  defp make_cat(name, color, description), do: %Cat{name: name, color: color, description: description}
  defp terminate(cats) do
    for %Cat{name: name} <- cats, do: IO.puts("#{name} was set free.")
    :ok
  end
end

サーバループの汎用化

これまでに書いたプロセスにはいずれも、全メッセージがパターンマッチされるループがあったことに注目しましょう。 ちょっと微妙な箇所ですが、ここはパターンマッチをループ自身から切り離す必要があります。

こんな感じで書けばひとまず動くだろう.

def loop(module, state) do
  receive do
    message -> module.handle(message, state)
  end
end

def handle(message1, state), do: new_state1
def handle(message2, state), do: new_state2
# ...
def handle(messageN, state), do: new_stateN

私たちの汎用サーバの実装で、呼び出しが同期なのか非同期なのかを明確にできれば、きっと役立つでしょう。

なるほど.同期 sync と非同期 async の処理を同じ loop で扱えるようにする.

defmodule MyServer do
  def call(pid, msg) do
    ref = Process.monitor(pid)
    send(pid, {:sync, self, ref, msg}) # <- ここに :sync を追加
    receive do
      {ref, reply} ->
        Process.demonitor(ref, [:flush])
        reply
      {:DOWN, ref, :process, pid, reason} ->
        raise(reason)
    after 5000 ->
      raise(:timeout)
    end
  end
end

MyServer へ非同期用の関数 cast も用意する.

def cast(pid, msg) do
  send(pid, {:async, msg})
  :ok
end

loop を非同期 :async にも対応させよう.

def loop(module, state) do
  receive do
    {:async, msg} ->
      loop(module, module.handle_cast(msg, state))
    {:sync, pid, ref, msg} ->
      loop(module, module.handle_call(msg, pid, ref, state)
  end
end

この loop の残念なところは、抽象化が漏れている点です。 my_server を使うプログラマは、同期メッセージを送ったり返信したりするとき、依然として参照について知っていなければなりません。

同期メッセージで「 pidref のことを意識してプログラミングしないといけない」という点についての話だ. from というグルーピングをして,そこに pidref を含めてしまおう.具体的にはタプル {} でひとまとめにする.

そうすると handle_call(msg, pid, ref, state)handle_call(msg, from, state) という形にすっきりさせることができる.

def loop(module, state) do
  receive do
    {:async, msg} ->
      loop(module, module.handle_cast(msg, state))
    {:sync, pid, ref, msg} ->
      loop(module, module.handle_call(msg, {pid, ref}, state)
  end
end

これでプログラマは、変数の内部について知る必要がなくなります。 その代わり、私たちのほうで、From に何が含まれているかを把握している関数を用意しておきます。

ここで書いているプログラマというのは,ライブラリ(MyServer)利用者ということだろう. 私たちというのは,ライブラリ(MyServer)製作者だな. From に含まれている情報をうまく扱う reply 関数を作る.

def reply({pid, ref}, reply) do
  send(pid, {ref, reply})
end

以上の変更を踏まえると MyServer はこうなる.

defmodule MyServer do
  def call(pid, msg) do
    ref = Process.monitor(pid)
    send(pid, {self, ref, msg})
    receive do
      {ref, reply} ->
        Process.demonitor(ref, [:flush])
        reply
      {:DOWN, ref, :process, pid, reason} ->
        raise(reason)
    after 5000 ->
      raise(:timeout)
    end
  end

  def cast(pid, msg) do
    send(pid, {:async, msg})
    :ok
  end

  def reply({pid, ref}, reply) do
    send(pid, {ref, reply})
  end

  def loop(module, state) do
    receive do
      {:async, msg} ->
        loop(module, module.handle_cast(msg, state))
      {:sync, pid, ref, msg} ->
        loop(module, module.handle_call(msg, {pid, ref}, state)
    end
  end
end
このエントリーをはてなブックマークに追加