Fork me on GitHub

OTP 并发

Some contents of this translation may be outdated.
Several major changes were applied to the original lesson since the last update.

我们已经看过了 Elixir 层的并发抽象机制,但有时候我们需要更多的控制,那就要了解 Elixir 底层的东西:OTP 行为(behaviors)。

这节课,我们主要讲两个东西:Genservers 和 GenEvents。

目录

GenServer

OTP server 是一个模块,包含了 Genserver 的主要行为,外加一系列的 callbacks。Genserver 最核心的内容是这样一个循环:每次迭代处理一个带有目标状态的请求。

为了演示 Genserver 的 API,我们将实现一个简单的 queue,来存储和获取值。

最开始,我们要先启动和初始化 Genserver,一般情况下,我们需要链接进程,所以还要使用 Genserver.start_link/3。传递给 Genserver 的参数包括:我们所在的模块,初始状态,以及其他 Genserver 参数。Genserver.init/1 的参数用来设置初始的状态,比如在下面的例子中,初始状态为 []:

defmodule SimpleQueue do
  use GenServer

  @doc """
  Start our queue and link it.  This is a helper method
  """
  def start_link(state \\ []) do
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end

  @doc """
  GenServer.init/1 callback
  """
  def init(state), do: {:ok, state}
end

同步函数

有时候需要和 Genservers 进行同步的交互:调用一个函数,然后等待它的响应返回。要处理同步请求,我们需要实现 Genserver.handle_call/3 函数,接受的参数是:请求、调用者的 PID,初始的状态,期望的返回值是 {:reply, response, state} 三元组。

使用模式匹配,我们可以为不同的请求和状态定义不同的 callbacks,能够接受的所有返回值列表可以前往 Genserver.handle_call/3 文档处查看。

为了演示同步请求,我们添加这样的功能:返回现在队列的状态以及删除队列中的一个值:

defmodule SimpleQueue do
  use GenServer

  ### GenServer API

  @doc """
  GenServer.init/1 callback
  """
  def init(state), do: {:ok, state}

  @doc """
  GenServer.handle_call/3 callback
  """
  def handle_call(:dequeue, _from, [value|state]) do
    {:reply, value, state}
  end
  def handle_call(:dequeue, _from, []), do: {:reply, nil, []}

  def handle_call(:queue, _from, state), do: {:reply, state, state}

  ### Client API / Helper methods

  def start_link(state \\ []) do
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end

  def queue, do: GenServer.call(__MODULE__, :queue)
  def dequeue, do: GenServer.call(__MODULE__, :dequeue)
end

我们来测试一下 SimpleQueue 刚完成的 dequeue 功能:

iex> SimpleQueue.start_link([1, 2, 3])
{:ok, #PID<0.90.0>}
iex> SimpleQueue.dequeue
1
iex> SimpleQueue.dequeue
2
iex> SimpleQueue.queue
[3]

异步函数

handle_cast/2 是处理异步函数的, 这个函数和 handle_call/3 的用法一样,除了它不接受调用者作为参数而且没有返回值。

我们把 enqueue 功能设计成异步的:更新 queue 的内容,但并不阻塞当前程序的运行:

defmodule SimpleQueue do
  use GenServer

  ### GenServer API

  @doc """
  GenServer.init/1 callback
  """
  def init(state), do: {:ok, state}

  @doc """
  GenServer.handle_call/3 callback
  """
  def handle_call(:dequeue, _from, [value|state]) do
    {:reply, value, state}
  end
  def handle_call(:dequeue, _from, []), do: {:reply, nil, []}

  def handle_call(:queue, _from, state), do: {:reply, state, state}

  @doc """
  GenServer.handle_cast/2 callback
  """
  def handle_cast({:enqueue, value}, state) do
    {:noreply, state ++ [value]}
  end

  ### Client API / Helper methods

  def start_link(state \\ []) do
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end
  def queue, do: GenServer.call(__MODULE__, :queue)
  def enqueue(value), do: GenServer.cast(__MODULE__, {:enqueue, value})
  def dequeue, do: GenServer.call(__MODULE__, :dequeue)
end

现在使用一下这个新功能:

iex> SimpleQueue.start_link([1, 2, 3])
{:ok, #PID<0.100.0>}
iex> SimpleQueue.queue
[1, 2, 3]
iex> SimpleQueue.enqueue(20)
:ok
iex> SimpleQueue.queue
[1, 2, 3, 20]

可以前往官方的 GenServer 文档了解更多的信息。

GenEvent

我们刚学习到:Genservers 是维护状态并能够同步和异步处理请求的进程,但什么是 GenEvent 呢?GenEvents 是事件管理器:接受进来的事件,并通知订阅事件的消费者。这种机制能让我们动态地添加和删除事件的处理函数。

处理事件

可以想象,GenEvents 最重要的 callbacks 就是 handle_event/2,它接受一个事件和处理器当前的状态,并返回元组{:ok, state}

为了演示 GenEvent 的功能,我们来创建两个处理函数:一个记录接收到的消息,另外一个把它持久化(逻辑上的):

defmodule LoggerHandler do
  use GenEvent

  def handle_event({:msg, msg}, messages) do
    IO.puts "Logging new message: #{msg}"
    {:ok, [msg|messages]}
  end
end

defmodule PersistenceHandler do
  use GenEvent

  def handle_event({:msg, msg}, state) do
    IO.puts "Persisting log message: #{msg}"

    # Save message

    {:ok, state}
  end
end

调用处理函数

除了 handle_event/2,GenEvents 还支持 handle_call/2 和其他的回调函数。使用 handle_call/2 可以处理特定的不同消息。

我们来更新 LoggerHandler,让它能够获取当前的消息日志:

defmodule LoggerHandler do
  use GenEvent

  def handle_event({:msg, msg}, messages) do
    IO.puts "Logging new message: #{msg}"
    {:ok, [msg|messages]}
  end

  def handle_call(:messages, messages) do
    {:ok, Enum.reverse(messages), messages}
  end
end

使用 GenEvents

处理函数都写好了,我们要熟悉一下 GenEvents 的函数。其中最重要的三个是:add_handler/3notify/2call/4,它们的功能分别是:添加处理函数,广播消息,和调用特定的处理函数。

把所有这些放到一起的话,我们的处理函数是这样使用的:

iex> {:ok, pid} = GenEvent.start_link([])
iex> GenEvent.add_handler(pid, LoggerHandler, [])
iex> GenEvent.add_handler(pid, PersistenceHandler, [])

iex> GenEvent.notify(pid, {:msg, "Hello World"})
Logging new message: Hello World
Persisting log message: Hello World

iex> GenEvent.call(pid, LoggerHandler, :messages)
["Hello World"]

阅读官方的 GenEvent 文档查看完整的回调函数列表以及 GenEvent 的所有功能。


分享本页面