Skip to content

Commit 3ce6dac

Browse files
authored
Merge pull request #1980 from grafana/aws/kinesis-documentation
add KinesisClient documentation for k6-jslib-aws
2 parents cd1309f + 47ca240 commit 3ce6dac

File tree

16 files changed

+1536
-0
lines changed

16 files changed

+1536
-0
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
---
2+
title: 'KinesisClient'
3+
description: 'KinesisClient allows interacting with AWS Kinesis streams'
4+
weight: 00
5+
---
6+
7+
# KinesisClient
8+
9+
{{< docs/shared source="k6" lookup="blocking-aws-blockquote.md" version="<K6_VERSION>" >}}
10+
11+
`KinesisClient` interacts with the AWS Kinesis service.
12+
13+
With it, you can perform operations such as creating streams, putting records, listing streams, and reading records from streams. For a full list of supported operations, see [Methods](#methods).
14+
15+
Both the dedicated `kinesis.js` jslib bundle and the all-encompassing `aws.js` bundle include the `KinesisClient`.
16+
17+
### Methods
18+
19+
| Function | Description |
20+
| :---------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :----------------------------------------------------- |
21+
| [createStream(streamName, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/createstream) | Create a new Kinesis stream |
22+
| [deleteStream(streamName)](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/deletestream) | Delete a Kinesis stream |
23+
| [listStreams([options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/liststreams) | List available Kinesis streams |
24+
| [putRecords(streamName, records)](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/putrecords) | Put multiple records into a Kinesis stream |
25+
| [getRecords(shardIterator, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/getrecords) | Get records from a Kinesis stream shard |
26+
| [listShards(streamName, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/listshards) | List shards in a Kinesis stream |
27+
| [getShardIterator(streamName, shardId, shardIteratorType, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/getsharditerator) | Get a shard iterator for reading records from a stream |
28+
29+
### Throws
30+
31+
KinesisClient methods will throw errors in case of failure.
32+
33+
| Error | Condition |
34+
| :-------------------- | :--------------------------------------------------------- |
35+
| InvalidSignatureError | When invalid credentials are provided. |
36+
| KinesisServiceError | When AWS replies to the requested operation with an error. |
37+
38+
### Examples
39+
40+
41+
<!-- md-k6:skip -->
42+
43+
```javascript
44+
import { check } from 'k6';
45+
import exec from 'k6/execution';
46+
47+
import {
48+
AWSConfig,
49+
KinesisClient,
50+
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';
51+
52+
const awsConfig = new AWSConfig({
53+
region: __ENV.AWS_REGION,
54+
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
55+
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
56+
});
57+
58+
const kinesis = new KinesisClient(awsConfig);
59+
const testStreamName = 'test-stream';
60+
61+
export default async function () {
62+
// List available streams
63+
const streams = await kinesis.listStreams();
64+
console.log('Available streams:', streams.streamNames);
65+
66+
// Check if our test stream exists
67+
if (!streams.streamNames.includes(testStreamName)) {
68+
// Create the stream if it doesn't exist
69+
await kinesis.createStream(testStreamName, { shardCount: 1 });
70+
console.log(`Created stream: ${testStreamName}`);
71+
}
72+
73+
// Put some records into the stream
74+
const records = [
75+
{
76+
data: JSON.stringify({ message: 'Hello from k6!', timestamp: Date.now() }),
77+
partitionKey: 'test-partition-1',
78+
},
79+
{
80+
data: JSON.stringify({ message: 'Another message', timestamp: Date.now() }),
81+
partitionKey: 'test-partition-2',
82+
},
83+
];
84+
85+
const putResult = await kinesis.putRecords(testStreamName, records);
86+
console.log('Put records result:', putResult);
87+
88+
// List shards in the stream
89+
const shards = await kinesis.listShards(testStreamName);
90+
console.log('Stream shards:', shards.shards);
91+
92+
// Get a shard iterator for reading records
93+
if (shards.shards.length > 0) {
94+
const shardId = shards.shards[0].shardId;
95+
const shardIterator = await kinesis.getShardIterator(testStreamName, shardId, 'TRIM_HORIZON');
96+
97+
// Get records from the shard
98+
const getResult = await kinesis.getRecords(shardIterator.shardIterator);
99+
console.log('Retrieved records:', getResult.records);
100+
}
101+
}
102+
```
103+
104+
105+
#### Stream management
106+
107+
108+
<!-- md-k6:skip -->
109+
110+
```javascript
111+
import {
112+
AWSConfig,
113+
KinesisClient,
114+
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';
115+
116+
const awsConfig = new AWSConfig({
117+
region: __ENV.AWS_REGION,
118+
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
119+
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
120+
});
121+
122+
const kinesis = new KinesisClient(awsConfig);
123+
124+
export default async function () {
125+
const streamName = 'my-test-stream';
126+
127+
// Create a stream with on-demand billing
128+
await kinesis.createStream(streamName, {
129+
streamModeDetails: {
130+
streamMode: 'ON_DEMAND',
131+
},
132+
});
133+
134+
// List all streams
135+
const streams = await kinesis.listStreams();
136+
console.log('All streams:', streams.streamNames);
137+
138+
// Clean up - delete the stream
139+
await kinesis.deleteStream(streamName);
140+
console.log(`Deleted stream: ${streamName}`);
141+
}
142+
```
143+
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
---
2+
title: 'createStream'
3+
description: 'KinesisClient.createStream creates a new Kinesis stream'
4+
weight: 10
5+
---
6+
7+
# createStream
8+
9+
`KinesisClient.createStream(streamName, [options])` creates a new Kinesis stream.
10+
11+
### Parameters
12+
13+
| Parameter | Type | Description |
14+
| :--------- | :----- | :---------------------------------------------- |
15+
| streamName | string | The name of the Kinesis stream to create. |
16+
| options | object | Optional configuration for the stream creation. |
17+
18+
#### Options
19+
20+
| Parameter | Type | Description |
21+
| :--------------------------- | :----- | :-------------------------------------------------------------------- |
22+
| shardCount | number | The number of shards for the stream (for provisioned mode). |
23+
| streamModeDetails | object | Configuration for the stream mode. |
24+
| streamModeDetails.streamMode | string | The billing mode for the stream. Either `PROVISIONED` or `ON_DEMAND`. |
25+
26+
### Returns
27+
28+
| Type | Description |
29+
| :-------------- | :-------------------------------------------------------------------- |
30+
| `Promise<void>` | A Promise that fulfills when the stream creation request is complete. |
31+
32+
### Example
33+
34+
35+
<!-- md-k6:skip -->
36+
37+
```javascript
38+
import {
39+
AWSConfig,
40+
KinesisClient,
41+
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';
42+
43+
const awsConfig = new AWSConfig({
44+
region: __ENV.AWS_REGION,
45+
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
46+
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
47+
});
48+
49+
const kinesis = new KinesisClient(awsConfig);
50+
51+
export default async function () {
52+
// Create a stream with provisioned billing and 2 shards
53+
await kinesis.createStream('my-provisioned-stream', {
54+
shardCount: 2,
55+
});
56+
57+
// Create a stream with on-demand billing
58+
await kinesis.createStream('my-on-demand-stream', {
59+
streamModeDetails: {
60+
streamMode: 'ON_DEMAND',
61+
},
62+
});
63+
}
64+
```
65+
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
---
2+
title: 'deleteStream'
3+
description: 'KinesisClient.deleteStream deletes a Kinesis stream'
4+
weight: 10
5+
---
6+
7+
# deleteStream
8+
9+
`KinesisClient.deleteStream(streamName)` deletes a Kinesis stream.
10+
11+
### Parameters
12+
13+
| Parameter | Type | Description |
14+
| :--------- | :----- | :---------------------------------------- |
15+
| streamName | string | The name of the Kinesis stream to delete. |
16+
17+
### Returns
18+
19+
| Type | Description |
20+
| :-------------- | :-------------------------------------------------------------------- |
21+
| `Promise<void>` | A Promise that fulfills when the stream deletion request is complete. |
22+
23+
### Example
24+
25+
26+
<!-- md-k6:skip -->
27+
28+
```javascript
29+
import {
30+
AWSConfig,
31+
KinesisClient,
32+
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';
33+
34+
const awsConfig = new AWSConfig({
35+
region: __ENV.AWS_REGION,
36+
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
37+
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
38+
});
39+
40+
const kinesis = new KinesisClient(awsConfig);
41+
42+
export default async function () {
43+
const streamName = 'my-test-stream';
44+
45+
// Delete the stream
46+
await kinesis.deleteStream(streamName);
47+
console.log(`Stream ${streamName} deleted`);
48+
}
49+
```
50+
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
---
2+
title: 'getRecords'
3+
description: 'KinesisClient.getRecords gets records from a Kinesis stream shard'
4+
weight: 10
5+
---
6+
7+
# getRecords
8+
9+
`KinesisClient.getRecords(shardIterator, [options])` gets records from a Kinesis stream shard using a shard iterator.
10+
11+
### Parameters
12+
13+
| Parameter | Type | Description |
14+
| :------------ | :----- | :---------------------------------------------------- |
15+
| shardIterator | string | The shard iterator from which to get records. |
16+
| options | object | Optional configuration for the get records operation. |
17+
18+
#### Options
19+
20+
| Parameter | Type | Description |
21+
| :-------- | :----- | :--------------------------------------- |
22+
| limit | number | The maximum number of records to return. |
23+
24+
### Returns
25+
26+
| Type | Description |
27+
| :---------------- | :------------------------------------------------- |
28+
| `Promise<Object>` | A Promise that fulfills with the records response. |
29+
30+
#### Returns object
31+
32+
| Property | Type | Description |
33+
| :----------------- | :------------ | :--------------------------------------------------- |
34+
| records | Array<Object> | An array of records retrieved from the stream. |
35+
| nextShardIterator | string | The next shard iterator to use for subsequent calls. |
36+
| millisBehindLatest | number | The number of milliseconds behind the latest record. |
37+
38+
#### Record object
39+
40+
| Property | Type | Description |
41+
| :-------------------------- | :----- | :----------------------------------------------- |
42+
| sequenceNumber | string | The sequence number of the record. |
43+
| approximateArrivalTimestamp | Date | The approximate arrival timestamp of the record. |
44+
| data | string | The data payload of the record. |
45+
| partitionKey | string | The partition key of the record. |
46+
47+
### Example
48+
49+
50+
<!-- md-k6:skip -->
51+
52+
```javascript
53+
import {
54+
AWSConfig,
55+
KinesisClient,
56+
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';
57+
58+
const awsConfig = new AWSConfig({
59+
region: __ENV.AWS_REGION,
60+
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
61+
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
62+
});
63+
64+
const kinesis = new KinesisClient(awsConfig);
65+
66+
export default async function () {
67+
const streamName = 'my-test-stream';
68+
69+
// First, get the shards for the stream
70+
const shards = await kinesis.listShards(streamName);
71+
72+
if (shards.shards.length > 0) {
73+
const shardId = shards.shards[0].shardId;
74+
75+
// Get a shard iterator for the first shard
76+
const shardIteratorResponse = await kinesis.getShardIterator(
77+
streamName,
78+
shardId,
79+
'TRIM_HORIZON'
80+
);
81+
82+
const shardIterator = shardIteratorResponse.shardIterator;
83+
84+
// Get records from the shard
85+
const recordsResponse = await kinesis.getRecords(shardIterator, { limit: 10 });
86+
87+
console.log('Records retrieved:', recordsResponse.records.length);
88+
console.log('Milliseconds behind latest:', recordsResponse.millisBehindLatest);
89+
90+
// Process the records
91+
recordsResponse.records.forEach((record, index) => {
92+
console.log(`Record ${index}:`);
93+
console.log(' Sequence number:', record.sequenceNumber);
94+
console.log(' Partition key:', record.partitionKey);
95+
console.log(' Data:', record.data);
96+
console.log(' Arrival timestamp:', record.approximateArrivalTimestamp);
97+
98+
// Parse JSON data if applicable
99+
try {
100+
const jsonData = JSON.parse(record.data);
101+
console.log(' Parsed data:', jsonData);
102+
} catch (e) {
103+
console.log(' Data is not JSON');
104+
}
105+
});
106+
107+
// Continue reading with the next shard iterator
108+
if (recordsResponse.nextShardIterator) {
109+
const nextBatch = await kinesis.getRecords(recordsResponse.nextShardIterator, { limit: 5 });
110+
console.log('Next batch size:', nextBatch.records.length);
111+
}
112+
}
113+
}
114+
```
115+

0 commit comments

Comments
 (0)