select_response.py
9.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import platform
import struct
import requests
from .compat import to_bytes
from .exceptions import RequestError
from .exceptions import SelectOperationFailed
from .exceptions import SelectOperationClientError
from .exceptions import InconsistentError
from . import utils
import logging
logger = logging.getLogger(__name__)
"""
The adapter class for Select object's response.
The response consists of frames. Each frame has the following format:
Type | Payload Length | Header Checksum | Payload | Payload Checksum
|<4-->| <--4 bytes------><---4 bytes-------><-n/a-----><--4 bytes--------->
And we have three kind of frames.
Data Frame:
Type:8388609
Payload: Offset | Data
<-8 bytes>
Continuous Frame
Type:8388612
Payload: Offset (8-bytes)
End Frame
Type:8388613
Payload: Offset | total scanned bytes | http status code | error message
<-- 8bytes--><-----8 bytes--------><---4 bytes-------><---variabe--->
"""
class SelectResponseAdapter(object):
_CHUNK_SIZE = 8 * 1024
_CONTINIOUS_FRAME_TYPE=8388612
_DATA_FRAME_TYPE = 8388609
_END_FRAME_TYPE = 8388613
_META_END_FRAME_TYPE = 8388614
_JSON_META_END_FRAME_TYPE = 8388615
_FRAMES_FOR_PROGRESS_UPDATE = 10
def __init__(self, response, progress_callback = None, content_length = None, enable_crc = False):
self.response = response
self.frame_off_set = 0
self.frame_length = 0
self.frame_data = b''
self.check_sum_flag = 0
self.file_offset = 0
self.finished = 0
self.raw_buffer = b''
self.raw_buffer_offset = 0
#self.resp_content_iter = response.__iter__()
self.callback = progress_callback
self.frames_since_last_progress_report = 0
self.content_length = content_length
self.resp_content_iter = response.__iter__()
self.enable_crc = enable_crc
self.payload = b''
self.output_raw_data = response.headers.get("x-oss-select-output-raw", '') == "true"
self.request_id = response.headers.get("x-oss-request-id",'')
self.splits = 0
self.rows = 0
self.columns = 0
def read(self):
if self.finished:
return b''
content=b''
for data in self:
content += data
return content
def __iter__(self):
return self
def __next__(self):
return self.next()
def next(self):
if self.output_raw_data == True:
data = next(self.resp_content_iter)
if len(data) != 0:
return data
else: raise StopIteration
while self.finished == 0:
if self.frame_off_set < self.frame_length:
data = self.frame_data[self.frame_off_set : self.frame_length]
self.frame_length = self.frame_off_set = 0
return data
else:
self.read_next_frame()
self.frames_since_last_progress_report += 1
if (self.frames_since_last_progress_report >= SelectResponseAdapter._FRAMES_FOR_PROGRESS_UPDATE and self.callback is not None):
self.callback(self.file_offset, self.content_length)
self.frames_since_last_progress_report = 0
raise StopIteration
def read_raw(self, amt):
ret = b''
read_count = 0
while amt > 0 and self.finished == 0:
size = len(self.raw_buffer)
if size == 0:
self.raw_buffer = next(self.resp_content_iter)
self.raw_buffer_offset = 0
size = len(self.raw_buffer)
if size == 0:
break
if size - self.raw_buffer_offset >= amt:
data = self.raw_buffer[self.raw_buffer_offset:self.raw_buffer_offset + amt]
data_size = len(data)
self.raw_buffer_offset += data_size
ret += data
read_count += data_size
amt -= data_size
else:
data = self.raw_buffer[self.raw_buffer_offset:]
data_len = len(data)
ret += data
read_count += data_len
amt -= data_len
self.raw_buffer = b''
return ret
def read_next_frame(self):
frame_type = bytearray(self.read_raw(4))
payload_length = bytearray(self.read_raw(4))
utils.change_endianness_if_needed(payload_length) # convert to little endian
payload_length_val = struct.unpack("I", bytes(payload_length))[0]
header_checksum = bytearray(self.read_raw(4))
frame_type[0] = 0 #mask the version bit
utils.change_endianness_if_needed(frame_type) # convert to little endian
frame_type_val = struct.unpack("I", bytes(frame_type))[0]
if (frame_type_val != SelectResponseAdapter._DATA_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._CONTINIOUS_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._END_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._META_END_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._JSON_META_END_FRAME_TYPE):
logger.warning("Unexpected frame type: {0}. RequestId:{1}. This could be due to the old version of client.".format(frame_type_val, self.request_id))
raise SelectOperationClientError(self.request_id, "Unexpected frame type:" + str(frame_type_val))
self.payload = self.read_raw(payload_length_val)
file_offset_bytes = bytearray(self.payload[0:8])
utils.change_endianness_if_needed(file_offset_bytes)
self.file_offset = struct.unpack("Q", bytes(file_offset_bytes))[0]
if frame_type_val == SelectResponseAdapter._DATA_FRAME_TYPE:
self.frame_length = payload_length_val - 8
self.frame_off_set = 0
self.check_sum_flag=1
self.frame_data = self.payload[8:]
checksum = bytearray(self.read_raw(4)) #read checksum crc32
utils.change_endianness_if_needed(checksum)
checksum_val = struct.unpack("I", bytes(checksum))[0]
if self.enable_crc:
crc32 = utils.Crc32()
crc32.update(self.payload)
checksum_calc = crc32.crc
if checksum_val != checksum_calc:
logger.warning("Incorrect checksum: Actual {0} and calculated {1}. RequestId:{2}".format(checksum_val, checksum_calc, self.request_id))
raise InconsistentError("Incorrect checksum: Actual" + str(checksum_val) + ". Calculated:" + str(checksum_calc), self.request_id)
elif frame_type_val == SelectResponseAdapter._CONTINIOUS_FRAME_TYPE:
self.frame_length = self.frame_off_set = 0
self.check_sum_flag=1
self.read_raw(4)
elif frame_type_val == SelectResponseAdapter._END_FRAME_TYPE:
self.frame_off_set = 0
scanned_size_bytes = bytearray(self.payload[8:16])
status_bytes = bytearray(self.payload[16:20])
utils.change_endianness_if_needed(status_bytes)
status = struct.unpack("I", bytes(status_bytes))[0]
error_msg_size = payload_length_val - 20
error_msg=b''
error_code = b''
if error_msg_size > 0:
error_msg = self.payload[20:error_msg_size + 20]
error_code_index = error_msg.find(b'.')
if error_code_index >= 0 and error_code_index < error_msg_size - 1:
error_code = error_msg[0:error_code_index]
error_msg = error_msg[error_code_index + 1:]
if status // 100 != 2:
raise SelectOperationFailed(status, error_code, error_msg)
self.frame_length = 0
if self.callback is not None:
self.callback(self.file_offset, self.content_length)
self.read_raw(4) # read the payload checksum
self.frame_length = 0
self.finished = 1
elif frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE or frame_type_val == SelectResponseAdapter._JSON_META_END_FRAME_TYPE:
self.frame_off_set = 0
scanned_size_bytes = bytearray(self.payload[8:16])
status_bytes = bytearray(self.payload[16:20])
utils.change_endianness_if_needed(status_bytes)
status = struct.unpack("I", bytes(status_bytes))[0]
splits_bytes = bytearray(self.payload[20:24])
utils.change_endianness_if_needed(splits_bytes)
self.splits = struct.unpack("I", bytes(splits_bytes))[0]
lines_bytes = bytearray(self.payload[24:32])
utils.change_endianness_if_needed(lines_bytes)
self.rows = struct.unpack("Q", bytes(lines_bytes))[0]
error_index = 36
if frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE:
column_bytes = bytearray(self.payload[32:36])
utils.change_endianness_if_needed(column_bytes)
self.columns = struct.unpack("I", bytes(column_bytes))[0]
else:
error_index = 32
error_size = payload_length_val - error_index
error_msg = b''
error_code = b''
if (error_size > 0):
error_msg = self.payload[error_index:error_index + error_size]
error_code_index = error_msg.find(b'.')
if error_code_index >= 0 and error_code_index < error_size - 1:
error_code = error_msg[0:error_code_index]
error_msg = error_msg[error_code_index + 1:]
self.read_raw(4) # read the payload checksum
self.final_status = status
self.frame_length = 0
self.finished = 1
if (status / 100 != 2):
raise SelectOperationFailed(status, error_code, error_msg)