Fiber 類別

Fiber 是在 Ruby 中實作輕量級協作並行處理的原語。基本上,它們是一種建立可以暫停和繼續的程式碼區塊的方法,很像執行緒。主要差別在於它們絕不會被搶佔,而且排程必須由程式設計人員執行,而不是 VM。

與其他無堆疊輕量級並行處理模型相反,每個 Fiber 都附帶一個堆疊。這使得 Fiber 能夠從 Fiber 區塊內的深度巢狀函式呼叫中暫停。請參閱 ruby(1) 手冊頁面以設定 Fiber 堆疊的大小。

建立 Fiber 時,它不會自動執行。相反地,必須使用 Fiber#resume 方法明確要求它執行。執行在 Fiber 內的程式碼可以透過呼叫 Fiber.yield 來放棄控制權,在這種情況下,它會將控制權讓回給呼叫方(Fiber#resume 的呼叫方)。

在讓出或終止時,Fiber 會傳回最後執行的表達式的值

例如

fiber = Fiber.new do
  Fiber.yield 1
  2
end

puts fiber.resume
puts fiber.resume
puts fiber.resume

會產生

1
2
FiberError: dead fiber called

Fiber#resume 方法接受任意數量的參數,如果這是第一次呼叫 resume,則會將它們傳遞為區塊引數。否則,它們會是 Fiber.yield 呼叫的傳回值

範例

fiber = Fiber.new do |first|
  second = Fiber.yield first + 2
end

puts fiber.resume 10
puts fiber.resume 1_000_000
puts fiber.resume "The fiber will be dead before I can cause trouble"

會產生

12
1000000
FiberError: dead fiber called

非封鎖 Fiber

非封鎖 Fiber 的概念是在 Ruby 3.0 中引入的。非封鎖 Fiber 在遇到通常會封鎖 Fiber 的作業(例如 sleep 或等待另一個程序或 I/O)時,會將控制權讓給其他 Fiber,並允許排程器在 Fiber 可以繼續執行時處理封鎖和喚醒(繼續)此 Fiber。

若要讓 Fiber 以非封鎖方式運作,必須在 Fiber.new 中使用 blocking: false (預設值) 來建立,並且應使用 Fiber.set_scheduler 設定 Fiber.scheduler。如果在目前執行緒中未設定 Fiber.scheduler,則封鎖和非封鎖纖維的行為會相同。

Ruby 沒有提供排程器類別:預期由使用者實作並對應到 Fiber::Scheduler

另外也有 Fiber.schedule 方法,預期會立即以非封鎖方式執行指定的區塊。其實際實作由排程器決定。

公開類別方法

Fiber[key] → value 按一下以切換來源

傳回由 key 識別的纖維儲存變數值。

key 必須是符號,且值由 Fiber#[]= 或 Fiber#store 設定。

另請參閱 Fiber::[]=

static VALUE
rb_fiber_storage_aref(VALUE class, VALUE key)
{
    Check_Type(key, T_SYMBOL);

    VALUE storage = fiber_storage_get(fiber_current(), FALSE);
    if (storage == Qnil) return Qnil;

    return rb_hash_aref(storage, key);
}
Fiber[key] = value 按一下以切換來源

value 指定給由 key 識別的纖維儲存變數。如果變數不存在,則會建立變數。

key 必須是 Symbol,否則會引發 TypeError

另請參閱 Fiber::[]

static VALUE
rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value)
{
    Check_Type(key, T_SYMBOL);

    VALUE storage = fiber_storage_get(fiber_current(), value != Qnil);
    if (storage == Qnil) return Qnil;

    if (value == Qnil) {
        return rb_hash_delete(storage, key);
    }
    else {
        return rb_hash_aset(storage, key, value);
    }
}
blocking{|fiber| ...} → result 按一下以切換來源

強制纖維在區塊持續時間內封鎖。傳回區塊的結果。

有關詳細資訊,請參閱類別文件中的「非封鎖纖維」區段。

VALUE
rb_fiber_blocking(VALUE class)
{
    VALUE fiber_value = rb_fiber_current();
    rb_fiber_t *fiber = fiber_ptr(fiber_value);

    // If we are already blocking, this is essentially a no-op:
    if (fiber->blocking) {
        return rb_yield(fiber_value);
    }
    else {
        return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value);
    }
}
blocking? → false 或 1 按一下以切換來源

如果目前的纖維是非封鎖的,則傳回 false。如果纖維是透過將 blocking: false 傳遞給 Fiber.new 或透過 Fiber.schedule 建立的,則 Fiber 為非封鎖的。

如果目前的 Fiber 為封鎖的,則此方法會傳回 1。未來的開發可能會允許傳回較大的整數。

請注意,即使方法傳回 falseFiber 僅在當前執行緒中設定 Fiber.scheduler 時才會表現不同。

有關詳細資訊,請參閱類別文件中的「非封鎖纖維」區段。

static VALUE
rb_fiber_s_blocking_p(VALUE klass)
{
    rb_thread_t *thread = GET_THREAD();
    unsigned blocking = thread->blocking;

    if (blocking == 0)
        return Qfalse;

    return INT2NUM(blocking);
}
current → fiber 按一下以切換來源

傳回當前 fiber。如果您不在 fiber 的內容中執行,此方法將會傳回根 fiber。

static VALUE
rb_fiber_s_current(VALUE klass)
{
    return rb_fiber_current();
}
current_scheduler → obj 或 nil 按一下以切換來源

傳回 Fiber 排程器,如果且僅當當前 fiber 為非封鎖時,此排程器會在當前執行緒中使用 Fiber.set_scheduler 最後設定。

static VALUE
rb_fiber_current_scheduler(VALUE klass)
{
    return rb_fiber_scheduler_current();
}
new(blocking: false, storage: true) { |*args| ... } → fiber 按一下以切換來源

建立新的 Fiber。最初,fiber 並未執行,且可以使用 resume 繼續執行。傳遞給第一個 resume 呼叫的引數將會傳遞給區塊

f = Fiber.new do |initial|
   current = initial
   loop do
     puts "current: #{current.inspect}"
     current = Fiber.yield
   end
end
f.resume(100)     # prints: current: 100
f.resume(1, 2, 3) # prints: current: [1, 2, 3]
f.resume          # prints: current: nil
# ... and so on ...

如果將 blocking: false 傳遞給 Fiber.new當前執行緒已定義 Fiber.schedulerFiber 將會變成非封鎖(請參閱類別文件中的「非封鎖 Fiber」區段)。

如果未指定 storage,預設值會繼承當前 fiber 的儲存體副本。這與指定 storage: true 相同。

Fiber[:x] = 1
Fiber.new do
  Fiber[:x] # => 1
  Fiber[:x] = 2
end.resume
Fiber[:x] # => 1

如果指定的 storagenil,此函式將會延遲初始化內部儲存體,其會從一個空雜湊開始。

Fiber[:x] = "Hello World"
Fiber.new(storage: nil) do
  Fiber[:x] # nil
end

否則,指定的 storage 會用作新的 fiber 儲存體,且它必須是 Hash 的執行個體。

目前明確使用 storage: true 是實驗性質的,且未來可能會變更。

static VALUE
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
{
    return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
}
schedule { |*args| ... } → fiber 按一下以切換來源

預期此方法會在一個獨立的非封鎖 fiber 中立即執行提供的程式碼區塊。

puts "Go to sleep!"

Fiber.set_scheduler(MyScheduler.new)

Fiber.schedule do
  puts "Going to sleep"
  sleep(1)
  puts "I slept well"
end

puts "Wakey-wakey, sleepyhead"

假設 MyScheduler 已正確實作,此程式將產生

Go to sleep!
Going to sleep
Wakey-wakey, sleepyhead
...1 sec pause here...
I slept well

…例如在 Fiber 內部的第一個封鎖操作(sleep(1))中,控制權讓渡給外部程式碼(主纖維),而 在該執行結束時,排程器會負責適當地繼續所有封鎖的纖維。

請注意,上述描述的行為是該方法預期的行為,實際行為取決於目前排程器對 Fiber::Scheduler#fiber 方法的實作。Ruby 沒有強制此方法以任何特定方式運作。

如果未設定排程器,此方法會引發 RuntimeError (No scheduler is available!)

static VALUE
rb_fiber_s_schedule(int argc, VALUE *argv, VALUE obj)
{
    return rb_fiber_s_schedule_kw(argc, argv, rb_keyword_given_p());
}
scheduler → obj or nil 按一下以切換來源

傳回 Fiber 排程器,這是最後一次使用 Fiber.set_scheduler 為目前執行緒設定的排程器。如果未設定排程器(這是預設值),傳回 nil,且非封鎖纖維的行為與封鎖纖維相同。(有關排程器概念的詳細資訊,請參閱類別文件中的「非封鎖纖維」區段)。

static VALUE
rb_fiber_s_scheduler(VALUE klass)
{
    return rb_fiber_scheduler_get();
}
set_scheduler(scheduler) → scheduler 按一下以切換來源

為目前執行緒設定 Fiber 排程器。如果設定了排程器,非封鎖纖維(由 Fiber.new 使用 blocking: falseFiber.schedule 建立)會在潛在的封鎖操作中呼叫該排程器的掛鉤方法,且目前執行緒會在最後處理時呼叫排程器的 close 方法(允許排程器適當地管理所有未完成的纖維)。

scheduler 可以是對應於 Fiber::Scheduler 的任何類別的物件。其實作取決於使用者。

另請參閱類別文件中的「非封鎖纖維」區段。

static VALUE
rb_fiber_set_scheduler(VALUE klass, VALUE scheduler)
{
    return rb_fiber_scheduler_set(scheduler);
}
yield(args, ...) → obj 按一下以切換來源

將控制權讓渡回繼續纖維的內容,同時傳遞傳遞給它的任何引數。當下一次呼叫 resume 時,纖維將在此點繼續處理。傳遞給下一個 resume 的任何引數都將是此 Fiber.yield 表達式評估為的值。

static VALUE
rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass)
{
    return rb_fiber_yield_kw(argc, argv, rb_keyword_given_p());
}

公開實例方法

alive? → true 或 false 按一下以切換來源

如果仍可繼續執行 (或傳遞至) 纖維,則傳回 true。在完成纖維區塊的執行後,此方法將永遠傳回 false

VALUE
rb_fiber_alive_p(VALUE fiber_value)
{
    return RBOOL(!FIBER_TERMINATED_P(fiber_ptr(fiber_value)));
}
backtrace → 陣列 按一下以切換來源
backtrace(start) → 陣列
backtrace(start, count) → 陣列
backtrace(start..end) → 陣列

傳回纖維的目前執行堆疊。startcountend 允許僅選取 backtrace 的部分。

def level3
  Fiber.yield
end

def level2
  level3
end

def level1
  level2
end

f = Fiber.new { level1 }

# It is empty before the fiber started
f.backtrace
#=> []

f.resume

f.backtrace
#=> ["test.rb:2:in `yield'", "test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"]
p f.backtrace(1) # start from the item 1
#=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"]
p f.backtrace(2, 2) # start from item 2, take 2
#=> ["test.rb:6:in `level2'", "test.rb:10:in `level1'"]
p f.backtrace(1..3) # take items from 1 to 3
#=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'"]

f.resume

# It is nil after the fiber is finished
f.backtrace
#=> nil
static VALUE
rb_fiber_backtrace(int argc, VALUE *argv, VALUE fiber)
{
    return rb_vm_backtrace(argc, argv, &fiber_ptr(fiber)->cont.saved_ec);
}
backtrace_locations → 陣列 按一下以切換來源
backtrace_locations(start) → 陣列
backtrace_locations(start, count) → 陣列
backtrace_locations(start..end) → 陣列

類似於 backtrace,但將執行堆疊的每一行傳回為 Thread::Backtrace::Location。接受與 backtrace 相同的引數。

f = Fiber.new { Fiber.yield }
f.resume
loc = f.backtrace_locations.first
loc.label  #=> "yield"
loc.path   #=> "test.rb"
loc.lineno #=> 1
static VALUE
rb_fiber_backtrace_locations(int argc, VALUE *argv, VALUE fiber)
{
    return rb_vm_backtrace_locations(argc, argv, &fiber_ptr(fiber)->cont.saved_ec);
}
blocking? → true 或 false 按一下以切換來源

如果 fiber 為封鎖,則傳回 true,否則傳回 false。如果 Fiber 是透過傳遞 blocking: falseFiber.new 或透過 Fiber.schedule 建立的,則為非封鎖。

請注意,即使方法傳回 false,纖維的行為也只有在目前的執行緒中設定 Fiber.scheduler 時才會有所不同。

有關詳細資訊,請參閱類別文件中的「非封鎖纖維」區段。

VALUE
rb_fiber_blocking_p(VALUE fiber)
{
    return RBOOL(fiber_ptr(fiber)->blocking);
}
inspect()
別名為:to_s
kill → nil 按一下以切換來源

透過引發無法捕捉的例外狀況來終止纖維。它只會終止指定的纖維,而不會終止其他纖維,如果該纖維正在呼叫 resumetransfer,則會傳回 nil 給另一個纖維。

Fiber#kill 僅在 Fiber.yield 中時才會中斷另一個纖維。如果在目前的纖維上呼叫,則會在 Fiber#kill 呼叫位置引發該例外狀況。

如果尚未啟動纖維,則直接轉換至終止狀態。

如果纖維已經終止,則不執行任何動作。

如果在屬於另一個執行緒的纖維上呼叫,則會引發 FiberError

static VALUE
rb_fiber_m_kill(VALUE self)
{
    rb_fiber_t *fiber = fiber_ptr(self);

    if (fiber->killed) return Qfalse;
    fiber->killed = 1;

    if (fiber->status == FIBER_CREATED) {
        fiber->status = FIBER_TERMINATED;
    }
    else if (fiber->status != FIBER_TERMINATED) {
        if (fiber_current() == fiber) {
            fiber_check_killed(fiber);
        } else {
            fiber_raise(fiber_ptr(self), Qnil);
        }
    }

    return self;
}
raise → obj 按一下以切換來源
raise(字串) → obj
raise(例外狀況 [, 字串 [, 陣列]]) → obj

在最後呼叫 Fiber.yield 的點上,在纖維中引發例外狀況。如果纖維尚未啟動或已執行完畢,則引發 FiberError。如果纖維正在讓出,則會繼續執行。如果正在傳遞,則會傳遞進去。但如果正在繼續執行,則會引發 FiberError

如果沒有引數,則引發 RuntimeError。如果只有一個 字串 引數,則引發 RuntimeError,並將字串作為訊息。否則,第一個參數應該是 Exception 類別的名稱 (或在傳送 exception 訊息時會傳回 Exception 物件的物件)。第二個參數 (選用) 會設定與例外狀況關聯的訊息,第三個參數是回呼資訊的陣列。例外狀況會由 begin...end 區塊的 rescue 子句擷取。

如果呼叫屬於另一個 執行緒Fiber,則會引發 FiberError

static VALUE
rb_fiber_m_raise(int argc, VALUE *argv, VALUE self)
{
    return rb_fiber_raise(self, argc, argv);
}
resume(args, ...) → obj 按一下以切換來源

從最後呼叫 Fiber.yield 的點上繼續執行纖維,或如果這是第一次呼叫 resume,則開始執行。傳遞給 resume 的引數將會是 Fiber.yield 表達式的值,或如果這是第一次 resume,則會作為區塊參數傳遞給纖維的區塊。

或者,當呼叫 resume 時,它會評估為傳遞給纖維區塊內下一個 Fiber.yield 陳述式的引數,或如果它在沒有任何 Fiber.yield 的情況下執行完畢,則評估為區塊值。

static VALUE
rb_fiber_m_resume(int argc, VALUE *argv, VALUE fiber)
{
    return rb_fiber_resume_kw(fiber, argc, argv, rb_keyword_given_p());
}
storage → hash (dup) 按一下以切換來源

傳回 fiber 的儲存雜湊的副本。此方法只能在 Fiber.current 上呼叫。

static VALUE
rb_fiber_storage_get(VALUE self)
{
    storage_access_must_be_from_same_fiber(self);

    VALUE storage = fiber_storage_get(fiber_ptr(self), FALSE);

    if (storage == Qnil) {
        return Qnil;
    }
    else {
        return rb_obj_dup(storage);
    }
}
storage = hash 按一下以切換來源

設定 fiber 的儲存雜湊。此功能為實驗性質,未來可能會變更。此方法只能在 Fiber.current 上呼叫。

使用此方法時應小心,因為您可能會無意間清除重要的 fiber 儲存狀態。您應該偏好使用 Fiber::[]= 在儲存中指定特定金鑰。

您也可以使用 Fiber.new(storage: nil) 建立儲存空間為空的 fiber。

範例

while request = request_queue.pop
  # Reset the per-request state:
  Fiber.current.storage = nil
  handle_request(request)
end
static VALUE
rb_fiber_storage_set(VALUE self, VALUE value)
{
    if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) {
        rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL,
          "Fiber#storage= is experimental and may be removed in the future!");
    }

    storage_access_must_be_from_same_fiber(self);
    fiber_storage_validate(value);

    fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value);
    return value;
}
to_s() 按一下以切換來源
static VALUE
fiber_to_s(VALUE fiber_value)
{
    const rb_fiber_t *fiber = fiber_ptr(fiber_value);
    const rb_proc_t *proc;
    char status_info[0x20];

    if (fiber->resuming_fiber) {
        snprintf(status_info, 0x20, " (%s by resuming)", fiber_status_name(fiber->status));
    }
    else {
        snprintf(status_info, 0x20, " (%s)", fiber_status_name(fiber->status));
    }

    if (!rb_obj_is_proc(fiber->first_proc)) {
        VALUE str = rb_any_to_s(fiber_value);
        strlcat(status_info, ">", sizeof(status_info));
        rb_str_set_len(str, RSTRING_LEN(str)-1);
        rb_str_cat_cstr(str, status_info);
        return str;
    }
    GetProcPtr(fiber->first_proc, proc);
    return rb_block_to_s(fiber_value, &proc->block, status_info);
}
別名為:inspect
transfer(args, ...) → obj 按一下以切換來源

將控制權轉移至另一個 fiber,從其上次停止的位置繼續執行,或在之前未繼續執行時啟動。呼叫 fiber 將暫停,就像呼叫 Fiber.yield 一樣。

接收轉移呼叫的 fiber 將其視為繼續執行呼叫。傳遞至轉移的引數會視為傳遞至繼續執行的引數。

傳遞至 fiber 及從 fiber 傳出的兩種控制方式(一種是 resumeFiber::yield,另一種是 transfer 至 fiber 及從 fiber 傳出)無法自由混合。

  • 如果 Fiber 的生命週期從轉移開始,它將永遠無法讓出或繼續執行控制權傳遞,只能結束或轉移回傳。(它仍然可以繼續執行允許繼續執行的其他 fiber。)

  • 如果 Fiber 的生命週期從 resume 開始,它可以讓出或傳遞給另一個 Fiber,但只能以與讓出方式相容的方式接收回控制權:如果它已傳遞,它只能被傳遞回來,如果它已讓出,它只能被重新啟動。在那之後,它可以再次傳遞或讓出。

如果違反這些規則,將會引發 FiberError

對於個別 Fiber 設計,讓出/重新啟動較容易使用(Fiber 只會讓出控制權,它不需要考慮控制權交給誰),而傳遞對於複雜情況較為靈活,允許建立依賴彼此的任意 Fiber 圖形。

範例

manager = nil # For local var to be visible inside worker block

# This fiber would be started with transfer
# It can't yield, and can't be resumed
worker = Fiber.new { |work|
  puts "Worker: starts"
  puts "Worker: Performed #{work.inspect}, transferring back"
  # Fiber.yield     # this would raise FiberError: attempt to yield on a not resumed fiber
  # manager.resume  # this would raise FiberError: attempt to resume a resumed fiber (double resume)
  manager.transfer(work.capitalize)
}

# This fiber would be started with resume
# It can yield or transfer, and can be transferred
# back or resumed
manager = Fiber.new {
  puts "Manager: starts"
  puts "Manager: transferring 'something' to worker"
  result = worker.transfer('something')
  puts "Manager: worker returned #{result.inspect}"
  # worker.resume    # this would raise FiberError: attempt to resume a transferring fiber
  Fiber.yield        # this is OK, the fiber transferred from and to, now it can yield
  puts "Manager: finished"
}

puts "Starting the manager"
manager.resume
puts "Resuming the manager"
# manager.transfer  # this would raise FiberError: attempt to transfer to a yielding fiber
manager.resume

會產生

Starting the manager
Manager: starts
Manager: transferring 'something' to worker
Worker: starts
Worker: Performed "something", transferring back
Manager: worker returned "Something"
Resuming the manager
Manager: finished
static VALUE
rb_fiber_m_transfer(int argc, VALUE *argv, VALUE self)
{
    return rb_fiber_transfer_kw(self, argc, argv, rb_keyword_given_p());
}