ヽ(´・肉・`)ノログ

How do we fighting without fighting?

すごいE本をElixirでやる(41)

第 13 章 並行アプリケーションを設計する - 13.7 監視を追加する から

並行アプリケーションを設計する

13.7 監視を追加する

このアプリケーションをより安定させるために、第12章で実装したような 「restarter」を実装すべきでしょう

Sup というモジュール名の restarter を作る.

defmodule Sup do
  def start(mod, args) do
    spawn(__MODULE__, :init, [{mod, args}])
  end

  def start_link(mod, args) do
    spawn_link(__MODULE__, :init, [{mod, args}])
  end

  def init({mod, args}) do
    Process.flag(:trap_exit, true)
    loop({mod, :start_link, args})
  end

  def loop({m, f, a}) do
    pid = apply(m, f, a)
    receive do
      {:EXIT, _From, :shutdown} ->
        exit(:shutdown) # will kill the child too
      {:EXIT, pid, reason} ->
        IO.puts("Process #{inspect pid} exited for reason #{reason}")
        loop({m, f, a})
    end
  end
end

defmodule Event do
  defmodule State, do: defstruct server: nil, name: "", to_go: 0

  def start(event_name, delay) do
    spawn(__MODULE__, :init, [self, event_name, delay])
  end

  def start_link(event_name, delay) do
    spawn_link(__MODULE__, :init, [self, event_name, delay])
  end

  # event's innerds
  def init(server, event_name, naive_date_time) do
    loop(%State{server: server, name: event_name, to_go: time_to_go(naive_date_time)})
  end

  def cancel(pid) do
    # Monitor in case the process is already dead.
    ref = Process.monitor(pid)
    send(pid, {self, ref, :cancel})
    receive do
      {ref, :ok} ->
        Process.demonitor(ref, [:flush])
        :ok
      {:DOWN, ref, :process, pid, _reason} ->
        :ok
    end
  end

  def time_to_go(time_out=%NaiveDateTime{}) do
    now =  :calendar.now_to_local_time(:os.timestamp)
    to_go = :calendar.datetime_to_gregorian_seconds(NaiveDateTime.to_erl(time_out)) - :calendar.datetime_to_gregorian_seconds(now)
    secs = case to_go do
             x when 0 < x -> x
             x when x <= 0 -> 0
           end
    normalize(secs)
  end

  # Because Erlang is limited to about 49 days (49*24*60*60*1000) in
  # milliseconds, the following function is used.
  def normalize(n) do
    limit = 49*24*60*60
    [rem(n, limit) | List.duplicate(limit, div(n, limit))]
  end

  # Loop uses a list for times in order to go around the ~49 days limit
  # on timeouts.
  def loop(state = %State{server: server, to_go: [t|next]}) do
    receive do
      {server, ref, :cancel} ->
        send(server, {ref, :ok})
      after t * 1000 ->
        case next do
          [] -> send(server, {:done, state.name})
          [_x] -> loop(%{state | to_go: next})
        end
    end
  end
end

defmodule EvSrv do
  defmodule State, do: defstruct events: %{}, clients: %{}
  defmodule Event, do: defstruct name: "", description: "", pid: nil, time_out: ~N[1970-01-01 00:00:00]

  def start do
    pid = spawn(__MODULE__, :init, [])
    Process.register(pid, __MODULE__)
    pid
  end

  def start_link do
    pid = spawn_link(__MODULE__, :init, [])
    Process.register(pid, __MODULE__)
    pid
  end

  def terminate do
    send(__MODULE__, :shutdown)
  end

  def subscribe(pid) do
    ref = Process.monitor(Process.whereis(__MODULE__))
    send(__MODULE__, {self, ref, {:subscribe, pid}})
    receive do
      {ref, :ok} ->
        {:ok, ref}
      {:DOWN, ref, :process, _pid, reason} ->
        {:error, reason}
    after 5000 ->
      {:error, :timeout}
    end
  end

  def add_event(name, description, time_out) do
    ref = make_ref
    send(__MODULE__, {self, ref, {:add, name, description, time_out}})
    receive do
      {ref, msg} ->
        msg
      after 5000 ->
        {:error, :timeout}
    end
  end

  def cancel(name) do
    ref = make_ref
    send(__MODULE__, {self, ref, {:cancel, name}})
    receive do
      {ref, :ok} ->
        :ok
      after 5000 ->
        {:error, :timeout}
    end
  end

  def listen(delay) do
    receive do
      m = {:done, name, description} ->
      [m | listen(0)]
    after delay*1000 ->
      []
    end
  end

  def init do
    # Loading events from a static file could be done here.
    # You would need to pass an argument to init telling where the
    # resource to find the events is. Then load it from here.
    # Another option is to just pass the events straight to the server
    # through this function.
    loop(%State{})
  end

  def loop(state=%State{}) do
    receive do
      {pid, msg_ref, {:subscribe, client}} ->
        ref = Process.monitor(client)
        new_clients = Map.put_new(state.clients, ref, client)
        send(pid, {msg_ref, :ok})
        loop(%{state | clients: new_clients})
      {pid, msg_ref, {:add, name, description, time_out}} ->
        event_pid = Elixir.Event.start_link(name, time_out)
        new_events = Map.put_new(state.events, name, %EvSrv.Event{name: name,
                                                                  description: description,
                                                                  pid: event_pid,
                                                                  time_out: time_out})
        send(pid, {msg_ref, :ok})
        loop(%{state | events: new_events})
      {pid, msg_ref, {:cancel, name}} ->
        events = case Map.fetch(state.events, name) do
                   {:ok, e} ->
                     Elixir.Event.cancel(e.pid)
                     Map.delete(state.events, name)
                   :error ->
                     state.events
                 end
        send(pid, {msg_ref, :ok})
        loop(%{state | events: events})
      {:done, name} ->
        case Map.fetch(state.events, name) do
          {:ok, e} ->
            send_to_clients(state.clients, {:done, e.name, e.description})
            new_events = Map.delete(state.events, name)
            loop(%{state | events: new_events})
          :error ->
            # This may happen if we cancel an event and
            # it fires at the same time.
            loop(state)
        end
      :shutdown ->
        exit(:shutdown)
      {:DOWN, ref, :process, _pid, _reason} ->
        loop(%{state | clients: Map.delete(state.clients, ref)})
      :code_change ->
        :do_something
      unknown ->
        IO.puts("Unknown message: #{inspect unknown}")
        loop(state)
    end
  end

  def send_to_clients(clients, msg) do
    Map.values(clients) |> Enum.each(&(send(&1, msg)))
  end
end

それっぽいファイル構造を作っていないので,上記のコードを app.ex という名前で保存して,iex から import_file(“app.ex”) で読み込む.

Eshell V8.1  (abort with ^G)
Interactive Elixir (1.3.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> import_file("app.ex")
import_file("app.ex")
{:module, EvSrv,
 <<70, 79, 82, 49, 0, 0, 32, 32, 66, 69, 65, 77, 69, 120, 68, 99, 0, 0, 2, 95,
   131, 104, 2, 100, 0, 14, 101, 108, 105, 120, 105, 114, 95, 100, 111, 99, 115,
   95, 118, 49, 108, 0, 0, 0, 4, 104, 2, ...>>, {:send_to_clients, 2}}
iex(2)> sup_pid = Sup.start(EvSrv, [])
sup_pid = Sup.start(EvSrv, [])
#PID<0.105.0>
iex(3)> Process.whereis(EvSrv)
Process.whereis(EvSrv)
#PID<0.106.0>
iex(4)> Process.exit(Process.whereis(EvSrv), :die)
Process.exit(Process.whereis(EvSrv), :die)
Process #PID<0.106.0> exited for reason die
true
iex(5)> Process.exit(Process.whereis(EvSrv), :die)
Process.exit(Process.whereis(EvSrv), :die)
Process #PID<0.110.0> exited for reason die
true
iex(6)> Process.exit(sup_pid, :shutdown)
Process.exit(sup_pid, :shutdown)
true
iex(7)> Process.whereis(EvSrv)
Process.whereis(EvSrv)
nil
iex(8)>

iex(4)iex(5) で子プロセスを複数回 exit しても復活している. iex(6)sup_pid を exit すると, iex(7) で子プロセスもいなくなっている.

うまく動いているようだ.

13.8 名前空間(あるいは名前のない空間)

Erlang はフラットなモジュール構造をしている (階層がない)ので、アプリケーションで衝突が起きることがしばしばあります。

Elixir は入れ子のモジュールを扱えるので,ほとんどの場合アプリケーション名を一番大きなモジュール名として,その中に様々な個別のモジュールを設定する.(例: Foo.Bar.Worker

ちなみに Elixir でトップレベルのモジュールを明示的に指定するには接頭辞 Elixir をつけるとよい.

どのような衝突も、code:clash/0 関数でテストできます。

知らなかった.Elixir でのラッパーを探してみたが存在しないようだ.

このエントリーをはてなブックマークに追加