Do you want to pick up from where you left of?
Take me there

GenStage

本课, 我们将学习 GenStage, 它的作用, 以及如何在我们的应用中使用它.

介绍

那么什么是 GenStage? 官方文档中写道, 它是”Elixir 的规格与计算流程”, 但对我们来说意味着什么?

这意味着, GenStage 为我们提供了定义一个管道操作的方式, 它是由多个独立的阶段(或 stage)组合起来的; 如果你之前使用过管道, 应该熟悉其中的一些概念.

为了更好地理解它的工作原理, 让我们来看一个简单的生产者-消费者流程:

[A] -> [B] -> [C]

在这个例子里, 我们有三个 stage: 一个生产者 A, 一个生产者-消费者 B, 以及一个消费者 C. A 生产一个 B 供消费的值, B 执行了一些工作并返回一个新的值, 提供给消费者 C; 我们将在下一节中看到这些角色, 它们十分重要.

虽然我们的例子是1对1的生产者和消费者, 但是在任何一个 stage 都可能拥有多个生产者和多个消费者.

为了更好地说明这些概念, 我们将使用 GenStage 来构建管道, 但首先让我们来探讨一下 GenStage 的角色.

消费者与生产者

正如我们所看到的, 我们赋予 stage 的角色很重要. GenStage 的规范中承认三种角色:

注意到我们的生产者是在 等待 需求了吗? 使用 GenStage, 我们的消费者向上游发送需求, 并处理来自生产者的数据. 这有助于称为背压的机制. 当消费者忙碌时, 背压使得生产者在消费者繁忙时不会承受过度的压力.

现在, 我们已经介绍了 GenStage 中的角色, 让我们开始编写的应用.

入门

在这个例子中, 我们将构建一个 GenStage 应用, 它将产生数字, 过滤掉偶数, 最后打印出剩下的数字.

在这个应用中我们将用到全部三种角色. 我们的生产者将负责计数和排放数字. 我们将使用一个生产者-消费者来筛选偶数, 并响应来自下游的需求. 最后, 我们将构建一个消费者来显示我们剩下的数字.

首先, 我们生成一个带有监控树的项目:

mix new genstage_example --sup
cd genstage_example

让我们在 mix.exs 文件的依赖列表中加入 gen_stage:

defp deps do
  [
    {:gen_stage, "~> 1.0.0"},
  ]
end

接着获取并编译依赖:

mix do deps.get, compile

现在我们准备好编写我们的生产者了!

生产者

GenStage 应用的第一步是创建我们的生产者. 正如我们之前讨论过的, 我们想要创建一个发出恒定数字流的生产者. 让我们来创建生产者的文件:

touch lib/genstage_example/producer.ex

现在添加代码:

defmodule GenstageExample.Producer do
  use GenStage

  def start_link(initial \\ 0) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end

这里有两个重要的部分需要注意, init/1handle_demand/2. 在 init/1 中我们像 GenServer 一样设置了的初始状态, 更重要的是我们将其标注为了一个生产者. GenStage 根据 init/1 函数的返回值来区分进程的类型.

handle_demand/2 函数是生产者的主要部分, 也是所有 GenStage 生产者都必须实现的. 这里, 我们返回了消费者所需要的数字, 并增加了计数. 消费者发来的需求, 也就是上面代码中的demand, 是一个代表其所能处理的事件数量的整数; 默认值是 1000.

生产者-消费者

现在, 我们有了一个产生数字的生产者, 让我们看看生产者-消费者. 我们想要向生产者请求数字, 过滤掉奇数, 并响应需求.

touch lib/genstage_example/producer_consumer.ex

让我们在文件中写入如下代码:

defmodule GenstageExample.ProducerConsumer do
  use GenStage

  require Integer

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.filter(&Integer.is_even/1)

    {:noreply, numbers, state}
  end
end

你可能已经注意到, 我们在生产者-消费者中, 为 init/1 增加了一个选项, 还增加了一个函数: handle_events/3. 通过 subscribe_to 选项, 我们让 GenStage 与指定的生产者进行通信.

handle_events/3 函数是我们的主力, 它接收事件, 处理它们, 并得到转换后的集合. 我们将在消费者中看到非常类似的实现, 但是最重要的区别在于 handle_events/3 函数的返回值. 在生产者-消费者中, 返回值元组的第二个参数 – 这里是 numbers – 将用于满足下游消费者的需求. 在消费者中, 这个值会被丢弃.

消费者

最后来让我们来看看同样重要的消费者:

touch lib/genstage_example/consumer.ex

由于消费者和生产者-消费者太相似了, 所以代码看起来没有什么区别:

defmodule GenstageExample.Consumer do
  use GenStage

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

正如之前提到的, 我们的消费者不会生产事件, 所以元组的第二个参数会被抛弃.

把它们结合起来

现在我们有了生产者, 生产者-消费者和消费者, 我们已经准备好把所有东西捆绑在一起了.

首先, 打开 lib/genstage_example/application.ex 并添加我们的新进程到监控树:

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    {GenstageExample.Producer, 0},
    {GenstageExample.ProducerConsumer, []},
    {GenstageExample.Consumer, []}
  ]

  opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
  Supervisor.start_link(children, opts)
end

如果一切正确, 那么我们的项目就可以运行, 可以看到它能够工作:

$ mix run --no-halt
{#PID<0.109.0>, 0, :state_doesnt_matter}
{#PID<0.109.0>, 2, :state_doesnt_matter}
{#PID<0.109.0>, 4, :state_doesnt_matter}
{#PID<0.109.0>, 6, :state_doesnt_matter}
...
{#PID<0.109.0>, 229062, :state_doesnt_matter}
{#PID<0.109.0>, 229064, :state_doesnt_matter}
{#PID<0.109.0>, 229066, :state_doesnt_matter}

我们做到了! 正如我们预期的那样, 程序只会产生偶数, 而且非常 .

这样, 我们就有了一条工作流. 一个生产者产生数字, 一个生产者-消费者过滤掉奇数, 然后一个消费者显示所有这些, 并让管道持续流动.

多个生产者或消费者

在简介里, 我们有提到可以同时有多个生产者或消费者. 让我们来看看是怎么一回事.

如果我们检查上面例子里 IO.inspect/1 的输出, 会发现所有的事件都是由同一个进程来处理的. 让我们修改 lib/genstage_example/application.ex 以配置多个 worker:

children = [
  {GenstageExample.Producer, 0},
  {GenstageExample.ProducerConsumer, []},
  %{
    id: 1,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
  %{
    id: 2,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  }
]

现在, 我们有了两个消费者, 让我们来看一下现在运行应用的结果:

$ mix run --no-halt
{#PID<0.120.0>, 0, :state_doesnt_matter}
{#PID<0.120.0>, 2, :state_doesnt_matter}
{#PID<0.120.0>, 4, :state_doesnt_matter}
{#PID<0.120.0>, 6, :state_doesnt_matter}
...
{#PID<0.120.0>, 86478, :state_doesnt_matter}
{#PID<0.121.0>, 87338, :state_doesnt_matter}
{#PID<0.120.0>, 86480, :state_doesnt_matter}
{#PID<0.120.0>, 86482, :state_doesnt_matter}

如你所见, 现在有了多个 PID, 只需要简单地添加一行代码并指定消费者的 ID.

用例

现在, 我们已经了解了 GenStage, 并构建了我们的第一个示例应用, 那么 GenStage 有哪些 真实 的用例呢?

这些只是 GenStage 的 一小部分 可能性.

Caught a mistake or want to contribute to the lesson? Edit this lesson on GitHub!