root / pykota / branches / 1.26_fixes / pykota / ipp.py @ 3525

Revision 3184, 13.8 kB (checked in by jerome, 17 years ago)

Added some docstrings.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1#! /usr/bin/env python
2# -*- coding: ISO-8859-15 -*-
3#
4# PyKota
5#
6# PyKota : Print Quotas for CUPS and LPRng
7#
8# (c) 2003, 2004, 2005, 2006, 2007 Jerome Alet <alet@librelogiciel.com>
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program; if not, write to the Free Software
21# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
22#
23# $Id$
24#
25#
26
27"""This module provides basic IPP request parsing facilities."""
28
29import sys
30from struct import pack, unpack
31
32class IPPError(Exception):
33    """An exception for IPP related stuff."""
34    def __init__(self, message = ""):
35        self.message = message
36        Exception.__init__(self, message)
37    def __repr__(self):
38        return self.message
39    __str__ = __repr__
40
41class IPPRequest :
42    """A class for IPP requests.
43   
44       Usage :
45       
46         fp = open("/var/spool/cups/c00001", "rb")
47         message = IPPRequest(fp.read())
48         fp.close()
49         message.parse()
50         # print message.dump() # dumps an equivalent to the original IPP message
51         # print str(message)   # returns a string of text with the same content as below
52         print "IPP version : %s.%s" % message.version
53         print "IPP operation Id : 0x%04x" % message.operation_id
54         print "IPP request Id : 0x%08x" % message.request_id
55         for attrtype in message.attributes_types :
56             attrdict = getattr(message, "%s_attributes" % attrtype)
57             if attrdict :
58                 print "%s attributes :" % attrtype.title()
59                 for key in attrdict.keys() :
60                     print "  %s : %s" % (key, attrdict[key])
61         if message.data :           
62             print "IPP datas : ", repr(message.data)           
63    """
64    attributes_types = ("operation", "job", "printer", "unsupported", \
65                                     "subscription", "event_notification")
66    def __init__(self, data="", version=None, operation_id=None, \
67                                              request_id=None, debug=0) :
68        """Initializes an IPP Message object.
69       
70           Parameters :
71           
72             data : the complete IPP Message's content.
73             debug : a boolean value to output debug info on stderr.
74        """
75        self.debug = debug
76        self._data = data
77        self.parsed = 0
78       
79        # Initializes message
80        if version is not None :
81            try :
82                self.version = [int(p) for p in version.split(".")]
83            except AttributeError :
84                if len(version) == 2 : # 2-tuple
85                    self.version = version
86                else :   
87                    try :
88                        self.version = [int(p) for p in str(float(version)).split(".")]
89                    except :
90                        self.version = (1, 1) # default version number
91        self.operation_id = operation_id
92        self.request_id = request_id
93        self.data = ""
94       
95        # Initialize attributes mappings
96        for attrtype in self.attributes_types :
97            setattr(self, "%s_attributes" % attrtype, {})
98           
99        # Initialize tags   
100        self.tags = [ None ] * 256 # by default all tags reserved
101       
102        # Delimiter tags
103        self.tags[0x01] = "operation-attributes-tag"
104        self.tags[0x02] = "job-attributes-tag"
105        self.tags[0x03] = "end-of-attributes-tag"
106        self.tags[0x04] = "printer-attributes-tag"
107        self.tags[0x05] = "unsupported-attributes-tag"
108        self.tags[0x06] = "subscription-attributes-tag"
109        self.tags[0x07] = "event_notification-attributes-tag"
110       
111        # out of band values
112        self.tags[0x10] = "unsupported"
113        self.tags[0x11] = "reserved-for-future-default"
114        self.tags[0x12] = "unknown"
115        self.tags[0x13] = "no-value"
116        self.tags[0x15] = "not-settable"
117        self.tags[0x16] = "delete-attribute"
118        self.tags[0x17] = "admin-define"
119 
120        # integer values
121        self.tags[0x20] = "generic-integer"
122        self.tags[0x21] = "integer"
123        self.tags[0x22] = "boolean"
124        self.tags[0x23] = "enum"
125       
126        # octetString
127        self.tags[0x30] = "octetString-with-an-unspecified-format"
128        self.tags[0x31] = "dateTime"
129        self.tags[0x32] = "resolution"
130        self.tags[0x33] = "rangeOfInteger"
131        self.tags[0x34] = "begCollection" # TODO : find sample files for testing
132        self.tags[0x35] = "textWithLanguage"
133        self.tags[0x36] = "nameWithLanguage"
134        self.tags[0x37] = "endCollection"
135       
136        # character strings
137        self.tags[0x40] = "generic-character-string"
138        self.tags[0x41] = "textWithoutLanguage"
139        self.tags[0x42] = "nameWithoutLanguage"
140        self.tags[0x44] = "keyword"
141        self.tags[0x45] = "uri"
142        self.tags[0x46] = "uriScheme"
143        self.tags[0x47] = "charset"
144        self.tags[0x48] = "naturalLanguage"
145        self.tags[0x49] = "mimeMediaType"
146        self.tags[0x4a] = "memberAttrName"
147       
148        # Reverse mapping to generate IPP messages
149        self.dictags = {}
150        for i in range(len(self.tags)) :
151            value = self.tags[i]
152            if value is not None :
153                self.dictags[value] = i
154       
155    def logdebug(self, msg) :   
156        """Prints a debug message."""
157        if self.debug :
158            sys.stderr.write("%s\n" % msg)
159            sys.stderr.flush()
160           
161    def __str__(self) :       
162        """Returns the parsed IPP message in a readable form."""
163        if not self.parsed :
164            return ""
165        mybuffer = []
166        mybuffer.append("IPP version : %s.%s" % self.version)
167        mybuffer.append("IPP operation Id : 0x%04x" % self.operation_id)
168        mybuffer.append("IPP request Id : 0x%08x" % self.request_id)
169        for attrtype in self.attributes_types :
170            attrdict = getattr(self, "%s_attributes" % attrtype)
171            if attrdict :
172                mybuffer.append("%s attributes :" % attrtype.title())
173                for key in attrdict.keys() :
174                    mybuffer.append("  %s : %s" % (key, attrdict[key]))
175        if self.data :           
176            mybuffer.append("IPP datas : %s" % repr(self.data))
177        return "\n".join(mybuffer)
178       
179    def dump(self) :   
180        """Generates an IPP Message.
181       
182           Returns the message as a string of text.
183        """   
184        mybuffer = []
185        if None not in (self.version, self.operation_id, self.request_id) :
186            mybuffer.append(chr(self.version[0]) + chr(self.version[1]))
187            mybuffer.append(pack(">H", self.operation_id))
188            mybuffer.append(pack(">I", self.request_id))
189            for attrtype in self.attributes_types :
190                tagprinted = 0
191                for (attrname, value) in getattr(self, "%s_attributes" % attrtype).items() :
192                    if not tagprinted :
193                        mybuffer.append(chr(self.dictags["%s-attributes-tag" % attrtype]))
194                        tagprinted = 1
195                    if type(value) != type([]) :
196                        value = [ value ]
197                    for (vtype, val) in value :
198                        mybuffer.append(chr(self.dictags[vtype]))
199                        mybuffer.append(pack(">H", len(attrname)))
200                        mybuffer.append(attrname)
201                        if vtype in ("integer", "enum") :
202                            mybuffer.append(pack(">H", 4))
203                            mybuffer.append(pack(">I", val))
204                        elif vtype == "boolean" :
205                            mybuffer.append(pack(">H", 1))
206                            mybuffer.append(chr(val))
207                        else :   
208                            mybuffer.append(pack(">H", len(val)))
209                            mybuffer.append(val)
210            mybuffer.append(chr(self.dictags["end-of-attributes-tag"]))
211        mybuffer.append(self.data)   
212        return "".join(mybuffer)
213           
214    def parse(self) :
215        """Parses an IPP Request.
216       
217           NB : Only a subset of RFC2910 is implemented.
218        """
219        self._curname = None
220        self._curdict = None
221        self.version = (ord(self._data[0]), ord(self._data[1]))
222        self.operation_id = unpack(">H", self._data[2:4])[0]
223        self.request_id = unpack(">I", self._data[4:8])[0]
224        self.position = 8
225        endofattributes = self.dictags["end-of-attributes-tag"]
226        maxdelimiter = self.dictags["event_notification-attributes-tag"]
227        try :
228            tag = ord(self._data[self.position])
229            while tag != endofattributes :
230                self.position += 1
231                name = self.tags[tag]
232                if name is not None :
233                    func = getattr(self, name.replace("-", "_"), None)
234                    if func is not None :
235                        self.position += func()
236                        if ord(self._data[self.position]) > maxdelimiter :
237                            self.position -= 1
238                            continue
239                tag = ord(self._data[self.position])
240        except IndexError :
241            raise IPPError, "Unexpected end of IPP message."
242           
243        # Now transform all one-element lists into single values
244        for attrtype in self.attributes_types :
245            attrdict = getattr(self, "%s_attributes" % attrtype)
246            for (key, value) in attrdict.items() :
247                if len(value) == 1 :
248                    attrdict[key] = value[0]
249        self.data = self._data[self.position+1:]           
250        self.parsed = 1           
251       
252    def parseTag(self) :   
253        """Extracts information from an IPP tag."""
254        pos = self.position
255        tagtype = self.tags[ord(self._data[pos])]
256        pos += 1
257        posend = pos2 = pos + 2
258        namelength = unpack(">H", self._data[pos:pos2])[0]
259        if not namelength :
260            name = self._curname
261        else :   
262            posend += namelength
263            self._curname = name = self._data[pos2:posend]
264        pos2 = posend + 2
265        valuelength = unpack(">H", self._data[posend:pos2])[0]
266        posend = pos2 + valuelength
267        value = self._data[pos2:posend]
268        if tagtype in ("integer", "enum") :
269            value = unpack(">I", value)[0]
270        elif tagtype == "boolean" :   
271            value = ord(value)
272        oldval = self._curdict.setdefault(name, [])
273        oldval.append((tagtype, value))
274        self.logdebug("%s(%s) : %s" % (name, tagtype, value))
275        return posend - self.position
276       
277    def operation_attributes_tag(self) : 
278        """Indicates that the parser enters into an operation-attributes-tag group."""
279        self.logdebug("Start of operation_attributes_tag")
280        self._curdict = self.operation_attributes
281        return self.parseTag()
282       
283    def job_attributes_tag(self) : 
284        """Indicates that the parser enters into a job-attributes-tag group."""
285        self.logdebug("Start of job_attributes_tag")
286        self._curdict = self.job_attributes
287        return self.parseTag()
288       
289    def printer_attributes_tag(self) : 
290        """Indicates that the parser enters into a printer-attributes-tag group."""
291        self.logdebug("Start of printer_attributes_tag")
292        self._curdict = self.printer_attributes
293        return self.parseTag()
294       
295    def unsupported_attributes_tag(self) : 
296        """Indicates that the parser enters into an unsupported-attributes-tag group."""
297        self.logdebug("Start of unsupported_attributes_tag")
298        self._curdict = self.unsupported_attributes
299        return self.parseTag()
300       
301    def subscription_attributes_tag(self) : 
302        """Indicates that the parser enters into a subscription-attributes-tag group."""
303        self.logdebug("Start of subscription_attributes_tag")
304        self._curdict = self.subscription_attributes
305        return self.parseTag()
306       
307    def event_notification_attributes_tag(self) : 
308        """Indicates that the parser enters into an event-notification-attributes-tag group."""
309        self.logdebug("Start of event_notification_attributes_tag")
310        self._curdict = self.event_notification_attributes
311        return self.parseTag()
312           
313if __name__ == "__main__" :           
314    if (len(sys.argv) < 2) or (sys.argv[1] == "--debug") :
315        print "usage : python ipp.py /var/spool/cups/c00005 [--debug] (for example)\n"
316    else :   
317        infile = open(sys.argv[1], "rb")
318        data = infile.read()
319        message = IPPRequest(data, debug=(sys.argv[-1]=="--debug"))
320        infile.close()
321        message.parse()
322       
323        message2 = IPPRequest(message.dump())
324        message2.parse()
325       
326        # We now compare the message parsed, and the output
327        # of the parsing of the dumped message which was parsed.
328        # We sort the results for the test, because the attributes ordering
329        # may vary since Python dictionnaries are not sorted.
330        strmessage = str(message)
331        strmessage2 = str(message2)
332        lstmessage = strmessage.split("\n")
333        lstmessage.sort()
334        lstmessage2 = strmessage2.split("\n")
335        lstmessage2.sort()
336        if lstmessage == lstmessage2 :
337            print "Test OK : parsing original and parsing the output of the dump produce the same result !"
338            print strmessage
339        else :   
340            print "Test Failed !"
341            print strmessage
342            print
343            print strmessage2
Note: See TracBrowser for help on using the browser.