Class: OCI::ObjectStorage::Transfer::Multipart::MultipartObjectAssembler
- Inherits:
-
Object
- Object
- OCI::ObjectStorage::Transfer::Multipart::MultipartObjectAssembler
- Defined in:
- lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb
Overview
MultipartObjectAssembler provides a simplified interaction when uploading large objects using multi-part uploads.
An assembler can be used to begin a new upload, or resume a previous one. A new assembler should be created per new upload or resumed upload to be performed. The same assembler is not resubale across multiple new uploads/resumes.
Constant Summary collapse
- MD5_CALC_PART_READ_BYTES =
8 * OCI::ObjectStorage::Transfer::MEBIBYTE
- DEFAULT_MAX_ATTEMPTS =
Settings for the exponential backoff and retry (with jitter) which the assembler does
3
- DEFAULT_BASE_SLEEP_MILLIS =
1000
- DEFAULT_MAX_SLEEP_TIME_MILLIS =
8000
- DEFAULT_EXPONENTIAL_GROWTH_FACTOR =
2
Instance Attribute Summary collapse
-
#base_sleep_millis ⇒ Integer
For exponential backoff and retry, the base time to use in our retry calculation in milliseconds.
-
#bucket_name ⇒ String
The bucket where we'll upload the object.
-
#exponential_growth_factor ⇒ Integer
For exponential backoff and retry, the exponent which we will raise to the power of the number of attempts.
-
#manifest ⇒ Hash
readonly
Contains the upload ID for the multipart upload and other upload-specific information (e.g. the destination namespace, bucket and object, and the parts which have been uploaded).
-
#max_attempts ⇒ Integer
If we encounter a failure when performing an operation and need to retry, the maximum number of attempts we can make before declaring failure.
-
#max_sleep_time_millis ⇒ Integer
The maximum amount of time to wait between retries.
-
#multipart_part_size ⇒ Integer
The size, in bytes, of each part of a multipart upload.
-
#multipart_upload_opts ⇒ Hash
A bag of optional parameter (e.g. the client request ID, metadata) which we can use when making calls to the Object Storage service.
-
#namespace ⇒ String
The namespace containing the bucket in which to store the object.
-
#non_file_io_multipart_part_size ⇒ Integer
The size, in bytes, of each part of a multipart upload when we are reading from stdin or a non-file IO-like source (e.g. a StringIO).
-
#object_name ⇒ String
The name of the object in Object Storage.
-
#object_storage_client ⇒ OCI::ObjectStorage::ObjectStorageClient
The client used to interact with the Object Storage service.
-
#parallel_process_count ⇒ Integer
How many parts we can upload in parallel.
Instance Method Summary collapse
-
#abort(upload_id) ⇒ Response
Aborts a multipart upload.
-
#commit ⇒ Response
Commits the multipart upload.
-
#initialize(object_storage_client:, namespace:, bucket_name:, object_name:, multipart_part_size: OCI::ObjectStorage::Transfer::MULTIPART_PART_SIZE, non_file_io_multipart_part_size: OCI::ObjectStorage::Transfer::NON_FILE_IO_MULTIPART_PART_SIZE, parallel_process_count: OCI::ObjectStorage::Transfer::Multipart::DEFAULT_PARALLEL_PROCESS_COUNT, multipart_upload_opts: {}) ⇒ MultipartObjectAssembler
constructor
A new instance of MultipartObjectAssembler.
-
#io_for_transfer=(object_io_or_file_path) ⇒ Object
Initializes the parts in the manifest based on the provided input.
-
#new_upload ⇒ Response
Initializes a new multipart upload.
-
#resume(upload_id) ⇒ Array
Resume uploading a multipart object to Object Storage.
-
#upload ⇒ Array
Performs a multipart upload.
Constructor Details
#initialize(object_storage_client:, namespace:, bucket_name:, object_name:, multipart_part_size: OCI::ObjectStorage::Transfer::MULTIPART_PART_SIZE, non_file_io_multipart_part_size: OCI::ObjectStorage::Transfer::NON_FILE_IO_MULTIPART_PART_SIZE, parallel_process_count: OCI::ObjectStorage::Transfer::Multipart::DEFAULT_PARALLEL_PROCESS_COUNT, multipart_upload_opts: {}) ⇒ MultipartObjectAssembler
Returns a new instance of MultipartObjectAssembler.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 104 def initialize(object_storage_client:, namespace:, bucket_name:, object_name:, multipart_part_size: OCI::ObjectStorage::Transfer::MULTIPART_PART_SIZE, non_file_io_multipart_part_size: OCI::ObjectStorage::Transfer::NON_FILE_IO_MULTIPART_PART_SIZE, parallel_process_count: OCI::ObjectStorage::Transfer::Multipart::DEFAULT_PARALLEL_PROCESS_COUNT, multipart_upload_opts: {}) @object_storage_client = object_storage_client @namespace = namespace @bucket_name = bucket_name @object_name = object_name @multipart_part_size = multipart_part_size @non_file_io_multipart_part_size = non_file_io_multipart_part_size @parallel_process_count = parallel_process_count @object_io = nil @manifest = { upload_id: nil, namespace: @namespace, bucket_name: @bucket_name, object_name: @object_name, object_io_or_file_path: nil, parts: OCI::ObjectStorage::Transfer::Multipart::Internal::MultipartUploadPartsCollection.new([]) } @multipart_upload_opts = multipart_upload_opts @multipart_upload_opts.delete('content_md5') if @multipart_upload_opts.key?('content_md5') @max_attempts = DEFAULT_MAX_ATTEMPTS @base_sleep_millis = DEFAULT_BASE_SLEEP_MILLIS @exponential_growth_factor = DEFAULT_EXPONENTIAL_GROWTH_FACTOR @max_sleep_time_millis = DEFAULT_MAX_SLEEP_TIME_MILLIS end |
Instance Attribute Details
#base_sleep_millis ⇒ Integer
For exponential backoff and retry, the base time to use in our retry calculation in milliseconds. Defaults to 1000ms
74 75 76 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 74 def base_sleep_millis @base_sleep_millis end |
#bucket_name ⇒ String
The bucket where we'll upload the object
42 43 44 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 42 def bucket_name @bucket_name end |
#exponential_growth_factor ⇒ Integer
For exponential backoff and retry, the exponent which we will raise to the power of the number of attempts. Defaults to 2
82 83 84 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 82 def exponential_growth_factor @exponential_growth_factor end |
#manifest ⇒ Hash (readonly)
Contains the upload ID for the multipart upload and other upload-specific information (e.g. the destination namespace, bucket and object, and the parts which have been uploaded)
87 88 89 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 87 def manifest @manifest end |
#max_attempts ⇒ Integer
If we encounter a failure when performing an operation and need to retry, the maximum number of attempts we can make before declaring failure. Attempts are 1-based, i.e. the first call we make is considered attempt 1.
70 71 72 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 70 def max_attempts @max_attempts end |
#max_sleep_time_millis ⇒ Integer
The maximum amount of time to wait between retries. Defaults to 8000ms
78 79 80 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 78 def max_sleep_time_millis @max_sleep_time_millis end |
#multipart_part_size ⇒ Integer
The size, in bytes, of each part of a multipart upload. This applies when we are uploading files from disk and defaults to 128 MiB
51 52 53 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 51 def multipart_part_size @multipart_part_size end |
#multipart_upload_opts ⇒ Hash
A bag of optional parameter (e.g. the client request ID, metadata) which we can use when making calls to the Object Storage service.
65 66 67 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 65 def multipart_upload_opts @multipart_upload_opts end |
#namespace ⇒ String
The namespace containing the bucket in which to store the object
38 39 40 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 38 def namespace @namespace end |
#non_file_io_multipart_part_size ⇒ Integer
The size, in bytes, of each part of a multipart upload when we are reading from stdin or a non-file IO-like source (e.g. a StringIO). Defaults to 10 MiB
56 57 58 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 56 def non_file_io_multipart_part_size @non_file_io_multipart_part_size end |
#object_name ⇒ String
The name of the object in Object Storage
46 47 48 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 46 def object_name @object_name end |
#object_storage_client ⇒ OCI::ObjectStorage::ObjectStorageClient
The client used to interact with the Object Storage service
34 35 36 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 34 def object_storage_client @object_storage_client end |
#parallel_process_count ⇒ Integer
How many parts we can upload in parallel. Defaults to 3. If this is set to 1, this is the equivalent of not allowing parts to be uploaded in parallel.
61 62 63 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 61 def parallel_process_count @parallel_process_count end |
Instance Method Details
#abort(upload_id) ⇒ Response
Aborts a multipart upload.
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 365 def abort(upload_id) abort_opts = {} abort_opts[:opc_client_request_id] = @multipart_upload_opts[:opc_client_request_id] \ if @multipart_upload_opts[:opc_client_request_id] response = do @object_storage_client.abort_multipart_upload( @namespace, @bucket_name, @object_name, upload_id, abort_opts ) end response end |
#commit ⇒ Response
Commits the multipart upload.
an opc-multipart-md5 key.
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 327 def commit raise 'Cannot commit as this MultipartObjectAssembler has not been initialized with an upload' \ unless @manifest[:upload_id] parts = @manifest[:parts].to_a commit_upload_details = OCI::ObjectStorage::Models::CommitMultipartUploadDetails.new commit_upload_details.parts_to_commit = [] commit_upload_details.parts_to_exclude = [] parts.each do |part| if part[:etag] commit_upload_details.parts_to_commit << OCI::ObjectStorage::Models::CommitMultipartUploadPartDetails.new( partNum: part[:part_number], etag: part[:etag] ) end commit_upload_details.parts_to_exclude << part[:part_number] unless part[:etag] end response = do @object_storage_client.commit_multipart_upload( @namespace, @bucket_name, @object_name, @manifest[:upload_id], commit_upload_details ) end response end |
#io_for_transfer=(object_io_or_file_path) ⇒ Object
Initializes the parts in the manifest based on the provided input. If the input is stdin then the list of parts in the manifest will be left empty (the parts in this case are figured out dynamically at upload time since we may not have all the information in advance). If the input is the path to a file (a String), a file or a IO-like object (e.g. a StringIO) then the parts in the manifest will be initialized so that the assembler can go through and upload them.
No uploads will be performed until the upload method is called.
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 197 def io_for_transfer=(object_io_or_file_path) # Supports IO-wrapping objects we can convert to an IO. An example is Rails' # ActionDispatch::Http::UploadedFile, which wraps an IO (a Tempfile) but # doesn't expose all the IO operations directly (e.g. you can't write to it, it's not seekable) # # This should be safe to use with IO and its subclasses as well as to_io is a method on IO: # http://ruby-doc.org/core-2.3.1/IO.html#method-i-to_io and returns itself if called on # an IO @manifest[:object_io_or_file_path] = object_io_or_file_path.to_io if object_io_or_file_path.respond_to?(:to_io) @manifest[:object_io_or_file_path] = object_io_or_file_path unless object_io_or_file_path.respond_to?(:to_io) return if stdin?(object_io_or_file_path) opened_file = false if object_io_or_file_path.is_a?(String) object_io = File.open(object_io_or_file_path, 'rb') opened_file = true end object_io = object_io_or_file_path if object_io_or_file_path.respond_to?(:seek) total_size = object_io.size if object_io.respond_to?(:size) total_size = object_io.stat.size unless object_io.respond_to?(:size) part_size_to_use = @multipart_part_size if file_based_io?(object_io_or_file_path) part_size_to_use = @non_file_io_multipart_part_size unless file_based_io?(object_io_or_file_path) offset = 0 part_number = 1 while offset < total_size part_info = { offset: offset, part_size: calculate_part_size(total_size, offset, part_size_to_use), part_number: part_number, part_md5_hash: nil } @manifest[:parts].push(part_info) offset += part_size_to_use part_number += 1 end nil ensure object_io.close if opened_file && object_io end |
#new_upload ⇒ Response
Initializes a new multipart upload.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 142 def new_upload raise 'This MultipartObjectAssembler has already been initialized with an upload' if @manifest[:upload_id] create_multipart_upload_opts = {} create_multipart_upload_opts[:if_match] = @multipart_upload_opts[:if_match] if @multipart_upload_opts[:if_match] create_multipart_upload_opts[:if_none_match] = @multipart_upload_opts[:if_none_match] \ if @multipart_upload_opts[:if_none_match] create_multipart_upload_opts[:opc_client_request_id] = @multipart_upload_opts[:opc_client_request_id] \ if @multipart_upload_opts[:opc_client_request_id] create_multipart_upload_details = OCI::ObjectStorage::Models::CreateMultipartUploadDetails.new(object: @object_name) create_multipart_upload_details.content_type = @multipart_upload_opts[:content_type] \ if @multipart_upload_opts[:content_type] create_multipart_upload_details.content_language = @multipart_upload_opts[:content_language] \ if @multipart_upload_opts[:content_language] create_multipart_upload_details.content_encoding = @multipart_upload_opts[:content_encoding] \ if @multipart_upload_opts[:content_encoding] create_multipart_upload_details.storage_tier = @multipart_upload_opts[:storage_tier] \ if @multipart_upload_opts[:storage_tier] if @multipart_upload_opts[:metadata] && !@multipart_upload_opts[:metadata].empty? = {} @multipart_upload_opts[:metadata].each do |key, value| [key] = value if key.to_s.start_with?('opc-meta-') ["opc-meta-#{key}"] = value unless key.to_s.start_with?('opc-meta-') end create_multipart_upload_details. = end response = do @object_storage_client.create_multipart_upload( @namespace, @bucket_name, create_multipart_upload_details, create_multipart_upload_opts ) end @manifest[:upload_id] = response.data.upload_id response end |
#resume(upload_id) ⇒ Array
Resume uploading a multipart object to Object Storage. This assumes that the assembler already has knowledge of the parts which it could potentially need to upload (i.e. they have been prepared via the set_io_for_transfer method).
Prior to resuming the upload, we'll attempt to reconcile with Object Storage any previously uploaded parts so that they are not uploaded again
one element per error which occurred.
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 296 def resume(upload_id) raise 'An upload ID must be provided' if upload_id.nil? raise 'Parts must be initialized prior to resuming' if @manifest[:parts].length.zero? manifest[:upload_id] = upload_id upload_parts = list_uploaded_parts(upload_id) known_parts = @manifest[:parts].to_a raise 'There are more parts on the server than parts to resume, please check the upload ID.' \ if upload_parts.length > known_parts.length upload_parts.each do |up| index = up.part_number - 1 manifest_part = known_parts[index] if manifest_part[:part_size] != up.size raise 'Cannot resume upload with different part size. ' \ + "Parts were uploaded with a part size of #{up.size / OCI::ObjectStorage::Transfer::MEBIBYTE} MiB" end manifest_part[:etag] = up.etag manifest_part[:opc_md5] = up.md5 end upload end |
#upload ⇒ Array
Performs a multipart upload
one element per error which occurred.
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/oci/object_storage/transfer/multipart/multipart_object_assembler.rb', line 246 def upload all_parts = @manifest[:parts].to_a pending_parts = OCI::ObjectStorage::Transfer::Multipart::Internal::MultipartUploadPartsCollection.new( all_parts.select { |ap| ap[:opc_md5].nil? && ap[:etag].nil? } ) unless file_based_io?(@manifest[:object_io_or_file_path]) || stdin?(@manifest[:object_io_or_file_path]) seekable_io_wrapper = OCI::ObjectStorage::Transfer::Multipart::Internal::SeekableNonFilePartIOWrapper.new( source: @manifest[:object_io_or_file_path] ) end if stdin?(@manifest[:object_io_or_file_path]) stdin_io_wrapper = OCI::ObjectStorage::Transfer::Multipart::Internal::StdinPartIOWrapper.new( source: @manifest[:object_io_or_file_path] ) end threads = [] @parallel_process_count.times do thread = Thread.new do begin upload_non_stdin(pending_parts, seekable_io_wrapper) unless stdin?(@manifest[:object_io_or_file_path]) upload_stdin(stdin_io_wrapper) if stdin?(@manifest[:object_io_or_file_path]) nil rescue => e pending_parts.clear! # Stop any futher processing on error e end end thread.abort_on_exception = true # Do not continue if we encounter exceptions threads << thread end threads.map(&:value).compact end |