Fork me on GitHub

GenStage

Σε αυτό το μάθημα θα δούμε πιο αναλυτικά το GenStage, τι ρόλο παίζει και πως θα το αξιοποιήσουμε στις εφαρμογές μας.

Πίνακας περιεχομένων

Εισαγωγή

Τι είναι το GenStage; Από την επίσημη τεκμηρίωση, είναι “ένας προσδιοριμός και μια επεξεργαστική ροή για την Elixir”, αλλά τι σημαίνει αυτό για εμάς;

Σημαίνει ότι το GenStage μας παρέχει έναν τρόπο να ορίζουμε έναν αγωγό εργασίας που εκτελείτε από ανεξάρτητα βήματα (ή στάδια) σε ξεχωριστές διεργασίες. Αν έχετε δουλέψει με αγωγούς στο παρελθόν τότε μερικές από τις έννοιες θα σας είναι γνώριμες.

Για να κατανοήσετε καλύτερα πως δουλεύει, ας σκεφτούμε μια απλή ροή παραγωγού-καταναλωτή:

[A] -> [B] -> [C]

Σε αυτό το παράδειγμα έχουμε τρία στάδια: το A είναι ένας παραγωγός, το B είναι ένας παραγωγός-καταναλωτής και το C είναι ένας καταναλωτής. το A παράγει μια τιμή η οποία καταναλώνεται από το B, το B κάνει κάποια εργασία και επιστρέφει μια νέα τιμή η οποία λαμβάνεται από τον καταναλωτή μας C. Ο ρόλος του σταδίου μας είναι σημαντικός, όπως θα δούμε στον επόμενο τομέα.

Παρόλο που το παράδειγμά μας είανι 1-προς-1 παραγωγός-προς-καταναλωτή, είναι δυνατό να έχουμε πολλαπλούς παραγωγούς και πολλαπλούς καταναλωτές σε οποιοδήποτε στάδιο.

Για να παρουσιάσουμε καλύτερα αυτές τις έννοιες θα κατασκευάσουμε έναν αγωγό με το GenStage, αλλά πρώτα ας εξερευνήσουμε περισσότερο τους ρόλους στους οποίος βασίζεται το GenStage.

Καταναλωτές και Παραγωγοί

Όπως έχουμε διαβάσει, ο ρόλος που δίνουμε στο στάδιό μας είναι σημαντικός. Οι προδιαγραφές του GenStage αναγνωρίζουν τρεις ρόλους:

Παρατηρήτε ότι οι παραγωγοί μας περιμένουν για ζήτηση; Με το GenStage οι καταναλωτές στέλνουν αιτήσεις προς τα πάνω και επεξεργάζονται τα δεδομένα από τον παραγωγό. Αυτό διευκολύνει ένα μηχανισμό γνωστό ως αντίθλιψη. Η αντίθλιψη βάζει το βάρος στον παραγωγό ώστε να μην πιέζει υπερβολίκα όταν οι καταναλωτές είναι απασχολημένοι.

Τώρα που καλύψαμε τους ρόλους στο GenStage, ας ξεκινήσουμε με την εφαρμογή μας.

Ξεκινώντας

Σε αυτό το παράδειγμα θα κατασκευάσουμε μια εφαρμογή GenStage που στέλνει αριθμούς, ταξινομεί τους ζυγούς και τέλος τους τυπώνει.

Για την εφαρμογή μας θα χρησιμοποιήσουμε και τους τρεις ρόλους GenStage. Ο παραγωγός μας θα είναι υπέυθυνος για το μέτρημα και την αποστολή των αριθμών. Θα χρησιμοποιήσουμε ένα παραγωγό-καταναλωτή για να φιλτράρουμε τους ζυγούς αριθμούς και αργότερα να ανταποκρινόμαστε στη ζήτηση από κάτω. Τέλος θα χτίσουμε ένα καταναλωτή για να εμφανίσουμε τους εναπομείναντες αριθμούς.

Θα ξεκινήσουμε δημιουργώντας ένα project με ένα δέντρο παρακολούθησης:

$ mix new genstage_example --sup
$ cd genstage_example

Ας αναβαθμίσουμε τις εξαρτήσεις μας στο mix.exs για να συμπεριλάβουμε το gen_stage:

  defp deps do
    [
      {:gen_stage, "~> 0.7"},
    ]
  end

Θα πρέπει να κατεβάσουμε τις εξαρτήσεις μας και να τις συντάξουμε πριν προχωρήσουμε:

$ mix do deps.get, compile

Τώρα είμαστε έτοιμοι να χτίστουμε τον παραγωγό μας!

Παραγωγός

Το πρώτο βήμα της εφαρμογής GenStage μας είναι να δημιουργήσουμε τον παραγωγό μας. Όπως συζητήσαμε πριν θέλουμε να δημιουργήσουμε ένα παραγωγό που στέλνει μια συνεχής ροή αριθμών. Ας δημιουργήσουμε το αρχείο παραγωγού:

$ mkdir lib/genstage_example
$ touch lib/genstage_example/producer.ex

Τώρα μπορούμε να προσθέσουμε τον κώδικα:

defmodule GenstageExample.Producer do
  alias Experimental.GenStage

  use GenStage

  def start_link(initial \\ 0) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..state + demand - 1)
    {:noreply, events, (state + demand)}
  end
end

Τα δύο πιο σημαντικά μέρη που πρέπει να σημειώσουμε εδώ είναι οι init/1 και handle_demand/2. Στην init/1 ορίζουμε την αρχική κατάσταση όπως θα κάναμε στους GenServers μας αλλά το πιο σημαντικό είναι ότι μας βάζουμε την ταμπέλα του παραγωγού. Η απάντηση της συνάρτησης μας init/1 είναι αυτή στην οποία στηρίζεται το GenStage για να ταξινομήσει την διεργασία μας.

Η συνάρτηση handle_demand/2 είναι εκεί που βρίσκεται η πλειοψηφία των παραγωγών μας και πρέπει να υλοποιηθεί από όλους τους GenStage παραγωγούς. Εδώ επιστρέφουμε το σετ των αριθμών που ζητούντε από τους καταναλωτές μας και αυξάνουμε το μετρητή μας. Η ζήτηση από τους καταναλωτές, η μεταβλητή demand στον κώδικα από πάνω, είναι ένας ακέραιος που αναπαριστά τον αριθμό των συμβάντων που μπορούν να χειριστούν, με προκαθορισμένο το 1000.

Παραγωγός Καταναλωτής

Τώρα που έχουμε έναν παραγωγό-γεννήτρια αριθμών ας μεταβούμε στον παραγωγό-καταναλωτή μας. Θα θέλουμε να ζητήσουμε αριθμούς από τον παραγωγό μας, να αφαιρέσουμε τους μονούς και να απαντήσουμε στη ζήτηση.

$ touch lib/genstage_example/producer_consumer.ex

Ας αναβαθμίσουμε το αρχείο μας για να δείχνει όπως ο κώδικας στο παράδειγμα:

defmodule GenstageExample.ProducerConsumer  do
  alias Experimental.GenStage
  use GenStage

  require Integer

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.filter(&Integer.is_even/1)

    {:noreply, numbers, state}
  end
end

Όπως ίσως παρατηρήσατε, με τον παραγωγό-καταναλωτή μας παρουσιάζουμε μια νέα συνάρτηση, την handle_events/3 και μια ένα επιλογή στην init/1. Με την επιλογή subscribe_to καθοδηγούμε το GenStage να μας βάλει σε επικοινωνία με ένα συγκεκριμμένο παραγωγό.

Η μέθοδος handle_events/3 είναι η κινητήριος δύναμή μας, εκεί όπου λαμβάνουμε τα εισερχόμενα συμβάντα, τα επεξεργαζόμαστε και επιστρέφουμε το μεταμορφωμένο σετ. Όπως θα δούμε οι καταναλωτές υλοποιούνται με περίπου τον ίδιο τρόπο αλλά η σημαντική διαφορά είναι στο τι επιστρέφει η μέθοδος handle_events/3 και στο πως χρησιμοποιείται. Όταν μαρκάρουμε την διεργασία μας σαν παραγωγό-καταναλωτή το δεύτερο όρισμα της τούπλας μας, η numbers σε αυτή την περίπτωση, χρησιμοποιείται για να ανταποκριθεί στη ζήτηση προς τα κάτω. Στους καταναλωτές αυτή η τιμή απορρίπτεται.

Καταναλωτής

Τελευταίο αλλά εξίσου σημαντικό έχουμε τον καταναλωτή μας. Ας ξεκινήσουμε:

$ touch lib/genstage_example/consumer.ex

Από τη στιγμή που οι καταναλωτές και οι παραγωγοί καταναλωτές είναι τόσο όμοιοι ο κώδικάς μας δεν θα δείχνει τόσο διαφορετικός:

defmodule GenstageExample.Consumer do
  alias Experimental.GenStage
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect {self(), event, state}
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

Όπως είδαμε στον προηγούμενο τομέα ο καταναλωτής μας δεν στέλνει συμβάντα, έτσι η δεύτερη τιμή στην τούπλα μας θα απορριφθεί.

Συνδεση

Τώρα που έχουμε τον παραγωγό μας, τον παραγωγό-καταναλωτή και τον καταναλωτή έτοιμους είμαστε έτοιμοι να τους συνδέσουμε.

Ας ξεκινήσουμε ανοίγοντας το lib/genstage_example.ex και προσθέτοντας τις διεργασίες μας στο δέντρο παρακολούθησης:

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    worker(GenstageExample.Producer, [0]),
    worker(GenstageExample.ProducerConsumer, []),
    worker(GenstageExample.Consumer, []),
  ]

  opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
  Supervisor.start_link(children, opts)
end

Αν όλα είναι σωστά, μπορούμε να τρέξουμε το project και θα πρέπει να δούμε τα πάντα να δουλέυουν:

$ mix run --no-halt
{#PID<0.109.0>, 2, :state_doesnt_matter}
{#PID<0.109.0>, 4, :state_doesnt_matter}
{#PID<0.109.0>, 6, :state_doesnt_matter}
...
{#PID<0.109.0>, 229062, :state_doesnt_matter}
{#PID<0.109.0>, 229064, :state_doesnt_matter}
{#PID<0.109.0>, 229066, :state_doesnt_matter}

Τα καταφέραμε! Όπως αναμέναμε η εφαρμογή μας στέλνει μόνο ζυγούς αριθμούς και το κάνει γρήγορα.

Σε αυτό το σημείο έχουμε ένα σωλήνα που λειτουργεί. Υπαρχει ένας παραγωγός που στέλνει αριθμούς, ένας παραγωγός καταναλωτής που διαγράφει τους μονούς αριθμούς, και ένας καταναλωτής που τα εμφανίζει όλα αυτά και συνεχίζει τη ροή. Αναφέραμε στην εισαγωγή ότι θα μπορούσαμε να έχουμε πάνω από ένα παραγωγούς ή καταναλωτές, οπότε ας ρίξουμε μια ματιά σε αυτά.

Αν εξετάσουμε την έξοδο της IO.inspect/1 από το παράδειγμά μας θα δούμε ότι κάθε συμβάν το χειρίζεται μια μοναδική PID. Ας κάνουμε κάποιες αλλαγές για πολλαπλούς εργάτες αλλάζοντας το lib/genstage_example.ex:

children = [
  worker(GenstageExample.Producer, [0]),
  worker(GenstageExample.ProducerConsumer, []),
  worker(GenstageExample.Consumer, [], id: 1),
  worker(GenstageExample.Consumer, [], id: 2),
]

Τώρα που ορίσαμε δύο καταναλωτές ας δούμε τι παίρνουμε αν τρέξουμε τώρα την εφαρμογή μας:

$ mix run --no-halt
{#PID<0.120.0>, 2, :state_doesnt_matter}
{#PID<0.121.0>, 4, :state_doesnt_matter}
{#PID<0.120.0>, 6, :state_doesnt_matter}
{#PID<0.120.0>, 8, :state_doesnt_matter}
...
{#PID<0.120.0>, 86478, :state_doesnt_matter}
{#PID<0.121.0>, 87338, :state_doesnt_matter}
{#PID<0.120.0>, 86480, :state_doesnt_matter}
{#PID<0.120.0>, 86482, :state_doesnt_matter}

Όπως θα δείτε τώρα έχουμε πολλαπλά PID, απλά προσθέτοντας μια γραμμή κώδικα και δίνοντας ID στους καταναλωτές μας.

Περιπτώσεις Χρήσης

Τώρα που καλύψαμε το GenStage και χτίσαμε το πρώτη μας δοκιμαστική εφαρμογή, ποιές είναι μερικές από τις πραγματικές περιπτώσεις χρήσης για το GenStage;

Αυτές είναι μόλιες μερικές από τις πιθανότητες για το GenStage.


Share This Page