اتصال به فضای ابری با زبان برنامه‌نویسی python

شروع به کار پارتیشن S3 RGW #

اطلاعات دسترسی پارتیشن S3 #

پس از ساختن پارتیشن S3 و دریافت ایمیل اطلاع رسانی وارد داشبورد پشتیبان شده و در صفحه خانه داشبورد در وسط صفحه به سربرگ پارتیشن S3 بروید و روی منوی سه نقطه سمت چپ کارت پارتیشن کلیک کنید تا وارد بخش تنظیمات پارتیشن شوید. در سربرگ اطلاع دسترسی و سربرگ آدرس ها اطلاع دسترسی پارتیشن s3 خود را مشاهده خواهید کرد.

این مشخصات برای اتصال به آبجکت استوریج در اختیار شما قرار خواهد داشت.

کلید دسترسی (Access Key ID)

کلید خصوصی (Secret Access Key)

اندپوینت S3 (یک نام دامنه تایید شده برای فضای ابری شما یا FQDM)

نام  Bucket (که پس از ساخت پارتیشن به هر تعداد که نیازداشته باشید در پارتیشن ایجاد می‌کنید)

کلید دسترسی و کلید خصوصی مانند نام کاربری و رمز عبور پارتیشن S3 هستند و باید به خوبی از آنها محافظت کنید و در اختیار دیگران قرار ندهید.

مستندات aws-sdk-python #

برای دسترسی به مستندات کامل و جزییات بیشتر  به مستندات و گیت‌هاب این کتابخانه مراجعه کنید.

پیش نیازها و نصب boto3  #

کتابخانه boto3 هستید. می‌توانید با استفاده از pip این کتابخانه را نصب کنید:

pip install boto3

یا نصب با pip3

pip3 install boto3

نمونه کد pyhton برای اتصال به فضای ابری #

import boto3
import logging 

logging.basicConfig(level=logging.INFO)

try:
    s3_resource = boto3.resource(
        's3',
        endpoint_url='endpoint_url',
        aws_access_key_id='access_key',
        aws_secret_access_key='secret_key'
    )
except Exception as exc:
    logging.info(exc)


ساخت باکت عمومی (public) و باکت خصوصی (private) #

import boto3
import logging
from botocore.exceptions import ClientError


logging.basicConfig(level=logging.INFO)

try:
    s3_resource = boto3.resource(
        's3',
        endpoint_url='endpoint_url',
        aws_access_key_id='access_key',
        aws_secret_access_key='secret_key'
    )
except Exception as exc:
    logging.error(exc)
else:
    try:
        bucket_name = 'sample-bucket_name'
        bucket = s3_resource.Bucket(bucket_name)
        bucket.create(ACL='public-read') # ACL='private'|'public-read'
    except ClientError as exc:
        logging.error(exc)

حذف باکت #

import boto3
import logging 
from botocore.exceptions import ClientError


logging.basicConfig(level=logging.INFO)

try:
    s3_resource = boto3.resource(
        's3',
        endpoint_url='endpoint_url',
        aws_access_key_id='access_key',
        aws_secret_access_key='secret_key'
    )
except Exception as exc:
    logging.error(exc)
else:
    try:
        bucket_name = 'sample-bucket_name'
        bucket = s3_resource.Bucket(bucket_name)
        bucket.delete()
    except ClientError as exc:
        logging.error(exc)

دریافت محتوای باکت #

import boto3
import logging
from botocore.exceptions import ClientError


logging.basicConfig(level=logging.INFO)

try:
    s3_client = boto3.client(
        's3',
        endpoint_url='endpoint_url',
        aws_access_key_id='access_key',
        aws_secret_access_key='secret_key'
    )
except Exception as exc:
    logging.error(exc)
else:
    try:
        response = s3_client.head_bucket(Bucket="my_bucket_name")
    except ClientError as err:
        status = err.response["ResponseMetadata"]["HTTPStatusCode"]
        errcode = err.response["Error"]["Code"]

        if status == 404:
            logging.warning("Missing object, %s", errcode)
        elif status == 403:
            logging.error("Access denied, %s", errcode)
        else:
            logging.exception("Error in request, %s", errcode)
    else:
        print(response)

دریافت لیست باکت‌ها #

import boto3
import logging 
from botocore.exceptions import ClientError


logging.basicConfig(level=logging.INFO)

try:
    s3_resource = boto3.resource(
        's3',
        endpoint_url='endpoint_url',
        aws_access_key_id='access_key',
        aws_secret_access_key='secret_key'
    )
except Exception as exc:
    logging.error(exc)
else:
    try:
        for bucket in s3_resource.buckets.all():
            logging.info(f'bucket_name: {bucket.name}')
    except ClientError as exc:
        logging.error(exc)

دریافت bucket policy #

import boto3
import logging
from botocore.exceptions import ClientError

# Configure logging
logging.basicConfig(level=logging.INFO)

try:
    s3_resource = boto3.resource(
        's3',
        endpoint_url='endpoint_url',
        aws_access_key_id='access_key',
        aws_secret_access_key='secret_key'
    )

except Exception as exc:
    logging.error(exc)
else:
    try:
        bucket_name = 'sample_bucket'
        bucket_policy = s3_resource.BucketPolicy(bucket_name)
        bucket_policy.load()
        logging.info(bucket_policy.policy)
    except ClientError as e:
        logging.error(e)

تنظیم پالیسی بر روی باکت با استفاده از کد python #

راهنمای پالیسی ها را اینجا مشاهده کنید. در این نمونه کد پالیسی read-only برای باکت تنظیم شده است:

import boto3
import logging
import json
from botocore.exceptions import ClientError

# Configure logging
logging.basicConfig(level=logging.INFO)

try:
    s3_resource = boto3.resource(
        's3',
        endpoint_url='endpoint_url',
        aws_access_key_id='access_key',
        aws_secret_access_key='secret_key'
    )

except Exception as exc:
    logging.error(exc)
else:
    try:
        bucket_name = 'sample_bucket'
        bucket_policy = s3_resource.BucketPolicy(bucket_name)

        policy = {
  "Version": "2008-10-17",
  "Statement": [
    {
      "Sid": "AllowPublicRead",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "s3:GetObject"
      ],
      "Resource": [
        "arn:aws:s3:::my-brand-new-bucket/*"
      ]
    }
  ]
}

        # Convert the policy from JSON dict to string
        policy = json.dumps(policy)
        bucket_policy.put(Policy=policy)
        logging.info(bucket_policy.policy)

    except ClientError as e:
        logging.error(e)

حذف bucket policy #

import boto3
import logging
from botocore.exceptions import ClientError

# Configure logging
logging.basicConfig(level=logging.INFO)

try:
    s3_resource = boto3.resource(
        's3',
        endpoint_url='endpoint_url',
        aws_access_key_id='access_key',
        aws_secret_access_key='secret_key'
    )

except Exception as exc:
    logging.error(exc)
else:
    try:
        bucket_name = 'sample_bucket'
        bucket_policy = s3_resource.BucketPolicy(bucket_name)
        bucket_policy.delete()
        logging.info(bucket_policy.policy)
    except ClientError as e:
        logging.error(e)

آپلود فایل در فضای ابری با python #

import boto3
import logging
from botocore.exceptions import ClientError

# Configure logging
logging.basicConfig(level=logging.INFO)

try:
    s3_resource = boto3.resource(
        's3',
        endpoint_url='endpoint_url',
        aws_access_key_id='access_key',
        aws_secret_access_key='secret_key'
    )

except Exception as exc:
    logging.error(exc)
else:
    try:
        bucket = s3_resource.Bucket('bucket-name')
        file_path = 'the/abs/path/to/file.txt'
        object_name = 'file.txt'

        with open(file_path, "rb") as file:
            bucket.put_object(
                ACL='private',
                Body=file,
                Key=object_name
            )
    except ClientError as e:
        logging.error(e)

آپلود multipart برای فایل‌های با حجم بالا با زبان python #

import json
import logging
import os
import pathlib
import sys
import threading
from typing import Optional

import boto3
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError

# Constant variables
KB = 1024
MB = KB * KB
GB = MB * KB

# Configure logging
logging.basicConfig(level=logging.INFO)

# S3 client instance
s3_client = boto3.client(
    's3',
    endpoint_url='endpoint_url',
    aws_access_key_id='access_key',
    aws_secret_access_key='secret_key]
)

class ProgressPercentage:
    def __init__(self, file_path: str):
        self._file_path = file_path
        self._size = float(os.path.getsize(file_path))
        self._seen_so_far = 0
        self._lock = threading.Lock()

    def __call__(self, bytes_amount):
        """
        To simplify, assume this is hooked up to a single file_path

        :param bytes_amount: uploaded bytes
        """
        with self._lock:
            self._seen_so_far += bytes_amount
            percentage = (self._seen_so_far / self._size) * 100
            sys.stdout.write(
                "\r%s  %s / %s  (%.2f%%)" % (self._file_path, self._seen_so_far, self._size, percentage)
            )
            sys.stdout.flush()


def upload_file(file_path: str, bucket: str, object_name: Optional[str] = None):
    """
    Upload a file to an S3 bucket

    :param file_path: File to upload
    :param bucket: Bucket to upload to
    :param object_name: S3 object name. If not specified then file_path is used
    :return: True if file was uploaded, else False
    """
    # If S3 object_name was not specified, use file_path
    if object_name is None:
        object_name = file_path

    # Upload the file
    try:
        # Set the desired multipart threshold value (400 MB)
        config = TransferConfig(multipart_threshold=400 * MB, max_concurrency=5)
        s3_client.upload_file(
            file_path,
            bucket,
            object_name,
            ExtraArgs={'ACL': 'public-read'},
            Callback=ProgressPercentage(file_path),
            Config=config
        )
    except ClientError as e:
        logging.error(e)
        return False

    return True


# file
object_name = 'file.png'
file_rel_path: str = os.path.join('files', object_name)
file_abs_path: str = os.path.join(base_directory, file_rel_path)

upload_file(file_abs_path, 'sample_bucket', object_name)

لغو آپلود multipart #

ابتدا لازم است لیست پلود ها را در اختیار داشته باشید هنگام آپلود multipart یک کلید ‍UploadId در جواب به کلاینت داده میشود ، در صورتی که این کلید را ندارید می توانید توسط کد زیر ابتدا لیست آپلودها را دریافت کنید و سپس بارگذاری مورد نظر که به اتمام نرسیده است را لغو کنید.

import logging
import boto3

# Configure logging
logging.basicConfig(level=logging.INFO)

# S3 client instance
s3_client = boto3.client(
    's3',
    endpoint_url='endpoint_url',
    aws_access_key_id='access_key',
    aws_secret_access_key='secret_key]
)

multiple_uploads_list = s3_client.list_multipart_uploads(Bucket='Your_Bucket_name')

uploads = multiple_uploads_list.get('Uploads')

if not uploads:
    # there is no multiple upload inprogress
    logging.info('no un-finished multiple uploads found')
    return

# prints out each unfinished mulipart upload
for upload in uploads:
    logging.info(f'UploadId: {upload['UploadId']}, Key: {upload['Key']}')

حالا با استفاده از کد زیر آپلود multipart را لغو کنید:

import logging
import boto3

# Configure logging
logging.basicConfig(level=logging.INFO)

# S3 client instance
s3_client = boto3.client(
    's3',
    endpoint_url='endpoint_url',
    aws_access_key_id='access_key',
    aws_secret_access_key='secret_key]
)

s3_client.abort_multipart_upload(
    Bucket='Your_Bucket_Name', 
    Key='Key_In_Previous_Code_Example', 
    UploadId='UploadId_In_previous_Code_Example'
)

دریافت لیست آپلود‌های multipart #

import boto3
import logging
from botocore.exceptions import ClientError


logging.basicConfig(level=logging.INFO)

try:
    s3_client = boto3.client(
        's3',
        endpoint_url='<ENDPOINT>',
        aws_access_key_id='<ACCESS-KEY>',
        aws_secret_access_key='<SECRET-KEY>'
    )
except Exception as exc:
    logging.error(exc)
else:
    try:
        response = s3_client.list_multipart_uploads(
            Bucket='<BUCKET_NAME>',
            Delimiter='string',
            EncodingType='url',
            KeyMarker='string',
            MaxUploads=123,
            Prefix='string',
            UploadIdMarker='string',
            ExpectedBucketOwner='string'
        )
        logging.info(response)
    except ClientError as exc:
        logging.error(exc)


رای دسترسی به مستندات کامل و جزییات بیشتر  به مستندات و گیت‌هاب این کتابخانه مراجعه کنید.