Show
Ignore:
Timestamp:
05/24/04 16:36:40 (20 years ago)
Author:
jalet
Message:

Revert to old polling loop. Will need optimisations

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • pykota/trunk/bin/cupspykota

    r1493 r1494  
    2424# 
    2525# $Log$ 
     26# Revision 1.48  2004/05/24 14:36:24  jalet 
     27# Revert to old polling loop. Will need optimisations 
     28# 
    2629# Revision 1.47  2004/05/24 11:59:46  jalet 
    2730# More robust (?) code 
     
    319322        except KeyError :     
    320323            self.logger.log_message(_("File number %s unregistered twice from polling object, ignored.") % fileno, "warn") 
     324        except :     
     325            self.logdebug("Error while unregistering file number %s from polling object." % fileno) 
    321326        else :     
    322327            self.logdebug("File number %s unregistered from polling object." % fileno) 
     
    356361        stderrfno = sys.stderr.fileno() 
    357362        fromcfno = subprocess.fromchild.fileno() 
    358         self.setNonBlocking(fromcfno) 
     363        tocfno = subprocess.tochild.fileno() 
    359364         
    360365        # We will have to be careful when dealing with I/O  
     
    362367        pollster = select.poll() 
    363368        pollster.register(fromcfno, select.POLLIN | select.POLLPRI) 
    364          
    365         instreams = { \ 
    366                       fromcfno : { "file" : subprocess.fromchild, "out" : stderrfno, "done" : 0, "name" : "real backend's stdout+stderr" },\ 
    367                     } 
    368                        
    369         outstreams = { \ 
    370                        stderrfno : { "file" : sys.stderr, "done" : 0, "in" : fromcfno, "name" : "stderr" }, \ 
    371                      } 
    372                         
     369        pollster.register(stderrfno, select.POLLOUT) 
     370        pollster.register(tocfno, select.POLLOUT) 
     371         
     372        # Initialize our buffers 
     373        indata = "" 
     374        outdata = "" 
     375        endinput = endoutput = 0 
     376        inputclosed = outputclosed = 0 
     377         
    373378        if self.preserveinputfile is None : 
    374             # this is not a real file, we read the job's data  
    375             # from stdin and send it on our stdout 
    376             tocfno = subprocess.tochild.fileno() 
    377             stdinfno = sys.stdin.fileno() 
    378             self.setNonBlocking(stdinfno) 
    379             pollster.register(stdinfno, select.POLLIN | select.POLLPRI) 
    380             instreams.update({ stdinfno : { "file": sys.stdin, "out" : tocfno, "done" : 0, "name" : "stdin" }}) 
    381             outstreams.update({ tocfno : { "file" : subprocess.tochild, "done" : 0, "in" : stdinfno, "name" : "real backend's stdin" }}) 
     379            # this is not a real file, we read the job's data 
     380            # from stdin  
     381            infno = self.jobdatastream.fileno() 
     382            self.jobdatastream.seek(0) 
     383            pollster.register(infno, select.POLLIN | select.POLLPRI) 
    382384        else :     
    383385            # job's data is in a file, no need to pass the data 
    384386            # to the real backend 
    385             self.logdebug("Job's data is in file %s" % self.preserveinputfile) 
    386              
    387         killed = 0 
    388         status = -1 
     387            self.logdebug("Job's data is in %s" % self.preserveinputfile) 
     388            infno = None 
     389            endinput = 1 
     390         
    389391        self.logdebug("Catching SIGTERM.") 
    390392        signal.signal(signal.SIGTERM, self.sigterm_handler) 
     393         
     394        killed = 0 
    391395        self.logdebug("Entering streams polling loop...") 
     396        status = -1 
    392397        while status == -1 : 
    393             # Catches IOErrors caused by interrupted system calls 
    394             try : 
    395                 # First check if original backend is still alive 
    396                 status = subprocess.poll() 
    397             except :     
    398                 self.logdebug("Interrupted Poll") 
    399                 time.sleep(0.01) # give some time to the CPU 
    400             else : 
    401                 # Now if we got SIGTERM, we have  
    402                 # to kill -TERM the original backend 
    403                 if self.gotSigTerm and not killed : 
     398            # First check if original backend is still alive 
     399            status = subprocess.poll() 
     400             
     401            # Now if we got SIGTERM, we have  
     402            # to kill -TERM the original backend 
     403            if self.gotSigTerm and not killed : 
     404                try : 
    404405                    os.kill(subprocess.pid, signal.SIGTERM) 
    405406                    self.logger.log_message(_("SIGTERM was sent to real backend %s (pid: %s)") % (realbackend, subprocess.pid), "info") 
    406407                    killed = 1 
     408                except : # ignore if process was already killed. 
     409                    pass 
     410             
     411            # In any case, deal with any remaining I/O 
     412            availablefds = pollster.poll(5000) 
     413            for (fd, mask) in availablefds : 
     414                # self.logdebug("file: %i    mask: %04x" % (fd, mask)) 
     415                try : 
     416                    if mask & select.POLLOUT : 
     417                        # We can write 
     418                        if fd == tocfno : 
     419                            if indata : 
     420                                os.write(fd, indata)     
     421                                try : 
     422                                    os.fsync(fd) 
     423                                except OSError : 
     424                                    pass 
     425                                indata = "" 
     426                            if endinput :     
     427                                self.unregisterFileNo(pollster, tocfno)         
     428                                self.logdebug("Closing real backend's stdin.") 
     429                                os.close(tocfno) 
     430                                inputclosed = 1 
     431                        elif fd == stderrfno : 
     432                            if outdata : 
     433                                os.write(fd, outdata) 
     434                                try : 
     435                                    os.fsync(fd) 
     436                                except OSError :     
     437                                    pass 
     438                                outdata = "" 
     439                            if endoutput :     
     440                                self.unregisterFileNo(pollster, stderrfno)         
     441                                outputclosed = 1 
     442                    if (mask & select.POLLIN) or (mask & select.POLLPRI) :      
     443                        # We have something to read 
     444                        try : 
     445                            data = os.read(fd, 256 * 1024) 
     446                        except OSError, msg :     
     447                            self.logdebug("Error while reading file %s : %s" % (fd, msg)) 
     448                        else : 
     449                            if fd == infno : 
     450                                indata += data 
     451                                if not data :    # If yes, then no more input data 
     452                                    self.unregisterFileNo(pollster, infno) 
     453                                    self.logdebug("Input data ends.") 
     454                                    endinput = 1 # this happens with real files. 
     455                            elif fd == fromcfno : 
     456                                outdata += data 
     457                    if (mask & select.POLLHUP) or (mask & select.POLLERR) : 
     458                        # I've never seen POLLERR myself, but this probably 
     459                        # can't hurt to treat an error condition just like  
     460                        # an EOF. 
     461                        #  
     462                        # Some standard I/O stream has no more datas 
     463                        self.unregisterFileNo(pollster, fd) 
     464                        if fd == infno : 
     465                            # Here we are in the case where the input file is stdin. 
     466                            # which has no more data to be read. 
     467                            self.logdebug("Input data ends.") 
     468                            endinput = 1 
     469                        elif fd == fromcfno :     
     470                            # We are no more interested in this file descriptor         
     471                            self.logdebug("Closing real backend's stdout+stderr.") 
     472                            os.close(fromcfno) 
     473                            endoutput = 1 
     474                except IOError :             
     475                    pass # we got signalled during an I/O it seems 
     476            if killed or (inputclosed and outputclosed) : 
     477                break 
    407478                 
    408                 # In any case, deal with any remaining I/O 
    409                 try : 
    410                     availablefds = pollster.poll(5000) 
    411                 except select.error : # we probably got a signal 
    412                     availablefds = []    
    413                 if not availablefds : 
    414                     self.logdebug("Nothing to do, sleeping a bit...") 
    415                     time.sleep(0.01) # nothing to do, give time to CPU 
    416                 else :     
    417                     for (fd, mask) in availablefds : 
    418                         # self.logdebug(self.formatFileEvent(fd, mask, instreams, outstreams)) 
    419                         if mask & (select.POLLIN | select.POLLPRI) :      
    420                             # We have something to read 
    421                             try : 
    422                                 fobj = instreams[fd] 
    423                             except KeyError :     
    424                                 self.logdebug("READ : %s" % self.formatFileEvent(fd, mask, instreams, outstreams)) 
    425                             else :     
    426                                 try : 
    427                                     data = fobj["file"].read() 
    428                                 except IOError, msg :     
    429                                     self.logdebug("Interrupted Read : %s" % msg) 
    430                                 else :     
    431                                     if not data : 
    432                                         self.logdebug("No more data to read on %s (read returned nothing)" % fobj["name"]) 
    433                                         if not fobj["done"] : 
    434                                             self.unregisterFileNo(pollster, fd) 
    435                                             fobj["done"] = 1 
    436                                     else :     
    437                                         # self.logdebug("%s -- DATA[%i] <= : %s ..." % (self.formatFileEvent(fd, mask, instreams, outstreams), len(data), data[:50])) 
    438                                         fout = outstreams[fobj["out"]]["file"] 
    439                                         try : 
    440                                             fout.write(data) 
    441                                         except IOError, msg :     
    442                                             self.logdebug("Interrupted Write : %s" % msg) 
    443                                         else :     
    444                                             try : 
    445                                                 fout.flush() 
    446                                             except IOError, msg :     
    447                                                 self.logdebug("Interrupted Flush : %s" % msg) 
    448                                          
    449                         if mask & (select.POLLHUP | select.POLLERR) : 
    450                             # Some pipe has no more datas so we don't 
    451                             # want to continue to poll this file 
    452                             toclose = None 
    453                             try : 
    454                                 fobj = instreams[fd] 
    455                                 if fobj["name"] == "stdin" : 
    456                                     toclose = outstreams[fobj["out"]] 
    457                                 self.logdebug("No more data to read from %s (POLLUP or POLLERR received)" % fobj["name"]) 
    458                             except KeyError :     
    459                                 fobj = outstreams[fd] 
    460                                 if fobj["name"] == "stderr" : 
    461                                     toclose = instreams[fobj["in"]] 
    462                                 self.logdebug("No more data to write to %s (POLLUP or POLLERR received)" % fobj["name"]) 
    463                                  
    464                             if not fobj["done"] : 
    465                                 self.unregisterFileNo(pollster, fd) 
    466                                 fobj["done"] = 1 
    467                                 if toclose is not None : 
    468                                     self.logdebug("Closing %s" % toclose["name"]) 
    469                                     try : 
    470                                         toclose["file"].close() 
    471                                     except :     
    472                                         self.logdebug("Interrupted Close") 
    473                                      
    474                         if mask & select.POLLNVAL :             
    475                             self.logdebug("CLOSED : %s" % self.formatFileEvent(fd, mask, instreams, outstreams)) 
    476                  
     479        # We must close the real backend's input stream 
     480        if killed and not inputclosed : 
     481            self.logdebug("Forcing close of real backend's stdin.") 
     482            os.close(tocfno) 
     483         
    477484        self.logdebug("Exiting streams polling loop...") 
    478485         
    479486        self.logdebug("Ignoring SIGTERM again.") 
    480487        signal.signal(signal.SIGTERM, signal.SIG_IGN) 
    481          
    482         status = subprocess.wait()      # just in case 
     488             
     489        # Check exit code of original CUPS backend.     
     490        if status == -1 : 
     491            # we exited the loop before the real backend exited 
     492            # now we have to wait for it to finish and get its status 
     493            try : 
     494                status = subprocess.wait() 
     495            except OSError : # already dead     
     496                status = 0 
    483497        if os.WIFEXITED(status) : 
    484498            retcode = os.WEXITSTATUS(status) 
     
    488502        else :     
    489503            retcode = self.removeJob() 
    490         self.logdebug("Real backend exited with status %s" % status)     
    491504        return retcode     
    492505