root / pykota / trunk / pykota / ipp.py @ 2272

Revision 2272, 8.5 kB (checked in by jerome, 19 years ago)

Unsupported attributes are now parsed correctly like the supported ones.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1#! /usr/bin/env python
2# -*- coding: ISO-8859-15 -*-
3#
4# PyKota
5#
6# PyKota : Print Quotas for CUPS and LPRng
7#
8# (c) 2003, 2004, 2005 Jerome Alet <alet@librelogiciel.com>
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program; if not, write to the Free Software
21# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
22#
23# $Id$
24#
25#
26
27import sys
28from struct import unpack
29
30OPERATION_ATTRIBUTES_TAG = 0x01
31JOB_ATTRIBUTES_TAG = 0x02
32END_OF_ATTRIBUTES_TAG = 0x03
33PRINTER_ATTRIBUTES_TAG = 0x04
34UNSUPPORTED_ATTRIBUTES_TAG = 0x05
35
36class IPPError(Exception):
37    """An exception for IPP related stuff."""
38    def __init__(self, message = ""):
39        self.message = message
40        Exception.__init__(self, message)
41    def __repr__(self):
42        return self.message
43    __str__ = __repr__
44
45attributes_types = ("operation", "job", "printer", "unsupported")
46
47class IPPMessage :
48    """A class for IPP message files.
49   
50       Usage :
51       
52         fp = open("/var/spool/cups/c00001", "rb")
53         message = IPPMessage(fp.read())
54         fp.close()
55         print "IPP version : %s" % message.version
56         print "IPP operation Id : %s" % message.operation_id
57         print "IPP request Id : %s" % message.request_id
58         for attrtype in ("operation", "job", "printer", "unsupported") :
59             attrdict = getattr(message, "%s_attributes" % attrtype)
60             if attrdict :
61                 print "%s attributes :" % attrtype.title()
62                 for key in attrdict.keys() :
63                     print "  %s : %s" % (key, attrdict[key])
64    """
65    def __init__(self, data, debug=0) :
66        """Initializes and parses IPP Message object.
67       
68           Parameters :
69           
70             data : the IPP Message's content.
71             debug : a boolean value to output debug info on stderr.
72        """
73        self.debug = debug
74        self.data = data
75        for attrtype in attributes_types :
76            setattr(self, "%s_attributes" % attrtype, {})
77        self.tags = [ None ] * 256      # by default all tags reserved
78       
79        # Delimiter tags
80        self.tags[OPERATION_ATTRIBUTES_TAG] = "operation-attributes-tag"
81        self.tags[JOB_ATTRIBUTES_TAG] = "job-attributes-tag"
82        self.tags[END_OF_ATTRIBUTES_TAG] = "end-of-attributes-tag"
83        self.tags[PRINTER_ATTRIBUTES_TAG] = "printer-attributes-tag"
84        self.tags[UNSUPPORTED_ATTRIBUTES_TAG] = "unsupported-attributes-tag"
85       
86        # out of band values
87        self.tags[0x10] = "unsupported"
88        self.tags[0x11] = "reserved-for-future-default"
89        self.tags[0x12] = "unknown"
90        self.tags[0x13] = "no-value"
91       
92        # integer values
93        self.tags[0x20] = "generic-integer"
94        self.tags[0x21] = "integer"
95        self.tags[0x22] = "boolean"
96        self.tags[0x23] = "enum"
97       
98        # octetString
99        self.tags[0x30] = "octetString-with-an-unspecified-format"
100        self.tags[0x31] = "dateTime"
101        self.tags[0x32] = "resolution"
102        self.tags[0x33] = "rangeOfInteger"
103        self.tags[0x34] = "reserved-for-collection"
104        self.tags[0x35] = "textWithLanguage"
105        self.tags[0x36] = "nameWithLanguage"
106       
107        # character strings
108        self.tags[0x20] = "generic-character-string"
109        self.tags[0x41] = "textWithoutLanguage"
110        self.tags[0x42] = "nameWithoutLanguage"
111        # self.tags[0x43] = "reserved"
112        self.tags[0x44] = "keyword"
113        self.tags[0x45] = "uri"
114        self.tags[0x46] = "uriScheme"
115        self.tags[0x47] = "charset"
116        self.tags[0x48] = "naturalLanguage"
117        self.tags[0x49] = "mimeMediaType"
118       
119        # now parses the IPP message
120        self.parse()
121       
122    def printInfo(self, msg) :   
123        """Prints a debug message."""
124        if self.debug :
125            sys.stderr.write("%s\n" % msg)
126            sys.stderr.flush()
127           
128    def parseTag(self) :   
129        """Extracts information from an IPP tag."""
130        pos = self.position
131        tagtype = self.tags[ord(self.data[pos])]
132        pos += 1
133        posend = pos2 = pos + 2
134        namelength = unpack(">H", self.data[pos:pos2])[0]
135        if not namelength :
136            name = self._curname
137        else :   
138            posend += namelength
139            self._curname = name = self.data[pos2:posend]
140        pos2 = posend + 2
141        valuelength = unpack(">H", self.data[posend:pos2])[0]
142        posend = pos2 + valuelength
143        value = self.data[pos2:posend]
144        if tagtype in ("integer", "enum") :
145            value = unpack(">I", value)[0]
146        oldval = self._curdict.setdefault(name, [])
147        oldval.append((tagtype, value))
148        self.printInfo("%s(%s) : %s" % (name, tagtype, value))
149        return posend - self.position
150       
151    def operation_attributes_tag(self) : 
152        """Indicates that the parser enters into an operation-attributes-tag group."""
153        self.printInfo("Start of operation_attributes_tag")
154        self._curdict = self.operation_attributes
155        return self.parseTag()
156       
157    def job_attributes_tag(self) : 
158        """Indicates that the parser enters into a job-attributes-tag group."""
159        self.printInfo("Start of job_attributes_tag")
160        self._curdict = self.job_attributes
161        return self.parseTag()
162       
163    def printer_attributes_tag(self) : 
164        """Indicates that the parser enters into a printer-attributes-tag group."""
165        self.printInfo("Start of printer_attributes_tag")
166        self._curdict = self.printer_attributes
167        return self.parseTag()
168       
169    def unsupported_attributes_tag(self) : 
170        """Indicates that the parser enters into an unsupported-attributes-tag group."""
171        self.printInfo("Start of unsupported_attributes_tag")
172        self._curdict = self.unsupported_attributes
173        return self.parseTag()
174       
175    def parse(self) :
176        """Parses an IPP Message.
177       
178           NB : Only a subset of RFC2910 is implemented.
179        """
180        self._curname = None
181        self._curdict = None
182        self.version = "%s.%s" % (ord(self.data[0]), ord(self.data[1]))
183        self.operation_id = "0x%04x" % unpack(">H", self.data[2:4])[0]
184        self.request_id = "0x%08x" % unpack(">I", self.data[4:8])[0]
185        self.position = 8
186        try :
187            tag = ord(self.data[self.position])
188            while tag != END_OF_ATTRIBUTES_TAG :
189                self.position += 1
190                name = self.tags[tag]
191                if name is not None :
192                    func = getattr(self, name.replace("-", "_"), None)
193                    if func is not None :
194                        self.position += func()
195                        if ord(self.data[self.position]) > UNSUPPORTED_ATTRIBUTES_TAG :
196                            self.position -= 1
197                            continue
198                tag = ord(self.data[self.position])
199        except IndexError :
200            raise IPPError, "Unexpected end of IPP message."
201           
202        # Now transform all one-element lists into single values
203        for attrtype in attributes_types :
204            attrdict = getattr(self, "%s_attributes" % attrtype)
205            for (key, value) in attrdict.items() :
206                if len(value) == 1 :
207                    attrdict[key] = value[0]
208           
209if __name__ == "__main__" :           
210    if (len(sys.argv) < 2) or (sys.argv[1] == "--debug") :
211        print "usage : python ipp.py /var/spool/cups/c00005 [--debug] (for example)\n"
212    else :   
213        infile = open(sys.argv[1], "rb")
214        message = IPPMessage(infile.read(), debug=(sys.argv[-1]=="--debug"))
215        infile.close()
216        print "IPP version : %s" % message.version
217        print "IPP operation Id : %s" % message.operation_id
218        print "IPP request Id : %s" % message.request_id
219        for attrtype in attributes_types :
220            attrdict = getattr(message, "%s_attributes" % attrtype)
221            if attrdict :
222                print "%s attributes :" % attrtype.title()
223                for key in attrdict.keys() :
224                    print "  %s : %s" % (key, attrdict[key])
Note: See TracBrowser for help on using the browser.