篩選器和管道重載

lua-users home
Wiki

一個以協程為基礎的架構

在管道、呼叫鏈和符號標記上

作者:HartmutSchaefer

摘要

PIL 9.2 章(https://lua.dev.org.tw/pil/9.2.html)與 DiegoNehab 的一篇極為有趣的文章《篩選器、來源和汇率》激發了我對管道的實驗,並對符號標記議題有所思考,以及是什麼讓管道有別於我們在命令式程式設計中習慣的工作方式。在本文中,我將探討管道與鏈式函數或方法呼叫間的共通點和差異。接著,我將提出並不斷演進一組用於在 Lua 中管理管道的函數。最後,我將使用一些元編程獲取一個更類管道的符號標記。

資料轉換之道

電腦的許多應用都牽涉到將資料從一種表示形式轉換為另一種形式,其中可能包括與其他資料進行混搭與配對的各種形式。此類轉換在性質上自然會變得比較複雜或單純。

複雜的資料轉換(就像任何其他演算法)若能分解成更小的部分,每個部分執行單純而明確的任務,則較易於理解。假設這樣的部分還能以可重複使用的方式建立,形成一組建構模組,複雜的轉換便可輕易地「組裝」起來,這將產生額外的好處。

最簡單的組合方式可能是循序執行數個轉換,亦即使用每個轉換的結果,做為下一個轉換的輸入。符號標記與內部運作會根據我們工作的環境而異

假如你正在使用函數的話,你可能會寫下類似這樣的東西

result = transformation3( transformation2( transformation1( data ) ) )

這看來並不直覺,因為轉換會以與其應用順序相反的順序寫出。(如果轉換會額外取得參數,其符號標記只會變得更糟。)不過,符號標記並不會改變正在發生事情的本質,而且還能在不改變本質的情況下進行增強。如果你有一個函數組合機制,你可以寫下類似以下的內容,讓轉換以它們的應用順序出現

result = chain( transformation1, transformation2, transformation3 )( data )

(使用更多高階函數,你甚至還能用轉換來塞入額外的參數。)

再者,如果你獲得了編譯器或某些預處理器的協助(例如:將 | 算子轉換為使用右運算項呼叫左運算項,並將左運算項做為參數),你可以得到一個更直覺的符號標記,將你開始處理的資料放在鏈的開頭,並可能寫下類似這樣的內容

result = data | transformation1 | transformation2 | transformation3

而且,如果你喜歡的話,借助更多協助,你甚至可以放棄我們熟悉的指派符號標記,並且將結果接收器放在鏈的末尾,最後得到一個看起來十分像是 UNIX 管道的符號標記,顯示資料如何穿過一系列轉換,如下所示

data > transformation1 > transformation2 > transformation3 > result

或類似。不過請注意,這並不會改變底層的運作原理:一個簡單的包裝式函式呼叫順序!

在物件導向程式設計中,包括 C# 的延伸方法等機制,你可以使用串連式的方法呼叫免費取得上述的順序運算表示法

result = data.transformation1().transformation2().transformation3()

這個表示法中的括號也相當清楚地顯示,「資料」流經的轉換「管道」只不過是一系列函式呼叫。(順帶一提,OO 中可能轉換被做為方法,與其輸入資料類型綑綁在一起,這表示的並非限制,而只是一個實現細節。)

回到我們討論的核心議題,UNIX 為我們帶來了應用在資料上的一系列轉換的一個完全不同的概念:UNIX 管道。它們在盒子上就附有與我們前面創建的類似,直覺式的管道表示法(當時其實並沒有管道!)

data > transformation1 | transformation2 | transformation3 > result

我們很快就會看到管道與函式或方法呼叫鏈之間根本性的不同,但我們先來注意一下這三種機制的共通點,與表示法無關,它們都反映出同一個原則:將一系列轉換應用在某些資料上。

話雖如此,有一個面向從根本上區分了管道與函式或方法呼叫,正如我們前述所見它既不是表示法,也看不出來:管道以區塊處理資料。這適用於管線的個別階段(稱為篩選器),以及整體管線。任何階段僅會把實際需要用於產生下一個輸出的資料量保存在記憶體或其他暫存儲存體中,而且任何階段之間都不會組裝並儲存中間結果。

此項特性帶來了一個重要的區分。管道不僅能夠處理無限量的資料,它們甚至可以處理無限資料串流,例如感測器測量或類似資料,並在資料可用時產生結果。這無法使用函式或方法呼叫來完成,因為在處理它們之前無法事先收集無限序列的資料,而且在資料串流結束之前也無法有任何結果,換句話說就是永遠不會有任何結果。

此項差異導致管道內轉換之間連線的介面發生根本性差異。雖然函式或方法呼叫透過參數與傳回值進行通訊,但篩選器透過接收與發送資料區塊進行通訊。這個介面的重要方面在於輸入與輸出本質上是分離的,這表示並非每個輸入都需要產生輸出,每個輸出也不一定是由輸入造成的。(我們可以說一個篩選器是扁平化運算的一般化與其相反的運算,以及這兩者之間的混合體。)

因此,管道運作的方式與函式呼叫的同步性質有著根本上的不同,使之無法將管道實作為一個閱讀-轉換-轉換-轉換-寫入運算的簡單迴圈,如下所示

-- Constructs a pipe connecting the given functions. The pipe will read data
-- by calling its first parameter, then transform it by invoking all given
-- functions in sequence, and output the result by calling its second parameter.
-- This will continue until input is exhausted.
-- THIS IS NOT A REAL PIPE!
function pipe( ... )
    local filters = arg
    return function( input, output )
        for data in input do
            local current = data
            for _, filter in ipairs( filters ) do
                current = filter( current )
            end
            output( current )
        end
    end
end

p = pipe( transformation1, transformation2, transformation3 )
p( io.lines, print )

這樣一種實作僅適用於每一個輸入在任何階段都會轉換成完全一個輸出的最簡單轉換。這並非一般濾波器所為。(這不代表管線能夠做無法使用函式呼叫所做的轉換:在所有待轉換的資料都收集並整合成一個轉換時,它也可以使用一般的函式呼叫進行處理 - 差別大多數出在記憶體佔用空間。不過,對於無限資料串流來說的確有差異性。反之,管線的個別階段很可能會使用一般的函式來完成處理工作,這是很正常的。這是因為它們能夠自由收集任意資料作為具體函式呼叫的輸入。)

Diego 使用函式呼叫介面來實作濾波器,並透過使用特殊協定來解決輸入和濾波器輸出的非同步特性之間的阻抗不匹配,該協定允許進行非輸出和非輸入「濾波器」呼叫。雖然這不只會讓排程器變得複雜,更重要的是,這也會讓濾波器實作變得複雜,不過對於他使用框架鎖定處理的工作性質來說,這種方式非常好用。

在濾波器實作為個別處理程序時,更直接的濾波器演算法實作會變得可行,每個處理程序都有其自己的控制迴圈,透過呼叫系統函數來讀取輸入並寫入輸出。這是 UNIX 實作濾波器的途徑,而且在 Lua 中,可以使用協程程式達成相同的效果。所有的排程和狀態維護都交給執行時期處理,因此,濾波器寫作者只能專注於他們的演算法,並使用自然且簡易的標記。我們來看一下如何使用 Lua 實作管線和濾波器。

探討濾波器和管線

在我們開始之前,先來談談我使用的命名慣例,這樣您或許不會對下面程式碼中的變數名稱感到困惑:「p」字首表示函式參數,「l」字首表示區域變數,「s」字首表示靜態(模組全域)變數,「g」字首表示全域變數(這裡未使用)。函式名稱沒有字首(這並不代表函式不能儲存在必須有字首的變數中)。您可能會有自己的偏好,但是我從這個命名慣例中獲得幾個重要的優點

local lCondition = (pCondition == nil) or pCondition

下方提供的程式碼是針對 Lua 5.0 編寫的。

讓我們先定義一個概觀,然後再探討如何實作概觀,並了解我們可以怎麼做。

概觀

首先,我們想要能夠撰寫濾器並將其作為函式,它取得兩個函式作為參數,分別作為輸入和輸出。它們會依序執行,將所有輸入轉換成輸出。輸入會透過連續呼叫輸入函式取得,任何輸出都會透過呼叫輸出函式寫入。此外,我們不只想要傳遞字串,還要能夠在濾器之間傳遞任何類型的多個值。因此,濾器實作大致會遵循下列其中一個範本

function my_filter( pInput, pOutput )
    for ... in pInput do
        ...
        pOutput( ... )
        ...
    end

function my_filter( pInput, pOutput )
    while ... do
        ...
        ... = pInput()
        ...
        pOutput( ... )
        ...
    end

目前濾器不會傳回任何值。

其次,我們想要能夠將以這種方式實作的濾器串接成鏈(管線),將一個濾器的輸出提供給下一個濾器的輸入。第一個濾器的輸入和最後一個濾器的輸出會和管線的輸入和輸出相同,因此,管線本身會是濾器,並可以和其他的濾器或管線結合。我們想要編寫的程式碼看起來會像底下這樣,其中 transformation1、transformation2、transformation3 是依上方說明實作的濾器

p = pipe( transformation1, transformation2, transformation3 )
p( io.lines, print )

這個範例會從標準輸入讀取所有行,透過連續傳遞三個濾器將資料流進行轉換,然後將結果列印到主控台。

你可以看到,我們不需要任何泵就能讓這件事發生,這和 Diego 的解決方案相反!這很有趣,而且是因為我們的濾器和 Diego 的濾器相反,是主動實體,自己將輸入泵浦到輸出。(順帶一提,這是我們需要協程讓多個濾器同時運作的原因。)

實作管線

正如我們所說的,我們想要能夠在濾器之間傳遞任何類型的多個值。此外,也應該允許傳遞nil,因此我們定義當輸入/輸出交握傳輸的值第一個nil時,表示資料流已經結束。

要實作此例於 Lua 5.0,我們需要一個簡單的工具函式,用來執行與 Lua unpack() 相反的動作,因為當有 nil 出現時,單純用括弧包住呼叫函式,來收集回傳值,是無法達到我們的目的

-- Inverse of Lua unpack(). Returns all its arguments packed into an array.
function pack( ... )
    return arg
end

現在,讓我們來實作管道。下列函式建構一個管道。它接受多個過濾器,傳回一個表示鏈結的新過濾器。呼叫時,所回傳的函式將除了最後一個以外的所有過濾器包入 coroutine,並針對每個過濾器建立函式,用以繼續 coroutine,將過濾器的輸入當作輸入,將 coroutine.yield 當作輸出,並傳回 yield 的值(即過濾器的輸出)。此函式,一個產生器,用作鏈結中下一個過濾器的輸入,第一個過濾器獲得管道的輸入當作輸入。建構鏈結後,呼叫最後一個過濾器,其輸入是 自己的 輸入與管道的輸出,而它回傳的值即是由管道傳回(尾端呼叫)。到目前為止,我們不需要回傳的值,但我們接下來會看到這對我們來說有什麼幫助。

-- Creates a new filter acting as the concatenation of the given filters. A
-- filter is a function with two parameters, input and output, that obtains
-- its input by calling input and writes output by calling output. Optionally,
-- return values may be produced, the return values of the last filter being
-- returned by the pipe.
function pipe( ... )
    local lFilters = arg
    local lFilterCount = table.getn( lFilters )
    return function( pInput, pOutput )
        local lCurrentInput = pInput
        for lFilterPosition = 1, lFilterCount - 1 do
            local lCurrentFilter = coroutine.create( lFilters[ lFilterPosition ] )
            local lPreviousInput = lCurrentInput
            lCurrentInput = function()
                local lResult = pack( coroutine.resume( lCurrentFilter, lPreviousInput, coroutine.yield ) )
                local lSuccess = table.remove( lResult, 1 )
                if lSuccess then
                    if coroutine.status( lCurrentFilter ) == "suspended" then
                        return unpack( lResult )
                    end
                else
                    error( lResult[ 1 ] )
                end
            end
        end
        return lFilters[ lFilterCount ]( lCurrentInput, pOutput )
    end
end

關於上述程式碼的一些額外筆記:首先,在繼續 coroutine 時,我們傳入過濾器的輸入與輸出。只有在開始執行 coroutine 的第一次繼續時,才會強烈需要這麼做。之後,每次呼叫 output,這兩個引數都會回傳給過濾器。由於它們會被過濾器忽略,因此不會造成任何傷害,而且我們不需要實作特殊處理。其次,因為過濾器可能會 回傳 某個值(即使我們透過合約不允許它這麼做),我們必須在每次繼續後檢查 coroutine 是已 yield 還是已結束,來決定我們從 continue 中得到的結果是否必須是輸出。

另外要注意的是,pipe 函式不會自己建構過濾器鏈結,這會延後到過濾器實際執行時才會進行。否則,就不可能執行管道的次數超過一次,這樣也不夠符合過濾器的定義。

這個小函式是我們在 Lua 中執行管道的唯一需求!讓我們看看我們能夠用這個小函式做些什麼。

來源、匯入與封閉管道

首先,我們先針對篩選器以及資料來源及去向進行思考。結果,我們的篩選器介面支援兩種特殊的篩選器類型:一種是不用呼叫 input 參數就能產生它自己的 output 的篩選器,這種情況是來源。相反地,不呼叫 output 參數但會使用 input 的篩選器則稱為接收器。很顯然地,來源和接收器只能發生在管道的起始或結尾。以來源開頭的管道本身就是來源,以接收器結尾的則為接收器。以來源開頭且以接收器結尾的管道是一個特別的案例:它不會執行任何輸入或輸出,在呼叫時會將所有資料從來源大量傳送到接收器。我們稍後會探討這些管道,它們具備許多有趣的特性。首先,讓我們檢視來源和接收器的資料來源和去向。

來源和接收器各自的 input 和 output 參數可能會是什麼意思?在此,它們不需要用來取得輸入或寫入輸出,但可以用來向篩選器(來源或接收器)「描述」實際的資料來源或目的地,舉例來說,要讀取或寫入的檔案名稱,或是透過解析來提取資料的字串。包含這些來源或接收器的管道會將這些參數作為其本身的 input 和 output 參數,將它們不變地傳遞給來源和接收器。

現在,暫時假設我們的接收器是將收到的資料收集到一個表格中,或根據資料計算總計值(例如字數或平均值)的某項物件。在此,篩選器的傳回值便會發揮作用:接收器可以傳回值(甚至可能是多個值),而不用(或除了)使用其 output 參數。管道(在這種情況下也是接收器)會傳回這個結果,而不做任何修改。

現在,我們重新檢視同時是來源和接收器的管道。如前所述,它們不執行任何輸入或輸出呼叫。我們可以將此類管道稱為「封閉」管道,因為它們將其資料流保存在「內部」,而提供函式呼叫介面:它們使用參數和傳回「值」來傳達輸入和輸出。也就是說,它們實際上可以視為「常規函式」,其具有一個或兩個參數,而且可能會傳回結果!就像函式一樣,它們可以自由呼叫,每次呼叫都會將在 input 參數中以「一體」給予它們的資料透過其本身「泵送」,然後再將結果以「一體」傳回(不是作為傳回值,就是透過 output 參數放入指定位置)。請注意,與一般篩選器作業不同,連續資料間並不會保有狀態:每個輸入都會取得其各自的新管道實例,而當資料處理完畢並傳回結果時,該實例就會「死亡」。因此,封閉管道實際上會將管道變成函式!

現在,我們來看實際上如何使用管道。

使用管道

為了實驗,我們將定義一個簡單的資料來源,產生以 1 為起點的整數序列,直到達到作為輸入參數所給予的限制

-- Produce a sequence of integers
function seq( pLimit, pOutput )
    for lValue = 1, pLimit do
        pOutput( lValue )
    end
end

我們可以測試這項濾器,根據結果顯示(至少只要「印出」會提供我們足以用於對數值進行判斷的資訊),使用「任何」濾器或管線,輸入其輸入值並將「印出」作為輸出,就可以取得印在主控台上的輸出!酷吧?讓我們將數字序列來源的輸出印在主控台上

> seq( 4, print )
1
2
3
4

現在,我們將定義一個資料接收器,用來計算從輸入讀取的所有數值的平均值

-- Compute the average from a sequence of numbers
function avg( pInput )
    local lSum, lCount = 0, 0
    for lValue in pInput do
        lSum = lSum + lValue
        lCount = lCount + 1
    end
    return lSum / lCount
end

讓我們使用資料來源產生的資料來測試它,我們會為此建立一個管線

> average_of_sequence = pipe( seq, avg )
> return average_of_sequence( 4 )
2.5

現在,讓我們過濾一下,下列濾器會從輸入讀取數字,並將其平方寫到輸出中

-- Produce the squares of the input values
function square( pInput, pOutput )
    for lValue in pInput do
        pOutput( lValue * lValue )
    end
end

使用這個濾器,我們會建立一個傳遞前 n 個平方的序列的管線,再添加另外一個平方濾器,並取得產生 4 的冪序列的另一個管線,最後,我們從頭開始建立一個管線,來計算前 n 個 4 的冪的平均值

> seq_squares = pipe( seq, square )
> seq_squares( 4, print )
1
4
9
16
> seq_squares_squared = pipe( seq_squares, square )
> seq_squares_squared( 4, print )
1
16
81
256
> average_of_seq_of_powers = pipe( seq, square, square, avg )
> return average_of_seq_powers( 4 )
88.5

這個顯然運作良好,但是我們的平方濾器真的沒有那麼有趣:它並未利用已解耦的輸入和輸出的管線提供。使用本導言中以縮減管線建置為基礎,上述的轉換也會執行得很好。我們因此寫兩個更有趣的濾器,第一個只會輸出從輸入讀取的奇數,而第二個會將其輸入設定為成對

-- Let only odd numbers pass through
function odd( pInput, pOutput )
    for lValue in pInput do
        if math.mod( lValue, 2 ) == 1 then
            pOutput( lValue )
        end
    end
end

-- Collect input into pairs
function pair( pInput, pOutput )
    for lFirstValue in pInput do
        local lSecondValue = pInput()
        pOutput( lFirstValue, lSecondValue )
        if lSecondValue == nil then
            break
        end
    end
end

如果我們使用數值序列來源連接濾器,我們會發現它們的運作符合預期

> seq_odd = pipe( seq, odd )
> seq_odd( 4, print )
1
3
> seq_pairs = pipe( seq, pair )
> seq_pairs( 4, print )
1       2
3       4

現在我們不會為濾器建立範例,會呼叫比輸入更頻繁的輸出,因為我們稍後會在討論後續壓縮濾器時看到這個應用,現在先看看我們如何將我們的管線與現有的,或打算編寫的程式碼介面建立起來

管線與現有程式碼的介面

建立來源和接收器

你可能已經注意到,濾器和管線會將產生器作為第一個(輸入)參數,有一些函數會回傳產生器,例如:io.lines()string.gfind(),我們可以將其直接用於管線作為輸入,例如:我們可以使用管線 my_pipe 處理字串 data 中包含的所有數字

> my_pipe( string.gfind( data, "%d+" ), print )

不過,如果我們可以使用 string.gfind 製作一個資料來源,那會很好,這樣我們可以將其放在管線的最開始,然後撰寫

> my_pipe( data, print )

下列的實用程式函數可以協助我們完成這個動作,它會從回傳產生器的函數中建構一個資料來源,資料來源執行時,會呼叫所提供的函數,並將回傳產生器提供的所有資料寫到其輸出中

-- Wraps a function returning a generator as data source
function source( pFunction )
    return function( pInput, pOutput )
        local lInput = pFunction( pInput )
        while true do
            local lData = pack( lInput() )
            if lData[ 1 ] == nil then
                break
            end
            pOutput( unpack( lData ) )
        end
    end
end

針對我們的便利性,我們界定了以下函數,它封裝一個函數,接受多個引數和一些值,並包裝成只接受一個引數的函數,它利用這個引數呼叫提供的函數並接著這些值。由於 Lua 函式庫中的大多數函數都採用要作用的值(「this」值)作為第一個引數,這個函數在 Lua 提供了函數程式設計中部份函數應用會提供的功能

-- Wraps a function returning a generator as data source
function curry( pFunction, ... )
    return function( pParam )
        return pFunction( pParam, unpack( arg ) )
    end
end

現在,我們可以建構一個使用 string.gfind 搭配模式「%d+」的資料來源,以尋找在給定輸入參數的字串中包含的所有數字,並將這些數字寫入其輸出(作為字串)

> parse_numbers = source( curry( string.gfind, "%d+" ) )
> parse_numbers( "123 5 78 abc 12", print )
123
5
78
12

我們會在下一章節介紹另一個介面函數後,開始處理這個資料來源。

因此,在我們定義一個函數來建構資料來源(該函數會傳回一個產生器)後,如何從我們已有的內容建構資料接收器?相反的操作是什麼?

事實證明,我們無需建構任何東西:相反的操作是一個函數,它會接受一個產生器並傳回一個值。但這正是符合接收器過濾器介面(對接收器來說)的描述:任何接受產生器(以及可選擇性第二個引數)並(可選擇性)傳回一個值的函數,本身就是一個接收器 - 不需要建構一個!

透過函數傳遞值

我們接下來想要完成的是,從現有的函數建立過濾器。模式很簡單:讀取輸入、呼叫函數,然後將結果寫入輸出。以下函數達成了這一點

-- Wraps a function as filter
function pass( pFunction )
    return function( pInput, pOutput )
        while true do
            local lData = pack( pInput() )
            if lData[ 1 ] == nil then
                break
            end
            pOutput( pFunction( unpack( lData ) ) )
        end
    end
end

有了這個函數,我們可以建立一個過濾器,它使用 tonumber() 將從數字剖析器傳遞的字串轉換為數字,所以我們可以用來計算其平均值。我們建構以下管道

> avg_from_string = pipe( parse_numbers, pass( tonumber ), avg )
> return avg_from_string( "123 5 78 abc 12" )
54.5

請注意,我們可以使用傳遞 pass() 給「已封存管道」將其作為子管道,並藉此傳遞資料。

以下函數是一個 pass 變體,它不會進行任何輸出,有效地消耗所有輸入,不傳回任何資訊

-- Wraps a function as data sink, consuming any input without any output or
-- return
function consume( pFunction )
    return function( pInput )
        while true do
            local lData = pack( pInput() )
            if lData[ 1 ] == nil then
                break
            end
            pFunction( unpack( lData ) )
        end
    end
end

讀取和寫入管道

現在,讓我們看看是否可以管道介接自訂程式邏輯,以在迴圈中處理資料。有兩種端點可以讓管道介接:處理管道輸出的資料,以及將值輸入管道中。(主動輸入過濾器的輸入,並在同一個執行緒中讀取其輸出,可能不會有太大幫助,因為會造成緩衝和同步問題。因此,我們不會深入探討這一點。)

要透過迴圈來處理管線中輸出的資料,我們需要將管線變成產生器。以下函式會完成此事,將管線 (或任何篩選器) 包裝成可重複使用的函式,只要一個引數就會傳回產生器,提供管線輸出的 yield 格式輸出 (這個函式的特徵模式與 string.gfind()io.lines() 等函式相同)

-- Wraps a filter into a function with one argument returning a generator that
-- yields all values produced by the filter from the input given as argument.
function drain( pFilter )
    return function( pInput )
        local lGenerator = coroutine.create( pFilter )
        return function()
            local lResult = pack( coroutine.resume( lGenerator, pInput, coroutine.yield ) )
            local lSuccess = table.remove( lResult, 1 )
            if lSuccess then
                if coroutine.status( lGenerator ) == "suspended" then
                    return unpack( lResult )
                end
            else
                error( lResult[ 1 ] )
            end
        end
    end
end

現在我們可以寫下類似以下的程式碼

for ... in drain( pipe( ... ) )( ... ) do
    ...
end

我們最初提到要排出的管線必須是資料來源,但事實上,上述函式並未對這個觀念做任何假設,因此並不表示管線必須是資料來源。這端看我們打算給回傳函式什麼參數:如果我們打算給它一個產生器,這個管線就不必是資料來源。

現在,如果要說相反的繫結,也就是將資料接收器的管線變成函式,這樣我們就能重複呼叫這個函式,輸入資料至管線中,又該怎麼辦呢?我們可以用以下方式實做傳回 feeder 的函式 (為求簡潔,我們在此不回傳可重複使用的函式,像 drain() 那樣,而是一步將輸出參數繫結至管線中)

-- Wraps a pipe that must be a data sink into a function that feeds its
-- arguments into the pipe's input.
-- THIS FUNCTION DOES NOT WORK WITH THE PIPES WE USE HERE!
function feed( pPipe, pOutput )
    local lOutput = coroutine.create( pPipe )
    local lPush = function( ... )
        local lSuccess, lResult = coroutine.resume( lOutput, unpack( arg ) )
        if not lSuccess then
            error( lResult )
        end
        return coroutine.status( lOutput ) == "suspended"
    end
    lPush( function() return coroutine.yield() end, pOutput )
    return lPush
end

這個函式根據管線建立協同常式,並傳遞 coroutine.yield 作為輸入,接著執行該協同常式,直到它 yield (要求第一個輸入),並傳回包覆器,此包覆器可在每個呼叫時,延續執行協同常式,並傳遞引數給該協同常式。如果協同常式再 yield 一次,包覆器就會傳回 true,這表示協同常式會接受更多輸入。我們來使用它

> feeder = feed( square, print )
stdin:6: attempt to yield across metamethod/C-call boundary
stack traceback:
        [C]: in function `error'
        stdin:6: in function `lPush'
        stdin:10: in function `feed'
        stdin:1: in main chunk
        [C]: ?

糟糕!我們發現 Lua 的限制:Lua 不允許從 for 迴圈標頭呼叫的產生器 yield!如果我們重新寫入我們的 square 篩選器,我們可以解決這個限制

> function square2( pInput, pOutput )
>>     while true do
>>         local lValue = pInput()
>>         if lValue == nil then break end
>>         pOutput( lValue * lValue )
>>     end
>> end
> feeder = feed( square2, print )
> return feeder( 2 )
4
true

不過,還有另一個問題:feed 函式只會針對簡單篩選器作用,它不會作用於管線。原因在於我們在管線實做中,執行篩選器的協同常式在延續/yield 鏈中會「堆疊」在另一個協同常式上,因此接收到 coroutine.yield 作為輸入的篩選器已由其後方的篩選器延續執行,因此它會在那裡 yield 而不會 yield 至頂層呼叫管線的程式碼。我們無法取回處理器,因為管線在等待輸入,這與我們的意圖不合。

我們可以改善管線實做,這樣它就不會有這個問題:為了達成此事,我們將所有包含的篩選器都包覆成協同常式,而且它們 會在輸入 輸出時 yield,再透過呼叫輸入和輸出的主迴圈加以控制

-- A symmetric pipe implementation. Pipes of this sort can be resumed from
-- both ends but prevent the constituting filters to use 'for' loops for
-- reading input. Also, sources and sinks cannot use the input and output
-- parameters of the pipe.
function symmetric_pipe( ... )
    local lFilters = arg
    local lFilterCount = table.getn( lFilters )
    return function( pInput, pOutput )
        local lHandover
        local lInput = function()
            lHandover = nil
            coroutine.yield()
            return unpack( lHandover )
        end
        local lOutput = function( ... )
            lHandover = arg
            coroutine.yield()
        end
        local lProcessors = {}
        for _, lFilter in ipairs( lFilters ) do
            table.insert( lProcessors, coroutine.create( lFilter ) )
        end
        local lCurrentProcessor = lFilterCount
        while lCurrentProcessor <= lFilterCount do
            if not lProcessors[ lCurrentProcessor ] then
                error( "Requesting input from closed pipe" )
            end
            local lSuccess, lResult = coroutine.resume( lProcessors[ lCurrentProcessor ], lInput, lOutput )
            if not lSuccess then
                error( lResult )
            end
            if coroutine.status( lProcessors[ lCurrentProcessor ] ) == "suspended" then
                if lHandover == nil then
                    lCurrentProcessor = lCurrentProcessor - 1
                else
                    lCurrentProcessor = lCurrentProcessor + 1
                end
                if lCurrentProcessor == 0 then
                    lHandover = pack( pInput() )
                    lCurrentProcessor = 1
                elseif lCurrentProcessor > lFilterCount then
                    pOutput( unpack( lHandover ) )
                    lCurrentProcessor = lFilterCount
                end
            else
                lHandover = {}
                lProcessors[ lCurrentProcessor ] = nil
                lCurrentProcessor = lCurrentProcessor + 1
            end
        end
    end
end

這個管線的實作允許使用管線與 feed(),不過,這些管線有另一個問題:由於 所有 的過濾器現在都會在輸入時產生,它們會受到我們上面所看到的 Lua 限制,有效禁止我們在 任何 的過濾器中使用 for 迴圈。這是個太嚴重的限制,所以我們放棄這個優雅的實作,接受 沒辦法在迴圈中將資料提供給管線 這件事。如果我們想要寫出將資料提供給管線的程式碼,我們必須要使用 corourine.yieldcoroutine.wrap 將其包裝成一個產生器,或是將它寫為資料源,將其填入管線的前方,然後 執行 該管線。我們的管線必須在控制位置,迴圈控制執行沒有任何辦法!

上述 symmetric_pipe() 實作有另一個問題:它會禁止將管線的輸入和輸出參數用作資料源和接收器的參數,因為它們永遠不會讓它們看到。我們會因此失去不少彈性。

讀寫檔案

使用上面提到的 source() 函式,我們可以簡單包裝 io.lines 以取得一個資料源,該資料源會讀取檔案並且將其內容一行一行寫入其輸出。遺憾的是,標準函式庫中沒有我們可以用作檔案接收器的函式,我們必須使用 io.write 來撰寫我們自己的函式。為了對稱,我們因此也實作一個 來源 檔,使用 io.read。我們以一種讓我們可以將其用作來源(採用兩個參數)或傳回產生器的函式(使用一個參數呼叫時)的方式來實作它。這樣讓我們可以在考量之下使用它來建置管線以及呼叫管線

-- Returns a function that can be used as data source yielding the contents of
-- the file named in its input parameter, processed by the given formats, or
-- that can be called with a filename alone to return a generator yielding this
-- data. If no formats are given, line-by-line is assumed.
function filereader( ... )
    local lFormats = arg
    return function( pInput, pOutput )
        local lInput = io.open( pInput )
        local lOutput = pOutput or coroutine.yield
        local lFeeder = function()
            while true do
                local lData = pack( lInput:read( unpack( lFormats ) ) )
                if lData[ 1 ] ~= nil then
                    lOutput( unpack( lData ) )
                end
                if lData[ table.getn( lData ) ] == nil then
                    break
                end
            end
            lInput:close()
        end
        if pOutput then
            lFeeder()
        else
            return coroutine.wrap( lFeeder )
        end
    end
end

-- Returns a data sink that writes its input, optionally formatted by the
-- given format string using string.format(), to the file named in its output
-- parameter. If no format is given, the processing is analogous to print().
function filewriter( pFormat )
    return function( pInput, pOutput )
        local lOutput = io.open( pOutput, "w" )
        while true do
            local lData = pack( pInput() )
            if lData[ 1 ] == nil then
                break
            end
            if pFormat then
                lOutput:write( string.format( pFormat, unpack( lData ) ) )
            else
                for lIndex = 1, table.getn( lData ) do
                    if lIndex > 1 then
                        lOutput:write( "\t" )
                    end
                    lOutput:write( tostring( lData[ lIndex ] ) )
                end
                lOutput:write( "\n" )
            end
        end
        lOutput:close()
    end
end

讓我們使用這兩個來逐行複製一個檔案

> copy = pipe( filereader(), filewriter() )
> copy( "data.in", "data.out" )

如果我們填入各種過濾器於讀取器與寫入器之間,我們可以在通過管線的列上執行我們想要的任何處理程序。一個簡單的範例工具像 grep。下列函式建置我們可以因此而使用的過濾器

-- Returns a filter that filters input by the given regexp
function grep( pPattern )
    return function( pInput, pOutput )
        for lData in pInput do
            if string.find( lData, pPattern ) then
                pOutput( lData )
            end
        end
    end
end

我們現在不會測試這個函式,而是撰寫一個更有趣的範例。你可能會注意到上面提到的函式完全沒有提到行。事實上,過濾器會從輸入是行或不是行中獨立。我們來寫另一個過濾器,將各行收集成段落,並且將這兩個函式放在一起,在段落而不是行上執行 grep(就像 grep -p)。

-- Filter collecting lines into paragraphs. The trailing newline is preserved.
function paragraphize( pInput, pOutput )
    local lBuffer = {}
    for lLine in pInput do
        table.insert( lBuffer, lLine )
        if lLine == "" then
            pOutput( table.concat( lBuffer, "\n" ) )
            lBuffer = {}
        end
    end
    if next( lBuffer ) then
        pOutput( table.concat( lBuffer, "\n" ) )
    end
end

> filter_foobar_paragraphs = pipe( filereader(), paragraphize, grep( "foobar" ), filewriter() )
> filter_foobar_paragraphs( "data.in", "data.out" )

如果我們喜歡,我們也可以不用檔案寫入器,將 print 提供給第二個管線參數,再提供輸出檔案名稱,讓結果列印到控制台。

一個特別有趣的檔案讀取器類型是一種可以讀取 PIL 第 10.1 節中所述的 Lua 資料檔案的檔案讀取器 (https://lua.dev.org.tw/pil/10.1.html):我們希望它可以將從檔案讀取的各個單元寫入輸出。下列函式可作為資料來源或回傳產生器的函式,其運作方式類似檔案讀取器。它會在一個空的沙盒中執行 Lua 檔案,利用一些元程式設計來提供執行的區塊呼叫函式以提交記錄(在 PIL 範例中為entry)。這是透過定義沙盒的__index元方法來完成,以使用函式提供區塊,該函式會輸出或產生記錄及其標籤名稱(在本例中為entry)。將標籤名稱也寫入輸出,讓我們可以使用傳遞超過一種資料種類(實體類型)的輸入檔案,舉例來說,如果我們希望將整個(足夠小的)關聯式資料庫的內容保留在一個資料檔案中,我們可以使用資料表名稱作為標籤名稱,並將所有資料表的內容寫入一個檔案。

-- Invocation with a single argument returns a generator yielding the records
-- from the named Lua file. When invoked with two arguments this function acts
-- as a data source yielding this data on output.
function luareader( pInput, pOutput )
    if pInput == nil then
        error( "bad argument #1 to `loadfile' (string expected, got nil)" )
    end
    local lInput, lError = loadfile( pInput )
    if not lInput then
        error( lError )
    end
    local lOutput = pOutput or coroutine.yield
    local lSandbox = {}
    setmetatable( lSandbox, {
        __index = function( pSandbox, pTagname )
            local lWorker = function( pRecord )
                lOutput( pRecord, pTagname )
            end
            pSandbox[ pTagname ] = lWorker
            return lWorker
        end
    } )
    setfenv( lInput, lSandbox )
    if pOutput then
        lInput()
    else
        return coroutine.wrap( lInput )
    end
end

如果我們針對 https://lua.dev.org.tw/pil/10.1.html 中的資料(僅一條記錄)來執行這項測試,我們會取得

> reader = luareader( "data.lua" )
> return reader()
table: 2001af18 entry
> return reader()
> 

現在讓我們來研究一下使用管線可以實現的更複雜的轉換。

扁平化和摺疊過濾器

您可能已經在思考,我們要如何從多個輸入來源提供資料給管線,以便管線輸入是這些來源輸出的串接。

如果您思考這個問題,您會了解到這個操作的核心不限於來源,並且可以在管線的任何階段執行:我們需要的是一種可以從每個輸入項目產生序列,並將所有這些序列的串接寫入輸出的過濾器。這基本上是一個扁平化操作。將這個一般化功能可用作過濾器,我們將不再需要建立像多來源那樣特別的東西:要串接多個輸入,我們只需要產生這些輸入的序列或描述它們的值,並套用過濾器,將每個元素展開為其包含的資料。我們最好的作法是呼叫函式,將元素視為參數並回傳產生器,來產生展開元素所產生的資料。

下列函式取得回傳產生器的函式(例如io.lines),並從中建構過濾器,讓過濾器對每個輸入呼叫這個函式,將產生器的輸出提供給它的輸出

-- Returns a filter that calls the given function for every input and feeds
-- the output of the returned generator into its output.
function expand( pFunction )
    return function( pInput, pOutput, ... )
        while true do
            local lInput = pack( pInput() )
            if lInput[ 1 ] == nil then
                break
            end
            local lGenerator = pFunction( unpack( lInput ) )
            while true do
                local lData = pack( lGenerator() )
                if lData[ 1 ] == nil then
                    break
                end
                pOutput( unpack( lData ) )
            end
        end
    end
end

假設我們有一個檔案包含一串檔名,每行一個,我們希望將這些檔案的內容逐行串接到一個輸出檔案中。以下是如何執行此動作

> concat_files = pipe( filereader(), expand( filereader() ), filewriter() )
> concat_files( "filelist.in", "data.out" )

順道一提,這個範例顯示了 filereader()所回傳函式的雙重性質背後的隱藏功能。

現在讓我們思考相反的運作:將流過管線的資料組裝成較大的單位。由於流過管線的資料本質上是平面的,組裝濾器必須自己知道(有時候是藉由查看資料)如何繪出界線,用以區分組裝到一個組合中的資料和組裝到下一個組合中的資料。與平面化情況中從一個組合切換到下一個組合時,沒有像輸入結束那樣的外部提示。

描述組裝演算法最簡單的方法,就是當作一個函數,這個函數將疊代器當成引數,從中提取所需資料量,然後傳回組合。下列函數可將此類函數包裝成摺疊濾器:

-- Filter that calls the given function with its input until it is exhausted,
-- feeding the returned values into its output.
function assemble( pFunction )
    return function( pInput, pOutput )
        local lTerminated = false
        local function lInput()
            local lData = pack( pInput() )
            if lData[ 1 ] == nil then
                lTerminated = true
            end
            return unpack( lData )
        end
        repeat
            pOutput( pFunction( lInput ) )
        until lTerminated
    end
end

有這個函數後,我們可以提供一個較簡單的實作,用以將行收集至段落,而我們先前提早實作。

-- Collects a paragraph by pulling lines from a generator. The trailing newline
-- is preserved.
function collect_paragraph( pInput )
    local lBuffer = {}
    for lLine in pInput do
        table.insert( lBuffer, lLine )
        if lLine == "" then
            break
        end
    end
    return table.concat( lBuffer, "\n" )
end

paragraphize = assemble( collect_paragraph )

巢狀管線

處理複雜資料

在前一節,我們看到如何分解和組裝複合資料(本例中是檔案和段落)。這些複合資料的組成部分與複合資料本身一起傳遞在同一個線性管線中,於是形成了單一的線性轉換路徑。(這不正是管線的一切嗎?)關於這個場景,要注意兩個事實:

畢竟,在一般情況下,後者甚至是不可能的,因為中間的濾器可以依據其意願篡改分解的資料,模糊初始組合之間的界線。

現在,想像如果我們想處理具有異質結構的複雜資料,其中組成部分必須經過不同的轉換,並且我們也想使用管線處理這些轉換,那麼必須使用哪種拓撲結構,才不會只用一個線性管線。想到的拓撲結構,會是一組從一個節點發出並加入另一個節點的平行「子管線」。問題是,在這些節點中,甚至是節點之間,如何同步子管線的輸入和輸出?

顯然平行管線不應單獨執行「取用彼此的食物」,且所有管線運算的成果都必須組成所有管線輸出的成果。自然地,我們想到所有子管線在同一個節點(篩選器)中擁有輸入和輸出,而他們只取得一個輸入(由控制子管線的篩選器所讀取),並且只能產生一個輸出(由控制篩選器組裝和寫入輸出)。換句話說,子管線與篩選器的輸入和輸出分開,而輸入和輸出都由控制篩選器節點提供。實際上,子管線以函數的方式執行,取得參數並產生回傳值。如我們所知,這些管線必須從資料來源(取得輸入值)開始,並以資料匯(回傳結果值)結束。從篩選器節點的角度來看,這與呼叫函式沒有任何差別。

事實上,使用平行「子管線」處理複雜值的組成部分對將它們視為輸入/輸出的篩選器來說,不構成特殊的條件 - 這很像是呼叫函式。由於管線並沒有什麼特別之處,因此我不打算提供一個透過呼叫一組函式,從其他函式建立結構化值的功能。

然而,我們可以提供一個使用子管線(包含複雜值的)簡單範例。我們會使用 pass() 來執行密閉管線作為子管線,以處理流經管線的個別值。回想上面我們 average_of_seq_of_powers 的範例,用來計算 4 的前 n 次方數的平均值。我們現在想要知道這個值隨著 n 如何變化,並將每個 n 輸入我們現有的 average_of_seq_of_powers 管線中,使用它作為子管線,來建立相對應的順序。

> seq_of_average_of_seq_of_powers = pipe( seq, pass( average_of_seq_of_powers ) )
> seq_of_average_of_seq_of_powers( 4, print )
1
8.5
32.666666666667
88.5

這是可能的,因為 average_of_seq_of_powers密閉管線,因此像函式一樣運作。「傳遞」這個用語可能有點誤導,因為沒有存活的子管線會讀取輸入並寫入輸出,而是一種完全不同的通訊模式:每傳遞一個值就會建立管線的新執行個體並執行到結束。但是「傳遞」也是我們在呼叫函式時使用的用語,而且事實上這個值確實會「經過」子管線,經歷轉換,因此這個用語應該是合理的。

處理異質資料串流

除了我們在上一節討論過的內容外,我們可以想像一個使用平行管道的另一個場景,其中子管道對外管道仍然是管道,而不是偽裝成函式:資料串流可能在某個階段由不同類型的資料組成,這些資料必須經過不同的轉換,然後再加入主管道。必須有一些類似的交換器將資料路由到一組子管道的其中一個,而某些收集器從子管道的末端拉取資料的順序與對應的輸入提供順序相同。後者的條件強制收集資料的節點成為執行輸入路由的節點(因為沒有其他兩個節點可以使用的通訊管道進行同步)。與上述的複雜資料場景的差異在於子管道不需要在輸入/輸出行為中同步,甚至可以交錯輸入和輸出。

若要實作這種類型的處理,我們必須能夠在輸入時暫停管道,在資料可用時提供資料。我們無法使用這裡的管道類型來執行此操作,這需要對稱管道,所以我們放棄了這個想法。

儘管如此,出於好奇,讓我們詢問在這些前提條件下,是否有可能擁有真正的「管道義大利麵」,也就是說,由一個來源發出並會聚到一個接收器中的管道「網」。很明顯地,這樣的建構無法透過遞迴組合較不複雜的結構到較複雜的結構來組成,但我們必須透過單獨連接其輸入和輸出將濾鏡和子管道連接在一起。如果我們思考資料將如何透過這樣的網路傳遞,就會發現,我們需要的是推式通訊(呼叫函式以處理輸出),而不是我們的拉式通訊設計(要求產生器提供輸入)。(我們的拉式設計與交換器執行的輸出調度類型不相容,因為它無法檢查要路由的資料,而且需要根據已輸出的資料調度「輸入的要求」,也就是無法檢查輸入。)有了這樣的機制,我們甚至可以想像具備多個輸入和輸出的「處理網路」。這將會是一個有趣的專案,可以實作和實驗,看看它能對我們做些什麼。

處理符號

雖然透過實作管道追求的主要目標,是取得一個自然的符號來「實作」資料轉換(對於長度未定義的資料串流而言),然後可以組合成更複雜的轉換,但讓我們看看是否可以使用該符號來「使用」這些轉換。為此,我們將套用一些元程式設計。

呼叫管道的內嵌符號,也就是為單次使用建立一個管道,而且-沒有儲存在變數中-立即使用它,看起來有點笨拙。以下舉我們 grep 的工具程式為例(省略了 paragraphize

> pipe( filereader(), grep( "foobar" ), filewriter() )( "data.in", "data.out" )

這遠遠不如 UNIX 標記中用於管道、大致相當於

$ filereader "data.in" | grep "foobar" | filewriter "data.out"

在思考如何使用運算子標記來建構管道時,我們首先要注意,我們有兩個選擇。請記住,篩選器是一個函數,將產生器當作輸入參數,並將函數當作輸出參數。在手邊有產生器的情況下(這是我們要在過程中執行管道的時候),我們可以使用 coroutine.yield 來替換輸出,並將下一個篩選器封裝到協程中,如此一來就能產生另一個產生器,用篩選器所處理的輸入做為結果。然後我們可以將其當作下一個篩選器的輸入,並重複此程序,在最後一個篩選器後,產生一個產生器,用整個管道的輸出做為結果。我們可以在 for 迴圈中使用它,但若要執行管道,我們必須新增另一個結構來清空產生器,或更好地透過其他結構附加接收器,提供其輸出參數。這樣一來,輸出的標記將不同於用來提供輸入的標記。此外,如果我們使用這種方式建構產生器鏈,我們將重新實作 pipe() 所做的大部分事情,但無法使用此標記來產生管道,例如可以使用作為子管道。

因此,我採納了我們的第二種選擇:建構可重複使用的管道,並立即執行。

建構管道很容易。我們需要做的,就是將第一個篩選器封裝到一個代理中,也就是一個附加了 meta 表的表格,定義實作我們所需運算子的元方法。因為第一個篩選器通常是 source() 建立的資料來源,或我們的函式庫(例如 luareader)提供的資料來源,因此這個封裝大多可以隱藏起來。對於必須明確執行的案例,我們讓 bind() 函數可以公開看到

-- Metatable
local sPipeOperations

-- Binds its argument to the library by wrapping it into a proxy
function bind( pValue )
    local lProxy = { payload = pValue }
    setmetatable( lProxy, sPipeOperations )
    return lProxy
end

元表定義了 __mul 元方法,用來實作 * 運算子,以連接篩選器。我選擇使用乘法,而不是例如加法或串接,是因為串在一起的篩選器的效應確實是相乘,而不是相加。(你可以在上方的 seq_squares_squared 範例中,很清楚地看到這一點。)此外,星號不僅看起來顯著,更重要的是,它比大多數其他運算子約束更強,不使用括號就能將這些運算子套用在管道中。運算子也會傳回一個代理,讓我們可以串接它

sPipeOperations = {
    -- Chains a proxy with a function or other proxy
    __mul = function( pHeadProxy, pFilter )
        local lProxy = { head = pHeadProxy, payload = fncompile( pFilter ) }
        setmetatable( lProxy, sPipeOperations )
        return lProxy
    end,
    ...
}

為了能夠透明地呼叫封裝函數,我們也實作了 __call 元方法,使用給定的引數來執行代理所代表的函數。我們使用兩個內部工具函數來建構代理所代表的函數

-- Compiles a proxy to the function it represents
local function fncompile( pSpec )
    if type( pSpec ) == "function" then
        return pSpec
    else
        if pSpec.head then
            return fncompile( pipe( unpack( unchain( pSpec ) ) ) )
        else
            return fncompile( pSpec.payload )
        end
    end
end

-- Collects the payloads of a proxy chain into an array
local function unchain( pProxy )
    if not pProxy then return {} end
    local lResult = unchain( pProxy.head )
    table.insert( lResult, pProxy.payload )
    return lResult
end

sPipeOperations = {
    ...
    __call = function( pProxy, ... )
        return fncompile( pProxy )( unpack( arg ) )
    end,
    ...
}

實作 fncompile() 為函式,且在 pipe() 的結果中呼叫它,這表示我們希望能夠自由地合併及調整使用 * 或 pipe() 建構的串接線。為了達成這一點,pipe() 會像任何其他視函式接受函式做為引數,使用 fncompile() 轉換函式,在使用之前,並像任何建置篩選器的函式一樣,傳回一個繫結函式。(儘管我不會在此再次提供已變更函式。)

我們現在可以同時處理串接線,或呼叫將串接線視為參數的函式,而不必再使用 pipe(),而且如果資料來源或第一個篩選器也已繫結,也不必使用 bind()

> seq = bind( seq )
> average_of_seq_of_powers = seq * square * square * avg
> return average_of_seq_of_powers( 4 )
88.5
> filter_foobar_lines = filereader() * grep( "foobar" ) * filewriter()
> filter_foobar_lines( "data.in", "data.out" )

對於內聯呼叫,到目前為止我們仍沒有獲得太多成果,因為我們現在僅能從以上難看的表示法中移除四個字元:pipe。我們必須在串接線建構式表達式外保留括號,因為函式呼叫運算子比乘法更具結合力

> (filereader() * grep( "foobar" ) * filewriter())( "data.in", "data.out" )

因此,這還不是我們期望的結果。對於必須用兩個引數呼叫的串接線,在沒有使事情變得很複雜的情況下,我們可能再也不能輕鬆地執行任何動作。

不過,當串接線僅使用單一輸入引數呼叫並傳回其結果(亦即,做為具有單一引數的函式呼叫)時,我們可以採用串接線表示法輸入資料,並與轉換結果的指定同時組合。請記住我們以上面的 average_of_seq_of_powers 範例,我們將重新改寫如下

> result =  4 .. seq * square * square * avg

資料轉換(其結果放入程式變數以供日後處理)是一個常見案例,同時我們也已習慣在這種案例中使用指定表示法。故此表示法似乎是一個折衷方案,而且如果我們繼續調整表示法直到將結果接收者放在 UNIX 般的串接線右端,我們很可能也不會獲得更多收穫。

如果我們實作了 __concat 元方法,且 Lua 在任何一個運算元提供的情況下呼叫 .. 運算子,就能使用以上表示法

sPipeOperations = {
    ...
    __concat = function( pInput, pProxy )
        return fncompile( pProxy )( pInput )
    end
}

(請注意,由於 Lua 會將其結果轉換成布林值(這並不是我們需要的),因此我們無法透過實作 __lt 元方法來使用看起來比較漂亮的 > 運算子。)

現在讓我們回到 seq_of_average_of_seq_of_powers 次串接線範例來說明這個表示法的威力。由於我們被迫計算值,而不是寫入輸出的關係,因此我們將再加入一個平均值計算,取得長度從 1 到指定數字的序列的平均值(4 的次方)

> return 4 .. seq * pass( seq * square * square * avg ) * avg
32.666666666667

這就完成了我們的元程式設計之旅。在進行一些結論性思考之前,我們注意以下觀察得到:如果我們在使用之前繫結它,我們可以使用所實作的 .. 表示法呼叫接受一個引數的任何函式

> return 1 + "4" .. bind( tonumber )
5

而且,如果我們有手邊各種函式都已綁定,我們就可以使用串連運算子來構造整個呼叫鏈!不過,如果將其他運算子與上述範例相結合,可能會影響運算子的優先順序,因此我不建議這樣做。如果資料以表格的形式表示,表示這樣的呼叫鏈會有更好的表達方法(而且通常來說會這樣):我們可以替資料裝備一個元表,並定義一個 __call 元方法,讓它以給定的函式作為第一個引數,並帶有資料和剩餘引數,進行呼叫。最後的結果表示形式看起來像這樣(這裡沒有提供的 map 函式會建立一個表格,並使用它所提供的函式將來源表格中的值轉換後儲存在結果表格中的原始金鑰下)

t (map, tonumber) (map, square) (avg)

看起來這是個不錯的函式呼叫鏈,將表示數字的字串陣列 t 轉換為數字、平方並計算平均值。它會評估為

avg( map( map( t, tonumber ), square ) )

可讀性就差很多了。

請注意,上述函式 squareavg 並非我們之前定義的那兩個。那些函式是篩選條件,而且我們可以在管道中使用這些條件來計算相同的結果(只要我們有一個 values 函式,可以接受一個表格並傳回一個產生值的產生器即可)。然後,我們可以使用以下一種表示形式,第一個表示形式使用表格的 __call 元方法,而第二個表示形式使用管線代理的 __concat 元方法

t (source( values ) * pass( tonumber ) * square * avg)
t .. source( values ) * pass( tonumber ) * square * avg

請注意,雖然第二種表示形式看起來更美觀,但不能像第一個表示形式那樣當作陳述式使用 - Lua 需要我們對傳回的值執行某些動作。這通常不是問題,因為我們無論如何都想使用管道提供的結果作為傳回值,但如果表達式的結果完全在副作用中,這就會變成一個限制。不過,這種情況下表示形式可能會很缺乏提供輸出參數,而使用第一個表示形式則可以輕鬆地做到

t (source( values ) * pass( tonumber ) * square, print)

一些最後的考慮事項

我們已經在討論的尾聲了,讓我們針對已實作的機制加入一些最後的考量。

你可能會納悶,為什麼我們不需要在篩選條件介面中讓輸出函式傳回布林值來指示篩選條件輸出沒問題,更重要的是,會接受更多輸出。最後,我們實作的篩選條件是撰寫輸出的主動實體,而且一定有人會接收它。在 UNIX 中,如果一個篩選條件嘗試寫入下一個篩選條件的輸入,而下一個篩選條件已經完成執行並關閉其輸入以不接受任何更多資料,我們會看到「管道已關閉」錯誤。

我們不需要這個功能的原因是在我們的實作中,當要求輸入時,任何濾器都會被後方的濾器重啟。也就是說,只有當先前的輸出已處理完且新的輸出已接受(而且確實預期這樣的情況)時,它才能執行。因此絕不會出現濾器處理完輸入卻無法提供輸出的狀況:輸出絕不會失敗-我們永遠不會遇到 UNIX 中熟知的管道關閉錯誤!這是我們的拉取式通訊機制的直接結果。這種通訊機制讓我們的濾器介面產生了良好的對稱性:輸入函數不接受值並傳回資料,而輸出函數接受資料並傳回無值。

這帶來的副作用是沒有任何濾器保證會讀取輸入直到結束為止。因此無法察覺過早停止輸入處理。要實作這種檢查很容易-任何濾器都不能在它的前置濾器結束前結束-但可能不值得花費這個功夫。(不過如果檔案讀取器未用盡,但擲回錯誤並不會解決這個問題,因此可能還是會有開啟的檔案。)

如果我們使用推送式通訊機制來實作管道(讓濾器在輸入時產生,並重啟它們的輸出),我們會遇到另一個情況:當輸出傳回(從重啟呼叫到下一個濾器)時,它會知道這個濾器是否仍然存在,因此會接受更多輸入。它可以傳達這個訊息給呼叫的濾器,以便它能停止處理(或略過它,在下一次輸出時引發管道關閉錯誤)。接著,輸入/輸出介面將會失去對稱性,而且組成濾器實作的迴圈將不再只由輸入控制。看來,我們的直覺式拉取式實作從這個角度來看也是較佳的選擇,而且會產生更自然的實作。

再思考一下輸出無法失敗的情況,這似乎與實際狀況有所出入,例如傳輸給網路連線的輸出的確可能會失敗。因此,我們來仔細看看

傳輸線中除最後一個過濾器外,任何過濾器會得到 coroutine.yield 做為輸出。此呼叫的確永不失敗。(它會在協程再次恢復執行時回傳。)傳輸線中的最後一個過濾器應為資料接收區,因此它不會呼叫其輸出,表示對它來說輸出呼叫也不會失敗。那麼輸出錯誤從何而來?顯然來自資料接收區呼叫某些可能會失敗的函數。這表示資料接收區需要與交付資料的對象簽訂一份合約,允許傳輸輸出頻道的狀態和處理任何錯誤。或者,我們可以改為自己撰寫自訂資料接收區,在自訂程式迴圈中 drain() 傳輸線,將其輸出提供給某個頻道,並自己處理任何問題。這兩種情況都不需要任何過濾器處理任何傳輸錯誤。(不過,如果我們提供可能會失敗的函數作為輸出給未以接收區結尾的傳輸線,錯誤將會保持不明或拋出到執行傳輸線的程式碼中。)

輸入錯誤呢?在任何過濾器中發生的任何錯誤會透明地轉送 (在輸入鏈中重新拋出) 到資料接收區。因此,接收區或如果我們想要撰寫迴圈來讀取傳輸線尾端,也會處理 (或不處理) 輸入錯誤和輸出錯誤。

另一個有趣的觀察:想想在 單一執行緒中執行控制程式碼來讀寫傳輸線的輸入和輸出的情況。

為了解決它所造成的同步問題,我們可以決定透過使用回呼架構來解耦處理傳輸線兩端的程式碼:給予傳輸線一個 callback,它能透過此 callback 取得其輸入,從而讓控制程式碼僅管理傳輸線的輸出端。結果發現,這正是我們在本文中使用傳輸線的方法:callback 是給予傳輸線的輸入函數 (生成器)!這是 drain() 的情況。

從另一端來看,我們可以決定提供一個 呼叫轉接 給傳輸線,用於提供其輸出,並讓控制程式碼僅處理傳輸線的輸入端。我們的過濾器介面也是如此:我們提供作為輸出參數的,正是這樣的 呼叫轉接 函數。這是 feed() 的情況。

如果我們比較這兩種情況,我們會發現這兩種情況中,管道(依據我們所知,其本質上是如每個篩選器一般主動的)會被包成由函數表示的被動實體,僅對外部刺激(函數呼叫)產生反應:在此我們採用的拉取型管道實作中,從管道的輸出端拉取資料載入輸入端,如果我們以推播形式實作管道,則將資料載入管道的輸入端,讓篩選器根據輸入而非輸出而運作。(這會導致使用我們在此採用的產生器方法,會產生較不自然且複雜的解決方案,且無論如何會對篩選器實作中使用 for 迴圈進行限制。)

只要管道只是執行(也就是含有資料來源和資料匯),兩種管道實作之間就不會存在外部可見的差異。

在此再次最終檢視一下同一部份的「篩選器網路」:這事實上是一個從主動處理節點網路建構而成的事件驅動應用程式。它會對其輸入上發生的事件產生反應(來自外部的呼叫輸入函數來載入資料),將其轉換為其輸出上的事件(呼叫輸出函數來提供資料),並變更其內部狀態(甚至可能重新組態自身)。順道一提,除了我們考慮的篩選器之外,可能會存在各種處理節點!即使在實質上來說這是一種「呼叫轉送」架構,事件驅動應用程式通常會在「回呼」架構中看到。這很有趣,但並無矛盾,一切取決於從哪個觀點來檢視呼叫事件 - 被呼叫(應用程式)或呼叫(架構)程式碼。因此,當我們將「處理網路」連線至一些從外部來源(例如:滑鼠事件)提供其輸入的架構,並提供它用於輸出功能(例如:圖形例程)時,我們會自然而然提供輸入函數給予它作為「回呼」。


最近變更 · 喜好設定
編輯 · 歷程
最後編輯時間為 2012 年 3 月 28 日上午 8:00 GMT (差異)