ヽ(´・肉・`)ノログ

How do we fighting without fighting?

すごいE本をElixirでやる(37)

第 13 章 並行アプリケーションを設計する - 13.5 イベントサーバ - メッセージを処理する から

13.5 イベントサーバ

メッセージを処理する

イベント追加

次に考えるべきメッセージはイベントを追加するときのものです。現状、エラーステータスが返せます。 ここで行う唯一の検証は、受け取ったタイムスタンプの確認です。 {{〔年〕,〔月〕,〔日〕}, {〔時〕,〔分〕,〔秒〕}}の形で受け取るのは簡単ですが、 閏年でないのに 2 月 29 日のイベントを受け取るなど、存在しない日付のイベントは絶対に受け付けないようにしないといけません。

NaiveDateTime.from_iso8601!/1 は valid な日付をチェックしてくれていそうだ. ただし,閏秒に対応するために秒が 60 になるのは ISO8601 の仕様で許容されているみたい.

iex(13)> ~N[2016-02-29 23:00:00]
~N[2016-02-29 23:00:00]
iex(14)> ~N[2016-02-29 24:00:00]
** (ArgumentError) cannot parse "2016-02-29 24:00:00" as naive date time, reason: :invalid_time
    (elixir) lib/calendar.ex:817: NaiveDateTime.from_iso8601!/1
    (elixir) expanding macro: Kernel.sigil_N/2
             iex:14: (file)
iex(14)> ~N[2015-02-29 23:00:00]
** (ArgumentError) cannot parse "2015-02-29 23:00:00" as naive date time, reason: :invalid_date
    (elixir) lib/calendar.ex:817: NaiveDateTime.from_iso8601!/1
    (elixir) expanding macro: Kernel.sigil_N/2
             iex:14: (file)
iex(14)> ~N[2016-02-29 23:00:60]
~N[2016-02-29 23:00:60]

クライアントで NaiveDateTime を作るときにエラーになり,サーバーに渡される Struct は valid なものだけになるので, サーバーでは valid な datetime かはチェックしない.

{pid, msg_ref, {:add, name, description, time_out}} ->
  event_pid = Event.start_link(name, time_out)
  new_events = Map.put_new(state.events, name, %EveSrv.Event{name: name,
                                                             description: description,
                                                             pid: event_pid,
                                                             time_out: time_out})
  send(pid, {msg_ref, :ok})
  loop(%{state | events: new_events})

こんな感じだろう.

イベントキャンセル

イベントがプロセスの state レコードに存在するかどうかを確認するだけです。 もし存在していたら、定義した event:cancel/1 関数を使ってイベントを殺し、 ok を送ります。 もし見つからなかったら、イベントが動作していないということであり、これはユーザが望んでいる状況なので、何も問題はないとユーザに伝えます。

{pid, msg_ref, {:cancel, name}} ->
  events = case Map.fetch(state.events, name) do
             {:ok, e} ->
               Event.cancel(e.pid)
               Map.delete(state.events, name)
             :error ->
               state.events
           end
  send(pid, {msg_ref, :ok})
  loop(%{state | events: events})

イベントのタイムアウト

サーバとイベント自身の間でやり取りされるメッセージの処理を書いていきましょう。 扱うメッセージは 2 つ、イベントのキャンセル(もう実装しました)と、イベントのタイムアウトです。 後者のメッセージは{done, Name}です。

{pid, msg_ref, {:cancel, name}} ->
  events = case Map.fetch(state.events, name) do
             {:ok, e} ->
               Event.cancel(e.pid)
               Map.delete(state.events, name)
             :error ->
               state.events
           end
  send(pid, {msg_ref, :ok})
  loop(%{state | events: events})

残り

あと残っているのは、クライアントが落ちた、シャットダウン、コードのアップグレードなど、さまざまなステー タスメッセージの扱いです。

最初のケース(shutdown)はかなり安直です。kill メッセージを受け取り、プロセスが死にます。

‘DOWN’ メッセージの動作もかなり単純です。これはクライアントが死んだことを意味しているので、state 内のクライアントのリストから削除します。

Unknown メッセージは、実際の製品アプリケーションではロギングモジュールを使うことになると思いますが、デバッグ目的で io:format/2 により表示させているだけです。

:shutdown ->
  exit(:shutdown)
{:DOWN, ref, :process, _pid, _reason} ->
  loop(%{state | clients: Map.delete(state.clients, ref)})
:code_change ->
  :do_something
:unknown ->
  IO.puts("Unknown message: #{inspect unknown}")
  loop(state)

これで :code_change 以外はひととおり実装したことになる.

defmodule EveSrv do
  defmodule State, do: defstruct events: %{}, clients: %{}
  defmodule Event, do: defstruct name: "", description: "", pid: nil, time_out: ~N[1970-01-01 00:00:00]

  def init do
    # Loading events from a static file could be done here.
    # You would need to pass an argument to init telling where the
    # resource to find the events is. Then load it from here.
    # Another option is to just pass the events straight to the server
    # through this function.
    loop(%State{})
  end

  def loop(state=%State{}) do
    receive do
      {pid, msg_ref, {:subscribe, client}} ->
        ref = Process.monitor(client)
        new_clients = Map.put_new(state.clients, ref, client)
        send(pid, {msg_ref, :ok})
        loop(%{state | clients: new_clients})
      {pid, msg_ref, {:add, name, description, time_out}} ->
        event_pid = Event.start_link(name, time_out)
        new_events = Map.put_new(state.events, name, %EveSrv.Event{name: name,
                                                                   description: description,
                                                                   pid: event_pid,
                                                                   time_out: time_out})
        send(pid, {msg_ref, :ok})
        loop(%{state | events: new_events})
      {pid, msg_ref, {:cancel, name}} ->
        events = case Map.fetch(state.events, name) do
                   {:ok, e} ->
                     Event.cancel(e.pid)
                     Map.delete(state.events, name)
                   :error ->
                     state.events
                 end
        send(pid, {msg_ref, :ok})
        loop(%{state | events: events})
      {:done, name} ->
        case Map.fetch(state.events, name) do
          {:ok, e} ->
            send_to_clients(state.clients, {:done, e.name, e.description})
            new_events = Map.delete(state.events, name)
            loop(%{state | events: new_events})
          :error ->
            # This may happen if we cancel an event and
            # it fires at the same time.
            loop(state)
        end
      :shutdown ->
        exit(:shutdown)
      {:DOWN, ref, :process, _pid, _reason} ->
        loop(%{state | clients: Map.delete(state.clients, ref)})
      :code_change ->
        :do_something
      :unknown ->
        IO.puts("Unknown message: #{inspect unknown}")
        loop(state)
    end
  end

  def send_to_clients(clients, msg) do
    Map.values(clients) |> Enum.each(&(send(&1, msg)))
  end
end