root / pykota / trunk / pykota / ipp.py @ 2311

Revision 2311, 11.8 kB (checked in by jerome, 19 years ago)

Fixed bug : exception name was incorrect in an except statement in cupspykota.
Improved the IPP module : can now generate IPP messages as well as parsing them.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1#! /usr/bin/env python
2# -*- coding: ISO-8859-15 -*-
3#
4# PyKota
5#
6# PyKota : Print Quotas for CUPS and LPRng
7#
8# (c) 2003, 2004, 2005 Jerome Alet <alet@librelogiciel.com>
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program; if not, write to the Free Software
21# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
22#
23# $Id$
24#
25#
26
27import sys
28from struct import pack, unpack
29
30class IPPError(Exception):
31    """An exception for IPP related stuff."""
32    def __init__(self, message = ""):
33        self.message = message
34        Exception.__init__(self, message)
35    def __repr__(self):
36        return self.message
37    __str__ = __repr__
38
39class IPPMessage :
40    """A class for IPP message files.
41   
42       Usage :
43       
44         fp = open("/var/spool/cups/c00001", "rb")
45         message = IPPMessage(fp.read())
46         fp.close()
47         message.parse()
48         # print str(message)
49         # print message.dump()
50         print "IPP version : %s" % message.version
51         print "IPP operation Id : %s" % message.operation_id
52         print "IPP request Id : %s" % message.request_id
53         for attrtype in ("operation", "job", "printer", "unsupported") :
54             attrdict = getattr(message, "%s_attributes" % attrtype)
55             if attrdict :
56                 print "%s attributes :" % attrtype.title()
57                 for key in attrdict.keys() :
58                     print "  %s : %s" % (key, attrdict[key])
59    """
60    attributes_types = ("operation", "job", "printer", "unsupported")
61    def __init__(self, data="", version=None, operation_id=None, request_id=None, debug=0) :
62        """Initializes an IPP Message object.
63       
64           Parameters :
65           
66             data : the IPP Message's content.
67             debug : a boolean value to output debug info on stderr.
68        """
69        self.debug = debug
70        self.data = data
71        self.parsed = 0
72       
73        self.version = version
74        self.operation_id = operation_id
75        self.request_id = request_id
76       
77        for attrtype in self.attributes_types :
78            setattr(self, "%s_attributes" % attrtype, {})
79        self.tags = [ None ] * 256      # by default all tags reserved
80       
81        # Delimiter tags
82        self.tags[0x01] = "operation-attributes-tag"
83        self.tags[0x02] = "job-attributes-tag"
84        self.tags[0x03] = "end-of-attributes-tag"
85        self.tags[0x04] = "printer-attributes-tag"
86        self.tags[0x05] = "unsupported-attributes-tag"
87       
88        # out of band values
89        self.tags[0x10] = "unsupported"
90        self.tags[0x11] = "reserved-for-future-default"
91        self.tags[0x12] = "unknown"
92        self.tags[0x13] = "no-value"
93       
94        # integer values
95        self.tags[0x20] = "generic-integer"
96        self.tags[0x21] = "integer"
97        self.tags[0x22] = "boolean"
98        self.tags[0x23] = "enum"
99       
100        # octetString
101        self.tags[0x30] = "octetString-with-an-unspecified-format"
102        self.tags[0x31] = "dateTime"
103        self.tags[0x32] = "resolution"
104        self.tags[0x33] = "rangeOfInteger"
105        self.tags[0x34] = "reserved-for-collection"
106        self.tags[0x35] = "textWithLanguage"
107        self.tags[0x36] = "nameWithLanguage"
108       
109        # character strings
110        self.tags[0x20] = "generic-character-string"
111        self.tags[0x41] = "textWithoutLanguage"
112        self.tags[0x42] = "nameWithoutLanguage"
113        self.tags[0x43] = "reserved"
114        self.tags[0x44] = "keyword"
115        self.tags[0x45] = "uri"
116        self.tags[0x46] = "uriScheme"
117        self.tags[0x47] = "charset"
118        self.tags[0x48] = "naturalLanguage"
119        self.tags[0x49] = "mimeMediaType"
120       
121        # Reverse mapping to generate IPP messages
122        self.dictags = {}
123        for i in range(len(self.tags)) :
124            value = self.tags[i]
125            if value is not None :
126                self.dictags[value] = i
127       
128    def printInfo(self, msg) :   
129        """Prints a debug message."""
130        if self.debug :
131            sys.stderr.write("%s\n" % msg)
132            sys.stderr.flush()
133           
134    def __str__(self) :       
135        """Returns the parsed IPP message in a readable form."""
136        if not self.parsed :
137            return ""
138        else :   
139            buffer = []
140            buffer.append("IPP version : %s" % self.version)
141            buffer.append("IPP operation Id : %s" % self.operation_id)
142            buffer.append("IPP request Id : %s" % self.request_id)
143            for attrtype in self.attributes_types :
144                attrdict = getattr(self, "%s_attributes" % attrtype)
145                if attrdict :
146                    buffer.append("%s attributes :" % attrtype.title())
147                    for key in attrdict.keys() :
148                        buffer.append("  %s : %s" % (key, attrdict[key]))
149            return "\n".join(buffer)
150       
151    def dump(self) :   
152        """Generates an IPP Message.
153       
154           Returns the message as a string of text.
155        """   
156        buffer = []
157        if None not in (self.version, self.operation_id, self.request_id) :
158            version = [int(p) for p in self.version.split('.')]
159            buffer.append(chr(version[0]) + chr(version[1]))
160            buffer.append(pack(">H", int(self.operation_id, 16)))
161            buffer.append(pack(">I", int(self.request_id, 16)))
162            for attrtype in self.attributes_types :
163                tagprinted = 0
164                for (attrname, value) in getattr(self, "%s_attributes" % attrtype).items() :
165                    if not tagprinted :
166                        buffer.append(chr(self.dictags["%s-attributes-tag" % attrtype]))
167                        tagprinted = 1
168                    if type(value) != type([]) :
169                        value = [ value ]
170                    for (vtype, val) in value :
171                        buffer.append(chr(self.dictags[vtype]))
172                        buffer.append(pack(">H", len(attrname)))
173                        buffer.append(attrname)
174                        if vtype in ("integer", "enum") :
175                            buffer.append(pack(">H", 4))
176                            buffer.append(pack(">I", val))
177                        elif vtype == "boolean" :
178                            buffer.append(pack(">H", 1))
179                            buffer.append(chr(val))
180                        else :   
181                            buffer.append(pack(">H", len(val)))
182                            buffer.append(val)
183            buffer.append(chr(self.dictags["end-of-attributes-tag"]))
184        return "".join(buffer)
185       
186    def parse(self) :
187        """Parses an IPP Message.
188       
189           NB : Only a subset of RFC2910 is implemented.
190        """
191        self._curname = None
192        self._curdict = None
193        self.version = "%s.%s" % (ord(self.data[0]), ord(self.data[1]))
194        self.operation_id = "0x%04x" % unpack(">H", self.data[2:4])[0]
195        self.request_id = "0x%08x" % unpack(">I", self.data[4:8])[0]
196        self.position = 8
197        endofattributes = self.dictags["end-of-attributes-tag"]
198        unsupportedattributes = self.dictags["unsupported-attributes-tag"]
199        try :
200            tag = ord(self.data[self.position])
201            while tag != endofattributes :
202                self.position += 1
203                name = self.tags[tag]
204                if name is not None :
205                    func = getattr(self, name.replace("-", "_"), None)
206                    if func is not None :
207                        self.position += func()
208                        if ord(self.data[self.position]) > unsupportedattributes :
209                            self.position -= 1
210                            continue
211                tag = ord(self.data[self.position])
212        except IndexError :
213            raise IPPError, "Unexpected end of IPP message."
214           
215        # Now transform all one-element lists into single values
216        for attrtype in self.attributes_types :
217            attrdict = getattr(self, "%s_attributes" % attrtype)
218            for (key, value) in attrdict.items() :
219                if len(value) == 1 :
220                    attrdict[key] = value[0]
221        self.parsed = 1           
222       
223    def parseTag(self) :   
224        """Extracts information from an IPP tag."""
225        pos = self.position
226        tagtype = self.tags[ord(self.data[pos])]
227        pos += 1
228        posend = pos2 = pos + 2
229        namelength = unpack(">H", self.data[pos:pos2])[0]
230        if not namelength :
231            name = self._curname
232        else :   
233            posend += namelength
234            self._curname = name = self.data[pos2:posend]
235        pos2 = posend + 2
236        valuelength = unpack(">H", self.data[posend:pos2])[0]
237        posend = pos2 + valuelength
238        value = self.data[pos2:posend]
239        if tagtype in ("integer", "enum") :
240            value = unpack(">I", value)[0]
241        elif tagtype == "boolean" :   
242            value = ord(value)
243        oldval = self._curdict.setdefault(name, [])
244        oldval.append((tagtype, value))
245        self.printInfo("%s(%s) : %s" % (name, tagtype, value))
246        return posend - self.position
247       
248    def operation_attributes_tag(self) : 
249        """Indicates that the parser enters into an operation-attributes-tag group."""
250        self.printInfo("Start of operation_attributes_tag")
251        self._curdict = self.operation_attributes
252        return self.parseTag()
253       
254    def job_attributes_tag(self) : 
255        """Indicates that the parser enters into a job-attributes-tag group."""
256        self.printInfo("Start of job_attributes_tag")
257        self._curdict = self.job_attributes
258        return self.parseTag()
259       
260    def printer_attributes_tag(self) : 
261        """Indicates that the parser enters into a printer-attributes-tag group."""
262        self.printInfo("Start of printer_attributes_tag")
263        self._curdict = self.printer_attributes
264        return self.parseTag()
265       
266    def unsupported_attributes_tag(self) : 
267        """Indicates that the parser enters into an unsupported-attributes-tag group."""
268        self.printInfo("Start of unsupported_attributes_tag")
269        self._curdict = self.unsupported_attributes
270        return self.parseTag()
271           
272if __name__ == "__main__" :           
273    if (len(sys.argv) < 2) or (sys.argv[1] == "--debug") :
274        print "usage : python ipp.py /var/spool/cups/c00005 [--debug] (for example)\n"
275    else :   
276        infile = open(sys.argv[1], "rb")
277        message = IPPMessage(infile.read(), debug=(sys.argv[-1]=="--debug"))
278        infile.close()
279        message.parse()
280       
281        message2 = IPPMessage(message.dump())
282        message2.parse()
283       
284        # We now compare the message parsed, and the output
285        # of the parsing of the dumped message which was parsed.
286        # We sort the results for the test, because the attributes ordering
287        # may vary since Python dictionnaries are not sorted.
288        strmessage = str(message)
289        strmessage2 = str(message2)
290        lstmessage = strmessage.split("\n")
291        lstmessage.sort()
292        lstmessage2 = strmessage2.split("\n")
293        lstmessage2.sort()
294        if lstmessage == lstmessage2 :
295            print "Test OK : parsing original and parsing the output of the dump produce the same result !"
296            print strmessage
297        else :   
298            print "Test Failed !"
299            print strmessage
300            print
301            print strmessage2
Note: See TracBrowser for help on using the browser.