Do you want to pick up from where you left of?
Take me there

GenStage

在本課程中,將仔細研究 GenStage、它的作用和如何在應用程式中利用。

簡介

那什麼是 GenStage?從官方文件來看,它是"Elixir 的規範 (specification) 和計算流程 (computational flow)",但這表示著什麼?

這表示著 GenStage 提供了一種方式,在一個單獨過程中通過獨立 steps (或 stages) 來定義管線 (pipeline) 的工作以便執行它;如果之前使用過管線,那麼其中一些概念應該不陌生。

為了更好地理解它的工作原理,現在將一個簡易的生產者-消費者 (producer-consumer) 流程可視化:

[A] -> [B] -> [C]

在這個範例中,有三個 stages:A 消費者 (producer),B 生產者-消費者 (producer-consumer) 與 C 消費者 (consumer)。 A 產生一個被 B 消耗的值,B 執行一些工作並回傳將被 consumer C 收到的新值;我們將在下一節中看到,stage 的角色很重要。

雖然範例是 1 對 1 的生產者-消費者,但在任何特定 stage 都可能有多個生產者和多個消費者。

為了更好地描繪這些概念,將使用 GenStage 打造一個管線 (pipeline),但首先探索一下 GenStage 會依賴頗多的這些角色。

消費者 (Consumers) 與生產者 (Producers)

正如所讀到的,我們賦予 stage 的角色非常重要。 GenStage 的規範中承認三種角色:

注意到生產者是 等待 需求了嗎?通過 GenStage,消費者是向上游發送需求並處理來自生產者的資料。 這有助於稱為背壓(back-pressure)的機制。 當消費者忙碌時,背壓機制使生產者不會造成超壓(over-pressure)。

現在已經介紹過 GenStage 中的角色,開始建立應用程式。

入門

在這個範例中,將建立一個產出數字的 GenStage 應用程式,能排序偶數後輸出。

在應用程式中,將使用全部三個 GenStage 角色。 生產者將負責計算和產出數字。 生產者-消費者則過濾出偶數,而後對下游需求做出回應。 最後建立消費者顯示剩餘數字。

讓我們從生成一個有 supervision 樹的專案開始:

mix new genstage_example --sup
cd genstage_example

現在更新 mix.exs 中的耦合性 (dependencies) 以包含 gen_stage

defp deps do
  [
    {:gen_stage, "~> 1.0.0"},
  ]
end

在進一步研究之前,應該先取得 (fetch) 耦合關係 (dependencies) 並進行編譯:

mix do deps.get, compile

現在準備好建立生產者了!

生產者

GenStage 應用程式的第一件事是建立生產者。 正如之前討論過的,我們想要建立一個能夠產出持續不斷數字流的生產者。 現在來建立生產者資料夾和檔案:

mkdir lib/genstage_example
touch lib/genstage_example/producer.ex

這時可以加入我們所需程式碼:

defmodule GenstageExample.Producer do
  use GenStage

  def start_link(initial \\ 0) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end

這裡需要注意兩個最重要部分是 init/1handle_demand/2。 在 init/1 中,設置了初始狀態,就像在 GenServers 中做的那樣,但更重要的是,我們將自己標記為生產者。 init/1 函數的回傳值是 GenStage 對處理程序 (process) 進行分類的依據。

handle_demand/2 函數是生產者被定義的主要部份。 它必須由所有 GenStage 生產者實現。 在這裡,我們回傳消費者所需的一組數字,並累加我們的計數器(counter)。 來自消費者的需求,也就是上面程式碼中的 demand,被表示為一個能夠處理事件量的相對應整數;預設為 1000。

生產者 消費者 (Producer Consumer)

現在已經有了能產出數字的生產者,接著看生產者-消費者。 我們希望從生產者索取數字,濾除奇數,並能回應需求。

touch lib/genstage_example/producer_consumer.ex

現在更新檔案,使其看起來像範例內的程式碼:

defmodule GenstageExample.ProducerConsumer do
  use GenStage

  require Integer

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.filter(&Integer.is_even/1)

    {:noreply, numbers, state}
  end
end

你可能已經注意到,在生產者-消費者裡,init/1 已經引入了一個新選項和一個新函數: handle_events/3。 通過 subscribe_to 選項,指示 GenStage 讓我們在特定生產者的對話中(communication)。

handle_events/3 函數是我們的主力,接收傳入事件、處理 (process) 它們並回傳轉換後的集合。 而消費者的實現方式也大致相同,但重要的區別在於 handle_events/3 函數的回傳值以及如何被使用的。 當將處理程序標記為 producer_consumer 時,在範例中是 tuple 的第二個引數 - numbers 是用於達到下游的消費者需求; 但在消費者中,這個值會被丟棄。

消費者

最後但並非最不重要的是消費者。 開始囉:

touch lib/genstage_example/consumer.ex

由於消費者和生產者-消費者如此相似,所以程式碼看起來不會有太大的不同:

defmodule GenstageExample.Consumer do
  use GenStage

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

正如在前面章節中所介紹的,消費者不會產出事件,所以 tuple 中的第二個值將被丟棄。

將所有角色組合在一起

現在已經建立了生產者、生產者-消費者和消費者,我們已經準備好將所有東西串在一起了。

首先打開 lib/genstage_example/application.ex 並將新處理程序加入到 supervisor 樹:

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    {GenstageExample.Producer, 0},
    {GenstageExample.ProducerConsumer, []},
    {GenstageExample.Consumer, []}
  ]

  opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
  Supervisor.start_link(children, opts)
end

如果一切都是正確的,專案就可以執行,應該會看到所有東西都能正常運作:

$ mix run --no-halt
{#PID<0.109.0>, 2, :state_doesnt_matter}
{#PID<0.109.0>, 4, :state_doesnt_matter}
{#PID<0.109.0>, 6, :state_doesnt_matter}
...
{#PID<0.109.0>, 229062, :state_doesnt_matter}
{#PID<0.109.0>, 229064, :state_doesnt_matter}
{#PID<0.109.0>, 229066, :state_doesnt_matter}

我們做到了!正如預期那樣,應用程式只會產出偶數,而且它做的非常

在此刻,我們有了一個工作管線(pipeline)。 生產者產出數字;生產者-消費者丟棄奇數;消費者則顯示以上這些並繼續流程。

多個生產者或消費者

在簡介中提到,有一個以上的生產者或消費者是可行的, 現在來看看這個。

如果檢查範例中的 IO.inspect/1 輸出,會看到每個事件都單獨由一個 PID 處理。 現在通過修改 lib/genstage_example/application.ex 對多個 worker 進行一些調整:

children = [
  {GenstageExample.Producer, 0},
  {GenstageExample.ProducerConsumer, []},
  %{
    id: 1,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
  %{
    id: 2,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
]

現在已經配置了兩個消費者,看看如果執行應用程式會得到什麼:

$ mix run --no-halt
{#PID<0.120.0>, 2, :state_doesnt_matter}
{#PID<0.121.0>, 4, :state_doesnt_matter}
{#PID<0.120.0>, 6, :state_doesnt_matter}
{#PID<0.120.0>, 8, :state_doesnt_matter}
...
{#PID<0.120.0>, 86478, :state_doesnt_matter}
{#PID<0.121.0>, 87338, :state_doesnt_matter}
{#PID<0.120.0>, 86480, :state_doesnt_matter}
{#PID<0.120.0>, 86482, :state_doesnt_matter}

正如所看到的,現在擁有多個 PID,只需簡單的加入一行程式碼並提供消費者 ID 即可。

使用案例

現在已經介紹了 GenStage 並建立了第一個範例應用程式,但 GenStage 有哪些 真實 案例?

而這些還只是 GenStage 的 一小部分 可能性。

Caught a mistake or want to contribute to the lesson? Edit this lesson on GitHub!