Skip to content

Kinesis module

Index > Kinesis

Auto-generated documentation for Kinesis type annotations stubs module types-aiobotocore-kinesis.

How to install

From PyPI with pip

Install types-aioboto3 for Kinesis service.

# install with aioboto3 type annotations
python -m pip install 'types-aioboto3[kinesis]'


# Lite version does not provide session.client/resource overloads
# it is more RAM-friendly, but requires explicit type annotations
python -m pip install 'types-aioboto3-lite[kinesis]'


# standalone installation
python -m pip install types-aiobotocore-kinesis

How to uninstall

python -m pip uninstall -y types-aiobotocore-kinesis

Usage

Code samples can be found in Examples.

KinesisClient

Type annotations and code completion for session.client("kinesis") as KinesisClient boto3 documentation

Usage example
from aioboto3.session import Session

from types_aiobotocore_kinesis.client import KinesisClient


session = Session()
async with session.client("kinesis") as client:
    client: KinesisClient

Paginators

Type annotations and code completion for paginators from session.client("kinesis").get_paginator("...").

Usage example
from types_aiobotocore_kinesis.paginator import DescribeStreamPaginator

def get_describe_stream_paginator() -> DescribeStreamPaginator:
    return client.get_paginator("describe_stream"))

Waiters

Type annotations and code completion for waiters from session.client("kinesis").get_waiter("...").

Usage example
from types_aiobotocore_kinesis.waiter import StreamExistsWaiter

def get_stream_exists_waiter() -> StreamExistsWaiter:
    return Session().client("kinesis").get_waiter("stream_exists")

Literals

Type annotations for literals used in methods and schema.

Usage example
from types_aiobotocore_kinesis.literals import ConsumerStatusType

def get_value() -> ConsumerStatusType:
    return "ACTIVE"

Typed dictionaries

Type annotations for typed dictionaries used in methods and schema.

Usage example
from types_aiobotocore_kinesis.type_defs import AddTagsToStreamInputRequestTypeDef

def get_value() -> AddTagsToStreamInputRequestTypeDef:
    return {
        "StreamName": ...,
        "Tags": ...,
    }