Fiber

Fiber 提供一種協力並行的機制。

內容切換

Fiber 執行使用者提供的區塊。執行期間,區塊可能會呼叫 Fiber.yieldFiber.transfer 切換到另一個 fiber。Fiber#resume 用於從呼叫 Fiber.yield 的點繼續執行。

#!/usr/bin/env ruby

puts "1: Start program."

f = Fiber.new do
  puts "3: Entered fiber."
  Fiber.yield
  puts "5: Resumed fiber."
end

puts "2: Resume fiber first time."
f.resume

puts "4: Resume fiber second time."
f.resume

puts "6: Finished."

這個程式示範 fiber 的流程控制。

排程器

排程器介面用於攔截封鎖操作。典型的實作會是像 EventMachineAsync 這類寶石的包裝器。此設計提供事件迴圈實作與應用程式程式碼之間的關注點分離。它也允許分層排程器執行儀器化。

要設定目前執行緒的排程器

Fiber.set_scheduler(MyScheduler.new)

當執行緒結束時,會隱含呼叫 set_scheduler

Fiber.set_scheduler(nil)

設計

排程器介面旨在成為使用者程式碼與封鎖操作之間的無意見輕量級層級。排程器掛鉤應避免轉譯或轉換引數或傳回值。理想情況下,使用者程式碼的確切相同引數會直接提供給排程器掛鉤,不作任何變更。

介面

這是您需要實作的介面。

class Scheduler
  # Wait for the specified process ID to exit.
  # This hook is optional.
  # @parameter pid [Integer] The process ID to wait for.
  # @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
  # @returns [Process::Status] A process status instance.
  def process_wait(pid, flags)
    Thread.new do
      Process::Status.wait(pid, flags)
    end.value
  end

  # Wait for the given io readiness to match the specified events within
  # the specified timeout.
  # @parameter event [Integer] A bit mask of `IO::READABLE`,
  #   `IO::WRITABLE` and `IO::PRIORITY`.
  # @parameter timeout [Numeric] The amount of time to wait for the event in seconds.
  # @returns [Integer] The subset of events that are ready.
  def io_wait(io, events, timeout)
  end

  # Read from the given io into the specified buffer.
  # WARNING: Experimental hook! Do not use in production code!
  # @parameter io [IO] The io to read from.
  # @parameter buffer [IO::Buffer] The buffer to read into.
  # @parameter length [Integer] The minimum amount to read.
  def io_read(io, buffer, length)
  end

  # Write from the given buffer into the specified IO.
  # WARNING: Experimental hook! Do not use in production code!
  # @parameter io [IO] The io to write to.
  # @parameter buffer [IO::Buffer] The buffer to write from.
  # @parameter length [Integer] The minimum amount to write.
  def io_write(io, buffer, length)
  end

  # Sleep the current task for the specified duration, or forever if not
  # specified.
  # @parameter duration [Numeric] The amount of time to sleep in seconds.
  def kernel_sleep(duration = nil)
  end

  # Execute the given block. If the block execution exceeds the given timeout,
  # the specified exception `klass` will be raised. Typically, only non-blocking
  # methods which enter the scheduler will raise such exceptions.
  # @parameter duration [Integer] The amount of time to wait, after which an exception will be raised.
  # @parameter klass [Class] The exception class to raise.
  # @parameter *arguments [Array] The arguments to send to the constructor of the exception.
  # @yields {...} The user code to execute.
  def timeout_after(duration, klass, *arguments, &block)
  end

  # Resolve hostname to an array of IP addresses.
  # This hook is optional.
  # @parameter hostname [String] Example: "www.ruby-lang.org".
  # @returns [Array] An array of IPv4 and/or IPv6 address strings that the hostname resolves to.
  def address_resolve(hostname)
  end

  # Block the calling fiber.
  # @parameter blocker [Object] What we are waiting on, informational only.
  # @parameter timeout [Numeric | Nil] The amount of time to wait for in seconds.
  # @returns [Boolean] Whether the blocking operation was successful or not.
  def block(blocker, timeout = nil)
  end

  # Unblock the specified fiber.
  # @parameter blocker [Object] What we are waiting on, informational only.
  # @parameter fiber [Fiber] The fiber to unblock.
  # @reentrant Thread safe.
  def unblock(blocker, fiber)
  end

  # Intercept the creation of a non-blocking fiber.
  # @returns [Fiber]
  def fiber(&block)
    Fiber.new(blocking: false, &block)
  end

  # Invoked when the thread exits.
  def close
    self.run
  end

  def run
    # Implement event loop here.
  end
end

未來可能會引入其他掛鉤,我們將使用功能偵測來啟用這些掛鉤。

非封鎖執行

排程器掛鉤只會在特殊非封鎖執行脈絡中使用。非封鎖執行脈絡會引入非決定性,因為排程器掛鉤的執行可能會在您的程式中引入脈絡切換點。

光纖

光纖可用於建立非封鎖執行脈絡。

Fiber.new do
  puts Fiber.current.blocking? # false

  # May invoke `Fiber.scheduler&.io_wait`.
  io.read(...)

  # May invoke `Fiber.scheduler&.io_wait`.
  io.write(...)

  # Will invoke `Fiber.scheduler&.kernel_sleep`.
  sleep(n)
end.resume

我們也引入一個新的方法,簡化這些非封鎖光纖的建立

Fiber.schedule do
  puts Fiber.current.blocking? # false
end

此方法的目的是允許排程器在內部決定何時啟動光纖,以及是否使用對稱或非對稱光纖的政策。

您也可以建立封鎖執行脈絡

Fiber.new(blocking: true) do
  # Won't use the scheduler:
  sleep(n)
end

不過,除非您要實作排程器,否則通常應避免這樣做。

IO

預設情況下,I/O 是非封鎖的。並非所有作業系統都支援非封鎖 I/O。Windows 是個著名的範例,其中 socket I/O 可以是非封鎖的,但管線 I/O 是封鎖的。只要排程器且目前執行緒是非封鎖的,操作就會呼叫排程器。

互斥鎖

Mutex 類別可用於非封鎖脈絡,而且是光纖特定的。

條件變數

ConditionVariable 類別可用於非封鎖脈絡,而且是光纖特定的。

佇列 / 大小佇列

QueueSizedQueue 類別可用於非封鎖脈絡,而且是光纖特定的。

Thread

Thread#join 操作可以在非封鎖的環境中使用,並且是特定於纖維的。