luasocket学习ltn12中文版

更新时间:2024-04-20 13:11:01 阅读量: 综合文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

Filters, sources and sinks: design, motivation and examples

or Functional programming for the rest of us

by DiegoNehab

应用程序有时有太多的信息需要在内存处理,即使现在已经有足够的内存,也会被迫把数据分成小部分处理。,以原子方式处理所有数据可能需要很长的时间,这妨碍了用户与应用程序交互。复杂的数据转换往往可以被定义为一系列简单的操作。几个不同的复杂的转换可以共享相同的更简单的操作,因此,定义一个统一的接口,把他们组合起来,是很好的处理方法。以下概念构成我们对这些问题的解决方案。

Filters是一些函数,接受连续块的输入,并产生连续的输出块。此外,连接所有的输出数据作为输入数据的级联filter应用的结果是相同的。因此,边界是无关紧要的:Filters必须处理由用户任意分割的输入数据。

chain是一个函数,将两个(或更多)的函数连接起来,但其接口和其他函数组件的接口没有什么区别。因此,链式Filters可用于任何一个可以使用原子Filter的地方。然而,其对数据的影响是其组成的filters的综合效果。请注意, chain可以连接其他的chain,创建任意复杂的操作,而像原子操作那样使用。

Filter可以被看作是通过它的数据流在网络内部的节点,并且可能根据不同的filter对数据进行转换。把这些节点链接起来,组成完整的网络数据流图。为此我们需要添加初始和最终的网络节点,分别是source和sink。具体来说,source是一个函数,每次调用时产生新的数据。相应的,sink函数是接收到的数据的最终目的地。source和sink可以使用chain和filter相连。

总的来数,Filter,chain,source和sink,都是被动的实体:他们需要被重复调用以便使系统运行起来。Pumps提供了推动数据流过网络的动力,从一个source到一个sink。

我们希望,通过例子使这些概念将变的清晰。在下面的章节中,我们从一个简单的接口开始,不断的完善这个接口,直到没有明显的缺点。我们做出的演化不是人为强制的:它按部就班,巩固了我们对这些概念的理解。

1 一个具体的例子

一些数据转换操作容易用Filter方式来实现,比如行结束的文本标准化,Base64和Quoted-Printable传输内容的编码,把文本切分成行,SMTP字节填充等等。

我们将要定义第一个filter接口,“行结束符标准化”。稍后我们讨论为什么实现起来并不容易。

假设我们得到一个文本,含有未知的行结束符,比如常见的Unix(LF)的Mac操作系统(CR)和DOS(CRLF)(包括可能的混合情况)。我们希望能够编写如下代码:

input = source.chain(source.file(io.stdin), normalize(\)) output = sink.file(io.stdout) pump(input, output)

这个程序从标准输入流中读取数据和行结束标记,用CRLF (MIME标准定义的标记)标准化,最后将结果发送到标准输出流。为此,我们使用一个标准输入:stdin,一个Filter:normalize,组成chain。然后执行pump调用,不断从stdin获取数据,并将其输送到sink:标准输出。

为了使讨论更具体,我们从讨论如何实现一个标准化Filter开始,这个normalize是一个函数,它用于创建一个filter功能。我们初步的filter接口如下:filter接收输入数据块,并返回处理的数据块。在没有更多的输入数据时,用户通知filter,用nil作为参数来调用它。filter返回最后的数据块。

虽然接口极其简单,实现似乎并不那么简单明确。任何遵守此接口的过滤器需要保持某种在调用之间的上下文。这是因为数据块可以越过标记一行的末尾之间的CR和LF字符。这个上下文存储的需求,促成了工厂函数的使用:每个工厂函数被调用时,返回一个filter,具有它自己的上下文,如此使我们可以在同一时间得到多个独立的filter。对于这个normalize filter,我们意识到,简单明了的解决方案是不够的(例如,在产生任何输出之前,把所有输入连接起来放入上下文中),我们将不得不寻找另一种方式。

我们将实现分为两部分:一个低级别的filter,和一个高级别的filter工厂。低级别的filter将在C实现,不会携带任何函数调用之间的上下文。高级别filter的工厂,在Lua中实现,filter工厂将创建并返回高级别的filter,它为低级别的filter提供上下文,但其内部细节对用户隔离。这样,我们利用C的效率优势,进行底层工作,利用lua的简单性做上下文记录。

1.1 Lua 部分的实现

function filter.cycle(low, ctx, extra) return function(chunk) local ret

-- ctx上下文只对low底层filter有用处,每次调用通过返回值更新 ret, ctx = low(ctx, chunk, extra)

-- 返回处理结果 nil表示处理完毕,没有更多数据了 return ret end end

2

function normalize(marker) return cycle(eol, 0, marker) end

normalize工厂函数只是简单地调用一个更加通用的名叫cycle的工厂。这个工厂接收一个低级别的filter,一个初始上下文和一些额外的值,并返回相应的高级别filter。高级别Filer使用一个新的数据块作为参数被调用,它调用低级别的过滤器,传递上一次的上下文,新的数据块和额外的参数。低级别的Filer生成处理过的数据块和一个新的上下文。最后,高级别Filer更新其内部上下文,返回处理过数据块给用户。低级别的过滤器完成了所有的工作。请注意,这个实现,利用了Lua5.0词法作用域规则,在函数调用之间存储本地上下文。

思考一下这个低级别的filter,我们注意到行结束标记标准化问题本身没有完美的解决方案。困难来自一个空行定义,它固有的模糊性。然而,以下的解决方案非常适用于任何情况的输入,以及混合输入的非空行。它作为一个很好的例子,如何实现一个低级别的过滤器。

这是我们所做的:CR和LF被认为是换行符的候选字符。我们认为这样得到得到一个结束标记:如果候选字符是单独的一个,或后面跟着一个不同的候选字符。也就是说,CRCR和LFLF被认为是两个行结束标记,但CR LF和LFCR的被认为只是一个结束标记。这个方法可以照顾到Mac OS,Mac OS X,VMS和Unix,DOS和MIME,以及其他可能更晦涩的约定的系统。

1.2 C部分实现

低级别的filter划分为两个简单的函数。内部函数完成实际的转换,他判断每一个输入字符,并决定了输出什么,以及如何修改上下文。上下文告知最后一个字符是否是候选字符,若是,是哪个候选字符。 #define candidate(c) (c == CR || c == LF)

static int process(int c, int last, const char *marker, luaL_Buffer *buffer) { if (candidate(c)) {

if (candidate(last)) {

//相同候选字符,添加一个标准化maker(CRLF)返回非换行符0 if (c == last) luaL_addstring(buffer, marker); return 0; } else {

//不同候选字符,添加一个标准化maker(CRLF)返回是当前换行符字符 luaL_addstring(buffer, marker); return c; } } else {

//不是候选字符,读入这个字符,返回非换行符0 luaL_putchar(buffer, c); return 0; }

3

}

内部函数 使用Lua的辅助库的缓冲接口,带来了效率和易用性。外部函数简单地使用Lua交互接口。它接收上下文和输入数据块(以及一个可选的行结束标记),并返回转换后的输出和新的上下文。 static int eol(lua_State *L) {

-- 上下文表示得到的最后一个换行符是什么,CR,LF,或者非换行符(用0表示) int ctx = luaL_checkint(L, 1); size_t isize = 0;

const char *input = luaL_optlstring(L, 2, NULL, &isize); const char *last = input + isize;

const char *marker = luaL_optstring(L, 3, CRLF); luaL_Buffer buffer;

luaL_buffinit(L, &buffer); if (!input) {

-- 输入块为nil,重置ctx为nil,返回上次的字符为0,表示非换行符 lua_pushnil(L);

lua_pushnumber(L, 0); return 2; }

while (input < last)

-- 逐个字符读入判断,进行标准化

-- ctx 表示上一次读入的字符,CF,LF,非换行符用0表示 ctx = process(*input++, ctx, marker, &buffer); luaL_pushresult(&buffer); lua_pushnumber(L, ctx); return 2; }

请注意,如果输入块是nil,操作将被视为完成。在这种情况下,循环将不会执行并且上下文被重置到初始状态。这使得过滤器可以被被无数次重用。这是一个不错的主意,如果可能就像这样写过滤器。

除了行结束标准化如上所示的过滤器,许多其他过滤器可以按照同样的思路实现。比如Base64和Quoted-Printable的内容传输编码,文本断行,SMTP字节填充等。具有挑战性的部分是决定用什么作为上下文。例如文本断行,它可能是留在当前行的字节数。比如Base64编码,它可能是在输入字节中以3个字节为单位进行分割的字节串。 2 链接起来

chain的概念引入后,Filer变得更加强大。假设你有一个Quoted-Printable编码的Filer,你要编码一些文字。根据要求,文本必须在编码前标准化为标准范式。简化任务的一个很好的接口是创建一个工厂,工厂创建一个组合过滤器,数据经过多个过滤器进行传递,同时还可以作为一种最初的简单过滤器来使用。 local function chain2(f1, f2) return function(chunk)

4

local ret = f2(f1(chunk)) if chunk then return ret

else return ret ... f2() end – 这里使用...有问题,看做是伪码 end end

function filter.chain(...) local f = arg[1]

for i = 2, table.getn(arg) do f = chain2(f, arg[i]) end return f end

local chain = filter.chain(normalize(\), encode(\)) while 1 do

local chunk = io.read(2048) io.write(chain(chunk))

if not chunk then break end end

chain的工厂是非常简单的。它的作用是返回一个函数,这个函数把数据传递通过所有的filter,并将结果返回给用户。它使用简单的辅助函数,知道如何链接两个过滤器。如果是最后的数据块在辅助函数中需要特别留意。这是因为,最后的数据块通知,必须通过两个filter反过来推动。有了chain工厂就很容易执行Quoted-Printable的转换,如上面的例子所示。 3 Sources, sinks, and pumps

正如我们在前言介绍中所说,到目前为止,我们介绍的filters的功能就像是一个在传输网络内部的节点。数据从一个节点流过另一个节点(或者更准确的说是从一个filter流入另一个filter),最终流出。把filters链接在一起就是把节点连接成网络。但是最终的节点呢?在网络的开始,我们也需要一个用于提供数据源的节点。在网络末端我们需要一个节点获得数据,这就是sink。 Sources

让我们从2个简单的soruce开始。第一个是空source:他简单的返回空数据,可能返回一个错误消息。第二个是文件source,以一块一块数据块的方式从一个文件中产生数据,结束时关闭文件句柄。 function source.empty(err) return function() return nil, err end end

function source.file(handle, io_err) if handle then

return function()

5

local chunk = handle:read(2048) if not chunk then handle:close() end return chunk end

else return source.empty(io_err or \) end end

一个source每次调用时返回的下一个数据块。当有没有更多的数据时返回nil。如果有错误,source可以返回nill和一个错误消息。 阿德里安发现,虽然不是刻意为之,source接口与Lua5.0的迭代器的思想兼容。也就是说,数据source可以很好地与循环结合使用。把文件源作为一个迭代器,重写第一个例子: local process = normalize(\) for chunk in source.file(io.stdin) do io.write(process(chunk)) end

io.write(process(nil))

注意filter最后一次调用获得最后处理过的数据块。循环结束时source返回了nil。因此我们需要在循环外最后再调用一次。 4 在调用之间保持状态

通常情况下,source在某些事件发生后,需要改变自己的行为。一个简单的例子是,一个文件source,当到达文件末端时,无论他被调用多少次都必须保证返回nil。从而避免读取超过文件末端。

实现这种source的方式是创建一个工厂,带有一个额外的状态变量。source可以通过词法作用域使用它。在检测到到达文件末端时,文件source可以设置自己的文件句柄为nil。此后所有的source被调用时都检查句柄是否有效,做出响应操作。

function source.file(handle, io_err) if handle then

return function()

if not handle then return nil end local chunk = handle:read(2048) if not chunk then handle:close() handle = nil end

return chunk end else

return source.empty(io_err or \) end end

另一个实现方式是改变source的接口,让它更灵活一些。允许source返回一个数据块之外的第二个值。如果返回的数据块是nil,额外的值告诉我们发生了什么。第二个nil表示没有数据了,source已经空了。其他的取值可以是一个错误信

6

息。另一方面,如果数据块不是nil,第二个返回值表示source是否需要更换。如果第二个返回值不是nil,继续使用同一个source,否则这就是另一个source,我需要获取剩下的数据。

这个额外的自由对于编写source函数的人来说是很好的。但是对使用它的用户来说有困难。幸运的是,下面的fancy source能够把它转化成简单的source,并不需要替换,使用下面这个工厂: function source.simplify(src) return function()

local chunk, err_or_new = src() --或者是新的src返回,否则返回同样的src -- 注意无论怎样src都返回一个src,要不是错误的src,要不是新的src,或者是原来的src

src = err_or_new or src – 更新自身

if not chunk then return nil, err_or_new – chunk为空,返回错误的或者新的src

else return chunk end end end

这个简化转换工厂允许我们编写一个fancy source,却使用简单source来实现。因此,接下来的函数只是产生一个简单source,接受source的函数认为他是一个简单source。

回到文件source的例子,接口的扩展带来了更优雅的实现。在没有数据时,这个新的source替换为一个空source,不需要反复检查句柄了。 function source.file(handle, io_err) if handle then

return source.simplify(function() local chunk = handle:read(2048) if not chunk then handle:close()

return \, source.empty() end

return chunk end) else

return source.empty(io_err or \) end end

如果我们使用coroutine协程,lua5.0的新特性,这个方法可以更强大。协程的资料比较少,就像一个词法作用域,协程起初看起来很怪,但是一旦你适应了这个概念,可以节省你的大量时间。我得承认使用协程来实现file source是多余的。下面来看如何实现一个链接source的工厂 function source.cat(...)

local co = coroutine.create(function() local i = 1

while i <= table.getn(arg) do local chunk, err = arg[i]()

7

if chunk then coroutine.yield(chunk) elseif err then return nil, err else i = i + 1 end end end)

return function()

return shift(coroutine.resume(co)) end end

这个工厂创建了2个函数,第一个是一个辅助函数,以协程的方式完成了所有的工作。它从一个source中读取一个数据块。如果数据块是nil,移动到下一个source,否则它调用yield返回数据块。第二个函数是source自身,只是调用resume执行辅助的协程,无论返回怎样的数据块都返回给用户(跳过了第一个返回值,这个值告诉我们协程是否已经终止)。思考一下,当你编写同样的函数而不是用协程,你就会体会到如此实现的简单性。在我们要把filter接口变得更加强大时,将要再次使用协程。 5 链接source

用filter链接一个source意味着什么?最有用的解释是链接source-filter后形成一个新的source。source产生数据在返回前传递给filter,这里的工厂例子: function source.chain(src, f)

return source.simplify(function() local chunk, err = src()

if not chunk then return f(nil), source.empty(err) else return f(chunk) end end) end

考虑一个函数,从一个source获得输入数据。通过链接一个简单的source跟着一个或者多个filter。这个相同的函数能够得到处理过的数据,无需考虑这个数据是通过背后的filter得到的。

6 sinks

正如我们定义了最初的source接口,同样我们定一个sink接口。把所有遵循接口的函数称为sink。下面2个简单的工厂返回了sink。table工厂创建了一个sink表,保存了所有获得的数据。这些数据之后能有效的链接成一个单一的string,调用table.concat库函数即可。另一个例子介绍了一个空sink,直接丢弃接受到的数据。

function sink.table(t) t = t or {}

local f = function(chunk, err)

if chunk then table.insert(t, chunk) end

8

return 1 end

return f, t end

local function null() return 1 end

function sink.null() return null end

sink 接受连续的数据块直到数据结束,通过返回一个nil块表示。错误通过额外的参数返回,在nil数据块之后跟随一个错误消息。如果一个sink检测到一个自身的错误,并且不准备被再次调用,应该返回nil,之后的错误消息是可选的。非nil的返回值表示source将会接受更多的数据。最后,正如source可以选择被替换,sink也可以,只要接口一致。同样的,实现一个sink.simplify工厂是很简单的。可以用它转换一个fancy sink变成一个简单的sink。

作为一个例子,创建一个source从标准输入读取数据,链接到一个标准化行结束转换的filter,使用一个sink把所有数据放到一个表中,最后打印出来。 local load = source.chain(source.file(io.stdin), normalize(\)) local store, t = sink.table() while 1 do

local chunk = load()

store(chunk)

if not chunk then break end end

print(table.concat(t))

有一次,正如我们用source和filter创建一个链接source-filter的工厂,很容易用sink和filter创建一个工厂产生新的sink。这个新的sink在调用原始的sink之前,通过filter处理所有的接受到的数据。如此实现: function sink.chain(f, snk) return function(chunk, err) local r, e = snk(f(chunk))

if not r then return nil, e end

if not chunk then return snk(nil, err) end return 1 end end 7 Pumps

在我们的例子中一直存在一个循环,到目前为止我们设计的东西都是被动式的,因此每次都有这个循环,Sources, sinks, filters:没有一个是靠它们自己工作的。把数

9

据从source推送到sink的工作是一个普遍的要求,我们将提供2个辅助函数来做这些工作。

function pump.step(src, snk) local chunk, src_err = src()

local ret, snk_err = snk(chunk, src_err)

return chunk and ret and not src_err and not snk_err, src_err or snk_err end

function pump.all(src, snk, step) step = step or pump.step while true do

local ret, err = step(src, snk)

if not ret then return not err, err end end end

pump.step 函数把一个数据块从source传递到sink。pump.all 函数调用一个可选的step函数,调用step函数推送所有的数据到sink。现在使用所有的东西,编写一个程序。使用Base64编码方式,从磁盘的二进制文件中读取数据保存到另一个文件中。

local load = source.chain(

source.file(io.open(\, \)), encode(\) )

local store = sink.chain( wrap(76),

sink.file(io.open(\, \)), )

pump.all(load, store)

我们切分的filter并不符合直觉,是人为故意的。作为一种选择,我们可以把Base64编码filter和行包裹filter链接起来,然后链接这个复合filter到文件source或者文件sink上,这无关紧要。

8 最后的重要变更

事实证明我们依然有一个问题,David Burgess编写了一个gzip filter,他发现解压filter把一个小的输入数据扩大为大量的数据。虽然我们希望可以忽视这个问题,但是得承认不可以。唯一的解决方式是允许filter返回部分结果,这也是我们选择的方法。在调用这个filter处理输入数据之后,用户不得不循环调用这个filter询问是否还有更多的输出。注意这些额外的调用不能把更多的数据发送到filter。 更加特殊的情况是,传递一个输入数据块到filter之后搜集第一个输出数据块。用户重复调用filter,传递空字符串,获得额外的输出数据块。当filter自身返回一个空串时,用户知道没有输出数据了,然后接着处理下一个输入数据块。最后,在用户传递一个nill告知filter没有更多的输入数据了,filter仍然会在一个数据块中返

10

回产生很多输出数据。用户不得不再次循环调用,这次每次传递的是参数nil。直到filter返回了nil用户才能确定处理完毕。

大多数的filter不需要这些额外的灵活性。很幸运,这个新的filter接口也容易实现。实际上在创建行终结转换filter的时候,就已经满足要求。另一方面,如果不使用协程,串联函数会变得更加复杂。我都不想实现它。如果你知道一个简单的不使用协程实现请一定告诉我! local function chain2(f1, f2)

local co = coroutine.create(function(chunk) while true do

local filtered1 = f1(chunk) local filtered2 = f2(filtered1) local done2 = filtered1 and \ while true do

if filtered2 == \ or filtered2 == nil then break end coroutine.yield(filtered2) filtered2 = f2(done2) end

if filtered1 == \ then chunk = coroutine.yield(filtered1) elseif filtered1 == nil then return nil else chunk = chunk and \ end end end)

return function(chunk)

local _, res = coroutine.resume(co, chunk) return res end end

串联source同样变得更加复杂。一个相似的解决方式还是协程。串联sink仍然简单。有趣的是这些修改对于filter的性能没有不好的影响,无需添加额外的弹性。有人对类似gzip的filter做了严格的优化,这也是为何我们保留了它们。 9 注意事项

我们在开发Luasocket2.0过程中的提出了这些概念,实现在模块LTN12中。结果是大大简化了Luasocket的实现,还变得更加强大。MIME模块特别整合进入了LTN12,提供了很多其他的filter。我们感觉到这些概念值得公开,即使是那些不关心luasocket的人。

另一个值得一提的是“标识filter”。假设你想提供一些反馈给用户,当一个文件下载传递到一个sink的时候。把一个“标识filter”和这个sink串联(这个filter简单的返回接受到的数据不改变数据)。你能够在这期间更新进度。这个原始的sink没有被改变。另一个有趣的主意是一个T形的sink:一个sink发送数据到2个其他的sink。总的来说,好像还有足够的空间讨论其他有趣的点子。

在这篇技术文件中,我们介绍了filters, sources, sinks, and pumps。对于数据处理来说这些都是有用的工具。source为数据获取提供了简单的抽象。sink为最后

11

的数据终点提供了抽象。filter为数据转换定义了接口。filter,source,sink 的chain串联提供了优雅的方式去从简单的转换创建任意复杂的数据转换。pumps让流程运行起来。 初学者说明 Simple sources

function source() -- we have data return chunk

-- we have an error return nil, err

-- no more data return nil end

Simple sinks

function(chunk, src_err) if chunk == nil then

-- no more data to process, we won't receive more chunks if src_err then

-- source reports an error, TBD what to do with chunk received up to now else

-- do something with concatenation of chunks, all went well end

return true -- or anything that evaluates to true elseif chunk == \ then

-- this is assumed to be without effect on the sink, but may -- not be if something different than raw text is processed

-- do nothing and return true to keep filters happy return true -- or anything that evaluates to true else

-- chunk has data, process/store it as appropriate return true -- or anything that evaluates to true end

-- in case of error return nil, err end Pumps

ret, err = pump.step(source, sink)

12

if ret == 1 then

-- all ok, continue pumping elseif err then

-- an error occured in the sink or source. If in both, the sink -- error is lost.

else -- ret == nil and err == nil -- done, nothing left to pump end

ret, err = pump.all(source, sink)

if ret == 1 then -- all OK, done elseif err then

-- an error occured else

-- impossible end

Filters not expanding data function filter(chunk)

-- first two cases are to maintain chaining logic that -- support expanding filters (see below) if chunk == nil then return nil

elseif chunk == \ then return \ else

-- process chunk and return filtered data return data end end

Fancy sources

The idea of fancy sources is to enable a source to indicate which other source contains the data from now on.

function source() -- we have data return chunk

-- we have an error return nil, err

13

-- no more data return nil, nil

-- no more data in current source, but use sourceB from now on -- (\ return \, sourceB end

Transforming a fancy source in a simple one: simple = source.simplify(fancy) Fancy sinks

The idea of fancy sinks is to enable a sink to indicate which other sink processes the data from now on.

function(chunk, src_err)

-- same as above (simple sink), except next sinkK sink -- to use indicated after true

return true, sinkK end

Transforming a fancy sink in a simple one:

simple = sink.simplify(fancy) Filters expanding data

function filter(chunk) if chunk == nil then

-- end of data. If some expanded data is still to be returned return partial_data

-- the chains will keep on calling the filter to get all of -- the partial data until the filter return nil return nil

elseif chunk == \ then

-- a previous filter may have finished returning expanded -- data, now it's our turn to expand the data and return it return partial_data

-- the chains will keep on calling the filter to get all of -- the partial data until the filter return \ return \ else

-- process chunk and return filtered data, potentially partial. -- In all cases, the filter is called again with \ return partial_data end

14

end

15

本文来源:https://www.bwwdw.com/article/g1tp.html

Top