類別 Fiber::Scheduler

這不是現有的類別,而是 Scheduler 物件應遵守的介面文件,才能用作 Fiber.scheduler 的引數,並處理非封鎖纖維。另請參閱 Fiber 類別文件中的「非封鎖纖維」區段,以了解一些概念的說明。

預期 Scheduler 的行為和用法如下

這樣一來,每個個別 Fiber 的程式碼都會透明地達成並行執行。

Scheduler 實作是由寶石提供的,例如 Async

掛鉤方法為

除非另有說明,否則掛鉤實作是強制性的:如果未實作,嘗試呼叫掛鉤的方法會失敗。為了提供向後相容性,未來掛鉤將會是選用的 (如果未實作,由於排程器是為舊版 Ruby 建立,需要此掛鉤的程式碼不會失敗,只會以封鎖方式運作)。

強烈建議排程器實作 fiber 方法,此方法是由 Fiber.schedule 委派。

排程器的範例玩具實作可以在 Ruby 的程式碼中找到,在 test/fiber/scheduler.rb

公開實例方法

address_resolve(hostname) → array_of_strings or nil 按一下以切換來源

由執行非反向 DNS 查詢的任何方法呼叫。最顯著的方法是 Addrinfo.getaddrinfo,但還有許多其他方法。

預期此方法會傳回一個字串陣列,對應於 hostname 解析到的 IP 位址,或在無法解析時傳回 nil

所有可能的呼叫站點的相當詳盡清單

VALUE
rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
{
    VALUE arguments[] = {
        hostname
    };

    return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
}
block(blocker, timeout = nil) 按一下以切換來源

Thread.join 等方法和 Mutex 呼叫,表示目前的 Fiber 已封鎖,直到另行通知 (例如 unblock) 或直到 timeout 經過。

blocker 是我們正在等待的項目,僅供參考 (用於除錯和記錄)。其值沒有保證。

預期會傳回布林值,指定封鎖操作是否成功。

VALUE
rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
{
    return rb_funcall(scheduler, id_block, 2, blocker, timeout);
}
close() 按一下以切換來源

在目前執行緒結束時呼叫。預期排程器會實作此方法,以允許所有等待的纖維完成執行。

建議的模式是在 close 方法中實作主要事件迴圈。

VALUE
rb_fiber_scheduler_close(VALUE scheduler)
{
    VM_ASSERT(ruby_thread_has_gvl_p());

    VALUE result;

    // The reason for calling `scheduler_close` before calling `close` is for
    // legacy schedulers which implement `close` and expect the user to call
    // it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
    // which should call `scheduler_close`. If it were to call `close`, it
    // would create an infinite loop.

    result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
    if (!UNDEF_P(result)) return result;

    result = rb_check_funcall(scheduler, id_close, 0, NULL);
    if (!UNDEF_P(result)) return result;

    return Qnil;
}
fiber(&block)

實作 Fiber.schedule。預期此方法會立即在單獨的非封鎖纖維中執行給定的程式碼區塊,並傳回該 Fiber

建議的最小實作是

def fiber(&block)
  fiber = Fiber.new(blocking: false, &block)
  fiber.resume
  fiber
end
io_pread(io, buffer, from, length, offset) → 讀取長度或 -errno 按一下以切換來源

IO#preadIO::Buffer#pread 呼叫,從 io 的偏移量 from 讀取 length 位元組,並寫入指定 buffer (請參閱 IO::Buffer) 的指定 offset

此方法在語意上與 io_read 相同,但允許指定要讀取的偏移量,而且通常更適合在同一個檔案上執行非同步 IO

此方法應視為實驗性質

VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
}
io_pwrite(io, buffer, from, length, offset) → 寫入長度或 -errno 按一下以切換來源

IO#pwriteIO::Buffer#pwrite 呼叫,將 length 位元組寫入 io 的偏移量 from,並寫入指定 buffer (請參閱 IO::Buffer) 的指定 offset

此方法在語意上與 io_write 相同,但允許指定要寫入的偏移量,而且通常更適合在同一個檔案上執行非同步 IO

此方法應視為實驗性質

VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
}
io_read(io, buffer, length, offset) → 讀取長度或 -errno 按一下以切換來源

IO#read 或 IO#Buffer.read 呼叫,從 io 讀取 length 位元組,並寫入指定 buffer (請參閱 IO::Buffer) 的指定 offset

length 參數為「要讀取的最小長度」。如果 IO 緩衝區大小為 8KiB,但 length1024 (1KiB),則最多可能會讀取 8KiB,但至少會讀取 1KiB。一般來說,讀取的資料量少於 length 的唯一情況是讀取資料時發生錯誤。

指定 length 為 0 是有效的,表示嘗試至少讀取一次,並傳回任何可用的資料。

建議的實作應嘗試以非封鎖方式從 io 讀取,如果 io 尚未準備就緒,則呼叫 io_wait (這會將控制權讓給其他纖維)。

請參閱 IO::Buffer,以取得可用的資料傳回介面。

預期傳回讀取的位元組數,或在發生錯誤時傳回 -errno (與系統錯誤碼對應的負數)。

此方法應視為實驗性質

VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_read, 4, arguments);
}
io_select(readables, writables, exceptables, timeout) 按一下以切換來源

IO.select 呼叫,以詢問指定的描述符是否在指定的 timeout 內準備好指定的事件。

預期傳回已準備好的 陣列 的 3 元組 IO

VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
{
    VALUE arguments[] = {
        readables, writables, exceptables, timeout
    };

    return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
}
io_wait(io, events, timeout) 按一下以切換來源

IO#waitIO#wait_readableIO#wait_writable 呼叫,以詢問指定的描述符是否已準備好針對指定的事件,且在指定的 timeout 內。

eventsIO::READABLEIO::WRITABLEIO::PRIORITY 的位元遮罩。

建議的實作應註冊哪些 Fiber 正在等待哪些資源,並立即呼叫 Fiber.yield 以將控制權傳遞給其他 fiber。然後,在 close 方法中,排程器可能會將所有 I/O 資源傳送給正在等待它的 fiber。

預期傳回已準備好的事件子集。

VALUE
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
    return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
}
io_write(io, buffer, length, offset) → 已寫入長度或 -errno 按一下以切換來源

IO#writeIO::Buffer#write 呼叫,以從指定的 buffer (請參閱 IO::Buffer),在指定的 offset 處,將 length 位元組寫入 io

length 參數為「要寫入的最小長度」。如果 IO 緩衝區大小為 8KiB,但指定的 length 為 1024 (1KiB),則最多會寫入 8KiB,但至少會寫入 1KiB。一般來說,寫入的資料量少於 length 的唯一情況是寫入資料時發生錯誤。

指定 length 為 0 是有效的,表示嘗試至少寫入一次,盡可能多的資料。

建議的實作應嘗試以非封鎖方式寫入 io,如果 io 尚未準備好 (這會將控制權傳遞給其他 fiber),則呼叫 io_wait

請參閱 IO::Buffer,以取得可用的介面,以有效率的方式從緩衝區取得資料。

預期傳回已寫入的位元組數,或在發生錯誤的情況下,傳回 -errno (與系統錯誤碼對應的負數)。

此方法應視為實驗性質

VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_write, 4, arguments);
}
kernel_sleep(duration = nil) 按一下以切換來源

Kernel#sleep 和 Mutex#sleep 呼叫,預期提供以非封鎖方式休眠的實作。實作可能會在「哪個 fiber 等待到哪個時刻」的清單中註冊目前的 fiber,呼叫 Fiber.yield 以傳遞控制權,然後在 close 中,繼續等待時間已過的 fiber。

VALUE
rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
{
    return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
}
process_wait(pid, flags) 按一下以切換來源

Process::Status.wait 呼叫,以等待指定的程序。請參閱該方法說明,以取得參數說明。

建議的最小實作

Thread.new do
  Process::Status.wait(pid, flags)
end.value

此掛鉤是選用的:如果它不存在於目前的排程器中,Process::Status.wait 會表現得像一個封鎖方法。

預期會傳回一個 Process::Status 實例。

VALUE
rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
{
    VALUE arguments[] = {
        PIDT2NUM(pid), RB_INT2NUM(flags)
    };

    return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
}
timeout_after(duration, exception_class, *exception_arguments, &block) → result of block 按一下以切換來源

Timeout.timeout 呼叫以在給定的 duration 內執行指定的 block。它也可以由排程器或使用者程式碼直接呼叫。

嘗試將給定 block 的執行時間限制在給定的 duration 內,如果可能的話。當非封鎖作業導致 block 的執行時間超過指定的 duration 時,該非封鎖作業應該透過引發使用給定的 exception_arguments 建構的指定 exception_class 來中斷。

一般執行逾時通常被認為是有風險的。此實作只會中斷非封鎖作業。這是設計使然的,因為預期非封鎖作業可能會因為各種難以預測的原因而失敗,所以應用程式在處理這些條件和暗示的逾時時應該已經很穩健。

然而,由於這個設計,如果 block 沒有呼叫任何非封鎖作業,將無法中斷它。如果你希望提供可預測的逾時點,請考慮加入 +sleep(0)+。

如果 block 成功執行,它的結果將會傳回。

例外狀況通常會使用 Fiber#raise 引發。

VALUE
rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
{
    VALUE arguments[] = {
        timeout, exception, message
    };

    return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
}
unblock(blocker, fiber) 按一下以切換來源

呼叫以喚醒先前使用 block 封鎖的 Fiber(例如,Mutex#lock 會呼叫 block,而 Mutex#unlock 會呼叫 unblock)。排程器應該使用 fiber 參數來了解哪個 fiber 已解除封鎖。

blocker 是等待的內容,但它僅供參考(用於偵錯和記錄),且不保證與 blockblocker 相同。

VALUE
rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
{
    VM_ASSERT(rb_obj_is_fiber(fiber));

    return rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
}