root / pykota / trunk / pykota / ipp.py @ 2313

Revision 2313, 13.8 kB (checked in by jerome, 19 years ago)

Improved IPP parser.
Severity : Jamuel, you don't even need to give it a look ;-)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
RevLine 
[1901]1#! /usr/bin/env python
2# -*- coding: ISO-8859-15 -*-
3#
[2254]4# PyKota
[1901]5#
[2254]6# PyKota : Print Quotas for CUPS and LPRng
7#
8# (c) 2003, 2004, 2005 Jerome Alet <alet@librelogiciel.com>
[1901]9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program; if not, write to the Free Software
[2302]21# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
[1901]22#
23# $Id$
24#
[1972]25#
[1901]26
27import sys
[2311]28from struct import pack, unpack
[1901]29
[2254]30class IPPError(Exception):
31    """An exception for IPP related stuff."""
[1901]32    def __init__(self, message = ""):
33        self.message = message
34        Exception.__init__(self, message)
35    def __repr__(self):
36        return self.message
37    __str__ = __repr__
38
[2313]39class IPPRequest :
40    """A class for IPP requests.
[2254]41   
42       Usage :
43       
44         fp = open("/var/spool/cups/c00001", "rb")
[2313]45         message = IPPRequest(fp.read())
[2254]46         fp.close()
[2311]47         message.parse()
[2313]48         # print message.dump() # dumps an equivalent to the original IPP message
49         # print str(message)   # returns a string of text with the same content as below
50         print "IPP version : %s.%s" % message.version
51         print "IPP operation Id : 0x%04x" % message.operation_id
52         print "IPP request Id : 0x%08x" % message.request_id
53         for attrtype in message.attributes_types :
[2254]54             attrdict = getattr(message, "%s_attributes" % attrtype)
55             if attrdict :
56                 print "%s attributes :" % attrtype.title()
57                 for key in attrdict.keys() :
58                     print "  %s : %s" % (key, attrdict[key])
[2313]59         if message.data :           
60             print "IPP datas : ", repr(message.data)           
[2254]61    """
[2313]62    attributes_types = ("operation", "job", "printer", "unsupported", \
63                                     "subscription", "event_notification")
64    def __init__(self, data="", version=None, operation_id=None, \
65                                              request_id=None, debug=0) :
[2311]66        """Initializes an IPP Message object.
[2254]67       
68           Parameters :
69           
[2313]70             data : the complete IPP Message's content.
[2254]71             debug : a boolean value to output debug info on stderr.
72        """
73        self.debug = debug
[2313]74        self._data = data
[2311]75        self.parsed = 0
76       
[2313]77        # Initializes message
78        if version is not None :
79            try :
80                self.version = [int(p) for p in version.split(".")]
81            except AttributeError :
82                if len(version) == 2 : # 2-tuple
83                    self.version = version
84                else :   
85                    try :
86                        self.version = [int(p) for p in str(float(version)).split(".")]
87                    except :
88                        self.version = (1, 1) # default version number
[2311]89        self.operation_id = operation_id
90        self.request_id = request_id
[2313]91        self.data = ""
[2311]92       
[2313]93        # Initialize attributes mappings
[2273]94        for attrtype in self.attributes_types :
[2272]95            setattr(self, "%s_attributes" % attrtype, {})
[2313]96           
97        # Initialize tags   
98        self.tags = [ None ] * 256 # by default all tags reserved
[1901]99       
100        # Delimiter tags
[2311]101        self.tags[0x01] = "operation-attributes-tag"
102        self.tags[0x02] = "job-attributes-tag"
103        self.tags[0x03] = "end-of-attributes-tag"
104        self.tags[0x04] = "printer-attributes-tag"
105        self.tags[0x05] = "unsupported-attributes-tag"
[2313]106        self.tags[0x06] = "subscription-attributes-tag"
107        self.tags[0x07] = "event-notification-attributes-tag"
[1901]108       
109        # out of band values
110        self.tags[0x10] = "unsupported"
111        self.tags[0x11] = "reserved-for-future-default"
112        self.tags[0x12] = "unknown"
113        self.tags[0x13] = "no-value"
[2313]114        self.tags[0x15] = "not-settable"
115        self.tags[0x16] = "delete-attribute"
116        self.tags[0x17] = "admin-define"
117 
[1901]118        # integer values
119        self.tags[0x20] = "generic-integer"
120        self.tags[0x21] = "integer"
121        self.tags[0x22] = "boolean"
122        self.tags[0x23] = "enum"
123       
124        # octetString
125        self.tags[0x30] = "octetString-with-an-unspecified-format"
126        self.tags[0x31] = "dateTime"
127        self.tags[0x32] = "resolution"
128        self.tags[0x33] = "rangeOfInteger"
[2313]129        self.tags[0x34] = "begCollection" # TODO : find sample files for testing
[1901]130        self.tags[0x35] = "textWithLanguage"
131        self.tags[0x36] = "nameWithLanguage"
[2313]132        self.tags[0x37] = "endCollection"
[1901]133       
134        # character strings
[2313]135        self.tags[0x40] = "generic-character-string"
[1901]136        self.tags[0x41] = "textWithoutLanguage"
137        self.tags[0x42] = "nameWithoutLanguage"
138        self.tags[0x44] = "keyword"
139        self.tags[0x45] = "uri"
140        self.tags[0x46] = "uriScheme"
141        self.tags[0x47] = "charset"
142        self.tags[0x48] = "naturalLanguage"
143        self.tags[0x49] = "mimeMediaType"
[2313]144        self.tags[0x4a] = "memberAttrName"
[1901]145       
[2311]146        # Reverse mapping to generate IPP messages
147        self.dictags = {}
148        for i in range(len(self.tags)) :
149            value = self.tags[i]
150            if value is not None :
151                self.dictags[value] = i
[1901]152       
[2254]153    def printInfo(self, msg) :   
154        """Prints a debug message."""
155        if self.debug :
156            sys.stderr.write("%s\n" % msg)
157            sys.stderr.flush()
[1901]158           
[2311]159    def __str__(self) :       
160        """Returns the parsed IPP message in a readable form."""
161        if not self.parsed :
162            return ""
163        else :   
164            buffer = []
[2313]165            buffer.append("IPP version : %s.%s" % self.version)
166            buffer.append("IPP operation Id : 0x%04x" % self.operation_id)
167            buffer.append("IPP request Id : 0x%08x" % self.request_id)
[2311]168            for attrtype in self.attributes_types :
169                attrdict = getattr(self, "%s_attributes" % attrtype)
170                if attrdict :
171                    buffer.append("%s attributes :" % attrtype.title())
172                    for key in attrdict.keys() :
173                        buffer.append("  %s : %s" % (key, attrdict[key]))
[2313]174            if self.data :           
175                buffer.append("IPP datas : %s" % repr(message.data))
[2311]176            return "\n".join(buffer)
177       
178    def dump(self) :   
179        """Generates an IPP Message.
180       
181           Returns the message as a string of text.
182        """   
183        buffer = []
184        if None not in (self.version, self.operation_id, self.request_id) :
[2313]185            buffer.append(chr(self.version[0]) + chr(self.version[1]))
186            buffer.append(pack(">H", self.operation_id))
187            buffer.append(pack(">I", self.request_id))
[2311]188            for attrtype in self.attributes_types :
189                tagprinted = 0
190                for (attrname, value) in getattr(self, "%s_attributes" % attrtype).items() :
191                    if not tagprinted :
192                        buffer.append(chr(self.dictags["%s-attributes-tag" % attrtype]))
193                        tagprinted = 1
194                    if type(value) != type([]) :
195                        value = [ value ]
196                    for (vtype, val) in value :
197                        buffer.append(chr(self.dictags[vtype]))
198                        buffer.append(pack(">H", len(attrname)))
199                        buffer.append(attrname)
200                        if vtype in ("integer", "enum") :
201                            buffer.append(pack(">H", 4))
202                            buffer.append(pack(">I", val))
203                        elif vtype == "boolean" :
204                            buffer.append(pack(">H", 1))
205                            buffer.append(chr(val))
206                        else :   
207                            buffer.append(pack(">H", len(val)))
208                            buffer.append(val)
209            buffer.append(chr(self.dictags["end-of-attributes-tag"]))
[2313]210        buffer.append(self.data)   
[2311]211        return "".join(buffer)
[2313]212           
[2311]213    def parse(self) :
[2313]214        """Parses an IPP Request.
[2311]215       
216           NB : Only a subset of RFC2910 is implemented.
217        """
218        self._curname = None
219        self._curdict = None
[2313]220        self.version = (ord(self._data[0]), ord(self._data[1]))
221        self.operation_id = unpack(">H", self._data[2:4])[0]
222        self.request_id = unpack(">I", self._data[4:8])[0]
[2311]223        self.position = 8
224        endofattributes = self.dictags["end-of-attributes-tag"]
[2313]225        maxdelimiter = self.dictags["event-notification-attributes-tag"]
[2311]226        try :
[2313]227            tag = ord(self._data[self.position])
[2311]228            while tag != endofattributes :
229                self.position += 1
230                name = self.tags[tag]
231                if name is not None :
232                    func = getattr(self, name.replace("-", "_"), None)
233                    if func is not None :
234                        self.position += func()
[2313]235                        if ord(self._data[self.position]) > maxdelimiter :
[2311]236                            self.position -= 1
237                            continue
[2313]238                tag = ord(self._data[self.position])
[2311]239        except IndexError :
240            raise IPPError, "Unexpected end of IPP message."
241           
242        # Now transform all one-element lists into single values
243        for attrtype in self.attributes_types :
244            attrdict = getattr(self, "%s_attributes" % attrtype)
245            for (key, value) in attrdict.items() :
246                if len(value) == 1 :
247                    attrdict[key] = value[0]
[2313]248        self.data = self._data[self.position+1:]           
[2311]249        self.parsed = 1           
250       
[1901]251    def parseTag(self) :   
252        """Extracts information from an IPP tag."""
253        pos = self.position
[2313]254        tagtype = self.tags[ord(self._data[pos])]
[1901]255        pos += 1
256        posend = pos2 = pos + 2
[2313]257        namelength = unpack(">H", self._data[pos:pos2])[0]
[1901]258        if not namelength :
[2254]259            name = self._curname
[1901]260        else :   
261            posend += namelength
[2313]262            self._curname = name = self._data[pos2:posend]
[1901]263        pos2 = posend + 2
[2313]264        valuelength = unpack(">H", self._data[posend:pos2])[0]
[1901]265        posend = pos2 + valuelength
[2313]266        value = self._data[pos2:posend]
[2254]267        if tagtype in ("integer", "enum") :
268            value = unpack(">I", value)[0]
[2311]269        elif tagtype == "boolean" :   
270            value = ord(value)
[2254]271        oldval = self._curdict.setdefault(name, [])
272        oldval.append((tagtype, value))
[2272]273        self.printInfo("%s(%s) : %s" % (name, tagtype, value))
[1901]274        return posend - self.position
275       
276    def operation_attributes_tag(self) : 
277        """Indicates that the parser enters into an operation-attributes-tag group."""
[2254]278        self.printInfo("Start of operation_attributes_tag")
279        self._curdict = self.operation_attributes
[1901]280        return self.parseTag()
281       
282    def job_attributes_tag(self) : 
[2254]283        """Indicates that the parser enters into a job-attributes-tag group."""
284        self.printInfo("Start of job_attributes_tag")
285        self._curdict = self.job_attributes
[1901]286        return self.parseTag()
287       
288    def printer_attributes_tag(self) : 
[2254]289        """Indicates that the parser enters into a printer-attributes-tag group."""
290        self.printInfo("Start of printer_attributes_tag")
291        self._curdict = self.printer_attributes
[1901]292        return self.parseTag()
293       
[2254]294    def unsupported_attributes_tag(self) : 
295        """Indicates that the parser enters into an unsupported-attributes-tag group."""
296        self.printInfo("Start of unsupported_attributes_tag")
297        self._curdict = self.unsupported_attributes
298        return self.parseTag()
[2313]299       
300    def subscription_attributes_tag(self) : 
301        """Indicates that the parser enters into a subscription-attributes-tag group."""
302        self.printInfo("Start of subscription_attributes_tag")
303        self._curdict = self.subscription_attributes
304        return self.parseTag()
305       
306    def event_notification_attributes_tag(self) : 
307        """Indicates that the parser enters into an event-notification-attributes-tag group."""
308        self.printInfo("Start of event_notification_attributes_tag")
309        self._curdict = self.event_notification_attributes
310        return self.parseTag()
[1901]311           
[1972]312if __name__ == "__main__" :           
[2254]313    if (len(sys.argv) < 2) or (sys.argv[1] == "--debug") :
314        print "usage : python ipp.py /var/spool/cups/c00005 [--debug] (for example)\n"
[1972]315    else :   
[2254]316        infile = open(sys.argv[1], "rb")
[2313]317        data = infile.read()
318        message = IPPRequest(data, debug=(sys.argv[-1]=="--debug"))
[1972]319        infile.close()
[2311]320        message.parse()
321       
[2313]322        message2 = IPPRequest(message.dump())
[2311]323        message2.parse()
324       
325        # We now compare the message parsed, and the output
326        # of the parsing of the dumped message which was parsed.
327        # We sort the results for the test, because the attributes ordering
328        # may vary since Python dictionnaries are not sorted.
329        strmessage = str(message)
330        strmessage2 = str(message2)
331        lstmessage = strmessage.split("\n")
332        lstmessage.sort()
333        lstmessage2 = strmessage2.split("\n")
334        lstmessage2.sort()
335        if lstmessage == lstmessage2 :
336            print "Test OK : parsing original and parsing the output of the dump produce the same result !"
337            print strmessage
338        else :   
339            print "Test Failed !"
340            print strmessage
341            print
342            print strmessage2
Note: See TracBrowser for help on using the browser.