Fiber 類別
Fiber 是在 Ruby 中實作輕量級協作並行處理的原語。基本上,它們是一種建立可以暫停和繼續的程式碼區塊的方法,很像執行緒。主要差別在於它們絕不會被搶佔,而且排程必須由程式設計人員執行,而不是 VM。
與其他無堆疊輕量級並行處理模型相反,每個 Fiber 都附帶一個堆疊。這使得 Fiber 能夠從 Fiber 區塊內的深度巢狀函式呼叫中暫停。請參閱 ruby(1) 手冊頁面以設定 Fiber 堆疊的大小。
建立 Fiber 時,它不會自動執行。相反地,必須使用 Fiber#resume
方法明確要求它執行。執行在 Fiber 內的程式碼可以透過呼叫 Fiber.yield
來放棄控制權,在這種情況下,它會將控制權讓回給呼叫方(Fiber#resume
的呼叫方)。
在讓出或終止時,Fiber
會傳回最後執行的表達式的值
例如
fiber = Fiber.new do Fiber.yield 1 2 end puts fiber.resume puts fiber.resume puts fiber.resume
會產生
1 2 FiberError: dead fiber called
Fiber#resume
方法接受任意數量的參數,如果這是第一次呼叫 resume
,則會將它們傳遞為區塊引數。否則,它們會是 Fiber.yield
呼叫的傳回值
範例
fiber = Fiber.new do |first| second = Fiber.yield first + 2 end puts fiber.resume 10 puts fiber.resume 1_000_000 puts fiber.resume "The fiber will be dead before I can cause trouble"
會產生
12 1000000 FiberError: dead fiber called
非封鎖 Fiber¶ ↑
非封鎖 Fiber 的概念是在 Ruby 3.0 中引入的。非封鎖 Fiber 在遇到通常會封鎖 Fiber 的作業(例如 sleep
或等待另一個程序或 I/O)時,會將控制權讓給其他 Fiber,並允許排程器在 Fiber 可以繼續執行時處理封鎖和喚醒(繼續)此 Fiber。
若要讓 Fiber
以非封鎖方式運作,必須在 Fiber.new
中使用 blocking: false
(預設值) 來建立,並且應使用 Fiber.set_scheduler
設定 Fiber.scheduler
。如果在目前執行緒中未設定 Fiber.scheduler
,則封鎖和非封鎖纖維的行為會相同。
Ruby 沒有提供排程器類別:預期由使用者實作並對應到 Fiber::Scheduler
。
另外也有 Fiber.schedule
方法,預期會立即以非封鎖方式執行指定的區塊。其實際實作由排程器決定。
公開類別方法
傳回由 key
識別的纖維儲存變數值。
key
必須是符號,且值由 Fiber#[]= 或 Fiber#store 設定。
另請參閱 Fiber::[]=
。
static VALUE rb_fiber_storage_aref(VALUE class, VALUE key) { Check_Type(key, T_SYMBOL); VALUE storage = fiber_storage_get(fiber_current(), FALSE); if (storage == Qnil) return Qnil; return rb_hash_aref(storage, key); }
將 value
指定給由 key
識別的纖維儲存變數。如果變數不存在,則會建立變數。
key
必須是 Symbol
,否則會引發 TypeError
。
另請參閱 Fiber::[]
。
static VALUE rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value) { Check_Type(key, T_SYMBOL); VALUE storage = fiber_storage_get(fiber_current(), value != Qnil); if (storage == Qnil) return Qnil; if (value == Qnil) { return rb_hash_delete(storage, key); } else { return rb_hash_aset(storage, key, value); } }
強制纖維在區塊持續時間內封鎖。傳回區塊的結果。
有關詳細資訊,請參閱類別文件中的「非封鎖纖維」區段。
VALUE rb_fiber_blocking(VALUE class) { VALUE fiber_value = rb_fiber_current(); rb_fiber_t *fiber = fiber_ptr(fiber_value); // If we are already blocking, this is essentially a no-op: if (fiber->blocking) { return rb_yield(fiber_value); } else { return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value); } }
如果目前的纖維是非封鎖的,則傳回 false
。如果纖維是透過將 blocking: false
傳遞給 Fiber.new
或透過 Fiber.schedule
建立的,則 Fiber
為非封鎖的。
如果目前的 Fiber
為封鎖的,則此方法會傳回 1。未來的開發可能會允許傳回較大的整數。
請注意,即使方法傳回 false
,Fiber
僅在當前執行緒中設定 Fiber.scheduler
時才會表現不同。
有關詳細資訊,請參閱類別文件中的「非封鎖纖維」區段。
static VALUE rb_fiber_s_blocking_p(VALUE klass) { rb_thread_t *thread = GET_THREAD(); unsigned blocking = thread->blocking; if (blocking == 0) return Qfalse; return INT2NUM(blocking); }
傳回當前 fiber。如果您不在 fiber 的內容中執行,此方法將會傳回根 fiber。
static VALUE rb_fiber_s_current(VALUE klass) { return rb_fiber_current(); }
傳回 Fiber
排程器,如果且僅當當前 fiber 為非封鎖時,此排程器會在當前執行緒中使用 Fiber.set_scheduler
最後設定。
static VALUE rb_fiber_current_scheduler(VALUE klass) { return rb_fiber_scheduler_current(); }
建立新的 Fiber
。最初,fiber 並未執行,且可以使用 resume
繼續執行。傳遞給第一個 resume
呼叫的引數將會傳遞給區塊
f = Fiber.new do |initial| current = initial loop do puts "current: #{current.inspect}" current = Fiber.yield end end f.resume(100) # prints: current: 100 f.resume(1, 2, 3) # prints: current: [1, 2, 3] f.resume # prints: current: nil # ... and so on ...
如果將 blocking: false
傳遞給 Fiber.new
,且當前執行緒已定義 Fiber.scheduler
,Fiber
將會變成非封鎖(請參閱類別文件中的「非封鎖 Fiber」區段)。
如果未指定 storage
,預設值會繼承當前 fiber 的儲存體副本。這與指定 storage: true
相同。
Fiber[:x] = 1 Fiber.new do Fiber[:x] # => 1 Fiber[:x] = 2 end.resume Fiber[:x] # => 1
如果指定的 storage
為 nil
,此函式將會延遲初始化內部儲存體,其會從一個空雜湊開始。
Fiber[:x] = "Hello World" Fiber.new(storage: nil) do Fiber[:x] # nil end
否則,指定的 storage
會用作新的 fiber 儲存體,且它必須是 Hash
的執行個體。
目前明確使用 storage: true
是實驗性質的,且未來可能會變更。
static VALUE rb_fiber_initialize(int argc, VALUE* argv, VALUE self) { return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p()); }
預期此方法會在一個獨立的非封鎖 fiber 中立即執行提供的程式碼區塊。
puts "Go to sleep!" Fiber.set_scheduler(MyScheduler.new) Fiber.schedule do puts "Going to sleep" sleep(1) puts "I slept well" end puts "Wakey-wakey, sleepyhead"
假設 MyScheduler 已正確實作,此程式將產生
Go to sleep! Going to sleep Wakey-wakey, sleepyhead ...1 sec pause here... I slept well
…例如在 Fiber
內部的第一個封鎖操作(sleep(1)
)中,控制權讓渡給外部程式碼(主纖維),而 在該執行結束時,排程器會負責適當地繼續所有封鎖的纖維。
請注意,上述描述的行為是該方法預期的行為,實際行為取決於目前排程器對 Fiber::Scheduler#fiber
方法的實作。Ruby 沒有強制此方法以任何特定方式運作。
如果未設定排程器,此方法會引發 RuntimeError (No scheduler is available!)
。
static VALUE rb_fiber_s_schedule(int argc, VALUE *argv, VALUE obj) { return rb_fiber_s_schedule_kw(argc, argv, rb_keyword_given_p()); }
傳回 Fiber
排程器,這是最後一次使用 Fiber.set_scheduler
為目前執行緒設定的排程器。如果未設定排程器(這是預設值),傳回 nil
,且非封鎖纖維的行為與封鎖纖維相同。(有關排程器概念的詳細資訊,請參閱類別文件中的「非封鎖纖維」區段)。
static VALUE rb_fiber_s_scheduler(VALUE klass) { return rb_fiber_scheduler_get(); }
為目前執行緒設定 Fiber
排程器。如果設定了排程器,非封鎖纖維(由 Fiber.new
使用 blocking: false
或 Fiber.schedule
建立)會在潛在的封鎖操作中呼叫該排程器的掛鉤方法,且目前執行緒會在最後處理時呼叫排程器的 close
方法(允許排程器適當地管理所有未完成的纖維)。
scheduler
可以是對應於 Fiber::Scheduler
的任何類別的物件。其實作取決於使用者。
另請參閱類別文件中的「非封鎖纖維」區段。
static VALUE rb_fiber_set_scheduler(VALUE klass, VALUE scheduler) { return rb_fiber_scheduler_set(scheduler); }
將控制權讓渡回繼續纖維的內容,同時傳遞傳遞給它的任何引數。當下一次呼叫 resume
時,纖維將在此點繼續處理。傳遞給下一個 resume
的任何引數都將是此 Fiber.yield
表達式評估為的值。
static VALUE rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass) { return rb_fiber_yield_kw(argc, argv, rb_keyword_given_p()); }
公開實例方法
如果仍可繼續執行 (或傳遞至) 纖維,則傳回 true。在完成纖維區塊的執行後,此方法將永遠傳回 false
。
VALUE rb_fiber_alive_p(VALUE fiber_value) { return RBOOL(!FIBER_TERMINATED_P(fiber_ptr(fiber_value))); }
傳回纖維的目前執行堆疊。start
、count
和 end
允許僅選取 backtrace 的部分。
def level3 Fiber.yield end def level2 level3 end def level1 level2 end f = Fiber.new { level1 } # It is empty before the fiber started f.backtrace #=> [] f.resume f.backtrace #=> ["test.rb:2:in `yield'", "test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"] p f.backtrace(1) # start from the item 1 #=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"] p f.backtrace(2, 2) # start from item 2, take 2 #=> ["test.rb:6:in `level2'", "test.rb:10:in `level1'"] p f.backtrace(1..3) # take items from 1 to 3 #=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'"] f.resume # It is nil after the fiber is finished f.backtrace #=> nil
static VALUE rb_fiber_backtrace(int argc, VALUE *argv, VALUE fiber) { return rb_vm_backtrace(argc, argv, &fiber_ptr(fiber)->cont.saved_ec); }
類似於 backtrace
,但將執行堆疊的每一行傳回為 Thread::Backtrace::Location
。接受與 backtrace
相同的引數。
f = Fiber.new { Fiber.yield } f.resume loc = f.backtrace_locations.first loc.label #=> "yield" loc.path #=> "test.rb" loc.lineno #=> 1
static VALUE rb_fiber_backtrace_locations(int argc, VALUE *argv, VALUE fiber) { return rb_vm_backtrace_locations(argc, argv, &fiber_ptr(fiber)->cont.saved_ec); }
如果 fiber
為封鎖,則傳回 true
,否則傳回 false
。如果 Fiber
是透過傳遞 blocking: false
給 Fiber.new
或透過 Fiber.schedule
建立的,則為非封鎖。
請注意,即使方法傳回 false
,纖維的行為也只有在目前的執行緒中設定 Fiber.scheduler
時才會有所不同。
有關詳細資訊,請參閱類別文件中的「非封鎖纖維」區段。
VALUE rb_fiber_blocking_p(VALUE fiber) { return RBOOL(fiber_ptr(fiber)->blocking); }
透過引發無法捕捉的例外狀況來終止纖維。它只會終止指定的纖維,而不會終止其他纖維,如果該纖維正在呼叫 resume
或 transfer
,則會傳回 nil
給另一個纖維。
Fiber#kill
僅在 Fiber.yield
中時才會中斷另一個纖維。如果在目前的纖維上呼叫,則會在 Fiber#kill
呼叫位置引發該例外狀況。
如果尚未啟動纖維,則直接轉換至終止狀態。
如果纖維已經終止,則不執行任何動作。
如果在屬於另一個執行緒的纖維上呼叫,則會引發 FiberError
。
static VALUE rb_fiber_m_kill(VALUE self) { rb_fiber_t *fiber = fiber_ptr(self); if (fiber->killed) return Qfalse; fiber->killed = 1; if (fiber->status == FIBER_CREATED) { fiber->status = FIBER_TERMINATED; } else if (fiber->status != FIBER_TERMINATED) { if (fiber_current() == fiber) { fiber_check_killed(fiber); } else { fiber_raise(fiber_ptr(self), Qnil); } } return self; }
在最後呼叫 Fiber.yield
的點上,在纖維中引發例外狀況。如果纖維尚未啟動或已執行完畢,則引發 FiberError
。如果纖維正在讓出,則會繼續執行。如果正在傳遞,則會傳遞進去。但如果正在繼續執行,則會引發 FiberError
。
如果沒有引數,則引發 RuntimeError
。如果只有一個 字串
引數,則引發 RuntimeError
,並將字串作為訊息。否則,第一個參數應該是 Exception
類別的名稱 (或在傳送 exception
訊息時會傳回 Exception
物件的物件)。第二個參數 (選用) 會設定與例外狀況關聯的訊息,第三個參數是回呼資訊的陣列。例外狀況會由 begin...end
區塊的 rescue
子句擷取。
如果呼叫屬於另一個 執行緒
的 Fiber
,則會引發 FiberError
。
static VALUE rb_fiber_m_raise(int argc, VALUE *argv, VALUE self) { return rb_fiber_raise(self, argc, argv); }
從最後呼叫 Fiber.yield
的點上繼續執行纖維,或如果這是第一次呼叫 resume
,則開始執行。傳遞給 resume 的引數將會是 Fiber.yield
表達式的值,或如果這是第一次 resume
,則會作為區塊參數傳遞給纖維的區塊。
或者,當呼叫 resume 時,它會評估為傳遞給纖維區塊內下一個 Fiber.yield
陳述式的引數,或如果它在沒有任何 Fiber.yield
的情況下執行完畢,則評估為區塊值。
static VALUE rb_fiber_m_resume(int argc, VALUE *argv, VALUE fiber) { return rb_fiber_resume_kw(fiber, argc, argv, rb_keyword_given_p()); }
傳回 fiber 的儲存雜湊的副本。此方法只能在 Fiber.current
上呼叫。
static VALUE rb_fiber_storage_get(VALUE self) { storage_access_must_be_from_same_fiber(self); VALUE storage = fiber_storage_get(fiber_ptr(self), FALSE); if (storage == Qnil) { return Qnil; } else { return rb_obj_dup(storage); } }
設定 fiber 的儲存雜湊。此功能為實驗性質,未來可能會變更。此方法只能在 Fiber.current
上呼叫。
使用此方法時應小心,因為您可能會無意間清除重要的 fiber 儲存狀態。您應該偏好使用 Fiber::[]=
在儲存中指定特定金鑰。
您也可以使用 Fiber.new(storage: nil)
建立儲存空間為空的 fiber。
範例
while request = request_queue.pop # Reset the per-request state: Fiber.current.storage = nil handle_request(request) end
static VALUE rb_fiber_storage_set(VALUE self, VALUE value) { if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) { rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL, "Fiber#storage= is experimental and may be removed in the future!"); } storage_access_must_be_from_same_fiber(self); fiber_storage_validate(value); fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value); return value; }
static VALUE fiber_to_s(VALUE fiber_value) { const rb_fiber_t *fiber = fiber_ptr(fiber_value); const rb_proc_t *proc; char status_info[0x20]; if (fiber->resuming_fiber) { snprintf(status_info, 0x20, " (%s by resuming)", fiber_status_name(fiber->status)); } else { snprintf(status_info, 0x20, " (%s)", fiber_status_name(fiber->status)); } if (!rb_obj_is_proc(fiber->first_proc)) { VALUE str = rb_any_to_s(fiber_value); strlcat(status_info, ">", sizeof(status_info)); rb_str_set_len(str, RSTRING_LEN(str)-1); rb_str_cat_cstr(str, status_info); return str; } GetProcPtr(fiber->first_proc, proc); return rb_block_to_s(fiber_value, &proc->block, status_info); }
將控制權轉移至另一個 fiber,從其上次停止的位置繼續執行,或在之前未繼續執行時啟動。呼叫 fiber 將暫停,就像呼叫 Fiber.yield
一樣。
接收轉移呼叫的 fiber 將其視為繼續執行呼叫。傳遞至轉移的引數會視為傳遞至繼續執行的引數。
傳遞至 fiber 及從 fiber 傳出的兩種控制方式(一種是 resume
和 Fiber::yield
,另一種是 transfer
至 fiber 及從 fiber 傳出)無法自由混合。
-
如果 Fiber 的生命週期從轉移開始,它將永遠無法讓出或繼續執行控制權傳遞,只能結束或轉移回傳。(它仍然可以繼續執行允許繼續執行的其他 fiber。)
-
如果 Fiber 的生命週期從 resume 開始,它可以讓出或傳遞給另一個
Fiber
,但只能以與讓出方式相容的方式接收回控制權:如果它已傳遞,它只能被傳遞回來,如果它已讓出,它只能被重新啟動。在那之後,它可以再次傳遞或讓出。
如果違反這些規則,將會引發 FiberError
。
對於個別 Fiber
設計,讓出/重新啟動較容易使用(Fiber
只會讓出控制權,它不需要考慮控制權交給誰),而傳遞對於複雜情況較為靈活,允許建立依賴彼此的任意 Fiber 圖形。
範例
manager = nil # For local var to be visible inside worker block # This fiber would be started with transfer # It can't yield, and can't be resumed worker = Fiber.new { |work| puts "Worker: starts" puts "Worker: Performed #{work.inspect}, transferring back" # Fiber.yield # this would raise FiberError: attempt to yield on a not resumed fiber # manager.resume # this would raise FiberError: attempt to resume a resumed fiber (double resume) manager.transfer(work.capitalize) } # This fiber would be started with resume # It can yield or transfer, and can be transferred # back or resumed manager = Fiber.new { puts "Manager: starts" puts "Manager: transferring 'something' to worker" result = worker.transfer('something') puts "Manager: worker returned #{result.inspect}" # worker.resume # this would raise FiberError: attempt to resume a transferring fiber Fiber.yield # this is OK, the fiber transferred from and to, now it can yield puts "Manager: finished" } puts "Starting the manager" manager.resume puts "Resuming the manager" # manager.transfer # this would raise FiberError: attempt to transfer to a yielding fiber manager.resume
會產生
Starting the manager Manager: starts Manager: transferring 'something' to worker Worker: starts Worker: Performed "something", transferring back Manager: worker returned "Something" Resuming the manager Manager: finished
static VALUE rb_fiber_m_transfer(int argc, VALUE *argv, VALUE self) { return rb_fiber_transfer_kw(self, argc, argv, rb_keyword_given_p()); }