The simple module for putting and getting object from Amazon S3 compatible endpoints
pip install aiohttp-s3-clientfrom http import HTTPStatus
from aiohttp import ClientSession
from aiohttp_s3_client import S3Client
async with ClientSession(raise_for_status=True) as session:
client = S3Client(
url="http://s3-url",
session=session,
access_key_id="key-id",
secret_access_key="hackme",
region="us-east-1"
)
# Upload str object to bucket "bucket" and key "str"
async with client.put("bucket/str", "hello, world") as resp:
assert resp.status == HTTPStatus.OK
# Upload bytes object to bucket "bucket" and key "bytes"
async with await client.put("bucket/bytes", b"hello, world") as resp:
assert resp.status == HTTPStatus.OK
# Upload AsyncIterable to bucket "bucket" and key "iterable"
async def gen():
yield b'some bytes'
async with client.put("bucket/file", gen()) as resp:
assert resp.status == HTTPStatus.OK
# Upload file to bucket "bucket" and key "file"
async with client.put_file("bucket/file", "/path_to_file") as resp:
assert resp.status == HTTPStatus.OK
# Check object exists using bucket+key
async with client.head("bucket/key") as resp:
assert resp == HTTPStatus.OK
# Get object by bucket+key
async with client.get("bucket/key") as resp:
data = await resp.read()
# Make presigned URL
url = client.presign_url("GET", "bucket/key", expires=60 * 60)
# Delete object using bucket+key
async with client.delete("bucket/key") as resp:
assert resp == HTTPStatus.NO_CONTENT
# List objects by prefix
async for result, prefixes in client.list_objects_v2("bucket/", prefix="prefix"):
# Each result is a list of metadata objects representing an object
# stored in the bucket. Each prefixes is a list of common prefixes
do_work(result, prefixes)Bucket may be specified as subdomain or in object name:
import aiohttp
from aiohttp_s3_client import S3Client
client = S3Client(url="http://bucket.your-s3-host",
session=aiohttp.ClientSession())
async with client.put("key", gen()) as resp:
...
client = S3Client(url="http://your-s3-host",
session=aiohttp.ClientSession())
async with await client.put("bucket/key", gen()) as resp:
...
client = S3Client(url="http://your-s3-host/bucket",
session=aiohttp.ClientSession())
async with client.put("key", gen()) as resp:
...Auth may be specified with keywords or in URL:
import aiohttp
from aiohttp_s3_client import S3Client
client_credentials_as_kw = S3Client(
url="http://your-s3-host",
access_key_id="key_id",
secret_access_key="access_key",
session=aiohttp.ClientSession(),
)
client_credentials_in_url = S3Client(
url="http://key_id:access_key@your-s3-host",
session=aiohttp.ClientSession(),
)By default S3Client trying to collect all available credentials from keyword
arguments like access_key_id= and secret_access_key=, after that from the
username and password from passed url argument, so the next step is environment
variables parsing and the last source for collection is the config file.
You can pass credentials explicitly using aiohttp_s3_client.credentials
module.
import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import StaticCredentials
credentials = StaticCredentials(
access_key_id='aaaa',
secret_access_key='bbbb',
region='us-east-1',
)
client = S3Client(
url="http://your-s3-host",
session=aiohttp.ClientSession(),
credentials=credentials,
)import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import URLCredentials
url = "http://key@hack-me:your-s3-host"
credentials = URLCredentials(url, region="us-east-1")
client = S3Client(
url="http://your-s3-host",
session=aiohttp.ClientSession(),
credentials=credentials,
)import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import EnvironmentCredentials
credentials = EnvironmentCredentials(region="us-east-1")
client = S3Client(
url="http://your-s3-host",
session=aiohttp.ClientSession(),
credentials=credentials,
)Using user config file:
import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import ConfigCredentials
credentials = ConfigCredentials() # Will be used ~/.aws/credentials config
client = S3Client(
url="http://your-s3-host",
session=aiohttp.ClientSession(),
credentials=credentials,
)Using the custom config location:
import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import ConfigCredentials
credentials = ConfigCredentials("~/.my-custom-aws-credentials")
client = S3Client(
url="http://your-s3-host",
session=aiohttp.ClientSession(),
credentials=credentials,
)This function collect all passed credentials instances and return a new one which contains all non-blank fields from passed instances. The first argument has more priority.
import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import (
ConfigCredentials, EnvironmentCredentials, merge_credentials
)
credentials = merge_credentials(
EnvironmentCredentials(),
ConfigCredentials(),
)
client = S3Client(
url="http://your-s3-host",
session=aiohttp.ClientSession(),
credentials=credentials,
)Trying to get credentials from the metadata service:
import aiohttp
from aiohttp_s3_client import S3Client
from aiohttp_s3_client.credentials import MetadataCredentials
credentials = MetadataCredentials()
# start refresh credentials from metadata server
await credentials.start()
client = S3Client(
url="http://your-s3-host",
session=aiohttp.ClientSession(),
)
await credentials.stop()For uploading large files multipart uploading can be used. It allows you to asynchronously upload multiple parts of a file to S3. S3Client handles retries of part uploads and calculates part hash for integrity checks.
import aiohttp
from aiohttp_s3_client import S3Client
client = S3Client(url="http://your-s3-host", session=aiohttp.ClientSession())
await client.put_file_multipart(
"test/bigfile.csv",
headers={
"Content-Type": "text/csv",
},
workers_count=8,
)S3 supports GET requests with Range header. It's possible to download
objects in parallel with multiple connections for speedup.
S3Client handles retries of partial requests and makes sure that file won't
be changed during download with ETag header.
If your system supports pwrite syscall (Linux, macOS, etc.) it will be used to
write simultaneously to a single file. Otherwise, each worker will have own file
which will be concatenated after downloading.
import aiohttp
from aiohttp_s3_client import S3Client
client = S3Client(url="http://your-s3-host", session=aiohttp.ClientSession())
await client.get_file_parallel(
"dump/bigfile.csv",
"/home/user/bigfile.csv",
workers_count=8,
)You can also manually control multipart upload process using multipart_upload method.
It returns an async context manager which handles upload creation and completion.
This method gives you more control over the upload process, for example you can
specify part size, add custom metadata, or control concurrency.
- Minimum part size: 5 MiB (
5 * 1024 * 1024bytes). Every part must be at least 5 MiB in size, except for the final part. - Maximum number of parts:
10,000. The total number of uploaded parts must be <=10,000. - Choosing a part size: pick a part size that satisfies both constraints.
A safe formula when you know the total object size is:
part_size = max(5 * 1024 * 1024, math.ceil(total_size / 10000))
- If you don't know the total size in advance, choose a conservative part size (for example 8 MiB or 16 MiB) so you are unlikely to exceed 10,000 parts.
- The uploader implements retries for failed part uploads; you should still ensure parts (except the last) meet the 5 MiB minimum before uploading.
The put_part method returns a coroutine — calling put_part(...) does not
perform the network upload immediately, it registers the part (and its part
number) and returns a coroutine which performs the actual upload when awaited.
This lets you schedule uploads and then await them concurrently.
Note: the coroutine returned by put_part(...) performs the actual network
upload when awaited and the uploader will automatically retry failed part
uploads according to its retry policy; awaiting the coroutine will run those
retries for that part. You don't need to retry manually when using the
returned coroutine — the uploader handles integrity checks and retrying.
- You MUST call
put_part(...)in the logical part sequence so parts get the correct part numbers (the uploader assigns part numbers in call order). - You MAY await the returned coroutines later and in any concurrency pattern you
like (for example with
asyncio.gather), which enables concurrent part uploads.
Create parts then upload them concurrently:
import hashlib
import aiohttp
from aiohttp_s3_client import S3Client
client = S3Client(url="http://your-s3-host", session=aiohttp.ClientSession())
async with client.multipart_upload("test/video.mov") as uploader:
uploads = []
# Call put_part in the correct part sequence and collect coroutines.
# The uploader assigns part numbers in the order put_part is called.
for chunk in chunks:
uploads.append(
uploader.put_part(
chunk,
content_sha256=hashlib.sha256(chunk).hexdigest(),
)
)
# Now execute all part uploads concurrently. The uploader will handle
# retries and integrity checks for each part.
await asyncio.gather(*uploads)