協程當作事件處理器

lua-users home
wiki

這是一個十分基本的監督程式 (或事件轉發器),與 Copas 和 ProgrammingInLua? 中的類似 (但沒有參考任何一款)。我是為了回應寄件清單上的問題而撰寫這份文件,並已嘗試提供完整的註解;文末有提供使用範例。我打算回答的問題是

要如何執行下列類似的工作...

result = block(self, event)
其中,block() 是將腳本內容 (例如,狀態) 和事件作為引數的 SWIG 封裝函式,且會封鎖 (讓出) 腳本,直到事件發生為止,接著將詳細資料複製到結果結構 (繼續執行) 中?

我的回答是,何不採用 Lua 撰寫?下方架構每秒可處理數十萬則事件轉發,因此改用 C 撰寫不太可能有顯著成效;CPU 時間會受到實際處理要求的成本支配。

local cocreate, resume, yield, costatus =
  coroutine.create, coroutine.resume, coroutine.yield, coroutine.status

local LOG = function(...)
  io.stderr:write(os.date"!%F %T ")
  io.stderr:write(...)
  io.stderr:write"\n"
end

-- This function expects to be a coroutine. When it is started,
-- it initializes itself, and then awaits further instructions.
-- Whenever it has nothing else to do it yields, unless it was
-- told to quit in which case it returns.

function supervisor(...)
  -- each event is a table of sets of coroutines;
  -- we assume we can handle the events in any order.
  local events = {}
  -- This table associates coroutines with names, mostly
  -- for debugging
  local name = {}
  -- to signal that we're done
  local done = false

  -- This function removes all references to a coro
  -- from all events, which we do when the coro dies.
  -- We could use weak tables to do the same thing.
  -- On the other hand, we might want to clean up other
  -- data associated with the coroutine. Anyway, this is easy:

  local function remove(coro)
    for _, coros in pairs(events) do
      coros[coro] = nil
    end
    name[coro] = nil
  end

  -- Convenience function to log tracebacks
  local function log_traceback(coro, msg, err)
    LOG("Coroutine ", name[coro], " ", msg, ":")
    local tb = debug.traceback(coro, err)
    for line in tb:gmatch"[^\n]+" do LOG("  ", line) end
  end

  -- The core routine, which handles the results of a
  -- coroutine.resume. First argument is the coro, rest
  -- are the return values from coro.resume(). The only
  -- thing we're expecting from the coro is a "waitfor"
  -- command, but other ones could be added...
  local function handle_resume(coro, ok, todo, ...)
    if costatus(coro) == "dead" then
      remove(coro) -- always get rid of it.
      -- log a traceback on error
      if not ok then log_traceback(coro, "failed with error", todo) end
    elseif todo ~= "waitfor" then
      -- todo should be "waitfor" and ... the event.
      -- If we had more verbs, we could use a case-table.
      log_traceback(coro, "unknown request "..tostring(todo),
                          "bad return value")
      remove(coro)
    else
      -- We don't care if the event doesn't exist (should we?)
      local q = events[...]
      if q == nil then q = {}; events[...] = q end
      -- We might want to tell the upper level that a new
      -- event needs to be recognized, though.
      -- if next(q) == nil then addevent((...)) end
      q[coro] = true
    end
  end 
      
  -- A table of actions; essentially a case statement
  local handler = {}

  -- do a clean exit (although actually there's no
  -- cleanup necessary, but there might be.)
  function handler.done()
    done = true
  end

  -- debugging: report on status
  function handler.status()
    -- invert the events table for nicer printing
    local n, e = {}, {}
    for evt, coros in pairs(events) do
      for coro in pairs(coros) do
        local who = name[coro]
        n[#n+1] = who
        e[who] = evt
      end
    end
    -- sort the names
    table.sort(n)
    -- and produce the report
    for _, who in ipairs(n) do
      LOG(who, " is waiting for ", tostring(e[who]))
    end
  end

  -- introduce a new actor (coroutine) into the system, and run
  -- it until it blocks
  function handler.introduce(who, func, ...)
    local coro = cocreate(func)
    name[coro] = who
    -- let it initialize itself
    return handle_resume(coro, resume(coro, ...))
  end

  -- send an event to whoever cares
  function handler.signal(what, ...)
    local q = events[what]
    if q and next(q) then
      for coro in pairs(q) do
        q[coro] = nil  -- handled
        handle_resume(coro, resume(coro, what, ...))
      end
      -- Maybe tell the top-level whether the event is
      -- still active?
      return next(q) ~= nil
    else
      -- No-one cares, sniff. "Log" the fact
      LOG("Event ", tostring(what), " dropped into the bit bucket")
    end
  end

  -- Set the __index meta for the handler table to avoid having
  -- to test for bad commands explicitly
  local function logargs(...)
    local t = {n = select("#", ...), ...}
    if t.n > 0 then
      for i = 1, t.n do
        local ti = t[i]
        t[i] = (type(ti) == "string" and "%q" or "%s")
               :format(tostring(ti))
      end
      LOG("..with arguments: ", table.concat(t, ", "))
    end
  end
  function handler:__index(what)
    LOG("Supervisor received unknown message ", what)
    return logargs
  end
  setmetatable(handler, handler)

  -- auxiliary function to handle a command, necessary to
  -- capture multiple returns from yield()
  local function handle(simonsays, ...)
    return handler[simonsays](...)
  end

  -- The main loop is a bit anti-climactic
  LOG"Starting up"
  local rv = handle(...)
  repeat 
    rv = handle(yield(rv))
  until done
  LOG"Shutting down"
end

------------------------------------------------------------------------------
--  Sample very simple processor
------------------------------------------------------------------------------

local function block(event) return yield("waitfor", event) end
local function process(n)
  for i = 1, n do
    local e, howmany = block"TICK"
    assert(e == "TICK" and howmany == i)
  end
end

------------------------------------------------------------------------------
-- Test driver in Lua
------------------------------------------------------------------------------

local super = cocreate(supervisor)

local function check(ok, rv)
  if not ok then
    LOG("supervisor crashed")
    local tb = debug.traceback(super, rv)
    for line in tb:gmatch"[^\n]+" do LOG("  ", line) end
    os.exit(1)
  end
  return rv
end

local function send(msg, ...) return check(resume(super, msg, ...)) end

local nprocess, nreps = tonumber(arg[1]) or 10, tonumber(arg[2] or 12)
for i = 1, tonumber(nprocess) do
  send("introduce", ("p%04i"):format(i), process, nreps)
end
local j = 1
while send("signal", "TICK", j) do
  j = j + 1
end
-- This should be empty
send"status"
send"done"
LOG("Endcount ", j)

定義 *監督程式*。 [1]?--DavidManura

這個定義來自一篇關於 Erlang 的論文,有點循環論證,但很好地總結其要點:「實作一種特定型態的監督程式-工作者程式設計風格,讓監督樹可以動態建構。」

換句話說,監督程式負責照顧和維護多個工作者,包括組織其工作負載、發送事件 (如本例所示) 和處理錯誤。

[Termite] 中還有更多範例 (PDF 文件,若有閱讀括弧的習慣,應可輕鬆閱讀)。

近期變更 · 喜好設定
編輯 · 歷程
最後編輯時間為 2007 年 3 月 2 日格林威治時間上午 5:23 (diff)