* Added preliminary HTTPD construct
[project/luci.git] / libs / httpd / luasrc / copas.lua
1 -------------------------------------------------------------------------------
2 -- Copas - Coroutine Oriented Portable Asynchronous Services
3 --
4 -- Offers a dispatcher and socket operations based on coroutines.
5 -- Usage:
6 -- copas.addserver(server, handler, timeout)
7 -- copas.addthread(thread, ...) Create a new coroutine thread and run it with args
8 -- copas.loop(timeout) - listens infinetely
9 -- copas.step(timeout) - executes one listening step
10 -- copas.receive(pattern or number) - receives data from a socket
11 -- copas.settimeout(client, time) if time=0 copas.receive(bufferSize) - receives partial data from a socket were data<=bufferSize
12 -- copas.send - sends data through a socket
13 -- copas.wrap - wraps a LuaSocket socket with Copas methods
14 -- copas.connect - blocks only the thread until connection completes
15 -- copas.flush - *deprecated* do nothing
16 --
17 -- Authors: Andre Carregal and Javier Guerra
18 -- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho,
19 -- Thomas Harning Jr. and Gary NG
20 --
21 -- Copyright 2005 - Kepler Project (www.keplerproject.org)
22 --
23 -- $Id: copas.lua,v 1.31 2008/05/19 18:57:13 carregal Exp $
24 -------------------------------------------------------------------------------
25 local socket = require "socket"
26
27 require"luci.util"
28 local copcall = luci.util.copcall
29
30
31 local WATCH_DOG_TIMEOUT = 120
32
33 -- Redefines LuaSocket functions with coroutine safe versions
34 -- (this allows the use of socket.http from within copas)
35 local function statusHandler(status, ...)
36 if status then return ... end
37 return nil, ...
38 end
39 function socket.protect(func)
40 return function (...)
41 return statusHandler(copcall(func, ...))
42 end
43 end
44
45 function socket.newtry(finalizer)
46 return function (...)
47 local status = (...) or false
48 if (status==false)then
49 copcall(finalizer, select(2, ...) )
50 error((select(2, ...)), 0)
51 end
52 return ...
53 end
54 end
55 -- end of LuaSocket redefinitions
56
57
58 module ("copas", package.seeall)
59
60 -- Meta information is public even if begining with an "_"
61 _COPYRIGHT = "Copyright (C) 2005 Kepler Project"
62 _DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services"
63 _VERSION = "Copas 1.1.3"
64
65 -------------------------------------------------------------------------------
66 -- Simple set implementation based on LuaSocket's tinyirc.lua example
67 -- adds a FIFO queue for each value in the set
68 -------------------------------------------------------------------------------
69 local function newset()
70 local reverse = {}
71 local set = {}
72 local q = {}
73 setmetatable(set, { __index = {
74 insert = function(set, value)
75 if not reverse[value] then
76 set[#set + 1] = value
77 reverse[value] = #set
78 end
79 end,
80
81 remove = function(set, value)
82 local index = reverse[value]
83 if index then
84 reverse[value] = nil
85 local top = set[#set]
86 set[#set] = nil
87 if top ~= value then
88 reverse[top] = index
89 set[index] = top
90 end
91 end
92 end,
93
94 push = function (set, key, itm)
95 local qKey = q[key]
96 if qKey == nil then
97 q[key] = {itm}
98 else
99 qKey[#qKey + 1] = itm
100 end
101 end,
102
103 pop = function (set, key)
104 local t = q[key]
105 if t ~= nil then
106 local ret = table.remove (t, 1)
107 if t[1] == nil then
108 q[key] = nil
109 end
110 return ret
111 end
112 end
113 }})
114 return set
115 end
116
117 local _servers = newset() -- servers being handled
118 local _reading_log = {}
119 local _writing_log = {}
120
121 local _reading = newset() -- sockets currently being read
122 local _writing = newset() -- sockets currently being written
123
124 -------------------------------------------------------------------------------
125 -- Coroutine based socket I/O functions.
126 -------------------------------------------------------------------------------
127 -- reads a pattern from a client and yields to the reading set on timeouts
128 function receive(client, pattern, part)
129 local s, err
130 pattern = pattern or "*l"
131 repeat
132 s, err, part = client:receive(pattern, part)
133 if s or err ~= "timeout" then
134 _reading_log[client] = nil
135 return s, err, part
136 end
137 _reading_log[client] = os.time()
138 coroutine.yield(client, _reading)
139 until false
140 end
141
142 -- same as above but with special treatment when reading chunks,
143 -- unblocks on any data received.
144 function receivePartial(client, pattern)
145 local s, err, part
146 pattern = pattern or "*l"
147 repeat
148 s, err, part = client:receive(pattern)
149 if s or ( (type(pattern)=="number") and part~="" and part ~=nil ) or
150 err ~= "timeout" then
151 _reading_log[client] = nil
152 return s, err, part
153 end
154 _reading_log[client] = os.time()
155 coroutine.yield(client, _reading)
156 until false
157 end
158
159 -- sends data to a client. The operation is buffered and
160 -- yields to the writing set on timeouts
161 function send(client,data, from, to)
162 local s, err,sent
163 from = from or 1
164 local lastIndex = from - 1
165
166 repeat
167 s, err, lastIndex = client:send(data, lastIndex + 1, to)
168 -- adds extra corrotine swap
169 -- garantees that high throuput dont take other threads to starvation
170 if (math.random(100) > 90) then
171 _writing_log[client] = os.time()
172 coroutine.yield(client, _writing)
173 end
174 if s or err ~= "timeout" then
175 _writing_log[client] = nil
176 return s, err,lastIndex
177 end
178 _writing_log[client] = os.time()
179 coroutine.yield(client, _writing)
180 until false
181 end
182
183 -- waits until connection is completed
184 function connect(skt,host, port)
185 skt:settimeout(0)
186 local ret,err = skt:connect (host, port)
187 if ret or err ~= "timeout" then
188 return ret, err
189 end
190 _writing_log[skt] = os.time()
191 coroutine.yield(skt, _writing)
192 ret,err = skt:connect (host, port)
193 _writing_log[skt] = nil
194 if (err=="already connected") then
195 return 1
196 end
197 return ret, err
198 end
199
200 -- flushes a client write buffer (deprecated)
201 function flush(client)
202 end
203
204 -- wraps a socket to use Copas methods (send, receive, flush and settimeout)
205 local _skt_mt = {__index = {
206 send = function (self, data, from, to)
207 return send (self.socket, data, from, to)
208 end,
209
210 receive = function (self, pattern)
211 if (self.timeout==0) then
212 return receivePartial(self.socket, pattern)
213 end
214 return receive (self.socket, pattern)
215 end,
216
217 flush = function (self)
218 return flush (self.socket)
219 end,
220
221 settimeout = function (self,time)
222 self.timeout=time
223 return
224 end,
225 }}
226
227 function wrap (skt)
228 return setmetatable ({socket = skt}, _skt_mt)
229 end
230
231 --------------------------------------------------
232 -- Error handling
233 --------------------------------------------------
234
235 local _errhandlers = {} -- error handler per coroutine
236
237 function setErrorHandler (err)
238 local co = coroutine.running()
239 if co then
240 _errhandlers [co] = err
241 end
242 end
243
244 local function _deferror (msg, co, skt)
245 print (msg, co, skt)
246 end
247
248 -------------------------------------------------------------------------------
249 -- Thread handling
250 -------------------------------------------------------------------------------
251
252 local function _doTick (co, skt, ...)
253 if not co then return end
254
255 local ok, res, new_q = coroutine.resume(co, skt, ...)
256
257 if ok and res and new_q then
258 new_q:insert (res)
259 new_q:push (res, co)
260 else
261 if not ok then copcall (_errhandlers [co] or _deferror, res, co, skt) end
262 if skt then skt:close() end
263 _errhandlers [co] = nil
264 end
265 end
266
267 -- accepts a connection on socket input
268 local function _accept(input, handler)
269 local client = input:accept()
270 if client then
271 client:settimeout(0)
272 local co = coroutine.create(handler)
273 _doTick (co, client)
274 --_reading:insert(client)
275 end
276 return client
277 end
278
279 -- handle threads on a queue
280 local function _tickRead (skt)
281 _doTick (_reading:pop (skt), skt)
282 end
283
284 local function _tickWrite (skt)
285 _doTick (_writing:pop (skt), skt)
286 end
287
288 -------------------------------------------------------------------------------
289 -- Adds a server/handler pair to Copas dispatcher
290 -------------------------------------------------------------------------------
291 function addserver(server, handler, timeout)
292 server:settimeout(timeout or 0.1)
293 _servers[server] = handler
294 _reading:insert(server)
295 end
296
297 -------------------------------------------------------------------------------
298 -- Adds an new courotine thread to Copas dispatcher
299 -------------------------------------------------------------------------------
300 function addthread(thread, ...)
301 local co = coroutine.create(thread)
302 _doTick (co, nil, ...)
303 end
304
305 -------------------------------------------------------------------------------
306 -- tasks registering
307 -------------------------------------------------------------------------------
308
309 local _tasks = {}
310
311 local function addtaskRead (tsk)
312 -- lets tasks call the default _tick()
313 tsk.def_tick = _tickRead
314
315 _tasks [tsk] = true
316 end
317
318 local function addtaskWrite (tsk)
319 -- lets tasks call the default _tick()
320 tsk.def_tick = _tickWrite
321
322 _tasks [tsk] = true
323 end
324
325 local function tasks ()
326 return next, _tasks
327 end
328
329 -------------------------------------------------------------------------------
330 -- main tasks: manage readable and writable socket sets
331 -------------------------------------------------------------------------------
332 -- a task to check ready to read events
333 local _readable_t = {
334 events = function(self)
335 local i = 0
336 return function ()
337 i = i + 1
338 return self._evs [i]
339 end
340 end,
341
342 tick = function (self, input)
343 local handler = _servers[input]
344 if handler then
345 input = _accept(input, handler)
346 else
347 _reading:remove (input)
348 self.def_tick (input)
349 end
350 end
351 }
352
353 addtaskRead (_readable_t)
354
355
356 -- a task to check ready to write events
357 local _writable_t = {
358 events = function (self)
359 local i = 0
360 return function ()
361 i = i+1
362 return self._evs [i]
363 end
364 end,
365
366 tick = function (self, output)
367 _writing:remove (output)
368 self.def_tick (output)
369 end
370 }
371
372 addtaskWrite (_writable_t)
373
374 local last_cleansing = 0
375 local function _select (timeout)
376 local err
377 local readable={}
378 local writable={}
379 local r={}
380 local w={}
381 local now = os.time()
382 local duration = os.difftime
383
384
385 _readable_t._evs, _writable_t._evs, err = socket.select(_reading, _writing, timeout)
386 local r_evs, w_evs = _readable_t._evs, _writable_t._evs
387
388 if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then
389 last_cleansing = now
390 for k,v in pairs(_reading_log) do
391 if not r_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then
392 _reading_log[k] = nil
393 r_evs[#r_evs + 1] = k
394 r_evs[k] = #r_evs
395 end
396 end
397
398 for k,v in pairs(_writing_log) do
399 if not w_evs[k] and duration(now, v) > WATCH_DOG_TIMEOUT then
400 _writing_log[k] = nil
401 w_evs[#w_evs + 1] = k
402 w_evs[k] = #w_evs
403 end
404 end
405 end
406
407 if err == "timeout" and #r_evs + #w_evs > 0 then return nil
408 else return err end
409 end
410
411
412 -------------------------------------------------------------------------------
413 -- Dispatcher loop step.
414 -- Listen to client requests and handles them
415 -------------------------------------------------------------------------------
416 function step(timeout)
417 local err = _select (timeout)
418 if err == "timeout" then return end
419
420 if err then
421 error(err)
422 end
423
424 for tsk in tasks() do
425 for ev in tsk:events () do
426 tsk:tick (ev)
427 end
428 end
429 end
430
431 -------------------------------------------------------------------------------
432 -- Dispatcher endless loop.
433 -- Listen to client requests and handles them forever
434 -------------------------------------------------------------------------------
435 function loop(timeout)
436 while true do
437 step(timeout)
438 end
439 end