#! /usr/bin/env python # -*- coding: ISO-8859-15 -*- # # pkpgcounter : a generic Page Description Language parser # # (c) 2003, 2004, 2005, 2006 Jerome Alet # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # # $Id$ # """This modules implements a page counter for PCL3/4/5 documents.""" import sys import os import mmap from struct import unpack import pdlparser import pjl FORMFEED = chr(12) ESCAPE = chr(27) class Parser(pdlparser.PDLParser) : """A parser for PCL3, PCL4, PCL5 documents.""" totiffcommand = 'pcl6 -sDEVICE=pdfwrite -dPARANOIDSAFER -dNOPAUSE -dBATCH -dQUIET -sOutputFile=- - | gs -sDEVICE=tiff24nc -dPARANOIDSAFER -dNOPAUSE -dBATCH -dQUIET -r%(dpi)i -sOutputFile="%(fname)s" -' mediasizes = { # ESC&l####A 0 : "Default", 1 : "Executive", 2 : "Letter", 3 : "Legal", 6 : "Ledger", 25 : "A5", 26 : "A4", 27 : "A3", 45 : "JB5", 46 : "JB4", 71 : "HagakiPostcard", 72 : "OufukuHagakiPostcard", 80 : "MonarchEnvelope", 81 : "COM10Envelope", 90 : "DLEnvelope", 91 : "C5Envelope", 100 : "B5Envelope", 101 : "Custom", } mediasources = { # ESC&l####H 0 : "Default", 1 : "Main", 2 : "Manual", 3 : "ManualEnvelope", 4 : "Alternate", 5 : "OptionalLarge", 6 : "EnvelopeFeeder", 7 : "Auto", 8 : "Tray1", } orientations = { # ESC&l####O 0 : "Portrait", 1 : "Landscape", 2 : "ReversePortrait", 3 : "ReverseLandscape", } mediatypes = { # ESC&l####M 0 : "Plain", 1 : "Bond", 2 : "Special", 3 : "Glossy", 4 : "Transparent", } def isValid(self) : """Returns True if data is PCL3/4/5, else False.""" if self.firstblock.startswith("\033E\033") or \ (self.firstblock.startswith("\033*rbC") and (not self.lastblock[-3:] == "\f\033@")) or \ self.firstblock.startswith("\033%8\033") or \ (self.firstblock.find("\033%-12345X") != -1) or \ (self.firstblock.find("@PJL ENTER LANGUAGE=PCL\012\015\033") != -1) or \ (self.firstblock.startswith(chr(0xcd)+chr(0xca)) and self.firstblock.find("\033E\033")) : self.logdebug("DEBUG: Input file is in the PCL3/4/5 format.") return True else : return False def setPageDict(self, pages, number, attribute, value) : """Initializes a page dictionnary.""" dic = pages.setdefault(number, { "copies" : 1, "mediasource" : "Main", "mediasize" : "Default", "mediatype" : "Plain", "orientation" : "Portrait", "escaped" : "", "duplex": 0}) dic[attribute] = value def readByte(self) : """Reads a byte from the input stream.""" tag = ord(self.minfile[self.pos]) self.pos += 1 self.logdebug("BYTE %02x" % tag) return tag def endPage(self) : """Handle the FF marker.""" self.logdebug("FORMFEED %i" % self.pagecount) self.pagecount += 1 def escPercent(self) : """Handles the ESC% sequence.""" if self.minfile[self.pos : self.pos+7] == r"-12345X" : self.logdebug("Generic ESCAPE sequence at %08x" % self.pos) self.pos += 7 def handleTag(self, tagtable) : """Handles tags.""" tagtable[self.readByte()]() def escape(self) : """Handles the ESC character.""" self.logdebug("ESCAPE") self.handleTag(self.esctags) def escAmp(self) : """Handles the ESC& sequence.""" self.logdebug("AMP") self.handleTag(self.escamptags) def escStar(self) : """Handles the ESC* sequence.""" self.logdebug("STAR") self.handleTag(self.escstartags) def escLeftPar(self) : """Handles the ESC( sequence.""" self.logdebug("LEFTPAR") self.handleTag(self.escleftpartags) def escRightPar(self) : """Handles the ESC( sequence.""" self.logdebug("RIGHTPAR") self.handleTag(self.escrightpartags) def escE(self) : """Handles the ESCE sequence.""" self.logdebug("RESET") self.resets += 1 def escAmpl(self) : """Handles the ESC&l sequence.""" self.logdebug("l") while 1 : (value, end) = self.getInteger() if value is None : return if end in ('h', 'H') : mediasource = self.mediasources.get(value, str(value)) self.mediasourcesvalues.append(mediasource) self.logdebug("MEDIASOURCE %s" % mediasource) elif end in ('a', 'A') : mediasize = self.mediasizes.get(value, str(value)) self.mediasizesvalues.append(mediasize) self.logdebug("MEDIASIZE %s" % mediasize) elif end in ('o', 'O') : orientation = self.orientations.get(value, str(value)) self.orientationsvalues.append(orientation) self.logdebug("ORIENTATION %s" % orientation) elif end in ('m', 'M') : mediatype = self.mediatypes.get(value, str(value)) self.mediatypesvalues.append(mediatype) self.logdebug("MEDIATYPE %s" % mediatype) elif end == 'X' : self.copies.append(value) self.logdebug("COPIES %i" % value) elif end == 'L' : self.logdebug("ESC&l%iL" % value) def escStarb(self) : """Handles the ESC*b sequence.""" self.logdebug("b") while 1 : (value, end) = self.getInteger() self.logdebug("%s === %s" % (value, end)) if (end is None) and (value is None) : return if end in ('V', 'W', 'v', 'w') : self.pos += (value or 0) self.logdebug("SKIPTO %08x" % self.pos) def escStaro(self) : """Handles the ESC*o sequence.""" self.logdebug("o") while 1 : (value, end) = self.getInteger() if value is None : return if end == 'M' : self.logdebug("ESC*o%iM" % value) def escStarp(self) : """Handles the ESC*p sequence.""" self.logdebug("p") while 1 : (value, end) = self.getInteger() if value is None : return if end in ('X', 'Y') : self.logdebug("ESC*p%i%s" % (value, end)) def escStarr(self) : """Handles the ESC*r sequence.""" self.logdebug("r") while 1 : (value, end) = self.getInteger() if value is None : if end is None : return elif end == 'b' : if self.minfile[self.pos] == 'C' : self.logdebug("Looks like it's PCL3.") self.ispcl3 = True self.pos += 1 elif end == 'C' : self.logdebug("EndGFX") if not self.startgfx : self.logdebug("EndGFX found before StartGFX, ignored.") else : self.endgfx.append(1) if end == 'A' and (0 <= value <= 3) : self.logdebug("StartGFX %i" % value) self.startgfx.append(value) elif end == 'U' : self.logdebug("ESC*r%iU" % value) elif end == 'S' : self.logdebug("ESC*r%iS" % value) def escStart(self) : """Handles the ESC*t sequence.""" self.logdebug("t") while 1 : (value, end) = self.getInteger() if value is None : return if end == 'R' : self.logdebug("ESC*t%iR" % value) def escAmpu(self) : """Handles the ESC&u sequence.""" self.logdebug("u") while 1 : (value, end) = self.getInteger() if value is None : return if end == 'D' : self.logdebug("ESC&u%iD" % value) def getInteger(self) : """Returns an integer value and the end character.""" sign = 1 value = None while 1 : char = chr(self.readByte()) if char == ESCAPE : self.pos -= 1 # Adjust position return (None, None) if char == '-' : sign = -1 elif not char.isdigit() : if value is not None : return (sign*value, char) else : return (value, char) else : value = ((value or 0) * 10) + int(char) def getJobSize(self) : """Count pages in a PCL5 document. Should also work for PCL3 and PCL4 documents. Algorithm from pclcount (c) 2003, by Eduardo Gielamo Oliveira & Rodolfo Broco Manin published under the terms of the GNU General Public Licence v2. Backported from C to Python by Jerome Alet, then enhanced with more PCL tags detected. I think all the necessary PCL tags are recognized to correctly handle PCL5 files wrt their number of pages. The documentation used for this was : HP PCL/PJL Reference Set PCL5 Printer Language Technical Quick Reference Guide http://h20000.www2.hp.com/bc/docs/support/SupportManual/bpl13205/bpl13205.pdf """ infileno = self.infile.fileno() self.minfile = minfile = mmap.mmap(infileno, os.fstat(infileno)[6], prot=mmap.PROT_READ, flags=mmap.MAP_SHARED) self.ispcl3 = False self.pagecount = 0 self.resets = 0 self.copies = [] self.mediasourcesvalues = [] self.mediasizesvalues = [] self.orientationsvalues = [] self.mediatypesvalues = [] self.startgfx = [] self.endgfx = [] tags = [ lambda : None] * 256 tags[ord(FORMFEED)] = self.endPage tags[ord(ESCAPE)] = self.escape self.esctags = [ lambda : None ] * 256 self.esctags[ord('%')] = self.escPercent self.esctags[ord('*')] = self.escStar self.esctags[ord('&')] = self.escAmp self.esctags[ord('(')] = self.escLeftPar self.esctags[ord(')')] = self.escRightPar self.esctags[ord('E')] = self.escE self.escamptags = [lambda : None ] * 256 self.escamptags[ord('l')] = self.escAmpl self.escamptags[ord('u')] = self.escAmpu self.escstartags = [ lambda : None ] * 256 self.escstartags[ord('b')] = self.escStarb self.escstartags[ord('o')] = self.escStaro self.escstartags[ord('p')] = self.escStarp self.escstartags[ord('r')] = self.escStarr self.escstartags[ord('t')] = self.escStart self.pos = 0 try : try : while 1 : tag = self.readByte() self.logdebug("%08x ===> %02x" % (self.pos-1, tag)) tags[tag]() except IndexError : # EOF ? pass finally : self.minfile.close() self.logdebug("Pagecount : \t\t%i" % self.pagecount) self.logdebug("Resets : \t\t%i" % self.resets) self.logdebug("Copies : \t\t%s" % self.copies) self.logdebug("MediaTypes : \t\t%s" % self.mediatypesvalues) self.logdebug("MediaSizes : \t\t%s" % self.mediasizesvalues) self.logdebug("MediaSources : \t\t%s" % self.mediasourcesvalues) self.logdebug("Orientations : \t\t%s" % self.orientationsvalues) self.logdebug("StartGfx : \t\t%s" % len(self.startgfx)) self.logdebug("EndGfx : \t\t%s" % len(self.endgfx)) return self.pagecount """ tagsends = { "&n" : "W", "&b" : "W", "*i" : "W", "*l" : "W", "*m" : "W", "*v" : "W", "*c" : "W", "(f" : "W", "(s" : "W", ")s" : "W", "&p" : "X", # "&l" : "XHAOM", # treated specially "&a" : "G", # TODO : 0 means next side, 1 front side, 2 back side "*g" : "W", "*r" : "sbABC", "*t" : "R", # "*b" : "VW", # treated specially because it occurs very often } irmarker = chr(0xcd) + chr(0xca) # Marker for Canon ImageRunner printers irmarker2 = chr(0x10) + chr(0x02) wasirmarker = 0 hasirmarker = (minfile[:2] == (irmarker)) pagecount = resets = ejects = backsides = startgfx = endgfx = 0 starb = ampl = ispcl3 = escstart = 0 mediasourcecount = mediasizecount = orientationcount = mediatypecount = 0 tag = None endmark = chr(0x1b) + chr(0x0c) + chr(0x00) asciilimit = chr(0x80) pages = {} pos = 0 try : try : while 1 : if hasirmarker and (minfile[pos:pos+2] == irmarker) : codop = minfile[pos+2:pos+4] # self.logdebug("Marker at 0x%08x (%s)" % (pos, wasirmarker)) length = unpack(">H", minfile[pos+8:pos+10])[0] pos += 20 if codop != irmarker2 : pos += length wasirmarker = 1 else : wasirmarker = 0 elif char == "\033" : starb = ampl = 0 if minfile[pos : pos+8] == r"%-12345X" : endpos = pos + 9 quotes = 0 while (minfile[endpos] not in endmark) and \ ((minfile[endpos] < asciilimit) or (quotes % 2)) : if minfile[endpos] == '"' : quotes += 1 endpos += 1 self.setPageDict(pages, pagecount, "escaped", minfile[pos : endpos]) pos += (endpos - pos) else : # # *b###y#m###v###w... -> PCL3 raster graphics # *b###W -> Start of a raster data row/block # *b###V -> Start of a raster data plane # *c###W -> Start of a user defined pattern # *i###W -> Start of a viewing illuminant block # *l###W -> Start of a color lookup table # *m###W -> Start of a download dither matrix block # *v###W -> Start of a configure image data block # *r1A -> Start Gfx # (s###W -> Start of a characters description block # )s###W -> Start of a fonts description block # (f###W -> Start of a symbol set block # &b###W -> Start of configuration data block # &l###X -> Number of copies for current page # &n###W -> Starts an alphanumeric string ID block # &p###X -> Start of a non printable characters block # &a2G -> Back side when duplex mode as generated by rastertohp # *g###W -> Needed for planes in PCL3 output # &l###H (or only 0 ?) -> Eject if NumPlanes > 1, as generated by rastertohp. Also defines mediasource # &l###A -> mediasize # &l###O -> orientation # &l###M -> mediatype # *t###R -> gfx resolution # tagstart = minfile[pos] ; pos += 1 if tagstart in "E9=YZ" : # one byte PCL tag if tagstart == "E" : resets += 1 continue # skip to next tag tag = tagstart + minfile[pos] ; pos += 1 if tag == "*b" : starb = 1 tagend = "VW" elif tag == "&l" : ampl = 1 tagend = "XHAOM" else : try : tagend = tagsends[tag] except KeyError : continue # Unsupported PCL tag # Now read the numeric argument size = 0 while 1 : char = minfile[pos] ; pos += 1 if not char.isdigit() : break size = (size * 10) + int(char) if char in tagend : if tag == "&l" : if char == "X" : self.setPageDict(pages, pagecount, "copies", size) elif char == "H" : self.setPageDict(pages, pagecount, "mediasource", self.mediasources.get(size, str(size))) mediasourcecount += 1 ejects += 1 elif char == "A" : self.setPageDict(pages, pagecount, "mediasize", self.mediasizes.get(size, str(size))) mediasizecount += 1 elif char == "O" : self.setPageDict(pages, pagecount, "orientation", self.orientations.get(size, str(size))) orientationcount += 1 elif char == "M" : self.setPageDict(pages, pagecount, "mediatype", self.mediatypes.get(size, str(size))) mediatypecount += 1 elif tag == "*r" : # Special tests for PCL3 if (char == "s") and size : while 1 : char = minfile[pos] ; pos += 1 if char == "A" : break elif (char == "b") and (minfile[pos] == "C") and not size : ispcl3 = 1 # Certainely a PCL3 file startgfx += (char == "A") and (minfile[pos - 2] in ("0", "1", "2", "3")) # Start Gfx endgfx += (not size) and (char in ("C", "B")) # End Gfx elif tag == "*t" : escstart += 1 elif (tag == "&a") and (size == 2) : # We are on the backside, so mark current page as duplex self.setPageDict(pages, pagecount, "duplex", 1) backsides += 1 # Back side in duplex mode else : # we just ignore the block. if tag == "&n" : # we have to take care of the operation id byte # which is before the string itself size += 1 pos += size else : if starb : # special handling of PCL3 in which # *b introduces combined ESCape sequences size = 0 while 1 : char = minfile[pos] ; pos += 1 if not char.isdigit() : break size = (size * 10) + int(char) if char in ("w", "v") : ispcl3 = 1 # certainely a PCL3 document pos += size - 1 elif char in ("y", "m") : ispcl3 = 1 # certainely a PCL3 document pos -= 1 # fix position : we were ahead elif ampl : # special handling of PCL3 in which # &l introduces combined ESCape sequences size = 0 while 1 : char = minfile[pos] ; pos += 1 if not char.isdigit() : break size = (size * 10) + int(char) if char in ("a", "o", "h", "m") : ispcl3 = 1 # certainely a PCL3 document pos -= 1 # fix position : we were ahead if char == "h" : self.setPageDict(pages, pagecount, "mediasource", self.mediasources.get(size, str(size))) mediasourcecount += 1 elif char == "a" : self.setPageDict(pages, pagecount, "mediasize", self.mediasizes.get(size, str(size))) mediasizecount += 1 elif char == "o" : self.setPageDict(pages, pagecount, "orientation", self.orientations.get(size, str(size))) orientationcount += 1 elif char == "m" : self.setPageDict(pages, pagecount, "mediatype", self.mediatypes.get(size, str(size))) mediatypecount += 1 except IndexError : # EOF ? pass finally : minfile.close() # if pagecount is still 0, we will use the number # of resets instead of the number of form feed characters. # but the number of resets is always at least 2 with a valid # pcl file : one at the very start and one at the very end # of the job's data. So we substract 2 from the number of # resets. And since on our test data we needed to substract # 1 more, we finally substract 3, and will test several # PCL files with this. If resets < 2, then the file is # probably not a valid PCL file, so we use 0 if self.debug : sys.stderr.write("pagecount : %s\n" % pagecount) sys.stderr.write("resets : %s\n" % resets) sys.stderr.write("ejects : %s\n" % ejects) sys.stderr.write("backsides : %s\n" % backsides) sys.stderr.write("startgfx : %s\n" % startgfx) sys.stderr.write("endgfx : %s\n" % endgfx) sys.stderr.write("mediasourcecount : %s\n" % mediasourcecount) sys.stderr.write("mediasizecount : %s\n" % mediasizecount) sys.stderr.write("orientationcount : %s\n" % orientationcount) sys.stderr.write("mediatypecount : %s\n" % mediatypecount) sys.stderr.write("escstart : %s\n" % escstart) sys.stderr.write("hasirmarker : %s\n" % hasirmarker) if hasirmarker : self.logdebug("Rule #20 (probably a Canon ImageRunner)") pagecount += 1 elif (orientationcount == (pagecount - 1)) and (resets == 1) : if resets == ejects == startgfx == mediasourcecount == escstart == 1 : self.logdebug("Rule #19") else : self.logdebug("Rule #1") pagecount -= 1 elif pagecount and (pagecount == orientationcount) : self.logdebug("Rule #2") elif resets == ejects == mediasourcecount == mediasizecount == escstart == 1 : #if ((startgfx and endgfx) and (startgfx != endgfx)) or (startgfx == endgfx == 0) : if (startgfx and endgfx) or (startgfx == endgfx == 0) : self.logdebug("Rule #3") pagecount = orientationcount elif (endgfx and not startgfx) and (pagecount > orientationcount) : self.logdebug("Rule #4") pagecount = orientationcount else : self.logdebug("Rule #5") pagecount += 1 elif (ejects == mediasourcecount == orientationcount) and (startgfx == endgfx) : if (resets == 2) and (orientationcount == (pagecount - 1)) and (orientationcount > 1) : self.logdebug("Rule #6") pagecount = orientationcount elif pagecount == mediasourcecount == escstart : self.logdebug("Rule #7") elif resets == startgfx == endgfx == mediasizecount == orientationcount == escstart == 1 : self.logdebug("Rule #8") elif resets == startgfx == endgfx == (pagecount - 1) : self.logdebug("Rule #9") elif (not startgfx) and (not endgfx) : self.logdebug("Rule #10") elif (resets == 2) and (startgfx == endgfx) and (mediasourcecount == 1) : if orientationcount == (pagecount - 1) : self.logdebug("Rule #11") pagecount = orientationcount elif not pagecount : self.logdebug("Rule #17") pagecount = ejects elif (resets == 1) and (startgfx == endgfx) and (mediasourcecount == 0) : if (startgfx > 1) and (startgfx != (pagecount - 1)) : self.logdebug("Rule #12") pagecount -= 1 else : self.logdebug("Rule #18") elif startgfx == endgfx : self.logdebug("Rule #13") pagecount = startgfx elif startgfx == (endgfx - 1) : self.logdebug("Rule #14") pagecount = startgfx elif (startgfx == 1) and not endgfx : self.logdebug("Rule #15") pass else : self.logdebug("Rule #16") pagecount = abs(startgfx - endgfx) defaultpjlcopies = 1 defaultduplexmode = "Simplex" defaultpapersize = "" oldpjlcopies = -1 oldduplexmode = "" oldpapersize = "" for pnum in range(pagecount) : # if no number of copies defined, take the preceding one else the one set before any page else 1. page = pages.get(pnum, pages.get(pnum - 1, pages.get(0, { "copies" : 1, "mediasource" : "Main", "mediasize" : "Default", "mediatype" : "Plain", "orientation" : "Portrait", "escaped" : "", "duplex": 0}))) pjlstuff = page["escaped"] if pjlstuff : pjlparser = pjl.PJLParser(pjlstuff) nbdefaultcopies = int(pjlparser.default_variables.get("COPIES", -1)) nbcopies = int(pjlparser.environment_variables.get("COPIES", -1)) nbdefaultqty = int(pjlparser.default_variables.get("QTY", -1)) nbqty = int(pjlparser.environment_variables.get("QTY", -1)) if nbdefaultcopies > -1 : defaultpjlcopies = nbdefaultcopies if nbdefaultqty > -1 : defaultpjlcopies = nbdefaultqty if nbcopies > -1 : pjlcopies = nbcopies elif nbqty > -1 : pjlcopies = nbqty else : if oldpjlcopies == -1 : pjlcopies = defaultpjlcopies else : pjlcopies = oldpjlcopies if page["duplex"] : duplexmode = "Duplex" else : defaultdm = pjlparser.default_variables.get("DUPLEX", "") if defaultdm : if defaultdm.upper() == "ON" : defaultduplexmode = "Duplex" else : defaultduplexmode = "Simplex" envdm = pjlparser.environment_variables.get("DUPLEX", "") if envdm : if envdm.upper() == "ON" : duplexmode = "Duplex" else : duplexmode = "Simplex" else : duplexmode = oldduplexmode or defaultduplexmode defaultps = pjlparser.default_variables.get("PAPER", "") if defaultps : defaultpapersize = defaultps envps = pjlparser.environment_variables.get("PAPER", "") if envps : papersize = envps else : if not oldpapersize : papersize = defaultpapersize else : papersize = oldpapersize else : if oldpjlcopies == -1 : pjlcopies = defaultpjlcopies else : pjlcopies = oldpjlcopies duplexmode = (page["duplex"] and "Duplex") or oldduplexmode or defaultduplexmode if not oldpapersize : papersize = defaultpapersize else : papersize = oldpapersize papersize = oldpapersize or page["mediasize"] if page["mediasize"] != "Default" : papersize = page["mediasize"] if not duplexmode : duplexmode = oldduplexmode or defaultduplexmode oldpjlcopies = pjlcopies oldduplexmode = duplexmode oldpapersize = papersize copies = pjlcopies * page["copies"] pagecount += (copies - 1) self.logdebug("%s*%s*%s*%s*%s*%s*BW" % (copies, \ page["mediatype"], \ papersize, \ page["orientation"], \ page["mediasource"], \ duplexmode)) return pagecount """ def test() : """Test function.""" if (len(sys.argv) < 2) or ((not sys.stdin.isatty()) and ("-" not in sys.argv[1:])) : sys.argv.append("-") totalsize = 0 for arg in sys.argv[1:] : if arg == "-" : infile = sys.stdin mustclose = 0 else : infile = open(arg, "rb") mustclose = 1 try : parser = Parser(infile, debug=1) totalsize += parser.getJobSize() except pdlparser.PDLParserError, msg : sys.stderr.write("ERROR: %s\n" % msg) sys.stderr.flush() if mustclose : infile.close() print "%s" % totalsize if __name__ == "__main__" : test()