crypto_bucket.py
17.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# -*- coding: utf-8 -*-
from . import http
from . import exceptions
from . import Bucket
from .api import _make_range_string
from .models import *
from .compat import to_string, urlsplit, parse_qs
from .crypto import BaseCryptoProvider
from .exceptions import ClientError
import copy
import threading
logger = logging.getLogger(__name__)
class CryptoBucket(Bucket):
"""用于加密Bucket和Object操作的类,诸如上传、下载Object等。创建、删除bucket的操作需使用Bucket类接口。
用法(假设Bucket属于杭州区域) ::
>>> import oss2
>>> auth = oss2.Auth('your-access-key-id', 'your-access-key-secret')
>>> bucket = oss2.CryptoBucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'your-bucket', oss2.LocalRsaProvider())
>>> bucket.put_object('readme.txt', 'content of the object')
<oss2.models.PutObjectResult object at 0x029B9930>
:param auth: 包含了用户认证信息的Auth对象
:type auth: oss2.Auth
:param str endpoint: 访问域名或者CNAME
:param str bucket_name: Bucket名
:param crypto_provider: 客户端加密类。该参数默认为空
:type crypto_provider: oss2.crypto.BaseCryptoProvider
:param bool is_cname: 如果endpoint是CNAME则设为True;反之,则为False。
:param session: 会话。如果是None表示新开会话,非None则复用传入的会话
:type session: oss2.Session
:param float connect_timeout: 连接超时时间,以秒为单位。
:param str app_name: 应用名。该参数不为空,则在User Agent中加入其值。
注意到,最终这个字符串是要作为HTTP Header的值传输的,所以必须要遵循HTTP标准。
:param bool enable_crc: 如果开启crc校验则设为True;反之,则为False
"""
def __init__(self, auth, endpoint, bucket_name, crypto_provider,
is_cname=False,
session=None,
connect_timeout=None,
app_name='',
enable_crc=True,
):
if not isinstance(crypto_provider, BaseCryptoProvider):
raise ClientError('crypto_provider must be an instance of BaseCryptoProvider')
logger.debug("Init CryptoBucket: {0}".format(bucket_name))
super(CryptoBucket, self).__init__(auth, endpoint, bucket_name, is_cname, session, connect_timeout, app_name,
enable_crc)
self.crypto_provider = crypto_provider
self.upload_contexts = {}
self.upload_contexts_lock = threading.Lock()
if self.app_name:
self.user_agent = http.USER_AGENT + '/' + self.app_name + '/' + OSS_ENCRYPTION_CLIENT
else:
self.user_agent = http.USER_AGENT + '/' + OSS_ENCRYPTION_CLIENT
def _init_user_agent(self, headers):
if 'User-Agent' not in headers:
headers['User-Agent'] = self.user_agent
else:
headers['User-Agent'] += '/' + OSS_ENCRYPTION_CLIENT
def put_object(self, key, data,
headers=None,
progress_callback=None):
"""上传一个普通文件。
用法 ::
>>> bucket.put_object('readme.txt', 'content of readme.txt')
>>> with open(u'local_file.txt', 'rb') as f:
>>> bucket.put_object('remote_file.txt', f)
:param mat_desc: map,对象文件的description
:param key: 上传到OSS的文件名
:param data: 待上传的内容。
:type data: bytes,str或file-like object
:param headers: 用户指定的HTTP头部。可以指定Content-Type、Content-MD5、x-oss-meta-开头的头部等
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param progress_callback: 用户指定的进度回调函数。可以用来实现进度条等功能。参考 :ref:`progress_callback` 。
:return: :class:`PutObjectResult <oss2.models.PutObjectResult>`
"""
logger.debug("Start to put object to CryptoBucket")
headers = http.CaseInsensitiveDict(headers)
self._init_user_agent(headers)
content_crypto_material = self.crypto_provider.create_content_material()
data = self.crypto_provider.make_encrypt_adapter(data, content_crypto_material.cipher)
headers = content_crypto_material.to_object_meta(headers)
return super(CryptoBucket, self).put_object(key, data, headers, progress_callback)
def put_object_with_url(self, sign_url, data, headers=None, progress_callback=None):
""" 使用加签的url上传对象
:param sign_url: 加签的url
:param data: 待上传的数据
:param headers: 用户指定的HTTP头部。可以指定Content-Type、Content-MD5、x-oss-meta-开头的头部等,必须和签名时保持一致
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return:
"""
raise ClientError("The operation is not support for CryptoBucket now")
def append_object(self, key, position, data,
headers=None,
progress_callback=None,
init_crc=None):
raise ClientError("The operation is not support for CryptoBucket")
def get_object(self, key,
byte_range=None,
headers=None,
progress_callback=None,
process=None,
params=None):
"""下载一个文件。
用法 ::
>>> result = bucket.get_object('readme.txt')
>>> print(result.read())
'hello world'
:param key: 文件名
:param byte_range: 指定下载范围。参见 :ref:`byte_range`
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:param process: oss文件处理,如图像服务等。指定后process,返回的内容为处理后的文件。
:param params: http 请求的查询字符串参数
:type params: dict
:return: file-like object
:raises: 如果文件不存在,则抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>` ;还可能抛出其他异常
"""
if process:
raise ClientError("Process object operation is not support for Crypto Bucket")
headers = http.CaseInsensitiveDict(headers)
self._init_user_agent(headers)
discard = 0
range_string = ''
if byte_range:
if byte_range[0] is None and byte_range[1]:
raise ClientError("Don't support range get while start is none and end is not")
start, end = self.crypto_provider.adjust_range(byte_range[0], byte_range[1])
adjust_byte_range = (start, end)
range_string = _make_range_string(adjust_byte_range)
if range_string:
headers['range'] = range_string
if byte_range[0] and adjust_byte_range[0] < byte_range[0]:
discard = byte_range[0] - adjust_byte_range[0]
logger.debug("adjust range of get object, byte_range: {0}, adjust_byte_range: {1}, discard: {2}".format(
byte_range, adjust_byte_range, discard))
logger.debug(
"Start to get object from CryptoBucket: {0}, key: {1}, range: {2}, headers: {3}, params: {4}".format(
self.bucket_name, to_string(key), range_string, headers, params))
resp = self._do('GET', self.bucket_name, key, headers=headers, params=params)
logger.debug("Get object done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return GetObjectResult(resp, progress_callback, self.enable_crc, crypto_provider=self.crypto_provider,
discard=discard)
def get_object_with_url(self, sign_url,
byte_range=None,
headers=None,
progress_callback=None):
"""使用加签的url下载文件
:param sign_url: 加签的url
:param byte_range: 指定下载范围。参见 :ref:`byte_range`
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict,必须和签名时保持一致
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return: file-like object
:raises: 如果文件不存在,则抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>` ;还可能抛出其他异常
"""
query = parse_qs(urlsplit(sign_url).query)
if query and (Bucket.PROCESS in query):
raise ClientError("Process object operation is not support for Crypto Bucket")
headers = http.CaseInsensitiveDict(headers)
self._init_user_agent(headers)
discard = 0
range_string = ''
if byte_range:
if not byte_range[0] and byte_range[1]:
raise ClientError("Don't support range get while start is none and end is not")
start, end = self.crypto_provider.adjust_range(byte_range[0], byte_range[1])
adjust_byte_range = (start, end)
range_string = _make_range_string(adjust_byte_range)
if range_string:
headers['range'] = range_string
if byte_range[0] and adjust_byte_range[0] < byte_range[0]:
discard = byte_range[0] - adjust_byte_range[0]
logger.debug("adjust range of get object, byte_range: {0}, adjust_byte_range: {1}, discard: {2}".format(
byte_range, adjust_byte_range, discard))
logger.debug(
"Start to get object with url from CryptoBucket: {0}, sign_url: {1}, range: {2}, headers: {3}".format(
self.bucket_name, sign_url, range_string, headers))
resp = self._do_url('GET', sign_url, headers=headers)
return GetObjectResult(resp, progress_callback, self.enable_crc,
crypto_provider=self.crypto_provider, discard=discard)
def create_select_object_meta(self, key, select_meta_params=None):
raise ClientError("The operation is not support for Crypto Bucket")
def select_object(self, key, sql,
progress_callback=None,
select_params=None
):
raise ClientError("The operation is not support for CryptoBucket")
def init_multipart_upload(self, key, headers=None, params=None, upload_context=None):
"""客户端加密初始化分片上传。
:param params
:param upload_context
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:return: :class:`InitMultipartUploadResult <oss2.models.InitMultipartUploadResult>`
返回值中的 `crypto_multipart_context` 记录了加密Meta信息,在upload_part时需要一并传入
"""
headers = http.CaseInsensitiveDict(headers)
self._init_user_agent(headers)
if not upload_context or not upload_context.data_size:
raise ClientError("It is not support none upload_context and must specify data_size of upload_context ")
logger.info("Start to init multipart upload by CryptoBucket, data_size: {0}, part_size: {1}".format(
upload_context.data_size, upload_context.part_size))
if upload_context.part_size:
res = self.crypto_provider.cipher.is_valid_part_size(upload_context.part_size, upload_context.data_size)
if not res:
raise ClientError("part_size is invalid for multipart upload for CryptoBucket")
else:
upload_context.part_size = self.crypto_provider.cipher.determine_part_size(upload_context.data_size)
content_crypto_material = self.crypto_provider.create_content_material()
upload_context.content_crypto_material = content_crypto_material
headers = content_crypto_material.to_object_meta(headers, upload_context)
resp = super(CryptoBucket, self).init_multipart_upload(key, headers)
return resp
def upload_part(self, key, upload_id, part_number, data, progress_callback=None, headers=None, upload_context=None):
"""客户端加密上传一个分片。
:param upload_context:
:param str key: 待上传文件名,这个文件名要和 :func:`init_multipart_upload` 的文件名一致。
:param str upload_id: 分片上传ID
:param int part_number: 分片号,最小值是1.
:param data: 待上传数据。
:param progress_callback: 用户指定进度回调函数。可以用来实现进度条等功能。参考 :ref:`progress_callback` 。
:param headers: 用户指定的HTTP头部。可以指定Content-MD5头部等
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:return: :class:`PutObjectResult <oss2.models.PutObjectResult>`
"""
logger.info(
"Start to upload multipart of CryptoBucket, upload_id = {0}, part_number = {1}".format(upload_id,
part_number))
headers = http.CaseInsensitiveDict(headers)
self._init_user_agent(headers)
if upload_context:
context = upload_context
else:
raise ClientError("Could not init upload context, upload contexts flag is False and upload context is none")
content_crypto_material = context.content_crypto_material
if content_crypto_material.cek_alg != self.crypto_provider.cipher.alg or content_crypto_material.wrap_alg != \
self.crypto_provider.wrap_alg:
err_msg = 'Envelope or data encryption/decryption algorithm is inconsistent'
raise InconsistentError(err_msg, self)
headers = content_crypto_material.to_object_meta(headers, context)
plain_key = self.crypto_provider.decrypt_encrypted_key(content_crypto_material.encrypted_key)
plain_iv = self.crypto_provider.decrypt_encrypted_iv(content_crypto_material.encrypted_iv)
offset = context.part_size * (part_number - 1)
counter = self.crypto_provider.cipher.calc_offset(offset)
cipher = copy.copy(content_crypto_material.cipher)
cipher.initialize(plain_key, plain_iv, counter)
data = self.crypto_provider.make_encrypt_adapter(data, cipher)
resp = super(CryptoBucket, self).upload_part(key, upload_id, part_number, data, progress_callback, headers)
return resp
def complete_multipart_upload(self, key, upload_id, parts, headers=None):
"""客户端加密完成分片上传,创建文件。
当所有分片均已上传成功,才可以调用此函数
:param str key: 待上传的文件名,这个文件名要和 :func:`init_multipart_upload` 的文件名一致。
:param str upload_id: 分片上传ID
:param parts: PartInfo列表。PartInfo中的part_number和etag是必填项。其中的etag可以从 :func:`upload_part` 的返回值中得到。
:type parts: list of `PartInfo <oss2.models.PartInfo>`
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:return: :class:`PutObjectResult <oss2.models.PutObjectResult>`
"""
logger.info("Start to complete multipart upload of CryptoBucket, upload_id = {0}".format(upload_id))
headers = http.CaseInsensitiveDict(headers)
self._init_user_agent(headers)
try:
resp = super(CryptoBucket, self).complete_multipart_upload(key, upload_id, parts, headers)
except exceptions as e:
raise e
return resp
def abort_multipart_upload(self, key, upload_id, headers=None):
"""取消分片上传。
:param headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param str key: 待上传的文件名,这个文件名要和 :func:`init_multipart_upload` 的文件名一致。
:param str upload_id: 分片上传ID
:return: :class:`RequestResult <oss2.models.RequestResult>`
"""
logger.info("Start to abort multipart upload of CryptoBucket, upload_id = {0}".format(upload_id))
headers = http.CaseInsensitiveDict(headers)
self._init_user_agent(headers)
try:
resp = super(CryptoBucket, self).abort_multipart_upload(key, upload_id)
except exceptions as e:
raise e
return resp
def upload_part_copy(self, source_bucket_name, source_key, byte_range, target_key, target_upload_id,
target_part_number, headers=None):
"""分片拷贝。把一个已有文件的一部分或整体拷贝成目标文件的一个分片。
:param target_part_number:
:param target_upload_id:
:param target_key:
:param source_key:
:param source_bucket_name:
:param byte_range: 指定待拷贝内容在源文件里的范围。参见 :ref:`byte_range`
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:return: :class:`PutObjectResult <oss2.models.PutObjectResult>`
"""
raise ClientError("The operation is not support for CryptoBucket now")
def process_object(self, key, process):
raise ClientError("The operation is not support for CryptoBucket")