364 | | |
365 | | instreams = { \ |
366 | | fromcfno : { "file" : subprocess.fromchild, "out" : stderrfno, "done" : 0, "name" : "real backend's stdout+stderr" },\ |
367 | | } |
368 | | |
369 | | outstreams = { \ |
370 | | stderrfno : { "file" : sys.stderr, "done" : 0, "in" : fromcfno, "name" : "stderr" }, \ |
371 | | } |
372 | | |
| 369 | pollster.register(stderrfno, select.POLLOUT) |
| 370 | pollster.register(tocfno, select.POLLOUT) |
| 371 | |
| 372 | # Initialize our buffers |
| 373 | indata = "" |
| 374 | outdata = "" |
| 375 | endinput = endoutput = 0 |
| 376 | inputclosed = outputclosed = 0 |
| 377 | |
374 | | # this is not a real file, we read the job's data |
375 | | # from stdin and send it on our stdout |
376 | | tocfno = subprocess.tochild.fileno() |
377 | | stdinfno = sys.stdin.fileno() |
378 | | self.setNonBlocking(stdinfno) |
379 | | pollster.register(stdinfno, select.POLLIN | select.POLLPRI) |
380 | | instreams.update({ stdinfno : { "file": sys.stdin, "out" : tocfno, "done" : 0, "name" : "stdin" }}) |
381 | | outstreams.update({ tocfno : { "file" : subprocess.tochild, "done" : 0, "in" : stdinfno, "name" : "real backend's stdin" }}) |
| 379 | # this is not a real file, we read the job's data |
| 380 | # from stdin |
| 381 | infno = self.jobdatastream.fileno() |
| 382 | self.jobdatastream.seek(0) |
| 383 | pollster.register(infno, select.POLLIN | select.POLLPRI) |
393 | | # Catches IOErrors caused by interrupted system calls |
394 | | try : |
395 | | # First check if original backend is still alive |
396 | | status = subprocess.poll() |
397 | | except : |
398 | | self.logdebug("Interrupted Poll") |
399 | | time.sleep(0.01) # give some time to the CPU |
400 | | else : |
401 | | # Now if we got SIGTERM, we have |
402 | | # to kill -TERM the original backend |
403 | | if self.gotSigTerm and not killed : |
| 398 | # First check if original backend is still alive |
| 399 | status = subprocess.poll() |
| 400 | |
| 401 | # Now if we got SIGTERM, we have |
| 402 | # to kill -TERM the original backend |
| 403 | if self.gotSigTerm and not killed : |
| 404 | try : |
| 408 | except : # ignore if process was already killed. |
| 409 | pass |
| 410 | |
| 411 | # In any case, deal with any remaining I/O |
| 412 | availablefds = pollster.poll(5000) |
| 413 | for (fd, mask) in availablefds : |
| 414 | # self.logdebug("file: %i mask: %04x" % (fd, mask)) |
| 415 | try : |
| 416 | if mask & select.POLLOUT : |
| 417 | # We can write |
| 418 | if fd == tocfno : |
| 419 | if indata : |
| 420 | os.write(fd, indata) |
| 421 | try : |
| 422 | os.fsync(fd) |
| 423 | except OSError : |
| 424 | pass |
| 425 | indata = "" |
| 426 | if endinput : |
| 427 | self.unregisterFileNo(pollster, tocfno) |
| 428 | self.logdebug("Closing real backend's stdin.") |
| 429 | os.close(tocfno) |
| 430 | inputclosed = 1 |
| 431 | elif fd == stderrfno : |
| 432 | if outdata : |
| 433 | os.write(fd, outdata) |
| 434 | try : |
| 435 | os.fsync(fd) |
| 436 | except OSError : |
| 437 | pass |
| 438 | outdata = "" |
| 439 | if endoutput : |
| 440 | self.unregisterFileNo(pollster, stderrfno) |
| 441 | outputclosed = 1 |
| 442 | if (mask & select.POLLIN) or (mask & select.POLLPRI) : |
| 443 | # We have something to read |
| 444 | try : |
| 445 | data = os.read(fd, 256 * 1024) |
| 446 | except OSError, msg : |
| 447 | self.logdebug("Error while reading file %s : %s" % (fd, msg)) |
| 448 | else : |
| 449 | if fd == infno : |
| 450 | indata += data |
| 451 | if not data : # If yes, then no more input data |
| 452 | self.unregisterFileNo(pollster, infno) |
| 453 | self.logdebug("Input data ends.") |
| 454 | endinput = 1 # this happens with real files. |
| 455 | elif fd == fromcfno : |
| 456 | outdata += data |
| 457 | if (mask & select.POLLHUP) or (mask & select.POLLERR) : |
| 458 | # I've never seen POLLERR myself, but this probably |
| 459 | # can't hurt to treat an error condition just like |
| 460 | # an EOF. |
| 461 | # |
| 462 | # Some standard I/O stream has no more datas |
| 463 | self.unregisterFileNo(pollster, fd) |
| 464 | if fd == infno : |
| 465 | # Here we are in the case where the input file is stdin. |
| 466 | # which has no more data to be read. |
| 467 | self.logdebug("Input data ends.") |
| 468 | endinput = 1 |
| 469 | elif fd == fromcfno : |
| 470 | # We are no more interested in this file descriptor |
| 471 | self.logdebug("Closing real backend's stdout+stderr.") |
| 472 | os.close(fromcfno) |
| 473 | endoutput = 1 |
| 474 | except IOError : |
| 475 | pass # we got signalled during an I/O it seems |
| 476 | if killed or (inputclosed and outputclosed) : |
| 477 | break |
408 | | # In any case, deal with any remaining I/O |
409 | | try : |
410 | | availablefds = pollster.poll(5000) |
411 | | except select.error : # we probably got a signal |
412 | | availablefds = [] |
413 | | if not availablefds : |
414 | | self.logdebug("Nothing to do, sleeping a bit...") |
415 | | time.sleep(0.01) # nothing to do, give time to CPU |
416 | | else : |
417 | | for (fd, mask) in availablefds : |
418 | | # self.logdebug(self.formatFileEvent(fd, mask, instreams, outstreams)) |
419 | | if mask & (select.POLLIN | select.POLLPRI) : |
420 | | # We have something to read |
421 | | try : |
422 | | fobj = instreams[fd] |
423 | | except KeyError : |
424 | | self.logdebug("READ : %s" % self.formatFileEvent(fd, mask, instreams, outstreams)) |
425 | | else : |
426 | | try : |
427 | | data = fobj["file"].read() |
428 | | except IOError, msg : |
429 | | self.logdebug("Interrupted Read : %s" % msg) |
430 | | else : |
431 | | if not data : |
432 | | self.logdebug("No more data to read on %s (read returned nothing)" % fobj["name"]) |
433 | | if not fobj["done"] : |
434 | | self.unregisterFileNo(pollster, fd) |
435 | | fobj["done"] = 1 |
436 | | else : |
437 | | # self.logdebug("%s -- DATA[%i] <= : %s ..." % (self.formatFileEvent(fd, mask, instreams, outstreams), len(data), data[:50])) |
438 | | fout = outstreams[fobj["out"]]["file"] |
439 | | try : |
440 | | fout.write(data) |
441 | | except IOError, msg : |
442 | | self.logdebug("Interrupted Write : %s" % msg) |
443 | | else : |
444 | | try : |
445 | | fout.flush() |
446 | | except IOError, msg : |
447 | | self.logdebug("Interrupted Flush : %s" % msg) |
448 | | |
449 | | if mask & (select.POLLHUP | select.POLLERR) : |
450 | | # Some pipe has no more datas so we don't |
451 | | # want to continue to poll this file |
452 | | toclose = None |
453 | | try : |
454 | | fobj = instreams[fd] |
455 | | if fobj["name"] == "stdin" : |
456 | | toclose = outstreams[fobj["out"]] |
457 | | self.logdebug("No more data to read from %s (POLLUP or POLLERR received)" % fobj["name"]) |
458 | | except KeyError : |
459 | | fobj = outstreams[fd] |
460 | | if fobj["name"] == "stderr" : |
461 | | toclose = instreams[fobj["in"]] |
462 | | self.logdebug("No more data to write to %s (POLLUP or POLLERR received)" % fobj["name"]) |
463 | | |
464 | | if not fobj["done"] : |
465 | | self.unregisterFileNo(pollster, fd) |
466 | | fobj["done"] = 1 |
467 | | if toclose is not None : |
468 | | self.logdebug("Closing %s" % toclose["name"]) |
469 | | try : |
470 | | toclose["file"].close() |
471 | | except : |
472 | | self.logdebug("Interrupted Close") |
473 | | |
474 | | if mask & select.POLLNVAL : |
475 | | self.logdebug("CLOSED : %s" % self.formatFileEvent(fd, mask, instreams, outstreams)) |
476 | | |
| 479 | # We must close the real backend's input stream |
| 480 | if killed and not inputclosed : |
| 481 | self.logdebug("Forcing close of real backend's stdin.") |
| 482 | os.close(tocfno) |
| 483 | |