ヽ(´・肉・`)ノログ

How do we fighting without fighting?

ElixirでTaskを使ってEchoServerを動かす

シンプルなEchoServerを作り,Taskを利用してライフサイクルを制御する.

それを通してTaskモジュールのドキュメントに書いてある主要な利用法

  1. async and await
  2. Supervised tasks
  3. Dynamically supervised tasks

のうち2と3についての理解を深める.

参考

EchoServerを起動する

defmodule EchoServer do
  def accept(port) do
    # The options below mean:
    #
    # 1. `:binary` - receives data as binaries (instead of lists)
    # 2. `packet: :line` - receives data line by line
    # 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
    # 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
    #
    {:ok, socket} = :gen_tcp.listen(port,
                                    [:binary, packet: :line, active: false, reuseaddr: true])
    IO.puts "Accepting connections on port #{port}"
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    serve(client)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

このファイルを echo_server-1.ex として保存し,iex から実行する.(この実行を行っているコンソールを以下”iexコンソール”と呼ぶ)

iex$ iex -r echo_server-1.ex
Erlang/OTP 18 [erts-7.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> EchoServer.accept(4040)
Accepting connections on port 4040

次に別のコンソールから telnet で接続する.(この実行を行っているコンソールを以下”telnetコンソール”と呼ぶ) いくつか動かしてみて反応が返ってくることを確認したら telnet クライアントを抜ける. 僕の場合は ctrl + ] を押し, quit を入力, <Enter> を押すという手順でできたが,他のやり方の人もいるかもしれない.

telnet$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?
^]
telnet> quit
Connection closed.

telnetコンソールを抜けると,iexコンソールには以下のような表示が出ているだろう.

** (MatchError) no match of right hand side value: {:error, :closed}
    echo_server-1.ex:31: EchoServer.read_line/1
    echo_server-1.ex:24: EchoServer.serve/1
    echo_server-1.ex:18: EchoServer.loop_acceptor/1

それではこの状態でtelnetコンソールから再度接続してみるとどうなるだろうか.

telnet$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello??????
^]
telnet> quit
Connection closed.

telnetは繋がるが返事がない.なぜならEchoサーバーはエラーによりクラッシュしてしまったためだ.これにより 2 つの問題が明らかになった.

  1. ある程度予測可能な処理がコードに含まれていない
  2. 何らかの原因でサーバーがクラッシュすると,それ以降処理が進められない

今回は説明の簡略化のために 2 についてだけ対策する.

余談だが 2 を対策しておけば 1 はいらないのだろうか?僕はそうは思わない. 今回のクライアントのクローズのような当然予測できるような処理については明記しておいた方が, コードの意図が明確になるし条件の考慮漏れが減るだろう.

これは Learn You Some Erlang for Great Good! の Note のところ, 『条件分岐に「それ以外」を使わず,考えられる全ての条件を明記する』という考え方に影響を受けている.

もちろんコードの複雑性の増加とトレードオフ(頻度が低くて複雑エラー処理を記述すると,見通しが悪くなる)ではあるが, 原則としてはわかっていて簡潔に処理を記述できることについては積極的に記述しておくつもりだ.

Echoサーバーがクラッシュしても,再び処理を行えるようにする

他のプロセスと何回もメッセージをやりとりしたり,状態を長い時間保持するといった比較的複雑な処理は, Elixir においては GenServer, GenEvent, Agent といったモジュールが担っている.

一方,他のプロセスとは数回しかやりとりしないような比較的単純な処理は Task というモジュールが担っている.

Task は数回とはいえプロセス間でやりとりするので,単なる spawn や spawn_link とは異なり,監視ツリーの対象として扱われるようになっている. 実質 Elixir のアプリケーション部分では spawnspawn_link, spawn_monitor といった生の関数を用いることはなく, Task で代用され Task.startTask.start_link に置き換えられると考えていい.

今回のEchoサーバーはプロセス間のメッセージがほとんど無いプロセスなのでTaskで監視する.

Task.start_link/3 は [モジュール名, 関数名,関数へ渡す引数] を受けとり, 別プロセスで呼び出し,そのプロセスを監視ツリーの一部として(=監視対象として)扱ってくれる.

以下のiexコンソールでの worker(Task, [EchoServer, :accept, [4040]]) の部分が実質それに当たる.

iex$ iex -r echo_server-1.ex
Erlang/OTP 18 [erts-7.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [
kernel-poll:false] [dtrace]

Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> import Supervisor.Spec
nil
iex(2)> children = [
...(2)>   worker(Task, [EchoServer, :accept, [4040]])
...(2)> ]
[{Task, {Task, :start_link, [EchoServer, :accept, [4040]]}, :permanent, 5000,
  :worker, [Task]}]
iex(3)> opts = [strategy: :one_for_one, name: EchoServer.Supervisor]
[strategy: :one_for_one, name: EchoServer.Supervisor]
iex(4)> Supervisor.start_link(children, opts)
Accepting connections on port 4040
{:ok, #PID<0.73.0>}

それではtelnetコンソールから繋いでみよう.

telnet$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?
^]
telnet> quit
Connection closed.

1回目と同様に問題ない. ここでiexコンソールをみると以下のようになっており,少しエラーメッセージが異なる.

Accepting connections on port 4040
iex(5)>
22:03:27.255 [error] Task #PID<0.74.0> started from EchoServer.Supervisor termin
ating
** (MatchError) no match of right hand side value: {:error, :closed}
    echo_server-1.ex:31: EchoServer.read_line/1
    echo_server-1.ex:24: EchoServer.serve/1
    echo_server-1.ex:18: EchoServer.loop_acceptor/1
    (elixir) lib/task/supervised.ex:74: Task.Supervised.do_apply/2
    (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Function: &EchoServer.accept/1
    Args: [4040]

それでは,1回目にうまくいかなかった,再接続は行えるだろうか. telnetコンソールから再度繋いでみる.(今回はクローズしない)

telnet$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
hello??????
hello??????

Echoが返ってきている. 一度クラッシュしてもSupervisorによって再起動が行われているため,クライアントから再接続が行えるのだ.

ここまではうまくいっている. それではもう一つtelnetコンソールを起動して(以下”telnet2コンソール”と呼ぶ)接続してみよう.

telnet2$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello??????

Echoが返ってこない. これはTCP接続を受けつけるプロセスと,接続した後にクライアントからリクエストを待ち受けるプロセスが同じなので, 一度接続してクライアントからのリクエスト待ちになってしまうと,新たなTCP接続を受けつけることができないせいだ.

複数のTCP接続を扱えるようにする

サーバーで複数の接続を扱えるようにするには,接続待ち受けのプロセスと,そこからspawnしてリクエストを処理する他の(複数)プロセスが必要になる.

serve(client) のところを,Taskを用いて別プロセスになるように書き換えてみよう.

defmodule EchoServer do
  def accept(port) do
    # The options below mean:
    #
    # 1. `:binary` - receives data as binaries (instead of lists)
    # 2. `packet: :line` - receives data line by line
    # 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
    # 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
    #
    {:ok, socket} = :gen_tcp.listen(port,
                                    [:binary, packet: :line, active: false, reuseaddr: true])
    IO.puts "Accepting connections on port #{port}"
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    # serve(client)
    Task.start_link(fn -> serve(client) end)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

このファイルを echo_server-2.ex として保存し,先程と同じように iex から実行する.

iex$ iex -r echo_server-2.ex
Erlang/OTP 18 [erts-7.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> import Supervisor.Spec
nil
iex(2)> children = [
...(2)>   worker(Task, [EchoServer, :accept, [4040]])
...(2)> ]
[{Task, {Task, :start_link, [EchoServer, :accept, [4040]]}, :permanent, 5000,
  :worker, [Task]}]
iex(3)> opts = [strategy: :one_for_one, name: EchoServer.Supervisor]
[strategy: :one_for_one, name: EchoServer.Supervisor]
iex(4)> Supervisor.start_link(children, opts)
Accepting connections on port 4040
{:ok, #PID<0.73.0>}

まずはtelnetコンソールで接続する.複数接続を試すため,クローズはしない.

telnet$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello

次に,先程はうまくいかなかった,telnet2コンソールでの接続を試す.

telnet2$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello

うまくできたようだ.それでは続けてtelnet2コンソールをクローズしてみよう.

telnet2$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
^]
telnet> quit
Connection closed.

このときtenletコンソールの方をみると,以下のように一緒に終了してしまっている.

telnet$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
Connection closed by foreign host.

状況を整理すると,同時接続は意図通りにできた. しかし,意図とは反しtelnet2コンソールをクローズしたら,telnetコンソールも同時にクローズされてしまった. ということだ.

これは,接続待ち受けのプロセスがリクエストを処理する他のプロセスとlinkしており, リクエストの処理でエラーになると接続待ち受けのプロセスにまでエラーが伝わり, その結果,全体がエラーで終了しているためである.

1つのリクエストを処理するプロセスのエラーに接続を待ち受けるプロセスが影響を受けないようにする

こういった処理はErlangではsimple_one_for_oneという起動戦略を利用する.

もちろんElixirでもsimple_one_for_oneという起動戦略を使えるのだが, よくある一般的な処理なのでTaskにも同じやり方 - 監視プロセスと使い捨てワーカープロセスを監視ツリーの一部として利用する - が用意されている.

それを利用してコードを書き換えてみよう.

defmodule EchoServer do
  def accept(port) do
    # The options below mean:
    #
    # 1. `:binary` - receives data as binaries (instead of lists)
    # 2. `packet: :line` - receives data line by line
    # 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
    # 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
    #
    {:ok, socket} = :gen_tcp.listen(port,
                                    [:binary, packet: :line, active: false, reuseaddr: true])
    IO.puts "Accepting connections on port #{port}"
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    # serve(client)
    # Task.start_link(fn -> serve(client) end)
    {:ok, pid} = Task.Supervisor.start_child(EchoServer.TaskSupervisor, fn -> serve(client) end)
    :ok = :gen_tcp.controlling_process(client, pid)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

EchoServer.TaskSupervisor の部分は, iexコンソールにて宣言時に利用する name: EchoServer.TaskSupervisor と同じになっていれば任意の名前が利用できる.

また :ok = :gen_tcp.controlling_process(client, pid) という行を足して, client ソケットの “controlling process” を,生成された子プロセスへと移している.

ソケットはデフォルトでは接続待ち受けプロセスのと結びつけられているため, この処理を入れておかなければ,接続待ち受けプロセスが落ちたときに,リクエストを処理している全ての子プロセスが落ちてしまうためである.

このファイルを echo_server-3.ex として保存し,iex から実行する.

iex$ iex -r echo_server-3.ex
Erlang/OTP 18 [erts-7.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> import Supervisor.Spec
nil
iex(2)> children = [
...(2)>   supervisor(Task.Supervisor, [[name: EchoServer.TaskSupervisor]]),
...(2)>   worker(Task, [EchoServer, :accept, [4040]])
...(2)> ]
[{Task.Supervisor,
  {Task.Supervisor, :start_link, [[name: EchoServer.TaskSupervisor]]},
  :permanent, :infinity, :supervisor, [Task.Supervisor]},
 {Task, {Task, :start_link, [EchoServer, :accept, [4040]]}, :permanent, 5000,
  :worker, [Task]}]
iex(3)> opts = [strategy: :one_for_one, name: EchoServer.Supervisor]
[strategy: :one_for_one, name: EchoServer.Supervisor]
iex(4)> Supervisor.start_link(children, opts)
Accepting connections on port 4040
{:ok, #PID<0.74.0>}

telnetコンソールで接続

telnet$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello

telnet2コンソールで接続,切断する.

telnet2$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
^]
telnet> quit
Connection closed.

このときにtelnetコンソールを見てみると,先ほどとは異なり終了していない. そこで,続けて入力してみると想定通りにきちんとエコーを返してくる.

telnet$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
alive?
alive?
ok
ok

うまく動作している.

まとめ

  1. Echoサーバーの実装を作った
  2. Echoサーバーがクラッシュしても再起動するように監視した worker(Task, [EchoServer, :accept, [4040]]) (実質 Task.start_link/3 )
  3. Echoサーバーに複数接続できるようにした Task.start_link(fn -> serve(client) end)
  4. Echoサーバーのどれかのリクエストがエラーになっても,他のリクエストに影響がないようにした Task.Supervisor.start_child(EchoServer.TaskSupervisor, fn -> serve(client) end)

処理(Worker)を作って,そのライフサイクルをTaskを使った別プロセスで管理する方法について知った.