GenStage

在本課程中,將仔細研究 GenStage、它的作用和如何在應用程式中利用。

簡介

那什麼是 GenStage?從官方文件來看,它是"Elixir 的規範 (specification) 和計算流程 (computational flow)",但這表示著什麼?

這表示著 GenStage 提供了一種方式,在一個單獨過程中通過獨立 steps (或 stages) 來定義管線 (pipeline) 的工作以便執行它;如果之前使用過管線,那麼其中一些概念應該不陌生。

為了更好地理解它的工作原理,現在將一個簡易的生產者-消費者 (producer-consumer) 流程可視化:

[A] -> [B] -> [C]

在這個範例中,有三個 stages:A 消費者 (producer),B 生產者-消費者 (producer-consumer) 與 C 消費者 (consumer)。 A 產生一個被 B 消耗的值,B 執行一些工作並回傳將被 consumer C 收到的新值;我們將在下一節中看到,stage 的角色很重要。

雖然範例是 1 對 1 的生產者-消費者,但在任何特定 stage 都可能有多個生產者和多個消費者。

為了更好地描繪這些概念,將使用 GenStage 打造一個管線 (pipeline),但首先探索一下 GenStage 會依賴頗多的這些角色。

消費者 (Consumers) 與生產者 (Producers)

正如所讀到的,我們賦予 stage 的角色非常重要。 GenStage 的規範中承認三種角色:

注意到生產者是 等待 需求了嗎?通過 GenStage,消費者是向上游發送需求並處理來自生產者的資料。 這有助於稱為背壓(back-pressure)的機制。 當消費者忙碌時,背壓機制使生產者不會造成超壓(over-pressure)。

現在已經介紹過 GenStage 中的角色,開始建立應用程式。

入門

在這個範例中,將建立一個產出數字的 GenStage 應用程式,能排序偶數後輸出。

在應用程式中,將使用全部三個 GenStage 角色。 生產者將負責計算和產出數字。 生產者-消費者則過濾出偶數,而後對下游需求做出回應。 最後建立消費者顯示剩餘數字。

讓我們從生成一個有 supervision 樹的專案開始:

mix new genstage_example --sup
cd genstage_example

現在更新 mix.exs 中的耦合性 (dependencies) 以包含 gen_stage

defp deps do
  [
    {:gen_stage, "~> 1.0.0"},
  ]
end

在進一步研究之前,應該先取得 (fetch) 耦合關係 (dependencies) 並進行編譯:

mix do deps.get, compile

現在準備好建立生產者了!

生產者

GenStage 應用程式的第一件事是建立生產者。 正如之前討論過的,我們想要建立一個能夠產出持續不斷數字流的生產者。 現在來建立生產者資料夾和檔案:

mkdir lib/genstage_example
touch lib/genstage_example/producer.ex

這時可以加入我們所需程式碼:

defmodule GenstageExample.Producer do
  use GenStage

  def start_link(initial \\ 0) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end

這裡需要注意兩個最重要部分是 init/1handle_demand/2。 在 init/1 中,設置了初始狀態,就像在 GenServers 中做的那樣,但更重要的是,我們將自己標記為生產者。 init/1 函數的回傳值是 GenStage 對處理程序 (process) 進行分類的依據。

handle_demand/2 函數是生產者被定義的主要部份。 它必須由所有 GenStage 生產者實現。 在這裡,我們回傳消費者所需的一組數字,並累加我們的計數器(counter)。 來自消費者的需求,也就是上面程式碼中的 demand,被表示為一個能夠處理事件量的相對應整數;預設為 1000。

生產者 消費者 (Producer Consumer)

現在已經有了能產出數字的生產者,接著看生產者-消費者。 我們希望從生產者索取數字,濾除奇數,並能回應需求。

touch lib/genstage_example/producer_consumer.ex

現在更新檔案,使其看起來像範例內的程式碼:

defmodule GenstageExample.ProducerConsumer do
  use GenStage

  require Integer

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.filter(&Integer.is_even/1)

    {:noreply, numbers, state}
  end
end

你可能已經注意到,在生產者-消費者裡,init/1 已經引入了一個新選項和一個新函數: handle_events/3。 通過 subscribe_to 選項,指示 GenStage 讓我們在特定生產者的對話中(communication)。

handle_events/3 函數是我們的主力,接收傳入事件、處理 (process) 它們並回傳轉換後的集合。 而消費者的實現方式也大致相同,但重要的區別在於 handle_events/3 函數的回傳值以及如何被使用的。 當將處理程序標記為 producer_consumer 時,在範例中是 tuple 的第二個引數 - numbers 是用於達到下游的消費者需求; 但在消費者中,這個值會被丟棄。

消費者

最後但並非最不重要的是消費者。 開始囉:

touch lib/genstage_example/consumer.ex

由於消費者和生產者-消費者如此相似,所以程式碼看起來不會有太大的不同:

defmodule GenstageExample.Consumer do
  use GenStage

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

正如在前面章節中所介紹的,消費者不會產出事件,所以 tuple 中的第二個值將被丟棄。

將所有角色組合在一起

現在已經建立了生產者、生產者-消費者和消費者,我們已經準備好將所有東西串在一起了。

首先打開 lib/genstage_example/application.ex 並將新處理程序加入到 supervisor 樹:

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    {GenstageExample.Producer, 0},
    {GenstageExample.ProducerConsumer, []},
    {GenstageExample.Consumer, []}
  ]

  opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
  Supervisor.start_link(children, opts)
end

如果一切都是正確的,專案就可以執行,應該會看到所有東西都能正常運作:

$ mix run --no-halt
{#PID<0.109.0>, 2, :state_doesnt_matter}
{#PID<0.109.0>, 4, :state_doesnt_matter}
{#PID<0.109.0>, 6, :state_doesnt_matter}
...
{#PID<0.109.0>, 229062, :state_doesnt_matter}
{#PID<0.109.0>, 229064, :state_doesnt_matter}
{#PID<0.109.0>, 229066, :state_doesnt_matter}

我們做到了!正如預期那樣,應用程式只會產出偶數,而且它做的非常

在此刻,我們有了一個工作管線(pipeline)。 生產者產出數字;生產者-消費者丟棄奇數;消費者則顯示以上這些並繼續流程。

多個生產者或消費者

在簡介中提到,有一個以上的生產者或消費者是可行的, 現在來看看這個。

如果檢查範例中的 IO.inspect/1 輸出,會看到每個事件都單獨由一個 PID 處理。 現在通過修改 lib/genstage_example/application.ex 對多個 worker 進行一些調整:

children = [
  {GenstageExample.Producer, 0},
  {GenstageExample.ProducerConsumer, []},
  %{
    id: 1,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
  %{
    id: 2,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
]

現在已經配置了兩個消費者,看看如果執行應用程式會得到什麼:

$ mix run --no-halt
{#PID<0.120.0>, 2, :state_doesnt_matter}
{#PID<0.121.0>, 4, :state_doesnt_matter}
{#PID<0.120.0>, 6, :state_doesnt_matter}
{#PID<0.120.0>, 8, :state_doesnt_matter}
...
{#PID<0.120.0>, 86478, :state_doesnt_matter}
{#PID<0.121.0>, 87338, :state_doesnt_matter}
{#PID<0.120.0>, 86480, :state_doesnt_matter}
{#PID<0.120.0>, 86482, :state_doesnt_matter}

正如所看到的,現在擁有多個 PID,只需簡單的加入一行程式碼並提供消費者 ID 即可。

使用案例

現在已經介紹了 GenStage 並建立了第一個範例應用程式,但 GenStage 有哪些 真實 案例?

而這些還只是 GenStage 的 一小部分 可能性。

Caught a mistake or want to contribute to the lesson? Edit this lesson on GitHub!