GenStage
В этом уроке мы поближе рассмотрим GenStage, узнаем, какую роль он выполняет, и как мы можем использовать его в наших приложениях.
Вступление
Так что же такое GenStage? В официальной документации написано, что это “спецификация и поток вычислений для Elixir”, но что это означает для нас?
Это означает, что GenStage предоставляет нам возможность определить конвейер задач, который будет выполняться независимыми шагами (или этапами) в отдельных процессах. Если вы уже работали с конвейерами, тогда некоторые из этих понятий могут быть вам знакомы.
Чтобы лучше понять, как это работает, давайте визуализируем простой поток производитель-потребитель:
[A] -> [B] -> [C]
В этом примере у нас три стадии: A
— производитель, B
— производитель-потребитель, и C
— потребитель.
A
производит значение, которое потребляет B
, B
выполняет какую-то работу и возвращает значение, которое отправляется нашему потребителю C
. Роль стадии важна, что мы увидим в следующей секции.
Пока в нашем примере производитель относится к потребителю один к одному, но можно одновременно иметь несколько производителей и множество потребителей на любой стадии.
Чтобы лучше проиллюстрировать эти понятия, мы построим конвейер с помощью GenStage, но сначала давайте рассмотрим чуть подробнее роли, на которые GenStage опирается.
Производитель и потребитель
Как мы уже рассмотрели, роль, которую мы даем нашей стадии, важна. Спецификация GenStage принимает три роли:
-
:producer
— Источник данных. Производители ждут требования от потребителей и реагируют, выполняя запрошенные действия. -
:producer_consumer
— И источник, и потребитель. Производитель-потребитель может как реагировать на требования от потребителей, так и запрашивать выполнение действий от производителей. -
:consumer
— Потребитель. Потребитель запрашивает и получает данные от производителя.
Заметили, что производители ожидают требования? Благодаря GenStage наши потребители посылают требование вверх по потоку и обрабатывают данные от производителя. Это облегчает использование механизма, который называется обратное давление. Обратное давление перекладывает ответственность на производителя, чтобы не перегружать потребителей, когда они заняты.
Теперь, когда мы рассмотрели роли в GenStage, займёмся нашим приложением.
Начало работы
В этом примере мы напишем GenStage приложение, которое генерирует числа, выбирает четные и в конце печатает их.
Для нашего приложения мы будем использовать все три GenStage роли. Производитель будет отвечать за подсчет и генерацию чисел. Мы будем использовать производителя-потребителя, чтобы отфильтровать только четные числа, а затем реагировать на запросы, которые приходят снизу потока. Последним мы добавим потребителя, который будет отображать результат.
Мы начнем с создания проекта с деревом надзора:
mix new genstage_example --sup
cd genstage_example
Давайте обновим наши зависимости в mix.exs
, добавив gen_stage
:
defp deps do
[
{:gen_stage, "~> 1.0.0"},
]
end
Мы должны скачать зависимости и скомпилировать их, прежде чем двигаться дальше:
mix do deps.get, compile
Теперь мы готовы к созданию нашего приложения!
Производитель
Первый шаг нашего GenStage приложения — это создание нашего производителя. Как мы обсуждали ранее, мы хотим создать производителя, который генерирует постоянный поток чисел. Давайте создадим файл для этого производителя:
touch lib/genstage_example/producer.ex
Теперь мы можем добавить код:
defmodule GenstageExample.Producer do
use GenStage
def start_link(initial \\ 0) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(counter), do: {:producer, counter}
def handle_demand(demand, state) do
events = Enum.to_list(state..(state + demand - 1))
{:noreply, events, state + demand}
end
end
Две важные части, которые нужно принять к сведению — init/1
и handle_demand/2
.
В init/1
мы устанавливаем начальное состояние, как мы делали в наших GenServer, но, что более важно, мы помечаем запускаемый процесс производителем.
На основе результата вызова функции init/1
GenStage классифицирует процесс.
В функции handle_demand/2
находится основная часть нашего производителя.
Она должна быть реализована всеми производителями GenStage.
Здесь мы возвращаем множество чисел, затребованных потребителями, и увеличиваем счетчик.
Требование от потребителей (demand
в нашем коде) представляется в виде целого числа, соответствующего числу событий, которые они могут обрабатывать, по умолчанию 1000.
Производитель-потребитель
Теперь, когда у нас есть производитель, генерирующий числа, перейдем к нашему производителю-потребителю. Мы хотим запрашивать числа от производителя, фильтровать нечетные и реагировать на требования.
touch lib/genstage_example/producer_consumer.ex
Давайте обновим наш файл, чтобы он выглядел как пример кода:
defmodule GenstageExample.ProducerConsumer do
use GenStage
require Integer
def start_link(_initial) do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
{:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
end
def handle_events(events, _from, state) do
numbers =
events
|> Enum.filter(&Integer.is_even/1)
{:noreply, numbers, state}
end
end
Вы могли заметить, что вместе с нашим производителем-потребителем мы ввели новую функцию handle_events/3
и опцию в init/1
.
С помощью опции subscribe_to
мы поручаем GenStage связать нас с конкретным производителем.
Метод handle_events/3
— наша рабочая лошадка, где мы получаем наши входящие события, обрабатываем их и возвращаем преобразованный набор.
Как мы увидим, потребители реализуются во многом таким же образом, но есть важное отличие — что именно handle_events/3
возвращает и как это используется.
Когда мы назначаем наш процесс потребителем, второй аргумент кортежа, numbers
в нашем случае, используется для удовлетворения требования потребителей ниже по потоку.
В потребителях это значение сбрасывается.
Потребитель
Остался последний по порядку, но не по важности процесс — потребитель. Начнём:
touch lib/genstage_example/consumer.ex
Поскольку потребитель и производитель-потребитель очень схожи, наш код не будет сильно отличаться:
defmodule GenstageExample.Consumer do
use GenStage
def start_link(_initial) do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect({self(), event, state})
end
# Так как мы потребители, мы не создаем события
{:noreply, [], state}
end
end
Как мы рассмотрели в предыдущем разделе, потребитель не создает события, так что второе значение в нашем кортеже будет отброшено.
Собираем все вместе
Теперь, когда мы создали производителя, производителя-потребителя и потребителя, мы готовы соединить это все вместе.
Давайте откроем файл lib/genstage_example/application.ex
и добавим наши новые процессы в дерево надзора:
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
{GenstageExample.Producer, 0},
{GenstageExample.ProducerConsumer, []},
{GenstageExample.Consumer, []}
]
opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
Supervisor.start_link(children, opts)
end
Если все правильно, мы можем запустить наш проект и должны увидеть, что все работает:
$ mix run --no-halt
{#PID<0.109.0>, 0, :state_doesnt_matter}
{#PID<0.109.0>, 2, :state_doesnt_matter}
{#PID<0.109.0>, 4, :state_doesnt_matter}
{#PID<0.109.0>, 6, :state_doesnt_matter}
...
{#PID<0.109.0>, 229062, :state_doesnt_matter}
{#PID<0.109.0>, 229064, :state_doesnt_matter}
{#PID<0.109.0>, 229066, :state_doesnt_matter}
Мы сделали это! Как мы и ожидали, наше приложение пропускает только четные числа и делает это быстро.
На данный момент у нас есть рабочий конвейер. Производитель генерирует числа, производитель-потребитель отбрасывает нечетные числа и потребитель отображает все это и продолжает процесс.
Несколько производителей или потребителей
Мы уже упоминали во вступлении, что возможно реализовать более одного производителя или потребителя. Давайте взглянем на это.
Если мы рассмотрим вывод IO.inspect/1
из нашего приложения, мы увидим, что все события обрабатываются одним PID.
Давайте внесем некоторые корректировки для нескольких работников путем изменения lib/genstage_example/application.ex
:
children = [
{GenstageExample.Producer, 0},
{GenstageExample.ProducerConsumer, []},
%{
id: 1,
start: {GenstageExample.Consumer, :start_link, [[]]}
},
%{
id: 2,
start: {GenstageExample.Consumer, :start_link, [[]]}
},
]
Теперь, когда мы настроили двух потребителей, давайте посмотрим, что мы получим, если мы запустим наше приложение:
$ mix run --no-halt
{#PID<0.120.0>, 0, :state_doesnt_matter}
{#PID<0.120.0>, 2, :state_doesnt_matter}
{#PID<0.120.0>, 4, :state_doesnt_matter}
{#PID<0.120.0>, 6, :state_doesnt_matter}
...
{#PID<0.120.0>, 86478, :state_doesnt_matter}
{#PID<0.121.0>, 87338, :state_doesnt_matter}
{#PID<0.120.0>, 86480, :state_doesnt_matter}
{#PID<0.120.0>, 86482, :state_doesnt_matter}
Как вы можете видеть, мы получили параллельную обработку в нескольких процессах, просто добавив строку кода и назначив нашим потребителям ID.
Сценарии использования
Только что мы рассмотрели GenStage и построили наше первое приложение. А в каких случаях можно реально использовать GenStage?
-
Конвейер преобразования данных — производителям необязательно быть простыми генераторами чисел. Мы могли бы создавать события из базы данных или даже из другого источника, например из Apache Kafka. Благодаря сочетанию производителей-потребителей и потребителей мы могли бы обрабатывать, сортировать, каталогизировать и хранить метрики как только они становятся доступными.
-
Очередь работ — поскольку события могут быть чем угодно, мы могли бы производить набор последовательных операций, которые должны быть завершены серией потребителей.
-
Обработка событий — по аналогии с конвейером данных, мы могли бы получать, обрабатывать, сортировать и принимать решения по событиям, переданным в режиме реального времени от наших источников.
Это всего лишь некоторые возможности использования GenStage.
Caught a mistake or want to contribute to the lesson? Edit this lesson on GitHub!