root / pykota / trunk / pykota / ipp.py @ 2313

Revision 2313, 13.8 kB (checked in by jerome, 19 years ago)

Improved IPP parser.
Severity : Jamuel, you don't even need to give it a look ;-)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1#! /usr/bin/env python
2# -*- coding: ISO-8859-15 -*-
3#
4# PyKota
5#
6# PyKota : Print Quotas for CUPS and LPRng
7#
8# (c) 2003, 2004, 2005 Jerome Alet <alet@librelogiciel.com>
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program; if not, write to the Free Software
21# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
22#
23# $Id$
24#
25#
26
27import sys
28from struct import pack, unpack
29
30class IPPError(Exception):
31    """An exception for IPP related stuff."""
32    def __init__(self, message = ""):
33        self.message = message
34        Exception.__init__(self, message)
35    def __repr__(self):
36        return self.message
37    __str__ = __repr__
38
39class IPPRequest :
40    """A class for IPP requests.
41   
42       Usage :
43       
44         fp = open("/var/spool/cups/c00001", "rb")
45         message = IPPRequest(fp.read())
46         fp.close()
47         message.parse()
48         # print message.dump() # dumps an equivalent to the original IPP message
49         # print str(message)   # returns a string of text with the same content as below
50         print "IPP version : %s.%s" % message.version
51         print "IPP operation Id : 0x%04x" % message.operation_id
52         print "IPP request Id : 0x%08x" % message.request_id
53         for attrtype in message.attributes_types :
54             attrdict = getattr(message, "%s_attributes" % attrtype)
55             if attrdict :
56                 print "%s attributes :" % attrtype.title()
57                 for key in attrdict.keys() :
58                     print "  %s : %s" % (key, attrdict[key])
59         if message.data :           
60             print "IPP datas : ", repr(message.data)           
61    """
62    attributes_types = ("operation", "job", "printer", "unsupported", \
63                                     "subscription", "event_notification")
64    def __init__(self, data="", version=None, operation_id=None, \
65                                              request_id=None, debug=0) :
66        """Initializes an IPP Message object.
67       
68           Parameters :
69           
70             data : the complete IPP Message's content.
71             debug : a boolean value to output debug info on stderr.
72        """
73        self.debug = debug
74        self._data = data
75        self.parsed = 0
76       
77        # Initializes message
78        if version is not None :
79            try :
80                self.version = [int(p) for p in version.split(".")]
81            except AttributeError :
82                if len(version) == 2 : # 2-tuple
83                    self.version = version
84                else :   
85                    try :
86                        self.version = [int(p) for p in str(float(version)).split(".")]
87                    except :
88                        self.version = (1, 1) # default version number
89        self.operation_id = operation_id
90        self.request_id = request_id
91        self.data = ""
92       
93        # Initialize attributes mappings
94        for attrtype in self.attributes_types :
95            setattr(self, "%s_attributes" % attrtype, {})
96           
97        # Initialize tags   
98        self.tags = [ None ] * 256 # by default all tags reserved
99       
100        # Delimiter tags
101        self.tags[0x01] = "operation-attributes-tag"
102        self.tags[0x02] = "job-attributes-tag"
103        self.tags[0x03] = "end-of-attributes-tag"
104        self.tags[0x04] = "printer-attributes-tag"
105        self.tags[0x05] = "unsupported-attributes-tag"
106        self.tags[0x06] = "subscription-attributes-tag"
107        self.tags[0x07] = "event-notification-attributes-tag"
108       
109        # out of band values
110        self.tags[0x10] = "unsupported"
111        self.tags[0x11] = "reserved-for-future-default"
112        self.tags[0x12] = "unknown"
113        self.tags[0x13] = "no-value"
114        self.tags[0x15] = "not-settable"
115        self.tags[0x16] = "delete-attribute"
116        self.tags[0x17] = "admin-define"
117 
118        # integer values
119        self.tags[0x20] = "generic-integer"
120        self.tags[0x21] = "integer"
121        self.tags[0x22] = "boolean"
122        self.tags[0x23] = "enum"
123       
124        # octetString
125        self.tags[0x30] = "octetString-with-an-unspecified-format"
126        self.tags[0x31] = "dateTime"
127        self.tags[0x32] = "resolution"
128        self.tags[0x33] = "rangeOfInteger"
129        self.tags[0x34] = "begCollection" # TODO : find sample files for testing
130        self.tags[0x35] = "textWithLanguage"
131        self.tags[0x36] = "nameWithLanguage"
132        self.tags[0x37] = "endCollection"
133       
134        # character strings
135        self.tags[0x40] = "generic-character-string"
136        self.tags[0x41] = "textWithoutLanguage"
137        self.tags[0x42] = "nameWithoutLanguage"
138        self.tags[0x44] = "keyword"
139        self.tags[0x45] = "uri"
140        self.tags[0x46] = "uriScheme"
141        self.tags[0x47] = "charset"
142        self.tags[0x48] = "naturalLanguage"
143        self.tags[0x49] = "mimeMediaType"
144        self.tags[0x4a] = "memberAttrName"
145       
146        # Reverse mapping to generate IPP messages
147        self.dictags = {}
148        for i in range(len(self.tags)) :
149            value = self.tags[i]
150            if value is not None :
151                self.dictags[value] = i
152       
153    def printInfo(self, msg) :   
154        """Prints a debug message."""
155        if self.debug :
156            sys.stderr.write("%s\n" % msg)
157            sys.stderr.flush()
158           
159    def __str__(self) :       
160        """Returns the parsed IPP message in a readable form."""
161        if not self.parsed :
162            return ""
163        else :   
164            buffer = []
165            buffer.append("IPP version : %s.%s" % self.version)
166            buffer.append("IPP operation Id : 0x%04x" % self.operation_id)
167            buffer.append("IPP request Id : 0x%08x" % self.request_id)
168            for attrtype in self.attributes_types :
169                attrdict = getattr(self, "%s_attributes" % attrtype)
170                if attrdict :
171                    buffer.append("%s attributes :" % attrtype.title())
172                    for key in attrdict.keys() :
173                        buffer.append("  %s : %s" % (key, attrdict[key]))
174            if self.data :           
175                buffer.append("IPP datas : %s" % repr(message.data))
176            return "\n".join(buffer)
177       
178    def dump(self) :   
179        """Generates an IPP Message.
180       
181           Returns the message as a string of text.
182        """   
183        buffer = []
184        if None not in (self.version, self.operation_id, self.request_id) :
185            buffer.append(chr(self.version[0]) + chr(self.version[1]))
186            buffer.append(pack(">H", self.operation_id))
187            buffer.append(pack(">I", self.request_id))
188            for attrtype in self.attributes_types :
189                tagprinted = 0
190                for (attrname, value) in getattr(self, "%s_attributes" % attrtype).items() :
191                    if not tagprinted :
192                        buffer.append(chr(self.dictags["%s-attributes-tag" % attrtype]))
193                        tagprinted = 1
194                    if type(value) != type([]) :
195                        value = [ value ]
196                    for (vtype, val) in value :
197                        buffer.append(chr(self.dictags[vtype]))
198                        buffer.append(pack(">H", len(attrname)))
199                        buffer.append(attrname)
200                        if vtype in ("integer", "enum") :
201                            buffer.append(pack(">H", 4))
202                            buffer.append(pack(">I", val))
203                        elif vtype == "boolean" :
204                            buffer.append(pack(">H", 1))
205                            buffer.append(chr(val))
206                        else :   
207                            buffer.append(pack(">H", len(val)))
208                            buffer.append(val)
209            buffer.append(chr(self.dictags["end-of-attributes-tag"]))
210        buffer.append(self.data)   
211        return "".join(buffer)
212           
213    def parse(self) :
214        """Parses an IPP Request.
215       
216           NB : Only a subset of RFC2910 is implemented.
217        """
218        self._curname = None
219        self._curdict = None
220        self.version = (ord(self._data[0]), ord(self._data[1]))
221        self.operation_id = unpack(">H", self._data[2:4])[0]
222        self.request_id = unpack(">I", self._data[4:8])[0]
223        self.position = 8
224        endofattributes = self.dictags["end-of-attributes-tag"]
225        maxdelimiter = self.dictags["event-notification-attributes-tag"]
226        try :
227            tag = ord(self._data[self.position])
228            while tag != endofattributes :
229                self.position += 1
230                name = self.tags[tag]
231                if name is not None :
232                    func = getattr(self, name.replace("-", "_"), None)
233                    if func is not None :
234                        self.position += func()
235                        if ord(self._data[self.position]) > maxdelimiter :
236                            self.position -= 1
237                            continue
238                tag = ord(self._data[self.position])
239        except IndexError :
240            raise IPPError, "Unexpected end of IPP message."
241           
242        # Now transform all one-element lists into single values
243        for attrtype in self.attributes_types :
244            attrdict = getattr(self, "%s_attributes" % attrtype)
245            for (key, value) in attrdict.items() :
246                if len(value) == 1 :
247                    attrdict[key] = value[0]
248        self.data = self._data[self.position+1:]           
249        self.parsed = 1           
250       
251    def parseTag(self) :   
252        """Extracts information from an IPP tag."""
253        pos = self.position
254        tagtype = self.tags[ord(self._data[pos])]
255        pos += 1
256        posend = pos2 = pos + 2
257        namelength = unpack(">H", self._data[pos:pos2])[0]
258        if not namelength :
259            name = self._curname
260        else :   
261            posend += namelength
262            self._curname = name = self._data[pos2:posend]
263        pos2 = posend + 2
264        valuelength = unpack(">H", self._data[posend:pos2])[0]
265        posend = pos2 + valuelength
266        value = self._data[pos2:posend]
267        if tagtype in ("integer", "enum") :
268            value = unpack(">I", value)[0]
269        elif tagtype == "boolean" :   
270            value = ord(value)
271        oldval = self._curdict.setdefault(name, [])
272        oldval.append((tagtype, value))
273        self.printInfo("%s(%s) : %s" % (name, tagtype, value))
274        return posend - self.position
275       
276    def operation_attributes_tag(self) : 
277        """Indicates that the parser enters into an operation-attributes-tag group."""
278        self.printInfo("Start of operation_attributes_tag")
279        self._curdict = self.operation_attributes
280        return self.parseTag()
281       
282    def job_attributes_tag(self) : 
283        """Indicates that the parser enters into a job-attributes-tag group."""
284        self.printInfo("Start of job_attributes_tag")
285        self._curdict = self.job_attributes
286        return self.parseTag()
287       
288    def printer_attributes_tag(self) : 
289        """Indicates that the parser enters into a printer-attributes-tag group."""
290        self.printInfo("Start of printer_attributes_tag")
291        self._curdict = self.printer_attributes
292        return self.parseTag()
293       
294    def unsupported_attributes_tag(self) : 
295        """Indicates that the parser enters into an unsupported-attributes-tag group."""
296        self.printInfo("Start of unsupported_attributes_tag")
297        self._curdict = self.unsupported_attributes
298        return self.parseTag()
299       
300    def subscription_attributes_tag(self) : 
301        """Indicates that the parser enters into a subscription-attributes-tag group."""
302        self.printInfo("Start of subscription_attributes_tag")
303        self._curdict = self.subscription_attributes
304        return self.parseTag()
305       
306    def event_notification_attributes_tag(self) : 
307        """Indicates that the parser enters into an event-notification-attributes-tag group."""
308        self.printInfo("Start of event_notification_attributes_tag")
309        self._curdict = self.event_notification_attributes
310        return self.parseTag()
311           
312if __name__ == "__main__" :           
313    if (len(sys.argv) < 2) or (sys.argv[1] == "--debug") :
314        print "usage : python ipp.py /var/spool/cups/c00005 [--debug] (for example)\n"
315    else :   
316        infile = open(sys.argv[1], "rb")
317        data = infile.read()
318        message = IPPRequest(data, debug=(sys.argv[-1]=="--debug"))
319        infile.close()
320        message.parse()
321       
322        message2 = IPPRequest(message.dump())
323        message2.parse()
324       
325        # We now compare the message parsed, and the output
326        # of the parsing of the dumped message which was parsed.
327        # We sort the results for the test, because the attributes ordering
328        # may vary since Python dictionnaries are not sorted.
329        strmessage = str(message)
330        strmessage2 = str(message2)
331        lstmessage = strmessage.split("\n")
332        lstmessage.sort()
333        lstmessage2 = strmessage2.split("\n")
334        lstmessage2.sort()
335        if lstmessage == lstmessage2 :
336            print "Test OK : parsing original and parsing the output of the dump produce the same result !"
337            print strmessage
338        else :   
339            print "Test Failed !"
340            print strmessage
341            print
342            print strmessage2
Note: See TracBrowser for help on using the browser.