38 MySQL Protocol Handshake Objects
77 'SSL_VERIFY_SERVER_CERT',
85 'MORE_RESULTS_EXISTS',
86 'QUERY_NO_GOOD_INDEX_USED',
87 'QUERY_NO_INDEX_USED',
91 'NO_BACKSLASH_ESCAPES',
96 '''This class represents the initial handshake sent from server to client.'''
98 def __init__(self, packed=None, protocol_version=10, server_version='',
99 thread_id=0, scramble=tuple([0] * 20), null1=0, capabilities=0,
100 charset=0, status=0, unused=tuple([0] * 13), null2=0):
114 server_version_length = packed[1:].index(
'\x00')
116 data = struct.unpack(
'<I8BB2BB2B13B12BB', packed[2+server_version_length:])
118 self.
scramble = data[1:9] + data[28:40]
124 self.
null2 = data[40]
129 data += struct.pack(
'<I', self.
thread_id)
130 data +=
''.join(map(chr, self.
scramble[:8]))
131 data += struct.pack(
'B2BB2B',
133 self.capabilities.value() & 0xFF,
134 (self.capabilities.value() >> 8) & 0xFF,
136 self.status.value() & 0xFF,
137 (self.status.value() >> 8) & 0xFF)
138 data +=
''.join(map(chr, self.
unused))
139 data +=
''.join(map(chr, self.
scramble[8:]))
140 data += struct.pack(
'B', self.
null2)
144 return '''ServerHandshake
145 protocol_version = %s
161 def testDefaultInit(self):
166 def testKeywordInit(self):
168 server_version=
'test',
170 scramble=tuple([5] * 20),
175 unused=tuple([6] * 13),
180 def testUnpackInit(self):
181 data = struct.pack(
'B', 11)
183 data += struct.pack(
'<I', 1234)
184 data +=
''.join([chr(5)] * 8)
185 data += struct.pack(
'B2BB2B', 1, 255, 254, 253, 252, 251)
186 data +=
''.join([chr(6)] * 13)
187 data +=
''.join([chr(5)] * 12)
188 data += struct.pack(
'B', 2)
197 def verifyDefault(self, handshake):
198 self.assertEqual(handshake.protocol_version, 10)
199 self.assertEqual(handshake.server_version,
'')
200 self.assertEqual(handshake.thread_id, 0)
201 self.assertEqual(handshake.scramble, tuple([0] * 20))
202 self.assertEqual(handshake.null1, 0)
203 self.assertEqual(handshake.capabilities.value(), 0)
204 self.assertEqual(handshake.charset, 0)
205 self.assertEqual(handshake.status.value(), 0)
206 self.assertEqual(handshake.unused, tuple([0] * 13))
207 self.assertEqual(handshake.null2, 0)
209 def verifyCustom(self, handshake):
210 self.assertEqual(handshake.protocol_version, 11)
211 self.assertEqual(handshake.server_version,
'test')
212 self.assertEqual(handshake.thread_id, 1234)
213 self.assertEqual(handshake.scramble, tuple([5] * 20))
214 self.assertEqual(handshake.null1, 1)
215 self.assertEqual(handshake.capabilities.value(), 65279)
216 self.assertEqual(handshake.charset, 253)
217 self.assertEqual(handshake.status.value(), 64508)
218 self.assertEqual(handshake.unused, tuple([6] * 13))
219 self.assertEqual(handshake.null2, 2)
222 '''This class represents the client handshake sent back to the server.'''
224 def __init__(self, packed=None, capabilities=0, max_packet_size=0, charset=0,
225 unused=tuple([0] * 23), user=
'', scramble_size=0,
226 scramble=
None, db=
''):
237 data = struct.unpack(
'<IIB23B', packed[:32])
243 user_length = packed.index(
'\x00')
244 self.
user = packed[:user_length]
245 packed = packed[1+user_length:]
250 self.
scramble = tuple(map(ord, packed[1:21]))
251 if packed[-1:] ==
'\x00':
252 self.
db = packed[21:-1]
254 self.
db = packed[21:]
257 data = struct.pack(
'<IIB',
258 self.capabilities.value(),
261 data +=
''.join(map(chr, self.
unused))
262 data += self.
user +
'\x00'
265 data +=
''.join(map(chr, self.
scramble))
266 data += self.
db +
'\x00'
270 return '''ClientHandshake
284 def testDefaultInit(self):
289 def testKeywordInit(self):
291 max_packet_size=64508,
293 unused=tuple([6] * 23),
296 scramble=tuple([5] * 20),
301 def testUnpackInit(self):
302 data = struct.pack(
'<IIB', 65279, 64508, 253)
303 data +=
''.join([chr(6)] * 23)
306 data +=
''.join([chr(5)] * 20)
316 def verifyDefault(self, handshake):
317 self.assertEqual(handshake.capabilities.value(), 0)
318 self.assertEqual(handshake.max_packet_size, 0)
319 self.assertEqual(handshake.charset, 0)
320 self.assertEqual(handshake.unused, tuple([0] * 23))
321 self.assertEqual(handshake.user,
'')
322 self.assertEqual(handshake.scramble_size, 0)
323 self.assertEqual(handshake.scramble,
None)
324 self.assertEqual(handshake.db,
'')
326 def verifyCustom(self, handshake):
327 self.assertEqual(handshake.capabilities.value(), 65279)
328 self.assertEqual(handshake.max_packet_size, 64508)
329 self.assertEqual(handshake.charset, 253)
330 self.assertEqual(handshake.unused, tuple([6] * 23))
331 self.assertEqual(handshake.user,
'user')
332 self.assertEqual(handshake.scramble_size, 20)
333 self.assertEqual(handshake.scramble, tuple([5] * 20))
334 self.assertEqual(handshake.db,
'db')
336 if __name__ ==
'__main__':