Direct IO#

CTERA Direct IO is integrated into the CTERA Python SDK to enable high-speed, programmatic access to files within the CTERA Global Namespace. By leveraging CTERA Direct IO, applications can retrieve file metadata from the CTERA Portal while directly accessing data blocks from the underlying object storage, whether on-premises or in the cloud. This approach ensures efficient, concurrent, and secure retrieval, making it ideal for high-performance data pipelines.

The CTERA SDK provides two key extensions for CTERA Direct IO:

Blocks API: Enables concurrent retrieval of all blocks (chunks) that make up a file. It returns data payloads alongside metadata such as block offset and length, with no guaranteed order. This API is optimized for bulk data processing workflows.

Streamer API: Supports prioritized, sequential retrieval of file content, ensuring efficient access from the beginning to the end of a file. It also allows for byte-range retrieval by specifying start and end offsets, making it ideal for real-time streaming and partial file access scenarios.

Prerequisites#

Credentials#

The cterasdk.direct.client.Client object is instantiated using the CTERA Portal URL, an access key, and a secret key.

The API keys must be linked to a Team Portal user with the Read Write Administrator role. For key generation via the CTERA Portal administrator interface, see: Setting Up API Keys For key generation via the end-user interface, refer to: Creating Access Key IDs and Secret Access Keys

Network#

Using CTERA Direct IO requires connectivity to both the CTERA Portal and the Object Storage over HTTPS (TCP 443).

Getting Started#

In this example, a file is downloaded using its unique ID (e.g., 12345) and written to disk as example.pdf. For more information on how to obtain the File ID, name, refer to the Event object returned by the Notification Service.

import aiofiles
from cterasdk import ctera_direct

url = 'https://tenant.ctera.com'
access_key_id = 'your-access-key-id'
secret_access_key = 'your-secret-key'

name = 'example.pdf'
file_id = 12345  # unique identifier of a file

async with aiofiles.open(name, 'wb') as f:
    async with ctera_direct.client.DirectIO(url, access_key_id, secret_access_key) as client:
        futures = await client.blocks(file_id)
        for future in asyncio.as_completed(futures):
            block = await future
            await f.seek(block.offset)
            await f.write(block.data)

TLS#

During testing, you may need to disable TLS verification if the Portal or Object Storage certificate is not trusted by your host client.

import cterasdk.settings

cterasdk.settings.sessions.ctera_direct.api.ssl = False  # disable CTERA Portal TLS verification
cterasdk.settings.sessions.ctera_direct.storage.ssl = False  # disable Object Storage TLS verification

Blocks API#

async Client.blocks(file_id, blocks, max_workers)

Blocks API.

Parameters:
  • file_id (int) – File ID.

  • blocks (list[cterasdk.direct.exceptions.BlockInfo]) – List of BlockInfo objects, or list of integers identifying the block position.

  • max_workers (int) – Max concurrent tasks. A task will be dispatched for each block if no limited was specified.

Returns:

List of Blocks.

Return type:

list[cterasdk.direct.types.Block]

import asyncio
import aiofiles
import cterasdk.settings
from cterasdk import ctera_direct

async def download(file_id, name):
    url = 'https://tenant.ctera.com'
    access_key_id = 'your-access-key-id'
    secret_access_key = 'your-secret-key'

    async with aiofiles.open(name, 'wb') as f:
        async with ctera_direct.client.DirectIO(url, access_key_id, secret_access_key) as client:
            futures = await client.blocks(file_id)
            for future in asyncio.as_completed(futures):
                block = await future
                await f.seek(block.offset)
                await f.write(block.data)

if __name__ == '__main__':
    cterasdk.settings.sessions.ctera_direct.api.ssl = False
    cterasdk.settings.sessions.ctera_direct.storage.ssl = False

    file_id = 12345

    loop = asyncio.get_event_loop()
    loop.run_until_complete(download(12345, 'example.pdf'))

Streamer API#

async Client.streamer(file_id, byte_range)

Stream API.

Parameters:
Returns:

Streamer Object

Return type:

cterasdk.direct.stream.Streamer

import logging
import asyncio

import cterasdk.settings
from cterasdk import ctera_direct


logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger('app')


async def start_stream(file_id, offset):

    url = 'https://tenant.ctera.com'
    access_key_id = 'your-access-key'
    secret_access_key = 'your-secret-key'

    async with ctera_direct.client.DirectIO(url, access_key_id, secret_access_key) as client:
        streamer = await client.streamer(file_id, byte_range=ctera_direct.types.ByteRange(offset))

        logger.info('Starting Stream. Offset: %s.', offset or 0)
        async for block in streamer.start():
            await handle_block(block)
        logger.info('Ending Stream.')


async def handle_block(block):
    logger.info('Playing video. Offset: %s, Size: %s.', block.offset, block.length)


async def handle_error(error):
    logger.error('Streaming error for file: %s. Retrying in 5 seconds...', error.filename)
    await asyncio.sleep(5)


async def stream(file_id, offset=None):
    success = False
    while not success:
        try:
            await start_stream(file_id, offset)
            success = True
        except ctera_direct.exceptions.StreamError as error:
            await handle_error(error)
            offset = error.offset  # Try to play from where stream was interrupted.


if __name__ == '__main__':
    cterasdk.settings.sessions.ctera_direct.api.ssl = False
    cterasdk.settings.sessions.ctera_direct.storage.ssl = False

    file_id = 12345

    loop = asyncio.get_event_loop()
    loop.run_until_complete(stream(file_id))

Exceptions#

class cterasdk.direct.exceptions.DirectIOError

Bases: OSError

Base Exception for Direct IO Errors

class cterasdk.direct.exceptions.DirectIOAPIError(error, strerror, filename)

Bases: DirectIOError

Direct IO API Error

class cterasdk.direct.exceptions.BlockError(error, strerror, file_id, chunk)

Bases: DirectIOError

Direct IO Block Error

Variables:

block (cterasdk.direct.exceptions.BlockInfo) – Block info

class cterasdk.direct.exceptions.StreamError(filename, offset)

Bases: DirectIOError

Stream Error

Variables:

offset (int) – Stream offset

Exceptions Hierarchy#

  • IOError
    • DirectIOError
      • DirectIOAPIError
        • NotFoundError

        • UnAuthorized

        • UnprocessableContent

        • BlocksNotFoundError

        • BlockListConnectionError

        • BlockListTimeout

      • DecryptKeyError

      • BlockError
        • DownloadError

        • DownloadTimeout

        • DownloadConnectionError

        • DecryptBlockError

        • DecompressBlockError

        • BlockValidationException

      • StreamError