Do you want to pick up from where you left of?
Take me there

GenStage

Σε αυτό το μάθημα θα δούμε πιο αναλυτικά το GenStage, τι ρόλο παίζει και πως θα το αξιοποιήσουμε στις εφαρμογές μας.

Εισαγωγή

Τι είναι το GenStage; Από την επίσημη τεκμηρίωση, είναι “ένας προσδιορισμός και μια επεξεργαστική ροή για την Elixir”, αλλά τι σημαίνει αυτό για εμάς;

Σημαίνει ότι το GenStage μας παρέχει έναν τρόπο να ορίζουμε έναν αγωγό εργασίας που εκτελείται από ανεξάρτητα βήματα (ή στάδια) σε ξεχωριστές διεργασίες· Αν έχετε δουλέψει με αγωγούς στο παρελθόν τότε μερικές από τις έννοιες θα σας είναι γνώριμες.

Για να κατανοήσετε καλύτερα πως δουλεύει, ας σκεφτούμε μια απλή ροή παραγωγού-καταναλωτή:

[A] -> [B] -> [C]

Σε αυτό το παράδειγμα έχουμε τρία στάδια: το A είναι ένας παραγωγός, το B είναι ένας παραγωγός-καταναλωτής και το C είναι ένας καταναλωτής. Το A παράγει μια τιμή η οποία καταναλώνεται από το B, το B κάνει κάποια εργασία και επιστρέφει μια νέα τιμή η οποία λαμβάνεται από τον καταναλωτή μας C· Ο ρόλος του σταδίου μας είναι σημαντικός, όπως θα δούμε στον επόμενο τομέα.

Παρόλο που το παράδειγμά μας είναι 1-προς-1 παραγωγός-προς-καταναλωτή, είναι δυνατό να έχουμε πολλαπλούς παραγωγούς και πολλαπλούς καταναλωτές σε οποιοδήποτε στάδιο.

Για να παρουσιάσουμε καλύτερα αυτές τις έννοιες θα κατασκευάσουμε έναν αγωγό με το GenStage, αλλά πρώτα ας εξερευνήσουμε περισσότερο τους ρόλους στους οποίους βασίζεται το GenStage.

Καταναλωτές και Παραγωγοί

Όπως έχουμε διαβάσει, ο ρόλος που δίνουμε στο στάδιό μας είναι σημαντικός. Οι προδιαγραφές του GenStage αναγνωρίζουν τρεις ρόλους:

Παρατηρείτε ότι οι παραγωγοί μας περιμένουν για ζήτηση; Με το GenStage οι καταναλωτές στέλνουν αιτήσεις προς τα πάνω και επεξεργάζονται τα δεδομένα από τον παραγωγό. Αυτό διευκολύνει ένα μηχανισμό γνωστό ως αντίθλιψη. Η αντίθλιψη βάζει το βάρος στον παραγωγό ώστε να μην πιέζει υπερβολικά όταν οι καταναλωτές είναι απασχολημένοι.

Τώρα που καλύψαμε τους ρόλους στο GenStage, ας ξεκινήσουμε με την εφαρμογή μας.

Ξεκινώντας

Σε αυτό το παράδειγμα θα κατασκευάσουμε μια εφαρμογή GenStage που στέλνει αριθμούς, ταξινομεί τους ζυγούς και τέλος τους τυπώνει.

Στην εφαρμογή μας, θα χρησιμοποιήσουμε και τους τρεις ρόλους GenStage. Ο παραγωγός μας θα είναι υπεύθυνος για το μέτρημα και την αποστολή των αριθμών. Θα χρησιμοποιήσουμε ένα παραγωγό-καταναλωτή για να φιλτράρουμε τους ζυγούς αριθμούς και αργότερα να ανταποκρινόμαστε στη ζήτηση από κάτω. Τέλος θα χτίσουμε ένα καταναλωτή για να εμφανίσουμε τους εναπομείναντες αριθμούς.

Θα ξεκινήσουμε δημιουργώντας ένα project με ένα δέντρο παρακολούθησης:

mix new genstage_example --sup
cd genstage_example

Ας αναβαθμίσουμε τις εξαρτήσεις μας στο mix.exs για να συμπεριλάβουμε το gen_stage:

defp deps do
  [
    {:gen_stage, "~> 1.0.0"},
  ]
end

Θα πρέπει να κατεβάσουμε τις εξαρτήσεις μας και να τις συντάξουμε πριν προχωρήσουμε:

mix do deps.get, compile

Τώρα είμαστε έτοιμοι να χτίστουμε τον παραγωγό μας!

Παραγωγός

Το πρώτο βήμα της εφαρμογής GenStage μας είναι να δημιουργήσουμε τον παραγωγό μας. Όπως συζητήσαμε πριν, θέλουμε να δημιουργήσουμε ένα παραγωγό που στέλνει μια συνεχή ροή αριθμών. Ας δημιουργήσουμε το αρχείο παραγωγού:

touch lib/genstage_example/producer.ex

Τώρα μπορούμε να προσθέσουμε τον κώδικα:

defmodule GenstageExample.Producer do
  use GenStage

  def start_link(initial \\ 0) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end

Τα δύο πιο σημαντικά μέρη που πρέπει να σημειώσουμε εδώ είναι οι init/1 και handle_demand/2. Στην init/1 ορίζουμε την αρχική κατάσταση όπως θα κάναμε στους GenServers μας αλλά το πιο σημαντικό είναι ότι μας βάζουμε την ταμπέλα του παραγωγού. Η απάντηση της συνάρτησής μας init/1 είναι αυτή στην οποία στηρίζεται το GenStage για να ταξινομήσει την διεργασία μας.

Η συνάρτηση handle_demand/2 είναι εκεί που ορίζεται το μεγαλύτερο μέρος του παραγωγού μας. Πρέπει να υλοποιηθεί από όλους τους GenStage παραγωγούς. Εδώ επιστρέφουμε το σετ των αριθμών που ζητούνται από τους καταναλωτές μας και αυξάνουμε το μετρητή μας. Η ζήτηση από τους καταναλωτές, η μεταβλητή demand στον κώδικα από πάνω, είναι ένας ακέραιος που αναπαριστά τον αριθμό των συμβάντων που μπορούν να χειριστούν· με προκαθορισμένο το 1000.

Παραγωγός Καταναλωτής

Τώρα που έχουμε έναν παραγωγό-γεννήτρια αριθμών ας μεταβούμε στον παραγωγό-καταναλωτή μας. Θα θέλουμε να ζητήσουμε αριθμούς από τον παραγωγό μας, να αφαιρέσουμε τους μονούς και να απαντήσουμε στη ζήτηση.

touch lib/genstage_example/producer_consumer.ex

Ας αναβαθμίσουμε το αρχείο μας για να δείχνει όπως ο κώδικας στο παράδειγμα:

defmodule GenstageExample.ProducerConsumer do
  use GenStage

  require Integer

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.filter(&Integer.is_even/1)

    {:noreply, numbers, state}
  end
end

Όπως ίσως παρατηρήσατε, με τον παραγωγό-καταναλωτή μας παρουσιάζουμε μια νέα συνάρτηση, την handle_events/3 και μια νέα επιλογή στην init/1. Με την επιλογή subscribe_to καθοδηγούμε το GenStage να μας βάλει σε επικοινωνία με ένα συγκεκριμμένο παραγωγό.

Η μέθοδος handle_events/3 είναι η κινητήριος δύναμή μας, εκεί όπου λαμβάνουμε τα εισερχόμενα συμβάντα, τα επεξεργαζόμαστε και επιστρέφουμε το μεταμορφωμένο σετ. Όπως θα δούμε οι καταναλωτές υλοποιούνται με περίπου τον ίδιο τρόπο αλλά η σημαντική διαφορά είναι στο τι επιστρέφει η μέθοδος handle_events/3 και στο πως χρησιμοποιείται. Όταν μαρκάρουμε την διεργασία μας σαν παραγωγό-καταναλωτή το δεύτερο όρισμα της τούπλας μας, η numbers σε αυτή την περίπτωση, χρησιμοποιείται για να ανταποκριθεί στη ζήτηση προς τα κάτω. Στους καταναλωτές αυτή η τιμή απορρίπτεται.

Καταναλωτής

Τελευταίο αλλά εξίσου σημαντικό έχουμε τον καταναλωτή μας. Ας ξεκινήσουμε:

touch lib/genstage_example/consumer.ex

Από τη στιγμή που οι καταναλωτές και οι παραγωγοί-καταναλωτές είναι τόσο όμοιοι ο κώδικάς μας δεν θα δείχνει τόσο διαφορετικός:

defmodule GenstageExample.Consumer do
  use GenStage

  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

Όπως είδαμε στον προηγούμενο τομέα ο καταναλωτής μας δεν στέλνει συμβάντα, έτσι η δεύτερη τιμή στην τούπλα μας θα απορριφθεί.

Συνδεση

Τώρα που έχουμε τον παραγωγό μας, τον παραγωγό-καταναλωτή και τον καταναλωτή έτοιμους είμαστε έτοιμοι να τους συνδέσουμε.

Ας ξεκινήσουμε ανοίγοντας το lib/genstage_example/application.ex και προσθέτοντας τις διεργασίες μας στο δέντρο παρακολούθησης:

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    {GenstageExample.Producer, 0},
    {GenstageExample.ProducerConsumer, []},
    {GenstageExample.Consumer, []}
  ]

  opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
  Supervisor.start_link(children, opts)
end

Αν όλα είναι σωστά, μπορούμε να τρέξουμε το project και θα πρέπει να δούμε τα πάντα να δουλέυουν:

$ mix run --no-halt
{#PID<0.109.0>, 0, :state_doesnt_matter}
{#PID<0.109.0>, 2, :state_doesnt_matter}
{#PID<0.109.0>, 4, :state_doesnt_matter}
{#PID<0.109.0>, 6, :state_doesnt_matter}
...
{#PID<0.109.0>, 229062, :state_doesnt_matter}
{#PID<0.109.0>, 229064, :state_doesnt_matter}
{#PID<0.109.0>, 229066, :state_doesnt_matter}

Τα καταφέραμε! Όπως αναμέναμε η εφαρμογή μας στέλνει μόνο ζυγούς αριθμούς και το κάνει γρήγορα.

Σε αυτό το σημείο έχουμε έναν αγωγό που λειτουργεί. Υπαρχει ένας παραγωγός που στέλνει αριθμούς, ένας παραγωγός καταναλωτής που διαγράφει τους μονούς αριθμούς, και ένας καταναλωτής που τα εμφανίζει όλα αυτά και συνεχίζει τη ροή.

Πολλαπλοί Παραγωγοί ή Καταναλωτές

Αναφέραμε στην εισαγωγή ότι θα μπορούσαμε να έχουμε πάνω από ένα παραγωγούς ή καταναλωτές. Ας ρίξουμε μια ματιά σε αυτά.

Αν εξετάσουμε την έξοδο της IO.inspect/1 από το παράδειγμά μας θα δούμε ότι κάθε συμβάν το χειρίζεται μια μοναδική PID. Ας κάνουμε κάποιες αλλαγές για πολλαπλούς εργάτες αλλάζοντας το lib/genstage_example/application.ex:

children = [
  {GenstageExample.Producer, 0},
  {GenstageExample.ProducerConsumer, []},
  %{
    id: 1,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
  %{
    id: 2,
    start: {GenstageExample.Consumer, :start_link, [[]]}
  },
]

Τώρα που ορίσαμε δύο καταναλωτές ας δούμε τι παίρνουμε αν τρέξουμε τώρα την εφαρμογή μας:

$ mix run --no-halt
{#PID<0.120.0>, 0, :state_doesnt_matter}
{#PID<0.120.0>, 2, :state_doesnt_matter}
{#PID<0.120.0>, 4, :state_doesnt_matter}
{#PID<0.120.0>, 6, :state_doesnt_matter}
...
{#PID<0.120.0>, 86478, :state_doesnt_matter}
{#PID<0.121.0>, 87338, :state_doesnt_matter}
{#PID<0.120.0>, 86480, :state_doesnt_matter}
{#PID<0.120.0>, 86482, :state_doesnt_matter}

Όπως θα δείτε τώρα έχουμε πολλαπλά PID, απλά προσθέτοντας μια γραμμή κώδικα και δίνοντας ID στους καταναλωτές μας.

Περιπτώσεις Χρήσης

Τώρα που καλύψαμε το GenStage και χτίσαμε την πρώτη μας δοκιμαστική εφαρμογή, ποιές είναι μερικές από τις πραγματικές περιπτώσεις χρήσης για το GenStage;

Αυτές είναι μόλις μερικές από τις δυνατότητες για το GenStage.

Έπιασες λάθος ή θέλεις να συνεισφέρεις στο μάθημα; Επεξεργαστείτε αυτό το μάθημα στο GitHub!