Show
Ignore:
Timestamp:
05/17/04 13:46:06 (20 years ago)
Author:
jalet
Message:

First try at cupspykota's main loop rewrite

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • pykota/trunk/bin/cupspykota

    r1467 r1478  
    2424# 
    2525# $Log$ 
     26# Revision 1.43  2004/05/17 11:46:05  jalet 
     27# First try at cupspykota's main loop rewrite 
     28# 
    2629# Revision 1.42  2004/05/10 11:22:28  jalet 
    2730# Typo 
     
    171174import sys 
    172175import os 
     176import fcntl 
    173177import popen2 
    174178import cStringIO 
     
    184188from pykota.requester import PyKotaRequesterError 
    185189     
    186 class PyKotaPopen3(popen2.Popen3) : 
     190class PyKotaPopen4(popen2.Popen4) : 
    187191    """Our own class to execute real backends. 
    188192     
     
    190194       native popen2.Popen3 would not be feasible. 
    191195    """ 
    192     def __init__(self, cmd, capturestderr=0, bufsize=-1, arg0=None) : 
     196    def __init__(self, cmd, bufsize=-1, arg0=None) : 
    193197        self.arg0 = arg0 
    194         popen2.Popen3.__init__(self, cmd, capturestderr, bufsize) 
     198        popen2.Popen4.__init__(self, cmd, bufsize) 
    195199         
    196200    def _run_child(self, cmd): 
     
    291295             
    292296        return retcode     
    293                      
     297                
     298    def setNonBlocking(self, fno) : 
     299        """Sets a file handle to be non-blocking.""" 
     300        flags = fcntl.fcntl(fno, fcntl.F_GETFL, 0) 
     301        fcntl.fcntl(fno, fcntl.F_SETFL, flags | os.O_NONBLOCK) 
     302 
    294303    def unregisterFileNo(self, pollobj, fileno) :                 
    295304        """Removes a file handle from the polling object.""" 
     
    301310            self.logdebug("File number %s unregistered from polling object." % fileno) 
    302311             
     312    def formatFileEvent(self, fd, mask, ins, outs) :         
     313        """Formats file debug info.""" 
     314        try : 
     315            name = ins.get(fd, outs.get(fd))["name"] 
     316        except KeyError :     
     317            self.logdebug("File %s not found in %s or %s" % (fd, repr(ins), repr(outs))) 
     318        else :     
     319            maskval = [] 
     320            if mask & select.POLLIN : 
     321                maskval.append("POLLIN") 
     322            if mask & select.POLLOUT : 
     323                maskval.append("POLLOUT") 
     324            if mask & select.POLLPRI : 
     325                maskval.append("POLLPRI") 
     326            if mask & select.POLLERR : 
     327                maskval.append("POLLERR") 
     328            if mask & select.POLLHUP : 
     329                maskval.append("POLLHUP") 
     330            if mask & select.POLLNVAL : 
     331                maskval.append("POLLNVAL") 
     332            return "%s (%s)" % (name, " | ".join(maskval)) 
     333         
    303334    def handleData(self) :                     
    304335        """Pass the job's data to the real backend.""" 
    305         # Now it becomes tricky... 
    306         # We must pass the unmodified job to the original backend 
    307         # First ensure that we have a file object as input 
    308         mustclose = 0     
    309         if self.inputfile is not None :     
    310             if hasattr(self.inputfile, "read") : 
    311                 infile = self.inputfile 
    312             else :     
    313                 infile = open(self.inputfile, "rb") 
    314             mustclose = 1 
    315         else :     
    316             infile = sys.stdin 
    317              
    318336        # Find the real backend pathname     
    319337        realbackend = os.path.join(os.path.split(sys.argv[0])[0], self.originalbackend) 
     
    321339        # And launch it 
    322340        self.logdebug("Starting real backend %s with args %s" % (realbackend, " ".join(['"%s"' % a for a in ([os.environ["DEVICE_URI"]] + sys.argv[1:])]))) 
    323         subprocess = PyKotaPopen3([realbackend] + sys.argv[1:], capturestderr=1, bufsize=0, arg0=os.environ["DEVICE_URI"]) 
     341        subprocess = PyKotaPopen4([realbackend] + sys.argv[1:], bufsize=0, arg0=os.environ["DEVICE_URI"]) 
    324342         
    325343        # Save file descriptors, we will need them later. 
    326         stdoutfno = sys.stdout.fileno() 
    327344        stderrfno = sys.stderr.fileno() 
    328345        fromcfno = subprocess.fromchild.fileno() 
    329         tocfno = subprocess.tochild.fileno() 
    330         cerrfno = subprocess.childerr.fileno() 
     346        self.setNonBlocking(fromcfno) 
    331347         
    332348        # We will have to be careful when dealing with I/O  
     
    334350        pollster = select.poll() 
    335351        pollster.register(fromcfno, select.POLLIN | select.POLLPRI) 
    336         pollster.register(cerrfno, select.POLLIN | select.POLLPRI) 
    337         pollster.register(stdoutfno, select.POLLOUT) 
    338         pollster.register(stderrfno, select.POLLOUT) 
    339         pollster.register(tocfno, select.POLLOUT) 
    340          
    341         # Initialize our buffers 
    342         indata = "" 
    343         outdata = "" 
    344         errdata = "" 
    345         endinput = endoutput = enderr = 0 
    346         inputclosed = outputclosed = errclosed = 0 
    347          
     352         
     353        instreams = { \ 
     354                      fromcfno : { "file" : subprocess.fromchild, "out" : stderrfno, "done" : 0, "name" : "real backend's stdout+stderr" },\ 
     355                    } 
     356                       
     357        outstreams = { \ 
     358                       stderrfno : { "file" : sys.stderr, "done" : 0, "in" : fromcfno, "name" : "stderr" }, \ 
     359                     } 
     360                        
    348361        if self.preserveinputfile is None : 
    349             # this is not a real file, we read the job's data 
    350             # from stdin  
    351             infno = infile.fileno() 
    352             pollster.register(infno, select.POLLIN | select.POLLPRI) 
     362            # this is not a real file, we read the job's data  
     363            # from stdin and send it on our stdout 
     364            tocfno = subprocess.tochild.fileno() 
     365            stdinfno = sys.stdin.fileno() 
     366            self.setNonBlocking(stdinfno) 
     367            pollster.register(stdinfno, select.POLLIN | select.POLLPRI) 
     368            instreams.update({ stdinfno : { "file": sys.stdin, "out" : tocfno, "done" : 0, "name" : "stdin" }}) 
     369            outstreams.update({ tocfno : { "file" : subprocess.tochild, "done" : 0, "in" : stdinfno, "name" : "real backend's stdin" }}) 
    353370        else :     
    354371            # job's data is in a file, no need to pass the data 
    355372            # to the real backend 
    356             self.logdebug("Job's data is in %s" % self.preserveinputfile) 
    357             infno = None 
    358             endinput = 1 
    359          
     373            self.logdebug("Job's data is in file %s" % self.preserveinputfile) 
     374             
    360375        killed = 0 
     376        status = -1 
    361377        self.logdebug("Entering streams polling loop...") 
    362         status = -1 
    363378        while status == -1 : 
    364             # First check if original backend is still alive 
    365             status = subprocess.poll() 
    366              
    367             # Now if we got SIGTERM, we have  
    368             # to kill -TERM the original backend 
    369             if self.gotSigTerm and not killed : 
    370                 try : 
     379            # Catches IOErrors caused by interrupted system calls 
     380            try : 
     381                # First check if original backend is still alive 
     382                status = subprocess.poll() 
     383                 
     384                # Now if we got SIGTERM, we have  
     385                # to kill -TERM the original backend 
     386                if self.gotSigTerm and not killed : 
    371387                    os.kill(subprocess.pid, signal.SIGTERM) 
    372388                    self.logger.log_message(_("SIGTERM was sent to real backend %s (pid: %s)") % (realbackend, subprocess.pid), "info") 
    373389                    killed = 1 
    374                 except : # ignore if process was already killed. 
    375                     pass 
    376              
    377             # In any case, deal with any remaining I/O 
    378             availablefds = pollster.poll(5000) 
    379             for (fd, mask) in availablefds : 
    380                 # self.logdebug("file: %i    mask: %04x" % (fd, mask)) 
    381                 try : 
    382                     if mask & select.POLLOUT : 
    383                         # We can write 
    384                         if fd == tocfno : 
    385                             if indata : 
    386                                 os.write(fd, indata)     
     390                 
     391                # In any case, deal with any remaining I/O 
     392                availablefds = pollster.poll(5000) 
     393                if not availablefds : 
     394                    self.logdebug("Nothing to do, sleeping a bit...") 
     395                    time.sleep(0.01) # nothing to do, give time to CPU 
     396                else :     
     397                    for (fd, mask) in availablefds : 
     398                        # self.logdebug(self.formatFileEvent(fd, mask, instreams, outstreams)) 
     399                        try : 
     400                            if mask & (select.POLLIN | select.POLLPRI) :      
     401                                # We have something to read 
    387402                                try : 
    388                                     os.fsync(fd) 
    389                                 except OSError : 
    390                                     pass 
    391                                 indata = "" 
    392                             if endinput :     
    393                                 self.unregisterFileNo(pollster, tocfno)         
    394                                 self.logdebug("Closing real backend's stdin.") 
    395                                 os.close(tocfno) 
    396                                 inputclosed = 1 
    397                         elif fd == stdoutfno : 
    398                             if outdata : 
    399                                 os.write(fd, outdata) 
     403                                    fobj = instreams[fd] 
     404                                except KeyError :     
     405                                    self.logdebug("READ : %s" % self.formatFileEvent(fd, mask, instreams, outstreams)) 
     406                                else :     
     407                                    data = fobj["file"].read() 
     408                                    if not data : 
     409                                        self.logdebug("No more data to read on %s (read returned nothing)" % fobj["name"]) 
     410                                        if not fobj["done"] : 
     411                                            self.unregisterFileNo(pollster, fd) 
     412                                            fobj["done"] = 1 
     413                                    else :     
     414                                        # self.logdebug("%s -- DATA[%i] <= : %s ..." % (self.formatFileEvent(fd, mask, instreams, outstreams), len(data), data[:50])) 
     415                                        fout = outstreams[fobj["out"]]["file"] 
     416                                        fout.write(data) 
     417                                        fout.flush() 
     418                                             
     419                            if mask & (select.POLLHUP | select.POLLERR) : 
     420                                # Some pipe has no more datas so we don't 
     421                                # want to continue to poll this file 
     422                                toclose = None 
    400423                                try : 
    401                                     os.fsync(fd) 
    402                                 except OSError :     
    403                                     pass 
    404                                 outdata = "" 
    405                             if endoutput :     
    406                                 self.unregisterFileNo(pollster, stdoutfno)         
    407                                 outputclosed = 1 
    408                         elif fd == stderrfno : 
    409                             if errdata : 
    410                                 os.write(fd, errdata) 
    411                                 try : 
    412                                     os.fsync(fd) 
    413                                 except OSError :     
    414                                     pass 
    415                                 errdata = "" 
    416                             if enderr :     
    417                                 self.unregisterFileNo(pollster, stderrfno)         
    418                                 errclosed = 1 
    419                     if (mask & select.POLLIN) or (mask & select.POLLPRI) :      
    420                         # We have something to read 
    421                         try : 
    422                             data = os.read(fd, 256 * 1024) 
    423                         except OSError, msg :     
    424                             self.logdebug("Error while reading file %s : %s" % (fd, msg)) 
    425                         else : 
    426                             if fd == infno : 
    427                                 indata += data 
    428                                 if not data :    # If yes, then no more input data 
    429                                     self.unregisterFileNo(pollster, infno) 
    430                                     self.logdebug("Input data ends.") 
    431                                     endinput = 1 # this happens with real files. 
    432                             elif fd == fromcfno : 
    433                                 outdata += data 
    434                             elif fd == cerrfno :     
    435                                 errdata += data 
    436                     if (mask & select.POLLHUP) or (mask & select.POLLERR) : 
    437                         # I've never seen POLLERR myself, but this probably 
    438                         # can't hurt to treat an error condition just like  
    439                         # an EOF. 
    440                         #  
    441                         # Some standard I/O stream has no more datas 
    442                         self.unregisterFileNo(pollster, fd) 
    443                         if fd == infno : 
    444                             # Here we are in the case where the input file is stdin. 
    445                             # which has no more data to be read. 
    446                             self.logdebug("Input data ends.") 
    447                             endinput = 1 
    448                         elif fd == fromcfno :     
    449                             # This should never happen, since 
    450                             # CUPS backends don't send anything on their 
    451                             # standard output. 
    452                             # We are no more interested in this file descriptor         
    453                             self.logdebug("Closing real backend's stdout.") 
    454                             os.close(fromcfno) 
    455                             endoutput = 1 
    456                         elif fd == cerrfno :     
    457                             # Original CUPS backend has finished  
    458                             # to write informations on its standard error. 
    459                             # We are no more interested in this file descriptor        . 
    460                             self.logdebug("Closing real backend's stderr.") 
    461                             os.close(cerrfno) 
    462                             enderr = 1 
    463                 except IOError :             
    464                     pass # we got signalled during an I/O it seems 
    465             if killed or (inputclosed and outputclosed and errclosed) : 
    466                 break 
     424                                    fobj = instreams[fd] 
     425                                    if fobj["name"] == "stdin" : 
     426                                        toclose = outstreams[fobj["out"]] 
     427                                    self.logdebug("No more data to read from %s (POLLUP or POLLERR received)" % fobj["name"]) 
     428                                except KeyError :     
     429                                    fobj = outstreams[fd] 
     430                                    if fobj["name"] == "stderr" : 
     431                                        toclose = instreams[fobj["in"]] 
     432                                    self.logdebug("No more data to write to %s (POLLUP or POLLERR received)" % fobj["name"]) 
     433                                     
     434                                if not fobj["done"] : 
     435                                    self.unregisterFileNo(pollster, fd) 
     436                                    fobj["done"] = 1 
     437                                    if toclose is not None : 
     438                                        self.logdebug("Closing %s" % toclose["name"]) 
     439                                        toclose["file"].close() 
     440                                         
     441                            if mask & select.POLLNVAL :             
     442                                self.logdebug("CLOSED : %s" % self.formatFileEvent(fd, mask, instreams, outstreams)) 
     443                                 
     444                        except IOError, msg :                 
     445                            self.logdebug("IOError : %s -- %s" % (msg, self.formatFileEvent(fd, mask, instreams, outstreams))) 
     446                            time.sleep(0.01) # give some time to the CPU 
     447            except IOError, msg : 
     448                self.logdebug("IOError : %s" % msg) 
     449                time.sleep(0.01) # give some time to the CPU 
    467450                 
    468         # We must close the real backend's input stream 
    469         if killed and not inputclosed : 
    470             self.logdebug("Forcing close of real backend's stdin.") 
    471             os.close(tocfno) 
    472          
    473         # Input file was a real file, we have to close it.     
    474         if mustclose : 
    475             infile.close() 
    476              
    477451        self.logdebug("Exiting streams polling loop...") 
    478              
    479         # Check exit code of original CUPS backend.     
    480         if status == -1 : 
    481             # we exited the loop before the real backend exited 
    482             # now we have to wait for it to finish and get its status 
    483             status = subprocess.wait() 
     452        status = subprocess.wait()      # just in case 
    484453        if os.WIFEXITED(status) : 
    485454            retcode = os.WEXITSTATUS(status) 
     
    489458        else :     
    490459            retcode = self.removeJob() 
     460        self.logdebug("Real backend exited with status %s" % status)     
    491461        return retcode     
    492462