Changeset 2313 for pykota/trunk/pykota/ipp.py
- Timestamp:
- 06/14/05 14:40:15 (19 years ago)
- Files:
-
- 1 modified
Legend:
- Unmodified
- Added
- Removed
-
pykota/trunk/pykota/ipp.py
r2311 r2313 37 37 __str__ = __repr__ 38 38 39 class IPP Message:40 """A class for IPP message files.39 class IPPRequest : 40 """A class for IPP requests. 41 41 42 42 Usage : 43 43 44 44 fp = open("/var/spool/cups/c00001", "rb") 45 message = IPP Message(fp.read())45 message = IPPRequest(fp.read()) 46 46 fp.close() 47 47 message.parse() 48 # print str(message)49 # print message.dump()50 print "IPP version : %s " % message.version51 print "IPP operation Id : %s" % message.operation_id52 print "IPP request Id : %s" % message.request_id53 for attrtype in ("operation", "job", "printer", "unsupported"):48 # print message.dump() # dumps an equivalent to the original IPP message 49 # print str(message) # returns a string of text with the same content as below 50 print "IPP version : %s.%s" % message.version 51 print "IPP operation Id : 0x%04x" % message.operation_id 52 print "IPP request Id : 0x%08x" % message.request_id 53 for attrtype in message.attributes_types : 54 54 attrdict = getattr(message, "%s_attributes" % attrtype) 55 55 if attrdict : … … 57 57 for key in attrdict.keys() : 58 58 print " %s : %s" % (key, attrdict[key]) 59 if message.data : 60 print "IPP datas : ", repr(message.data) 59 61 """ 60 attributes_types = ("operation", "job", "printer", "unsupported") 61 def __init__(self, data="", version=None, operation_id=None, request_id=None, debug=0) : 62 attributes_types = ("operation", "job", "printer", "unsupported", \ 63 "subscription", "event_notification") 64 def __init__(self, data="", version=None, operation_id=None, \ 65 request_id=None, debug=0) : 62 66 """Initializes an IPP Message object. 63 67 64 68 Parameters : 65 69 66 data : the IPP Message's content.70 data : the complete IPP Message's content. 67 71 debug : a boolean value to output debug info on stderr. 68 72 """ 69 73 self.debug = debug 70 self. data = data74 self._data = data 71 75 self.parsed = 0 72 76 73 self.version = version 77 # Initializes message 78 if version is not None : 79 try : 80 self.version = [int(p) for p in version.split(".")] 81 except AttributeError : 82 if len(version) == 2 : # 2-tuple 83 self.version = version 84 else : 85 try : 86 self.version = [int(p) for p in str(float(version)).split(".")] 87 except : 88 self.version = (1, 1) # default version number 74 89 self.operation_id = operation_id 75 90 self.request_id = request_id 76 91 self.data = "" 92 93 # Initialize attributes mappings 77 94 for attrtype in self.attributes_types : 78 95 setattr(self, "%s_attributes" % attrtype, {}) 79 self.tags = [ None ] * 256 # by default all tags reserved 96 97 # Initialize tags 98 self.tags = [ None ] * 256 # by default all tags reserved 80 99 81 100 # Delimiter tags … … 85 104 self.tags[0x04] = "printer-attributes-tag" 86 105 self.tags[0x05] = "unsupported-attributes-tag" 106 self.tags[0x06] = "subscription-attributes-tag" 107 self.tags[0x07] = "event-notification-attributes-tag" 87 108 88 109 # out of band values … … 91 112 self.tags[0x12] = "unknown" 92 113 self.tags[0x13] = "no-value" 93 114 self.tags[0x15] = "not-settable" 115 self.tags[0x16] = "delete-attribute" 116 self.tags[0x17] = "admin-define" 117 94 118 # integer values 95 119 self.tags[0x20] = "generic-integer" … … 103 127 self.tags[0x32] = "resolution" 104 128 self.tags[0x33] = "rangeOfInteger" 105 self.tags[0x34] = " reserved-for-collection"129 self.tags[0x34] = "begCollection" # TODO : find sample files for testing 106 130 self.tags[0x35] = "textWithLanguage" 107 131 self.tags[0x36] = "nameWithLanguage" 132 self.tags[0x37] = "endCollection" 108 133 109 134 # character strings 110 self.tags[0x 20] = "generic-character-string"135 self.tags[0x40] = "generic-character-string" 111 136 self.tags[0x41] = "textWithoutLanguage" 112 137 self.tags[0x42] = "nameWithoutLanguage" 113 self.tags[0x43] = "reserved"114 138 self.tags[0x44] = "keyword" 115 139 self.tags[0x45] = "uri" … … 118 142 self.tags[0x48] = "naturalLanguage" 119 143 self.tags[0x49] = "mimeMediaType" 144 self.tags[0x4a] = "memberAttrName" 120 145 121 146 # Reverse mapping to generate IPP messages … … 138 163 else : 139 164 buffer = [] 140 buffer.append("IPP version : %s " % self.version)141 buffer.append("IPP operation Id : %s" % self.operation_id)142 buffer.append("IPP request Id : %s" % self.request_id)165 buffer.append("IPP version : %s.%s" % self.version) 166 buffer.append("IPP operation Id : 0x%04x" % self.operation_id) 167 buffer.append("IPP request Id : 0x%08x" % self.request_id) 143 168 for attrtype in self.attributes_types : 144 169 attrdict = getattr(self, "%s_attributes" % attrtype) … … 147 172 for key in attrdict.keys() : 148 173 buffer.append(" %s : %s" % (key, attrdict[key])) 174 if self.data : 175 buffer.append("IPP datas : %s" % repr(message.data)) 149 176 return "\n".join(buffer) 150 177 … … 156 183 buffer = [] 157 184 if None not in (self.version, self.operation_id, self.request_id) : 158 version = [int(p) for p in self.version.split('.')] 159 buffer.append(chr(version[0]) + chr(version[1])) 160 buffer.append(pack(">H", int(self.operation_id, 16))) 161 buffer.append(pack(">I", int(self.request_id, 16))) 185 buffer.append(chr(self.version[0]) + chr(self.version[1])) 186 buffer.append(pack(">H", self.operation_id)) 187 buffer.append(pack(">I", self.request_id)) 162 188 for attrtype in self.attributes_types : 163 189 tagprinted = 0 … … 182 208 buffer.append(val) 183 209 buffer.append(chr(self.dictags["end-of-attributes-tag"])) 210 buffer.append(self.data) 184 211 return "".join(buffer) 185 212 186 213 def parse(self) : 187 """Parses an IPP Message.214 """Parses an IPP Request. 188 215 189 216 NB : Only a subset of RFC2910 is implemented. … … 191 218 self._curname = None 192 219 self._curdict = None 193 self.version = "%s.%s" % (ord(self.data[0]), ord(self.data[1]))194 self.operation_id = "0x%04x" % unpack(">H", self.data[2:4])[0]195 self.request_id = "0x%08x" % unpack(">I", self.data[4:8])[0]220 self.version = (ord(self._data[0]), ord(self._data[1])) 221 self.operation_id = unpack(">H", self._data[2:4])[0] 222 self.request_id = unpack(">I", self._data[4:8])[0] 196 223 self.position = 8 197 224 endofattributes = self.dictags["end-of-attributes-tag"] 198 unsupportedattributes = self.dictags["unsupported-attributes-tag"]225 maxdelimiter = self.dictags["event-notification-attributes-tag"] 199 226 try : 200 tag = ord(self. data[self.position])227 tag = ord(self._data[self.position]) 201 228 while tag != endofattributes : 202 229 self.position += 1 … … 206 233 if func is not None : 207 234 self.position += func() 208 if ord(self. data[self.position]) > unsupportedattributes:235 if ord(self._data[self.position]) > maxdelimiter : 209 236 self.position -= 1 210 237 continue 211 tag = ord(self. data[self.position])238 tag = ord(self._data[self.position]) 212 239 except IndexError : 213 240 raise IPPError, "Unexpected end of IPP message." … … 219 246 if len(value) == 1 : 220 247 attrdict[key] = value[0] 248 self.data = self._data[self.position+1:] 221 249 self.parsed = 1 222 250 … … 224 252 """Extracts information from an IPP tag.""" 225 253 pos = self.position 226 tagtype = self.tags[ord(self. data[pos])]254 tagtype = self.tags[ord(self._data[pos])] 227 255 pos += 1 228 256 posend = pos2 = pos + 2 229 namelength = unpack(">H", self. data[pos:pos2])[0]257 namelength = unpack(">H", self._data[pos:pos2])[0] 230 258 if not namelength : 231 259 name = self._curname 232 260 else : 233 261 posend += namelength 234 self._curname = name = self. data[pos2:posend]262 self._curname = name = self._data[pos2:posend] 235 263 pos2 = posend + 2 236 valuelength = unpack(">H", self. data[posend:pos2])[0]264 valuelength = unpack(">H", self._data[posend:pos2])[0] 237 265 posend = pos2 + valuelength 238 value = self. data[pos2:posend]266 value = self._data[pos2:posend] 239 267 if tagtype in ("integer", "enum") : 240 268 value = unpack(">I", value)[0] … … 269 297 self._curdict = self.unsupported_attributes 270 298 return self.parseTag() 299 300 def subscription_attributes_tag(self) : 301 """Indicates that the parser enters into a subscription-attributes-tag group.""" 302 self.printInfo("Start of subscription_attributes_tag") 303 self._curdict = self.subscription_attributes 304 return self.parseTag() 305 306 def event_notification_attributes_tag(self) : 307 """Indicates that the parser enters into an event-notification-attributes-tag group.""" 308 self.printInfo("Start of event_notification_attributes_tag") 309 self._curdict = self.event_notification_attributes 310 return self.parseTag() 271 311 272 312 if __name__ == "__main__" : … … 275 315 else : 276 316 infile = open(sys.argv[1], "rb") 277 message = IPPMessage(infile.read(), debug=(sys.argv[-1]=="--debug")) 317 data = infile.read() 318 message = IPPRequest(data, debug=(sys.argv[-1]=="--debug")) 278 319 infile.close() 279 320 message.parse() 280 321 281 message2 = IPP Message(message.dump())322 message2 = IPPRequest(message.dump()) 282 323 message2.parse() 283 324