Do you want to pick up from where you left of?
Take me there

GenStage

Nesta lição vamos examinar de perto o GenStage, para que serve e como podemos usá-lo em nossas aplicações.

Introdução

Então, o que é GenStage? De acordo com a documentação oficial, é uma “especificação e um fluxo computacional para o Elixir”, mas o que isso significa pra nós?

Significa que o GenStage nos fornece uma forma de definir um pipeline de trabalho a ser realizado por passos independentes (ou etapas) em processos separados; se você já trabalhou com pipelines anteriormente, então alguns desses conceitos devem ser familiares.

Para entender melhor como isso funciona, vamos visualizar um simples fluxo produtor-consumidor:

[A] -> [B] -> [C]

Neste exemplo temos três etapas: A um produtor, B um produtor-consumidor, e C um consumidor. A produz um valor que é consumido por B, B executa algum trabalho e retorna um novo valor que é recebido pelo nosso consumidor C; o papel da nossa etapa é importante como veremos na próxima seção.

Enquanto nosso exemplo é 1-para-1, produtor-para-consumidor, é possível que ambos tenham múltiplos produtores e múltiplos consumidores em qualquer etapa.

Para ilustrar melhor esses conceitos, vamos construir um pipeline com GenStage, mas antes vamos explorar os papéis que o GenStage depende um pouco mais.

Consumidores e Produtores

Conforme lemos, o papel que damos à nossa etapa é importante. A especificação do GenStage reconhece três papéis:

Notou que nossos produtores esperam por demanda? Com o GenStage nossos consumidores enviam demanda e processam os dados de nosso produtor. Isso facilita o mecanismo conhecido como back-pressure. Back-pressure coloca o ônus no produtor a não gerar sobrepressão quando consumidores estão ocupados.

Agora que cobrimos os papéis dentro do GenStage, vamos começar a nossa aplicação.

Começando

Neste exemplo construiremos uma aplicação GenStage que emite números, separa os números pares, e finalmente os imprime.

Para nossa aplicação usaremos todos os três papéis do GenStage. Nosso produtor será responsável por contar e emitir números. Usaremos um produtor-consumidor para filtrar somente os números pares e depois responder à demanda. Por último, vamos construir um consumidor para nos mostrar os números restantes.

Começaremos gerando um projeto com uma árvore de supervisão:

mix new genstage_example --sup
cd genstage_example

Vamos atualizar nossas dependências no mix.exs para incluir gen_stage:

defp deps do
  [
    {:gen_stage, "~> 1.0.0"}
  ]
end

Devemos buscar nossas dependências e compilar antes de avançar mais:

mix do deps.get, compile

Agora estamos prontos para construir nosso produtor!

Produtor

O primeiro passo da nossa aplicação GenStage é criar nosso produtor. Conforme falamos antes, queremos criar um produtor que emite um fluxo constante de números. Vamos criar o arquivo do nosso produtor:

touch lib/genstage_example/producer.ex

Agora podemos adicionar o código:

defmodule GenstageExample.Producer do
  use GenStage

  def start_link(initial \\ 0) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end

As duas partes mais importantes para tomar nota aqui são init/1 e handle_demand/2. No init/1 definimos o estado inicial como fizemos em nossos GenServers, mas mais importante, nos rotulamos como produtores. A resposta da nossa função init/1 é o que o GenStage confia para classificar nossos processo.

A função handle_demand/2 é onde a maioria de nosso produtor está definida. Ela precisa ser implementada por todos os produtores GenStage. Aqui retornamos o conjunto de números demandados pelos nossos consumidores e incrementamos nosso contador. A demanda dos consumidores, demand no nosso código acima, é representada como um inteiro correspondendo ao número de eventos que eles podem tratar; seu padrão é 1000.

Produtor Consumidor

Agora que temos um produtor gerador de números, vamos ao nosso produtor-consumidor. Queremos solicitar números de nosso produtor, filtrar os ímpares, e responder à demanda.

touch lib/genstage_example/producer_consumer.ex

Vamos atualizar nosso arquivo para se parecer com o código de exemplo:

defmodule GenstageExample.ProducerConsumer do
  use GenStage

  require Integer

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.filter(&Integer.is_even/1)

    {:noreply, numbers, state}
  end
end

Você deve ter notado com nosso produtor-consumidor que introduzimos uma nova opção no init/1 e uma nova função: handle_events/3. Com a opção subscribe_to, instruímos o GenStage a nos colocar em comunicação com um produtor específico.

A função handle_events/3 é nosso cavalo de batalha, onde recebemos nossos eventos de entrada, os processamos, e retornamos nosso conjunto modificado. Como veremos, consumidores são implementados de maneira muito semelhante, mas a diferença importante é que nossa função handle_events/3 retorna e como ela é usada. Quando rotulamos nosso processo um produtor_consumidor, o segundo argumento da nossa tupla — numbers no nosso caso — é usado para conhecer a demanda de consumidores. Em consumidores esse valor é descartado.

Consumidor

Por último, mas não menos importante, nós temos nosso consumidor. Vamos começar:

touch lib/genstage_example/consumer.ex

Uma vez que consumidores e produtores-consumidores são tão similares, nosso código não será muito diferente:

defmodule GenstageExample.Consumer do
  use GenStage

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

Conforme abordamos na seção anterior, nosso consumidor não emite eventos, então o segundo valor em nossa tupla será descartado.

Colocando tudo junto

Agora que temos nosso produtor, produtor-consumidor, e consumidor construídos, estamos prontos para ligá-los todos juntos.

Vamos começar abrindo o lib/genstage_example/application.ex e adicionando nosso novo processo para a árvore de supervisores:

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    {GenstageExample.Producer, 0},
    {GenstageExample.ProducerConsumer, []},
    {GenstageExample.Consumer, []}
  ]

  opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
  Supervisor.start_link(children, opts)
end

Se tudo estiver certo, podemos executar nosso projeto e devemos ver tudo funcionando:

$ mix run --no-halt
{#PID<0.109.0>, 2, :state_doesnt_matter}
{#PID<0.109.0>, 4, :state_doesnt_matter}
{#PID<0.109.0>, 6, :state_doesnt_matter}
...
{#PID<0.109.0>, 229062, :state_doesnt_matter}
{#PID<0.109.0>, 229064, :state_doesnt_matter}
{#PID<0.109.0>, 229066, :state_doesnt_matter}

Pronto! Como esperávamos, nossa aplicação apenas omite números pares e faz isso rapidamente.

Neste ponto temos um pipeline funcionando. Existe um produtor emitindo números, um produtor-consumidor descartando números ímpares, e um consumidor mostrando tudo isso e continuando o fluxo.

Múltiplos Produtores ou Consumidores

Mencionamos na introdução que era possível ter mais que um produtor ou consumidor. Vamos dar uma olhada nisso.

Se examinarmos a saída do IO.inspect/1 do nosso exemplo, vemos que todo evento é tratado por um único PID. Vamos fazer alguns ajustes para múltiplos workers modificando o lib/genstage_example/application.ex:

children = [
  {GenstageExample.Producer, 0},
  {GenstageExample.ProducerConsumer, []},
  %{
    id: 1,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
  %{
    id: 2,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
]

Agora que configuramos dois consumidores, vamos ver o que obtemos se rodarmos nossa aplicação agora:

$ mix run --no-halt
{#PID<0.120.0>, 0, :state_doesnt_matter}
{#PID<0.120.0>, 2, :state_doesnt_matter}
{#PID<0.120.0>, 4, :state_doesnt_matter}
{#PID<0.120.0>, 6, :state_doesnt_matter}
...
{#PID<0.120.0>, 86478, :state_doesnt_matter}
{#PID<0.121.0>, 87338, :state_doesnt_matter}
{#PID<0.120.0>, 86480, :state_doesnt_matter}
{#PID<0.120.0>, 86482, :state_doesnt_matter}

Como você pode ver, agora nós temos múltiplos PIDs, simplesmente adicionando uma linha de código e dando IDs aos nossos consumidores.

Casos de Uso

Agora que cobrimos o GenStage e construímos nossa primeira aplicação exemplo, quais são alguns casos de uso reais do GenStage?

Estas são apenas algumas possibilidades para o GenStage.

Caught a mistake or want to contribute to the lesson? Edit this lesson on GitHub!