Changeset 588 for tea4cups

Show
Ignore:
Timestamp:
03/14/05 20:40:46 (20 years ago)
Author:
jerome
Message:

Should be working file now

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • tea4cups/trunk/tea4cups

    r587 r588  
    3232import tempfile 
    3333import ConfigParser 
    34 import threading 
     34import select 
     35import signal 
     36import time 
    3537from struct import unpack 
    3638 
     
    224226    def __init__(self) : 
    225227        """Initializes the CUPS backend wrapper.""" 
     228        self.gotSigTerm = 0 
     229        signal.signal(signal.SIGTERM, signal.SIG_IGN) 
     230        signal.signal(signal.SIGPIPE, signal.SIG_IGN) 
    226231        self.MyName = "Tea4CUPS" 
    227232        self.myname = "tea4cups" 
     
    239244    def logInfo(self, message, level="info") :         
    240245        """Logs a message to CUPS' error_log file.""" 
    241         sys.stderr.write("%s: %s v%s (PID %i) : %s\n" % (level.upper(), self.MyName, version, os.getpid(), message)) 
     246        sys.stderr.write("%s: %s v%s (PID %i / JobId %s) : %s\n" % (level.upper(), self.MyName, version, os.getpid(), self.JobId, message)) 
    242247        sys.stderr.flush() 
    243248         
     
    308313        available = [] 
    309314        (directory, myname) = os.path.split(sys.argv[0]) 
     315        if not directory : 
     316            directory = "./" 
    310317        tmpdir = tempfile.gettempdir() 
    311318        lockfilename = os.path.join(tmpdir, "%s..LCK" % myname) 
     
    513520            os.remove(self.DataFile) 
    514521             
     522    def sigtermHandler(self, signum, frame) : 
     523        """Sets an attribute whenever SIGTERM is received.""" 
     524        self.gotSigTerm = 1 
     525        self.logInfo("SIGTERM received for Job %s." % self.JobId) 
     526         
    515527    def runBranches(self) :          
    516528        """Launches each tee defined for the current print queue.""" 
    517529        exitcode = 0 
     530        signal.signal(signal.SIGTERM, self.sigtermHandler) 
    518531        branches = self.enumTeeBranches(self.PrinterName) 
    519532        if self.isTrue(self.getPrintQueueOption(self.PrinterName, "serialize", ignore=1)) : 
     
    523536                exitcode = self.runOriginalBackend() 
    524537            for (branch, command) in branches.items() : 
     538                if self.gotSigTerm : 
     539                    break 
    525540                self.logDebug("Launching %s : %s" % (branch, command)) 
    526541                retcode = os.system(command) 
     
    537552                branches["Original backend"] = None     # Fakes a tee to launch one more child 
    538553            for (branch, command) in branches.items() : 
     554                if self.gotSigTerm : 
     555                    break 
    539556                pid = os.fork() 
    540557                if pid : 
     
    560577                    self.logInfo("Tee %s (PID %s) on printer %s didn't exit successfully." % (branch, childpid, self.PrinterName), "error") 
    561578                    exitcode = 1 
     579        signal.signal(signal.SIGTERM, signal.SIG_IGN) 
    562580        return exitcode 
     581         
     582    def unregisterFileNo(self, pollobj, fileno) :                 
     583        """Removes a file handle from the polling object.""" 
     584        try : 
     585            pollobj.unregister(fileno) 
     586        except KeyError :     
     587            self.logInfo("File number %s unregistered twice from polling object, ignored." % fileno, "warn") 
     588        except :     
     589            self.logDebug("Error while unregistering file number %s from polling object." % fileno) 
     590        else :     
     591            self.logDebug("File number %s unregistered from polling object." % fileno) 
     592             
     593    def formatFileEvent(self, fd, mask) :         
     594        """Formats file debug info.""" 
     595        maskval = [] 
     596        if mask & select.POLLIN : 
     597            maskval.append("POLLIN") 
     598        if mask & select.POLLOUT : 
     599            maskval.append("POLLOUT") 
     600        if mask & select.POLLPRI : 
     601            maskval.append("POLLPRI") 
     602        if mask & select.POLLERR : 
     603            maskval.append("POLLERR") 
     604        if mask & select.POLLHUP : 
     605            maskval.append("POLLHUP") 
     606        if mask & select.POLLNVAL : 
     607            maskval.append("POLLNVAL") 
     608        return "%s (%s)" % (fd, " | ".join(maskval)) 
    563609         
    564610    def runOriginalBackend(self) :     
     
    568614        self.logDebug("Starting original backend %s with args %s" % (originalbackend, " ".join(['"%s"' % a for a in ([os.environ["DEVICE_URI"]] + arguments[1:])]))) 
    569615        subprocess = Popen4ForCUPS([originalbackend] + arguments[1:], bufsize=0, arg0=os.environ["DEVICE_URI"]) 
    570         rendezvous = threading.Event() 
     616         
     617        # Save file descriptors, we will need them later. 
     618        stderrfno = sys.stderr.fileno() 
     619        fromcfno = subprocess.fromchild.fileno() 
     620        tocfno = subprocess.tochild.fileno() 
     621         
     622        # We will have to be careful when dealing with I/O  
     623        # So we use a poll object to know when to read or write 
     624        pollster = select.poll() 
     625        pollster.register(fromcfno, select.POLLIN | select.POLLPRI) 
     626        pollster.register(stderrfno, select.POLLOUT) 
     627        pollster.register(tocfno, select.POLLOUT) 
     628         
     629        # Initialize our buffers 
     630        indata = "" 
     631        outdata = "" 
     632        endinput = endoutput = 0 
     633        inputclosed = outputclosed = 0 
     634        totaltochild = totalfromcups = 0 
     635        totalfromchild = totaltocups = 0 
     636         
    571637        if self.InputFile is None : 
    572            self.logDebug("Launching data thread.") 
    573            infile = open(self.DataFile, "rb") 
    574            iothread = threading.Thread(target = self.handleOriginalBackendIO, kwargs = { "parent" : threading.currentThread(), "event" : rendezvous, "inf": infile, "outf" : subprocess.tochild}) 
    575            iothread.start() 
    576             
    577         # here we only want to read its output and send it on our stderr 
    578         while (self.InputFile is not None) or iothread.isAlive() : # until all data transmitted to original backend 
    579             data = subprocess.fromchild.readline() 
    580             if (not data) or rendezvous.isSet() : 
    581                 break 
    582             sys.stderr.write(data)     
    583             sys.stderr.flush() 
    584              
    585         if not rendezvous.isSet() :     
    586             rendezvous.set()     
    587         if self.InputFile is None : 
    588             infile.close() 
    589             iothread.join() 
    590         subprocess.fromchild.close() 
    591         subprocess.tochild.close() 
    592         status = subprocess.wait() 
     638           # this is not a real file, we read the job's data 
     639            # from our temporary file which is a copy of stdin  
     640            inf = open(self.DataFile, "rb") 
     641            infno = inf.fileno() 
     642            pollster.register(infno, select.POLLIN | select.POLLPRI) 
     643        else :     
     644            # job's data is in a file, no need to pass the data 
     645            # to the original backend 
     646            self.logDebug("Job's data is in %s" % self.InputFile) 
     647            infno = None 
     648            endinput = 1 
     649         
     650        self.logDebug("Entering streams polling loop...") 
     651        MEGABYTE = 1024*1024 
     652        killed = 0 
     653        status = -1 
     654        while (status == -1) and (not killed) and not (inputclosed and outputclosed) : 
     655            # First check if original backend is still alive 
     656            status = subprocess.poll() 
     657             
     658            # Now if we got SIGTERM, we have  
     659            # to kill -TERM the original backend 
     660            if self.gotSigTerm and not killed : 
     661                try : 
     662                    os.kill(subprocess.pid, signal.SIGTERM) 
     663                except OSError, msg : # ignore but logs if process was already killed. 
     664                    self.logDebug("Error while sending signal to pid %s : %s" % (subprocess.pid, msg)) 
     665                else :     
     666                    self.logInfo(_("SIGTERM was sent to original backend %s (PID %s)") % (originalbackend, subprocess.pid)) 
     667                    killed = 1 
     668             
     669            # In any case, deal with any remaining I/O 
     670            try : 
     671                availablefds = pollster.poll(5000) 
     672            except select.error, msg :     
     673                self.logDebug("Interrupted poll : %s" % msg) 
     674                availablefds = [] 
     675            if not availablefds : 
     676                self.logDebug("Nothing to do, sleeping a bit...") 
     677                time.sleep(0.01) # give some time to the system 
     678            else : 
     679                for (fd, mask) in availablefds : 
     680                    try : 
     681                        if mask & select.POLLOUT : 
     682                            # We can write 
     683                            if fd == tocfno : 
     684                                if indata : 
     685                                    try : 
     686                                        nbwritten = os.write(fd, indata)     
     687                                    except (OSError, IOError), msg :     
     688                                        self.logDebug("Error while writing to original backend's stdin %s : %s" % (fd, msg)) 
     689                                    else :     
     690                                        if len(indata) != nbwritten : 
     691                                            self.logDebug("Short write to original backend's input !") 
     692                                        totaltochild += nbwritten     
     693                                        self.logDebug("%s bytes sent to original backend so far..." % totaltochild) 
     694                                        indata = indata[nbwritten:] 
     695                                else :         
     696                                    self.logDebug("No data to send to original backend yet, sleeping a bit...") 
     697                                    time.sleep(0.01) 
     698                                     
     699                                if endinput :     
     700                                    self.unregisterFileNo(pollster, tocfno)         
     701                                    self.logDebug("Closing original backend's stdin.") 
     702                                    os.close(tocfno) 
     703                                    inputclosed = 1 
     704                            elif fd == stderrfno : 
     705                                if outdata : 
     706                                    try : 
     707                                        nbwritten = os.write(fd, outdata) 
     708                                    except (OSError, IOError), msg :     
     709                                        self.logDebug("Error while writing to CUPS back channel (stderr) %s : %s" % (fd, msg)) 
     710                                    else : 
     711                                        if len(outdata) != nbwritten : 
     712                                            self.logDebug("Short write to stderr (CUPS) !") 
     713                                        totaltocups += nbwritten     
     714                                        self.logDebug("%s bytes sent back to CUPS so far..." % totaltocups) 
     715                                        outdata = outdata[nbwritten:] 
     716                                else :         
     717                                    # self.logDebug("No data to send back to CUPS yet, sleeping a bit...") # Uncommenting this fills your logs 
     718                                    time.sleep(0.01) # Give some time to the system, stderr is ALWAYS writeable it seems. 
     719                                     
     720                                if endoutput :     
     721                                    self.unregisterFileNo(pollster, stderrfno)         
     722                                    outputclosed = 1 
     723                            else :     
     724                                self.logDebug("Unexpected : %s - Sleeping a bit..." % self.formatFileEvent(fd, mask)) 
     725                                time.sleep(0.01) 
     726                                 
     727                        if mask & (select.POLLIN | select.POLLPRI) :      
     728                            # We have something to read 
     729                            try : 
     730                                data = os.read(fd, MEGABYTE) 
     731                            except (IOError, OSError), msg :     
     732                                self.logDebug("Error while reading file %s : %s" % (fd, msg)) 
     733                            else : 
     734                                if fd == infno : 
     735                                    if not data :    # If yes, then no more input data 
     736                                        self.unregisterFileNo(pollster, infno) 
     737                                        self.logDebug("Input data ends.") 
     738                                        endinput = 1 # this happens with real files. 
     739                                    else :     
     740                                        indata += data 
     741                                        totalfromcups += len(data) 
     742                                        self.logDebug("%s bytes read from CUPS so far..." % totalfromcups) 
     743                                elif fd == fromcfno : 
     744                                    if not data : 
     745                                        self.logDebug("No back channel data to read from original backend yet, sleeping a bit...") 
     746                                        time.sleep(0.01) 
     747                                    else : 
     748                                        outdata += data 
     749                                        totalfromchild += len(data) 
     750                                        self.logDebug("%s bytes read from original backend so far..." % totalfromchild) 
     751                                else :     
     752                                    self.logDebug("Unexpected : %s - Sleeping a bit..." % self.formatFileEvent(fd, mask)) 
     753                                    time.sleep(0.01) 
     754                                     
     755                        if mask & (select.POLLHUP | select.POLLERR) : 
     756                            # Treat POLLERR as an EOF. 
     757                            # Some standard I/O stream has no more datas 
     758                            self.unregisterFileNo(pollster, fd) 
     759                            if fd == infno : 
     760                                # Here we are in the case where the input file is stdin. 
     761                                # which has no more data to be read. 
     762                                self.logDebug("Input data ends.") 
     763                                endinput = 1 
     764                            elif fd == fromcfno :     
     765                                # We are no more interested in this file descriptor         
     766                                self.logDebug("Closing original backend's stdout+stderr.") 
     767                                os.close(fromcfno) 
     768                                endoutput = 1 
     769                            else :     
     770                                self.logDebug("Unexpected : %s - Sleeping a bit..." % self.formatFileEvent(fd, mask)) 
     771                                time.sleep(0.01) 
     772                                 
     773                        if mask & select.POLLNVAL :         
     774                            self.logDebug("File %s was closed. Unregistering from polling object." % fd) 
     775                            self.unregisterFileNo(pollster, fd) 
     776                    except IOError, msg :             
     777                        self.logDebug("Got an IOError : %s" % msg) # we got signalled during an I/O 
     778                 
     779        # We must close the original backend's input stream 
     780        if killed and not inputclosed : 
     781            self.logDebug("Forcing close of original backend's stdin.") 
     782            os.close(tocfno) 
     783         
     784        self.logDebug("Exiting streams polling loop...") 
     785         
     786        self.logDebug("input data's final length : %s" % len(indata)) 
     787        self.logDebug("back-channel data's final length : %s" % len(outdata)) 
     788         
     789        self.logDebug("Total bytes read from CUPS (job's datas) : %s" % totalfromcups) 
     790        self.logDebug("Total bytes sent to original backend (job's datas) : %s" % totaltochild) 
     791         
     792        self.logDebug("Total bytes read from original backend (back-channel datas) : %s" % totalfromchild) 
     793        self.logDebug("Total bytes sent back to CUPS (back-channel datas) : %s" % totaltocups) 
     794         
     795        # Check exit code of original CUPS backend.     
     796        if status == -1 : 
     797            # we exited the loop before the original backend exited 
     798            # now we have to wait for it to finish and get its status 
     799            self.logDebug("Waiting for original backend to exit...") 
     800            try : 
     801                status = subprocess.wait() 
     802            except OSError : # already dead : TODO : detect when abnormal 
     803                status = 0 
    593804        if os.WIFEXITED(status) : 
    594805            return os.WEXITSTATUS(status) 
     806        elif not killed :     
     807            self.logInfo("CUPS backend %s died abnormally." % originalbackend, "error") 
     808            return -1 
    595809        else :     
    596810            return 1 
    597          
    598     def handleOriginalBackendIO(self, parent, event, inf, outf) :     
    599         """Thread to handles the original backend's I/O.""" 
    600         dummy = 0 
    601         totalsent = 0 
    602         while 1 : 
    603             if not parent.isAlive() : 
    604                 self.logInfo("Parent died unexpectedly.", level = "warn") 
    605                 break 
    606             if event.isSet() :     
    607                 self.logInfo("Parent said work is finished.") 
    608                 break 
    609             try :     
    610                 data = inf.readline() 
    611                 if not data : 
    612                     break 
    613                 outf.write(data)     
    614                 totalsent += len(data) 
    615                 dummy += 1 
    616                 if not (dummy % 10) : 
    617                     self.logDebug("Sent %i bytes to original backend." % totalsent) 
    618                 try : 
    619                     outf.flush() 
    620                 except :     
    621                     pass 
    622             except IOError, msg : 
    623                 self.logInfo("I/O Error : %s" % msg, level = "warn") 
    624                 break 
    625         event.set()         
    626         sys.exit(0) 
    627811         
    628812if __name__ == "__main__" :