* libs/httpd: Introduced keep-alive and pipelining support
[project/luci.git] / libs / httpd / luasrc / httpd.lua
index 6524bc1de631d937ccb35f42025a18f7f79226a3..a9b1ccbb4cb18bd1771c77e7a99dea2027b6abdc 100644 (file)
@@ -27,13 +27,59 @@ function Socket(ip, port)
        return sock, err
 end
 
+Thread = luci.util.class()
+
+function Thread.__init__(self, socket, func)
+       self.socket  = socket
+       self.routine = coroutine.create(func)
+       self.stamp   = os.time()
+       self.waiting = false
+end
+
+function Thread.getidletime(self)
+       return os.difftime(os.time(), self.stamp)
+end
+
+function Thread.iswaiting(self)
+       return self.waiting
+end
+
+function Thread.receive(self, ...)
+       local chunk, err, part
+       self.waiting = true
+       
+       repeat
+               coroutine.yield()
+               chunk, err, part = self.socket:receive(...)
+       until err ~= "timeout"
+       
+       self.waiting = false
+       return chunk, err, part
+end
+
+function Thread.resume(self, ...)
+       return coroutine.resume(self.routine, self, ...)
+end
+
+function Thread.status(self)
+       return coroutine.status(self.routine)
+end
+
+function Thread.touch(self)
+       self.stamp = os.time()
+end
 
 Daemon = luci.util.class()
 
 function Daemon.__init__(self, threadlimit, timeout)
        self.reading = {}
-       self.running = {}
+       self.threads = {}
        self.handler = {}
+       self.waiting = {}
+       self.threadc = 0
+       
+       setmetatable(self.waiting, {__mode = "v"})
+       
        self.debug   = false
        self.threadlimit = threadlimit
        self.timeout = timeout or 0.1
@@ -58,10 +104,7 @@ end
 
 function Daemon.step(self)     
        local input, output, err = socket.select( self.reading, nil, 0 )
-
-       if err == "timeout" and #self.running == 0 then
-               socket.sleep(self.timeout)
-       end
+       local working = false
 
        -- accept new connections
        for i, connection in ipairs(input) do
@@ -70,19 +113,18 @@ function Daemon.step(self)
                
                if sock then
                        -- check capacity
-                       if not self.threadlimit or #self.running < self.threadlimit then
+                       if not self.threadlimit or self.threadc < self.threadlimit then
                                
                                if self.debug then
                                        self:dprint("Accepted incoming connection from " .. sock:getpeername())
                                end
-       
-                               table.insert( self.running, {
-                                       coroutine.create( self.handler[connection].clhandler ),
-                                       sock
-                               } )
+                               
+                               local t = Thread(sock, self.handler[connection].clhandler)
+                               self.threads[sock] = t
+                               self.threadc = self.threadc + 1
        
                                if self.debug then
-                                       self:dprint("Created " .. tostring(self.running[#self.running][1]))
+                                       self:dprint("Created " .. tostring(t))
                                end
        
                        -- reject client
@@ -101,27 +143,62 @@ function Daemon.step(self)
        end
 
        -- create client handler
-       for i, client in ipairs( self.running ) do
+       for sock, thread in pairs( self.threads ) do
 
                -- reap dead clients
-               if coroutine.status( client[1] ) == "dead" then
+               if thread:status() == "dead" then
                        if self.debug then
-                               self:dprint("Completed " .. tostring(client[1]))
+                               self:dprint("Completed " .. tostring(thread))
                        end
-                       table.remove( self.running, i )
-               else
+                       sock:close()
+                       self.threadc = self.threadc - 1
+                       self.threads[sock] = nil
+               -- resume working threads
+               elseif not thread:iswaiting() then
                        if self.debug then
-                               self:dprint("Resuming " .. tostring(client[1]))
+                               self:dprint("Resuming " .. tostring(thread))
                        end
 
-                       local stat, err = coroutine.resume( client[1], client[2] )
+                       local stat, err = thread:resume()
+                       if stat then
+                               thread:touch()
+                               if not thread:iswaiting() then
+                                       working = true
+                               else
+                                       table.insert(self.waiting, sock)
+                               end
+                       end
                        
                        if self.debug then
-                               self:dprint(tostring(client[1]) .. " returned")
+                               self:dprint(tostring(thread) .. " returned")
                                if not stat then
-                                       self:dprint("Error in " .. tostring(client[1]) .. " " .. err)
+                                       self:dprint("Error in " .. tostring(thread) .. " " .. err)
+                               end
+                       end
+               end
+       end
+       
+       -- check for data on waiting threads
+       input, output, err = socket.select( self.waiting, nil, 0 )
+       
+       for i, sock in ipairs(input) do         
+               self.threads[sock]:resume()
+               self.threads[sock]:touch()
+               
+               if not self.threads[sock]:iswaiting() then
+                       for i, s in ipairs(self.waiting) do
+                               if s == sock then
+                                       table.remove(self.waiting, i)
+                                       break
                                end
                        end
+                       if not working then
+                               working = true
+                       end
                end
        end
+       
+       if err == "timeout" and not working then
+               socket.sleep(self.timeout)
+       end
 end