Kinesis
Abstracted Boto3 Methods:
put_records()
put_record()
Usage
Batch Put items to a Kinesis stream.
Initialise the client by providing it with the name of the Kinesis Stream.
Submit the records which should be sent.
Finally, flush any remaining, unsent payloads.
from boto3_batch_utils import KinesisBatchDispatcher
kn = KinesisBatchDispatcher('MyExampleStreamName')
kn.submit_payload({'something': 'in', 'my': 'message'})
kn.submit_payload({'tells': 'me', 'this': 'is', 'easy': True})
unprocessed_records = kn.flush_payloads()
Advanced Usage
Using the basic features of the Kinesis client will suit most use cases. However, in some scenarios you may need additional control over how the data is transmitted to Kinesis. The Kinesis client allows configuration of the following behaviour:
Batch Size
For information about batch sizes click here.
The Kinesis client has the following maximum batch limitations:
Limit Type | Limit |
---|---|
Number of Records | 500 |
Byte size of a single record | 1,000,000 bytes |
Byte size of a batch | 5,000,000 bytes |
Partition Keys
A Kinesis stream is made up of 1 or more Shards. For more information about Shards, refer to the Kinesis docs here. By default Boto3 Batch Utils will evenly distribute records across all Shards in a Stream. It does this by assigning a random unique id to each record, which is used as that records Partition Key. If your Kinesis Stream has multiple Shard purely as a means of achieving greater scale, the default will work very nicely.
However, you may require more control over the allocation of a record to a given Shard. This can be done by telling the client which part of a record to use as the Partition Key value.
Example
In this example, records can have varying content, keys and values. However, all records will have a unique id. In this case I wish to use this id as the Partition Key. The id is located against the ‘Id’ key in the record:
record = {'id': 'a4f315f8ef6b40d9bfdc3837fa6c0d64', 'type': 'furniture', 'name': 'table', 'legs': {'count': 4,
'material': 'wood'}}
Note: The attribute to be used as the Partition Key must be at the top level of the record’s hierarchy.
In order to configure the client, pass in the optional partition_key_identifier
argument. Provide the key name of the
key to be used:
kn_client = KinesisBatchDispatcher('MyExampleStreamName', partition_key_identifier='id')
With this configuration the example record would be dispatched to Kinesis with in the following payload structure:
{
'Data': {'id': 'a4f315f8ef6b40d9bfdc3837fa6c0d64', 'type': 'furniture', 'name': 'table', 'legs': {'count': 4,
'material': 'wood'}},
'PartitionKey': 'a4f315f8ef6b40d9bfdc3837fa6c0d64'
}
Uniqueness
When a record is submitted to the Kinesis client using submit_payload
it is NOT checked for uniqueness. When sending
data to Kinesis Stream, it is perfectly acceptable to send duplicate messages. In fact this may be desired behaviour.