root / pykota / trunk / pykota / ipp.py @ 2273

Revision 2273, 8.5 kB (checked in by jerome, 19 years ago)

Cleaned the ipp class a bit

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
RevLine 
[1901]1#! /usr/bin/env python
2# -*- coding: ISO-8859-15 -*-
3#
[2254]4# PyKota
[1901]5#
[2254]6# PyKota : Print Quotas for CUPS and LPRng
7#
8# (c) 2003, 2004, 2005 Jerome Alet <alet@librelogiciel.com>
[1901]9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program; if not, write to the Free Software
21# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
22#
23# $Id$
24#
[1972]25#
[1901]26
27import sys
28from struct import unpack
29
30OPERATION_ATTRIBUTES_TAG = 0x01
31JOB_ATTRIBUTES_TAG = 0x02
32END_OF_ATTRIBUTES_TAG = 0x03
33PRINTER_ATTRIBUTES_TAG = 0x04
34UNSUPPORTED_ATTRIBUTES_TAG = 0x05
35
[2254]36class IPPError(Exception):
37    """An exception for IPP related stuff."""
[1901]38    def __init__(self, message = ""):
39        self.message = message
40        Exception.__init__(self, message)
41    def __repr__(self):
42        return self.message
43    __str__ = __repr__
44
45class IPPMessage :
[2254]46    """A class for IPP message files.
47   
48       Usage :
49       
50         fp = open("/var/spool/cups/c00001", "rb")
51         message = IPPMessage(fp.read())
52         fp.close()
53         print "IPP version : %s" % message.version
54         print "IPP operation Id : %s" % message.operation_id
55         print "IPP request Id : %s" % message.request_id
56         for attrtype in ("operation", "job", "printer", "unsupported") :
57             attrdict = getattr(message, "%s_attributes" % attrtype)
58             if attrdict :
59                 print "%s attributes :" % attrtype.title()
60                 for key in attrdict.keys() :
61                     print "  %s : %s" % (key, attrdict[key])
62    """
[2273]63    attributes_types = ("operation", "job", "printer", "unsupported")
[2254]64    def __init__(self, data, debug=0) :
65        """Initializes and parses IPP Message object.
66       
67           Parameters :
68           
69             data : the IPP Message's content.
70             debug : a boolean value to output debug info on stderr.
71        """
72        self.debug = debug
[1901]73        self.data = data
[2273]74        for attrtype in self.attributes_types :
[2272]75            setattr(self, "%s_attributes" % attrtype, {})
[1901]76        self.tags = [ None ] * 256      # by default all tags reserved
77       
78        # Delimiter tags
[2254]79        self.tags[OPERATION_ATTRIBUTES_TAG] = "operation-attributes-tag"
80        self.tags[JOB_ATTRIBUTES_TAG] = "job-attributes-tag"
81        self.tags[END_OF_ATTRIBUTES_TAG] = "end-of-attributes-tag"
82        self.tags[PRINTER_ATTRIBUTES_TAG] = "printer-attributes-tag"
83        self.tags[UNSUPPORTED_ATTRIBUTES_TAG] = "unsupported-attributes-tag"
[1901]84       
85        # out of band values
86        self.tags[0x10] = "unsupported"
87        self.tags[0x11] = "reserved-for-future-default"
88        self.tags[0x12] = "unknown"
89        self.tags[0x13] = "no-value"
90       
91        # integer values
92        self.tags[0x20] = "generic-integer"
93        self.tags[0x21] = "integer"
94        self.tags[0x22] = "boolean"
95        self.tags[0x23] = "enum"
96       
97        # octetString
98        self.tags[0x30] = "octetString-with-an-unspecified-format"
99        self.tags[0x31] = "dateTime"
100        self.tags[0x32] = "resolution"
101        self.tags[0x33] = "rangeOfInteger"
102        self.tags[0x34] = "reserved-for-collection"
103        self.tags[0x35] = "textWithLanguage"
104        self.tags[0x36] = "nameWithLanguage"
105       
106        # character strings
107        self.tags[0x20] = "generic-character-string"
108        self.tags[0x41] = "textWithoutLanguage"
109        self.tags[0x42] = "nameWithoutLanguage"
110        # self.tags[0x43] = "reserved"
111        self.tags[0x44] = "keyword"
112        self.tags[0x45] = "uri"
113        self.tags[0x46] = "uriScheme"
114        self.tags[0x47] = "charset"
115        self.tags[0x48] = "naturalLanguage"
116        self.tags[0x49] = "mimeMediaType"
117       
118        # now parses the IPP message
119        self.parse()
120       
[2254]121    def printInfo(self, msg) :   
122        """Prints a debug message."""
123        if self.debug :
124            sys.stderr.write("%s\n" % msg)
125            sys.stderr.flush()
[1901]126           
127    def parseTag(self) :   
128        """Extracts information from an IPP tag."""
129        pos = self.position
[2254]130        tagtype = self.tags[ord(self.data[pos])]
[1901]131        pos += 1
132        posend = pos2 = pos + 2
133        namelength = unpack(">H", self.data[pos:pos2])[0]
134        if not namelength :
[2254]135            name = self._curname
[1901]136        else :   
137            posend += namelength
[2254]138            self._curname = name = self.data[pos2:posend]
[1901]139        pos2 = posend + 2
140        valuelength = unpack(">H", self.data[posend:pos2])[0]
141        posend = pos2 + valuelength
142        value = self.data[pos2:posend]
[2254]143        if tagtype in ("integer", "enum") :
144            value = unpack(">I", value)[0]
145        oldval = self._curdict.setdefault(name, [])
146        oldval.append((tagtype, value))
[2272]147        self.printInfo("%s(%s) : %s" % (name, tagtype, value))
[1901]148        return posend - self.position
149       
150    def operation_attributes_tag(self) : 
151        """Indicates that the parser enters into an operation-attributes-tag group."""
[2254]152        self.printInfo("Start of operation_attributes_tag")
153        self._curdict = self.operation_attributes
[1901]154        return self.parseTag()
155       
156    def job_attributes_tag(self) : 
[2254]157        """Indicates that the parser enters into a job-attributes-tag group."""
158        self.printInfo("Start of job_attributes_tag")
159        self._curdict = self.job_attributes
[1901]160        return self.parseTag()
161       
162    def printer_attributes_tag(self) : 
[2254]163        """Indicates that the parser enters into a printer-attributes-tag group."""
164        self.printInfo("Start of printer_attributes_tag")
165        self._curdict = self.printer_attributes
[1901]166        return self.parseTag()
167       
[2254]168    def unsupported_attributes_tag(self) : 
169        """Indicates that the parser enters into an unsupported-attributes-tag group."""
170        self.printInfo("Start of unsupported_attributes_tag")
171        self._curdict = self.unsupported_attributes
172        return self.parseTag()
173       
[1901]174    def parse(self) :
175        """Parses an IPP Message.
176       
177           NB : Only a subset of RFC2910 is implemented.
178        """
[2254]179        self._curname = None
180        self._curdict = None
[1901]181        self.version = "%s.%s" % (ord(self.data[0]), ord(self.data[1]))
182        self.operation_id = "0x%04x" % unpack(">H", self.data[2:4])[0]
183        self.request_id = "0x%08x" % unpack(">I", self.data[4:8])[0]
184        self.position = 8
185        try :
186            tag = ord(self.data[self.position])
187            while tag != END_OF_ATTRIBUTES_TAG :
188                self.position += 1
189                name = self.tags[tag]
190                if name is not None :
191                    func = getattr(self, name.replace("-", "_"), None)
192                    if func is not None :
193                        self.position += func()
194                        if ord(self.data[self.position]) > UNSUPPORTED_ATTRIBUTES_TAG :
195                            self.position -= 1
196                            continue
197                tag = ord(self.data[self.position])
198        except IndexError :
[2254]199            raise IPPError, "Unexpected end of IPP message."
[1901]200           
[2254]201        # Now transform all one-element lists into single values
[2273]202        for attrtype in self.attributes_types :
[2254]203            attrdict = getattr(self, "%s_attributes" % attrtype)
204            for (key, value) in attrdict.items() :
205                if len(value) == 1 :
206                    attrdict[key] = value[0]
207           
[1972]208if __name__ == "__main__" :           
[2254]209    if (len(sys.argv) < 2) or (sys.argv[1] == "--debug") :
210        print "usage : python ipp.py /var/spool/cups/c00005 [--debug] (for example)\n"
[1972]211    else :   
[2254]212        infile = open(sys.argv[1], "rb")
213        message = IPPMessage(infile.read(), debug=(sys.argv[-1]=="--debug"))
[1972]214        infile.close()
[2254]215        print "IPP version : %s" % message.version
216        print "IPP operation Id : %s" % message.operation_id
217        print "IPP request Id : %s" % message.request_id
[2273]218        for attrtype in message.attributes_types :
[2254]219            attrdict = getattr(message, "%s_attributes" % attrtype)
220            if attrdict :
221                print "%s attributes :" % attrtype.title()
222                for key in attrdict.keys() :
223                    print "  %s : %s" % (key, attrdict[key])
Note: See TracBrowser for help on using the browser.