root / pykota / trunk / pykota / ipp.py @ 2956

Revision 2956, 13.7 kB (checked in by jerome, 16 years ago)

Fixed parser to correctly handle event notification attributes.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1#! /usr/bin/env python
2# -*- coding: ISO-8859-15 -*-
3#
4# PyKota
5#
6# PyKota : Print Quotas for CUPS and LPRng
7#
8# (c) 2003, 2004, 2005, 2006 Jerome Alet <alet@librelogiciel.com>
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program; if not, write to the Free Software
21# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
22#
23# $Id$
24#
25#
26
27import sys
28from struct import pack, unpack
29
30class IPPError(Exception):
31    """An exception for IPP related stuff."""
32    def __init__(self, message = ""):
33        self.message = message
34        Exception.__init__(self, message)
35    def __repr__(self):
36        return self.message
37    __str__ = __repr__
38
39class IPPRequest :
40    """A class for IPP requests.
41   
42       Usage :
43       
44         fp = open("/var/spool/cups/c00001", "rb")
45         message = IPPRequest(fp.read())
46         fp.close()
47         message.parse()
48         # print message.dump() # dumps an equivalent to the original IPP message
49         # print str(message)   # returns a string of text with the same content as below
50         print "IPP version : %s.%s" % message.version
51         print "IPP operation Id : 0x%04x" % message.operation_id
52         print "IPP request Id : 0x%08x" % message.request_id
53         for attrtype in message.attributes_types :
54             attrdict = getattr(message, "%s_attributes" % attrtype)
55             if attrdict :
56                 print "%s attributes :" % attrtype.title()
57                 for key in attrdict.keys() :
58                     print "  %s : %s" % (key, attrdict[key])
59         if message.data :           
60             print "IPP datas : ", repr(message.data)           
61    """
62    attributes_types = ("operation", "job", "printer", "unsupported", \
63                                     "subscription", "event_notification")
64    def __init__(self, data="", version=None, operation_id=None, \
65                                              request_id=None, debug=0) :
66        """Initializes an IPP Message object.
67       
68           Parameters :
69           
70             data : the complete IPP Message's content.
71             debug : a boolean value to output debug info on stderr.
72        """
73        self.debug = debug
74        self._data = data
75        self.parsed = 0
76       
77        # Initializes message
78        if version is not None :
79            try :
80                self.version = [int(p) for p in version.split(".")]
81            except AttributeError :
82                if len(version) == 2 : # 2-tuple
83                    self.version = version
84                else :   
85                    try :
86                        self.version = [int(p) for p in str(float(version)).split(".")]
87                    except :
88                        self.version = (1, 1) # default version number
89        self.operation_id = operation_id
90        self.request_id = request_id
91        self.data = ""
92       
93        # Initialize attributes mappings
94        for attrtype in self.attributes_types :
95            setattr(self, "%s_attributes" % attrtype, {})
96           
97        # Initialize tags   
98        self.tags = [ None ] * 256 # by default all tags reserved
99       
100        # Delimiter tags
101        self.tags[0x01] = "operation-attributes-tag"
102        self.tags[0x02] = "job-attributes-tag"
103        self.tags[0x03] = "end-of-attributes-tag"
104        self.tags[0x04] = "printer-attributes-tag"
105        self.tags[0x05] = "unsupported-attributes-tag"
106        self.tags[0x06] = "subscription-attributes-tag"
107        self.tags[0x07] = "event_notification-attributes-tag"
108       
109        # out of band values
110        self.tags[0x10] = "unsupported"
111        self.tags[0x11] = "reserved-for-future-default"
112        self.tags[0x12] = "unknown"
113        self.tags[0x13] = "no-value"
114        self.tags[0x15] = "not-settable"
115        self.tags[0x16] = "delete-attribute"
116        self.tags[0x17] = "admin-define"
117 
118        # integer values
119        self.tags[0x20] = "generic-integer"
120        self.tags[0x21] = "integer"
121        self.tags[0x22] = "boolean"
122        self.tags[0x23] = "enum"
123       
124        # octetString
125        self.tags[0x30] = "octetString-with-an-unspecified-format"
126        self.tags[0x31] = "dateTime"
127        self.tags[0x32] = "resolution"
128        self.tags[0x33] = "rangeOfInteger"
129        self.tags[0x34] = "begCollection" # TODO : find sample files for testing
130        self.tags[0x35] = "textWithLanguage"
131        self.tags[0x36] = "nameWithLanguage"
132        self.tags[0x37] = "endCollection"
133       
134        # character strings
135        self.tags[0x40] = "generic-character-string"
136        self.tags[0x41] = "textWithoutLanguage"
137        self.tags[0x42] = "nameWithoutLanguage"
138        self.tags[0x44] = "keyword"
139        self.tags[0x45] = "uri"
140        self.tags[0x46] = "uriScheme"
141        self.tags[0x47] = "charset"
142        self.tags[0x48] = "naturalLanguage"
143        self.tags[0x49] = "mimeMediaType"
144        self.tags[0x4a] = "memberAttrName"
145       
146        # Reverse mapping to generate IPP messages
147        self.dictags = {}
148        for i in range(len(self.tags)) :
149            value = self.tags[i]
150            if value is not None :
151                self.dictags[value] = i
152       
153    def logdebug(self, msg) :   
154        """Prints a debug message."""
155        if self.debug :
156            sys.stderr.write("%s\n" % msg)
157            sys.stderr.flush()
158           
159    def __str__(self) :       
160        """Returns the parsed IPP message in a readable form."""
161        if not self.parsed :
162            return ""
163        mybuffer = []
164        mybuffer.append("IPP version : %s.%s" % self.version)
165        mybuffer.append("IPP operation Id : 0x%04x" % self.operation_id)
166        mybuffer.append("IPP request Id : 0x%08x" % self.request_id)
167        for attrtype in self.attributes_types :
168            attrdict = getattr(self, "%s_attributes" % attrtype)
169            if attrdict :
170                mybuffer.append("%s attributes :" % attrtype.title())
171                for key in attrdict.keys() :
172                    mybuffer.append("  %s : %s" % (key, attrdict[key]))
173        if self.data :           
174            mybuffer.append("IPP datas : %s" % repr(self.data))
175        return "\n".join(mybuffer)
176       
177    def dump(self) :   
178        """Generates an IPP Message.
179       
180           Returns the message as a string of text.
181        """   
182        mybuffer = []
183        if None not in (self.version, self.operation_id, self.request_id) :
184            mybuffer.append(chr(self.version[0]) + chr(self.version[1]))
185            mybuffer.append(pack(">H", self.operation_id))
186            mybuffer.append(pack(">I", self.request_id))
187            for attrtype in self.attributes_types :
188                tagprinted = 0
189                for (attrname, value) in getattr(self, "%s_attributes" % attrtype).items() :
190                    if not tagprinted :
191                        mybuffer.append(chr(self.dictags["%s-attributes-tag" % attrtype]))
192                        tagprinted = 1
193                    if type(value) != type([]) :
194                        value = [ value ]
195                    for (vtype, val) in value :
196                        mybuffer.append(chr(self.dictags[vtype]))
197                        mybuffer.append(pack(">H", len(attrname)))
198                        mybuffer.append(attrname)
199                        if vtype in ("integer", "enum") :
200                            mybuffer.append(pack(">H", 4))
201                            mybuffer.append(pack(">I", val))
202                        elif vtype == "boolean" :
203                            mybuffer.append(pack(">H", 1))
204                            mybuffer.append(chr(val))
205                        else :   
206                            mybuffer.append(pack(">H", len(val)))
207                            mybuffer.append(val)
208            mybuffer.append(chr(self.dictags["end-of-attributes-tag"]))
209        mybuffer.append(self.data)   
210        return "".join(mybuffer)
211           
212    def parse(self) :
213        """Parses an IPP Request.
214       
215           NB : Only a subset of RFC2910 is implemented.
216        """
217        self._curname = None
218        self._curdict = None
219        self.version = (ord(self._data[0]), ord(self._data[1]))
220        self.operation_id = unpack(">H", self._data[2:4])[0]
221        self.request_id = unpack(">I", self._data[4:8])[0]
222        self.position = 8
223        endofattributes = self.dictags["end-of-attributes-tag"]
224        maxdelimiter = self.dictags["event_notification-attributes-tag"]
225        try :
226            tag = ord(self._data[self.position])
227            while tag != endofattributes :
228                self.position += 1
229                name = self.tags[tag]
230                if name is not None :
231                    func = getattr(self, name.replace("-", "_"), None)
232                    if func is not None :
233                        self.position += func()
234                        if ord(self._data[self.position]) > maxdelimiter :
235                            self.position -= 1
236                            continue
237                tag = ord(self._data[self.position])
238        except IndexError :
239            raise IPPError, "Unexpected end of IPP message."
240           
241        # Now transform all one-element lists into single values
242        for attrtype in self.attributes_types :
243            attrdict = getattr(self, "%s_attributes" % attrtype)
244            for (key, value) in attrdict.items() :
245                if len(value) == 1 :
246                    attrdict[key] = value[0]
247        self.data = self._data[self.position+1:]           
248        self.parsed = 1           
249       
250    def parseTag(self) :   
251        """Extracts information from an IPP tag."""
252        pos = self.position
253        tagtype = self.tags[ord(self._data[pos])]
254        pos += 1
255        posend = pos2 = pos + 2
256        namelength = unpack(">H", self._data[pos:pos2])[0]
257        if not namelength :
258            name = self._curname
259        else :   
260            posend += namelength
261            self._curname = name = self._data[pos2:posend]
262        pos2 = posend + 2
263        valuelength = unpack(">H", self._data[posend:pos2])[0]
264        posend = pos2 + valuelength
265        value = self._data[pos2:posend]
266        if tagtype in ("integer", "enum") :
267            value = unpack(">I", value)[0]
268        elif tagtype == "boolean" :   
269            value = ord(value)
270        oldval = self._curdict.setdefault(name, [])
271        oldval.append((tagtype, value))
272        self.logdebug("%s(%s) : %s" % (name, tagtype, value))
273        return posend - self.position
274       
275    def operation_attributes_tag(self) : 
276        """Indicates that the parser enters into an operation-attributes-tag group."""
277        self.logdebug("Start of operation_attributes_tag")
278        self._curdict = self.operation_attributes
279        return self.parseTag()
280       
281    def job_attributes_tag(self) : 
282        """Indicates that the parser enters into a job-attributes-tag group."""
283        self.logdebug("Start of job_attributes_tag")
284        self._curdict = self.job_attributes
285        return self.parseTag()
286       
287    def printer_attributes_tag(self) : 
288        """Indicates that the parser enters into a printer-attributes-tag group."""
289        self.logdebug("Start of printer_attributes_tag")
290        self._curdict = self.printer_attributes
291        return self.parseTag()
292       
293    def unsupported_attributes_tag(self) : 
294        """Indicates that the parser enters into an unsupported-attributes-tag group."""
295        self.logdebug("Start of unsupported_attributes_tag")
296        self._curdict = self.unsupported_attributes
297        return self.parseTag()
298       
299    def subscription_attributes_tag(self) : 
300        """Indicates that the parser enters into a subscription-attributes-tag group."""
301        self.logdebug("Start of subscription_attributes_tag")
302        self._curdict = self.subscription_attributes
303        return self.parseTag()
304       
305    def event_notification_attributes_tag(self) : 
306        """Indicates that the parser enters into an event-notification-attributes-tag group."""
307        self.logdebug("Start of event_notification_attributes_tag")
308        self._curdict = self.event_notification_attributes
309        return self.parseTag()
310           
311if __name__ == "__main__" :           
312    if (len(sys.argv) < 2) or (sys.argv[1] == "--debug") :
313        print "usage : python ipp.py /var/spool/cups/c00005 [--debug] (for example)\n"
314    else :   
315        infile = open(sys.argv[1], "rb")
316        data = infile.read()
317        message = IPPRequest(data, debug=(sys.argv[-1]=="--debug"))
318        infile.close()
319        message.parse()
320       
321        message2 = IPPRequest(message.dump())
322        message2.parse()
323       
324        # We now compare the message parsed, and the output
325        # of the parsing of the dumped message which was parsed.
326        # We sort the results for the test, because the attributes ordering
327        # may vary since Python dictionnaries are not sorted.
328        strmessage = str(message)
329        strmessage2 = str(message2)
330        lstmessage = strmessage.split("\n")
331        lstmessage.sort()
332        lstmessage2 = strmessage2.split("\n")
333        lstmessage2.sort()
334        if lstmessage == lstmessage2 :
335            print "Test OK : parsing original and parsing the output of the dump produce the same result !"
336            print strmessage
337        else :   
338            print "Test Failed !"
339            print strmessage
340            print
341            print strmessage2
Note: See TracBrowser for help on using the browser.