การทำงานพร้อมกัน (Concurrency)

หนึ่งในจุดขายหลักของ Elixir คือการรองรับการทำงานพร้อมๆ กัน ซึ่งเป็นผลพลอยได้จากการนำ Erlang VM (BEAM) มาใช้งาน ส่งผลให้การจัดการการทำงานพร้อมกันใน Elixir นั้นง่ายกว่าที่คิดมากๆ

โดยรูปแบบการทำงานพร้อมๆ กันนั้น มีความจำเป็นต้องพึ่งพา actor ซึ่งเป็นโปรเซสที่สามารถสื่อสารกับโปรเซสอื่นๆ ได้โดยผ่านการส่งข้อความหากัน

ในบทเรียนนี้ เราจะมาลองเรียนรู้ถึงโมดูลที่เอาไว้ใช้จัดการเรื่องการทำงานพร้อมกัน ที่มาพร้อมกับ Elixir ซึ่งในบทต่อๆ ไป เราจะพูดถึงคุณลักษณะต่างๆ ของ OTP ที่นำเรื่องการทำงานพร้อมการมาใช้ด้วย

Table of Contents

โปรเซส (Processes)

โปรเซสต่างๆ ใน Erlang VM นั้นเบามากๆ และจะถูกนำไปประมวลผลในทุกๆ CPU ในเครื่อง ซึ่งตัวโปรเซสนั้นอาจจะดูเหมือน thread ที่มีอยู่ตามปกติ แต่จริงๆ แล้วมีความซับซ้อนน้อยกว่ากันมาก และไม่ใช่เรื่องแปลกเลยที่จะมีหลายพันโปรเซสทำงานพร้อมกันอยู่ในโปรแกรมเดียว

วิธีที่ง่ายที่สุดที่จะสร้างโปรเซสใหม่ คือการใช้คำสั่ง spawn ซึ่งสามารถใส่ฟังก์ชั่นแบบไม่มีชื่อหรือมีชื่อเข้าไปก็ได้ และเมื่อเราสร้างโปรเซสใหม่ขึ้นมา เราจะได้รับ process identifier หรือที่เรียกกันว่า PID กลับมา เพื่อที่จะได้เรียกมันได้ถูกภายในโปรแกรมของเรา

เราจะเริ่มจากการสร้างโมดูลใหม่ขึ้นมา และสร้างฟังก์ชั่นที่เราอยากจะเรียกใช้:

defmodule Example do
  def add(a, b) do
    IO.puts(a + b)
  end
end

iex> Example.add(2, 3)
5
:ok

ถ้าเราต้องการเรียกใช้ฟังก์ชั่นนี้แบบ asynchronous เราจะต้องใช้คำสั่ง spawn/3 แทน:

iex> spawn(Example, :add, [2, 3])
5
#PID<0.80.0>

การส่งข้อความหากัน (Message Passing)

เพื่อที่จะทำการสื่อสารต่างๆ โปรเซสจำเป็นจะต้องใช้การส่งข้อความหากัน ซึ่งต้องมีคำสั่งสองส่วนคือ send/2 และ receive

โดยที่ฟังก์ชั่น send/2 ทำให้เราสามารถส่งข้อความไปยัง PIDs ต่างๆ ได้ และสามารถรับข้อความที่ถูกส่งมาโดยใช้คำสั่ง receive เพื่อ match กับข้อความ โดยถ้าไม่เจอ match การทำงานก็จะดำเนินต่อไปเรื่อยๆ โดยไม่ต้องหยุดรอ

defmodule Example do
  def listen do
    receive do
      {:ok, "hello"} -> IO.puts("World")
    end

    listen
  end
end

iex> pid = spawn(Example, :listen, [])
#PID<0.108.0>

iex> send pid, {:ok, "hello"}
World
{:ok, "hello"}

iex> send pid, :ok
:ok

คุณอาจสังเกตว่าฟังก์ชั่น listen/0 นั้นเป็นแบบ recursive ซึ่งทำให้โปรเซสของเราสามารถรับข้อความได้หลายๆ ข้อความ เพราะถ้าไม่มี recursion โปรเซสของเราก็จะหยุดลงตั้งแต่ตอนที่รับข้อความแรกแล้ว

การเชื่อมโยงโปรเซสเข้าด้วยกัน (Process Linking)

ปัญหาหนึ่งของ spawn คือการที่ไม่รู้ว่าโปรเซสจะพังลงเมื่อไหร่ และถ้าต้องการจะรู้ เราต้องทำการเชื่อมโยงโปรเซสต่างๆ เข้าด้วยกันโดยใช้คำสั่ง spawn_link ซึ่งโปรเซสที่เชื่อมโยงกันจะได้รับการแจ้งเตือนเมื่ออีกโปรเซสหยุดการทำงานลง:

defmodule Example do
  def explode, do: exit(:kaboom)
end

iex> spawn(Example, :explode, [])
#PID<0.66.0>

iex> spawn_link(Example, :explode, [])
** (EXIT from #PID<0.57.0>) evaluator process exited with reason: :kaboom

ในบางครั้ง เราก็ไม่ต้องการให้โปรเซสที่เชื่อมโยงกันไปทำให้โปรเซสที่ทำงานอยู่พังลง เราจึงต้องดักฟังสัญญาณหยุดการทำงาน ซึ่งเราจะได้รับข้อความออกมาเป็น tuple แบบนี้: {:EXIT, from_pid, reason}

defmodule Example do
  def explode, do: exit(:kaboom)

  def run do
    Process.flag(:trap_exit, true)
    spawn_link(Example, :explode, [])

    receive do
      {:EXIT, from_pid, reason} -> IO.puts("Exit reason: #{reason}")
    end
  end
end

iex> Example.run
Exit reason: kaboom
:ok

การติดตามดูสถานะของโปรเซส (Process Monitoring)

จะเกิดอะไรขึ้นถ้าหากเราไม่อยากที่จะเชื่อมโยงสองโปรเซสเข้าด้วยกัน แต่ก็ยังอยากที่จะได้รับการแจ้งเตือนอยู่ดีล่ะ

เพราะเหตุนี้ เราจึงสามารถติดตามสถานะของโปรเซสได้ ผ่านคำสั่ง spawn_monitor โดยขณะที่เรากำลังติดตามสถานะอยู่นั้น เราจะได้รับข้อความเมื่อโปรเซสของเราพังลง โดยที่โปรเซสหลักจะไม่พังตามไปด้วย และก็ไม่จำเป็นที่จะต้องไปดักฟังสัญญาณหยุดการทำงานด้วยเช่นกัน

defmodule Example do
  def explode, do: exit(:kaboom)

  def run do
    {pid, ref} = spawn_monitor(Example, :explode, [])

    receive do
      {:DOWN, ref, :process, from_pid, reason} -> IO.puts("Exit reason: #{reason}")
    end
  end
end

iex> Example.run
Exit reason: kaboom
:ok

Agents

agent คือ abstraction ของโปรเซสที่ทำงานอยู่ในเบื้องหลัง ซึ่งมีการเก็บสถานะ (state) อยู่ในตัว โดยเราสามารถเข้าถึงมันจากโปรเซสอื่นๆ ภายในโปรแกรมของเรา และสถานะของ agent ถูกกำหนดให้เป็นค่าที่ถูกส่งกลับมาจากฟังก์ชั่นของเรา:

iex> {:ok, agent} = Agent.start_link(fn -> [1, 2, 3] end)
{:ok, #PID<0.65.0>}

iex> Agent.update(agent, fn (state) -> state ++ [4, 5] end)
:ok

iex> Agent.get(agent, &(&1))
[1, 2, 3, 4, 5]

เมื่อเราตั้งชื่อให้ agent แล้ว เราสามารถเรียกมันด้วยชื่อได้เลย แทนที่จะเรียกด้วย PID:

iex> Agent.start_link(fn -> [1, 2, 3] end, name: Numbers)
{:ok, #PID<0.74.0>}

iex> Agent.get(Numbers, &(&1))
[1, 2, 3]

หน้าที่การทำงาน (Task)

task เป็นวิธีที่จะช่วยให้เราสามารถเรียกใช้ฟังก์ชั่นได้ในเบื้องหลัง และรับค่าที่ถูกส่งกลับออกมาได้ในภายหลัง ซึ่งมีความจำเป็นอย่างมากเมื่อเราต้องการที่จะจัดการกับการทำงานที่ใช้การประมวลผลเยอะๆ โดยที่ไม่ต้องไปหยุดการทำงานของโปรแกรมหลัก

defmodule Example do
  def double(x) do
    :timer.sleep(2000)
    x * 2
  end
end

iex> task = Task.async(Example, :double, [2000])
%Task{pid: #PID<0.111.0>, ref: #Reference<0.0.8.200>}

# Do some work

iex> Task.await(task)
4000
Caught a mistake or want to contribute to the lesson? Edit this lesson on GitHub!