Do you want to pick up from where you left of?
Take me there

GenStage

В этом уроке мы поближе рассмотрим GenStage, узнаем, какую роль он выполняет, и как мы можем использовать его в наших приложениях.

Вступление

Так что же такое GenStage? В официальной документации написано, что это “спецификация и поток вычислений для Elixir”, но что это означает для нас?

Это означает, что GenStage предоставляет нам возможность определить конвейер задач, который будет выполняться независимыми шагами (или этапами) в отдельных процессах. Если вы уже работали с конвейерами, тогда некоторые из этих понятий могут быть вам знакомы.

Чтобы лучше понять, как это работает, давайте визуализируем простой поток производитель-потребитель:

[A] -> [B] -> [C]

В этом примере у нас три стадии: A — производитель, B — производитель-потребитель, и C — потребитель. A производит значение, которое потребляет B, B выполняет какую-то работу и возвращает значение, которое отправляется нашему потребителю C. Роль стадии важна, что мы увидим в следующей секции.

Пока в нашем примере производитель относится к потребителю один к одному, но можно одновременно иметь несколько производителей и множество потребителей на любой стадии.

Чтобы лучше проиллюстрировать эти понятия, мы построим конвейер с помощью GenStage, но сначала давайте рассмотрим чуть подробнее роли, на которые GenStage опирается.

Производитель и потребитель

Как мы уже рассмотрели, роль, которую мы даем нашей стадии, важна. Спецификация GenStage принимает три роли:

Заметили, что производители ожидают требования? Благодаря GenStage наши потребители посылают требование вверх по потоку и обрабатывают данные от производителя. Это облегчает использование механизма, который называется обратное давление. Обратное давление перекладывает ответственность на производителя, чтобы не перегружать потребителей, когда они заняты.

Теперь, когда мы рассмотрели роли в GenStage, займёмся нашим приложением.

Начало работы

В этом примере мы напишем GenStage приложение, которое генерирует числа, выбирает четные и в конце печатает их.

Для нашего приложения мы будем использовать все три GenStage роли. Производитель будет отвечать за подсчет и генерацию чисел. Мы будем использовать производителя-потребителя, чтобы отфильтровать только четные числа, а затем реагировать на запросы, которые приходят снизу потока. Последним мы добавим потребителя, который будет отображать результат.

Мы начнем с создания проекта с деревом надзора:

mix new genstage_example --sup
cd genstage_example

Давайте обновим наши зависимости в mix.exs, добавив gen_stage:

defp deps do
  [
    {:gen_stage, "~> 1.0.0"},
  ]
end

Мы должны скачать зависимости и скомпилировать их, прежде чем двигаться дальше:

mix do deps.get, compile

Теперь мы готовы к созданию нашего приложения!

Производитель

Первый шаг нашего GenStage приложения — это создание нашего производителя. Как мы обсуждали ранее, мы хотим создать производителя, который генерирует постоянный поток чисел. Давайте создадим файл для этого производителя:

touch lib/genstage_example/producer.ex

Теперь мы можем добавить код:

defmodule GenstageExample.Producer do
  use GenStage

  def start_link(initial \\ 0) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end

Две важные части, которые нужно принять к сведению — init/1 и handle_demand/2. В init/1 мы устанавливаем начальное состояние, как мы делали в наших GenServer, но, что более важно, мы помечаем запускаемый процесс производителем. На основе результата вызова функции init/1 GenStage классифицирует процесс.

В функции handle_demand/2 находится основная часть нашего производителя. Она должна быть реализована всеми производителями GenStage. Здесь мы возвращаем множество чисел, затребованных потребителями, и увеличиваем счетчик. Требование от потребителей (demand в нашем коде) представляется в виде целого числа, соответствующего числу событий, которые они могут обрабатывать, по умолчанию 1000.

Производитель-потребитель

Теперь, когда у нас есть производитель, генерирующий числа, перейдем к нашему производителю-потребителю. Мы хотим запрашивать числа от производителя, фильтровать нечетные и реагировать на требования.

touch lib/genstage_example/producer_consumer.ex

Давайте обновим наш файл, чтобы он выглядел как пример кода:

defmodule GenstageExample.ProducerConsumer do
  use GenStage

  require Integer

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.filter(&Integer.is_even/1)

    {:noreply, numbers, state}
  end
end

Вы могли заметить, что вместе с нашим производителем-потребителем мы ввели новую функцию handle_events/3 и опцию в init/1. С помощью опции subscribe_to мы поручаем GenStage связать нас с конкретным производителем.

Метод handle_events/3 — наша рабочая лошадка, где мы получаем наши входящие события, обрабатываем их и возвращаем преобразованный набор. Как мы увидим, потребители реализуются во многом таким же образом, но есть важное отличие — что именно handle_events/3 возвращает и как это используется. Когда мы назначаем наш процесс потребителем, второй аргумент кортежа, numbers в нашем случае, используется для удовлетворения требования потребителей ниже по потоку. В потребителях это значение сбрасывается.

Потребитель

Остался последний по порядку, но не по важности процесс — потребитель. Начнём:

touch lib/genstage_example/consumer.ex

Поскольку потребитель и производитель-потребитель очень схожи, наш код не будет сильно отличаться:

defmodule GenstageExample.Consumer do
  use GenStage

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # Так как мы потребители, мы не создаем события
    {:noreply, [], state}
  end
end

Как мы рассмотрели в предыдущем разделе, потребитель не создает события, так что второе значение в нашем кортеже будет отброшено.

Собираем все вместе

Теперь, когда мы создали производителя, производителя-потребителя и потребителя, мы готовы соединить это все вместе.

Давайте откроем файл lib/genstage_example/application.ex и добавим наши новые процессы в дерево надзора:

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    {GenstageExample.Producer, 0},
    {GenstageExample.ProducerConsumer, []},
    {GenstageExample.Consumer, []}
  ]

  opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
  Supervisor.start_link(children, opts)
end

Если все правильно, мы можем запустить наш проект и должны увидеть, что все работает:

$ mix run --no-halt
{#PID<0.109.0>, 0, :state_doesnt_matter}
{#PID<0.109.0>, 2, :state_doesnt_matter}
{#PID<0.109.0>, 4, :state_doesnt_matter}
{#PID<0.109.0>, 6, :state_doesnt_matter}
...
{#PID<0.109.0>, 229062, :state_doesnt_matter}
{#PID<0.109.0>, 229064, :state_doesnt_matter}
{#PID<0.109.0>, 229066, :state_doesnt_matter}

Мы сделали это! Как мы и ожидали, наше приложение пропускает только четные числа и делает это быстро.

На данный момент у нас есть рабочий конвейер. Производитель генерирует числа, производитель-потребитель отбрасывает нечетные числа и потребитель отображает все это и продолжает процесс.

Несколько производителей или потребителей

Мы уже упоминали во вступлении, что возможно реализовать более одного производителя или потребителя. Давайте взглянем на это.

Если мы рассмотрим вывод IO.inspect/1 из нашего приложения, мы увидим, что все события обрабатываются одним PID. Давайте внесем некоторые корректировки для нескольких работников путем изменения lib/genstage_example/application.ex:

children = [
  {GenstageExample.Producer, 0},
  {GenstageExample.ProducerConsumer, []},
  %{
    id: 1,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
  %{
    id: 2,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
]

Теперь, когда мы настроили двух потребителей, давайте посмотрим, что мы получим, если мы запустим наше приложение:

$ mix run --no-halt
{#PID<0.120.0>, 0, :state_doesnt_matter}
{#PID<0.120.0>, 2, :state_doesnt_matter}
{#PID<0.120.0>, 4, :state_doesnt_matter}
{#PID<0.120.0>, 6, :state_doesnt_matter}
...
{#PID<0.120.0>, 86478, :state_doesnt_matter}
{#PID<0.121.0>, 87338, :state_doesnt_matter}
{#PID<0.120.0>, 86480, :state_doesnt_matter}
{#PID<0.120.0>, 86482, :state_doesnt_matter}

Как вы можете видеть, мы получили параллельную обработку в нескольких процессах, просто добавив строку кода и назначив нашим потребителям ID.

Сценарии использования

Только что мы рассмотрели GenStage и построили наше первое приложение. А в каких случаях можно реально использовать GenStage?

Это всего лишь некоторые возможности использования GenStage.

Caught a mistake or want to contribute to the lesson? Edit this lesson on GitHub!