Skip to content

Commit 47ca240

Browse files
Apply to latest
1 parent a69a9b4 commit 47ca240

File tree

8 files changed

+759
-0
lines changed

8 files changed

+759
-0
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
---
2+
title: 'KinesisClient'
3+
description: 'KinesisClient allows interacting with AWS Kinesis streams'
4+
weight: 00
5+
---
6+
7+
# KinesisClient
8+
9+
{{< docs/shared source="k6" lookup="blocking-aws-blockquote.md" version="<K6_VERSION>" >}}
10+
11+
`KinesisClient` interacts with the AWS Kinesis service.
12+
13+
With it, you can perform operations such as creating streams, putting records, listing streams, and reading records from streams. For a full list of supported operations, see [Methods](#methods).
14+
15+
Both the dedicated `kinesis.js` jslib bundle and the all-encompassing `aws.js` bundle include the `KinesisClient`.
16+
17+
### Methods
18+
19+
| Function | Description |
20+
| :---------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :----------------------------------------------------- |
21+
| [createStream(streamName, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/createstream) | Create a new Kinesis stream |
22+
| [deleteStream(streamName)](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/deletestream) | Delete a Kinesis stream |
23+
| [listStreams([options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/liststreams) | List available Kinesis streams |
24+
| [putRecords(streamName, records)](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/putrecords) | Put multiple records into a Kinesis stream |
25+
| [getRecords(shardIterator, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/getrecords) | Get records from a Kinesis stream shard |
26+
| [listShards(streamName, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/listshards) | List shards in a Kinesis stream |
27+
| [getShardIterator(streamName, shardId, shardIteratorType, [options])](https://grafana.com/docs/k6/<K6_VERSION>/javascript-api/jslib/aws/kinesisclient/getsharditerator) | Get a shard iterator for reading records from a stream |
28+
29+
### Throws
30+
31+
KinesisClient methods will throw errors in case of failure.
32+
33+
| Error | Condition |
34+
| :-------------------- | :--------------------------------------------------------- |
35+
| InvalidSignatureError | When invalid credentials are provided. |
36+
| KinesisServiceError | When AWS replies to the requested operation with an error. |
37+
38+
### Examples
39+
40+
<!-- md-k6:skip -->
41+
42+
```javascript
43+
import { check } from 'k6';
44+
import exec from 'k6/execution';
45+
46+
import {
47+
AWSConfig,
48+
KinesisClient,
49+
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';
50+
51+
const awsConfig = new AWSConfig({
52+
region: __ENV.AWS_REGION,
53+
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
54+
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
55+
});
56+
57+
const kinesis = new KinesisClient(awsConfig);
58+
const testStreamName = 'test-stream';
59+
60+
export default async function () {
61+
// List available streams
62+
const streams = await kinesis.listStreams();
63+
console.log('Available streams:', streams.streamNames);
64+
65+
// Check if our test stream exists
66+
if (!streams.streamNames.includes(testStreamName)) {
67+
// Create the stream if it doesn't exist
68+
await kinesis.createStream(testStreamName, { shardCount: 1 });
69+
console.log(`Created stream: ${testStreamName}`);
70+
}
71+
72+
// Put some records into the stream
73+
const records = [
74+
{
75+
data: JSON.stringify({ message: 'Hello from k6!', timestamp: Date.now() }),
76+
partitionKey: 'test-partition-1',
77+
},
78+
{
79+
data: JSON.stringify({ message: 'Another message', timestamp: Date.now() }),
80+
partitionKey: 'test-partition-2',
81+
},
82+
];
83+
84+
const putResult = await kinesis.putRecords(testStreamName, records);
85+
console.log('Put records result:', putResult);
86+
87+
// List shards in the stream
88+
const shards = await kinesis.listShards(testStreamName);
89+
console.log('Stream shards:', shards.shards);
90+
91+
// Get a shard iterator for reading records
92+
if (shards.shards.length > 0) {
93+
const shardId = shards.shards[0].shardId;
94+
const shardIterator = await kinesis.getShardIterator(testStreamName, shardId, 'TRIM_HORIZON');
95+
96+
// Get records from the shard
97+
const getResult = await kinesis.getRecords(shardIterator.shardIterator);
98+
console.log('Retrieved records:', getResult.records);
99+
}
100+
}
101+
```
102+
103+
#### Stream management
104+
105+
<!-- md-k6:skip -->
106+
107+
```javascript
108+
import {
109+
AWSConfig,
110+
KinesisClient,
111+
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';
112+
113+
const awsConfig = new AWSConfig({
114+
region: __ENV.AWS_REGION,
115+
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
116+
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
117+
});
118+
119+
const kinesis = new KinesisClient(awsConfig);
120+
121+
export default async function () {
122+
const streamName = 'my-test-stream';
123+
124+
// Create a stream with on-demand billing
125+
await kinesis.createStream(streamName, {
126+
streamModeDetails: {
127+
streamMode: 'ON_DEMAND',
128+
},
129+
});
130+
131+
// List all streams
132+
const streams = await kinesis.listStreams();
133+
console.log('All streams:', streams.streamNames);
134+
135+
// Clean up - delete the stream
136+
await kinesis.deleteStream(streamName);
137+
console.log(`Deleted stream: ${streamName}`);
138+
}
139+
```
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
---
2+
title: 'createStream'
3+
description: 'KinesisClient.createStream creates a new Kinesis stream'
4+
weight: 10
5+
---
6+
7+
# createStream
8+
9+
`KinesisClient.createStream(streamName, [options])` creates a new Kinesis stream.
10+
11+
### Parameters
12+
13+
| Parameter | Type | Description |
14+
| :--------- | :----- | :---------------------------------------------- |
15+
| streamName | string | The name of the Kinesis stream to create. |
16+
| options | object | Optional configuration for the stream creation. |
17+
18+
#### Options
19+
20+
| Parameter | Type | Description |
21+
| :--------------------------- | :----- | :-------------------------------------------------------------------- |
22+
| shardCount | number | The number of shards for the stream (for provisioned mode). |
23+
| streamModeDetails | object | Configuration for the stream mode. |
24+
| streamModeDetails.streamMode | string | The billing mode for the stream. Either `PROVISIONED` or `ON_DEMAND`. |
25+
26+
### Returns
27+
28+
| Type | Description |
29+
| :-------------- | :-------------------------------------------------------------------- |
30+
| `Promise<void>` | A Promise that fulfills when the stream creation request is complete. |
31+
32+
### Example
33+
34+
<!-- md-k6:skip -->
35+
36+
```javascript
37+
import {
38+
AWSConfig,
39+
KinesisClient,
40+
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';
41+
42+
const awsConfig = new AWSConfig({
43+
region: __ENV.AWS_REGION,
44+
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
45+
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
46+
});
47+
48+
const kinesis = new KinesisClient(awsConfig);
49+
50+
export default async function () {
51+
// Create a stream with provisioned billing and 2 shards
52+
await kinesis.createStream('my-provisioned-stream', {
53+
shardCount: 2,
54+
});
55+
56+
// Create a stream with on-demand billing
57+
await kinesis.createStream('my-on-demand-stream', {
58+
streamModeDetails: {
59+
streamMode: 'ON_DEMAND',
60+
},
61+
});
62+
}
63+
```
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
---
2+
title: 'deleteStream'
3+
description: 'KinesisClient.deleteStream deletes a Kinesis stream'
4+
weight: 10
5+
---
6+
7+
# deleteStream
8+
9+
`KinesisClient.deleteStream(streamName)` deletes a Kinesis stream.
10+
11+
### Parameters
12+
13+
| Parameter | Type | Description |
14+
| :--------- | :----- | :---------------------------------------- |
15+
| streamName | string | The name of the Kinesis stream to delete. |
16+
17+
### Returns
18+
19+
| Type | Description |
20+
| :-------------- | :-------------------------------------------------------------------- |
21+
| `Promise<void>` | A Promise that fulfills when the stream deletion request is complete. |
22+
23+
### Example
24+
25+
<!-- md-k6:skip -->
26+
27+
```javascript
28+
import {
29+
AWSConfig,
30+
KinesisClient,
31+
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';
32+
33+
const awsConfig = new AWSConfig({
34+
region: __ENV.AWS_REGION,
35+
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
36+
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
37+
});
38+
39+
const kinesis = new KinesisClient(awsConfig);
40+
41+
export default async function () {
42+
const streamName = 'my-test-stream';
43+
44+
// Delete the stream
45+
await kinesis.deleteStream(streamName);
46+
console.log(`Stream ${streamName} deleted`);
47+
}
48+
```
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
---
2+
title: 'getRecords'
3+
description: 'KinesisClient.getRecords gets records from a Kinesis stream shard'
4+
weight: 10
5+
---
6+
7+
# getRecords
8+
9+
`KinesisClient.getRecords(shardIterator, [options])` gets records from a Kinesis stream shard using a shard iterator.
10+
11+
### Parameters
12+
13+
| Parameter | Type | Description |
14+
| :------------ | :----- | :---------------------------------------------------- |
15+
| shardIterator | string | The shard iterator from which to get records. |
16+
| options | object | Optional configuration for the get records operation. |
17+
18+
#### Options
19+
20+
| Parameter | Type | Description |
21+
| :-------- | :----- | :--------------------------------------- |
22+
| limit | number | The maximum number of records to return. |
23+
24+
### Returns
25+
26+
| Type | Description |
27+
| :---------------- | :------------------------------------------------- |
28+
| `Promise<Object>` | A Promise that fulfills with the records response. |
29+
30+
#### Returns object
31+
32+
| Property | Type | Description |
33+
| :----------------- | :------------ | :--------------------------------------------------- |
34+
| records | Array<Object> | An array of records retrieved from the stream. |
35+
| nextShardIterator | string | The next shard iterator to use for subsequent calls. |
36+
| millisBehindLatest | number | The number of milliseconds behind the latest record. |
37+
38+
#### Record object
39+
40+
| Property | Type | Description |
41+
| :-------------------------- | :----- | :----------------------------------------------- |
42+
| sequenceNumber | string | The sequence number of the record. |
43+
| approximateArrivalTimestamp | Date | The approximate arrival timestamp of the record. |
44+
| data | string | The data payload of the record. |
45+
| partitionKey | string | The partition key of the record. |
46+
47+
### Example
48+
49+
<!-- md-k6:skip -->
50+
51+
```javascript
52+
import {
53+
AWSConfig,
54+
KinesisClient,
55+
} from 'https://jslib.k6.io/aws/{{< param "JSLIB_AWS_VERSION" >}}/kinesis.js';
56+
57+
const awsConfig = new AWSConfig({
58+
region: __ENV.AWS_REGION,
59+
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
60+
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
61+
});
62+
63+
const kinesis = new KinesisClient(awsConfig);
64+
65+
export default async function () {
66+
const streamName = 'my-test-stream';
67+
68+
// First, get the shards for the stream
69+
const shards = await kinesis.listShards(streamName);
70+
71+
if (shards.shards.length > 0) {
72+
const shardId = shards.shards[0].shardId;
73+
74+
// Get a shard iterator for the first shard
75+
const shardIteratorResponse = await kinesis.getShardIterator(
76+
streamName,
77+
shardId,
78+
'TRIM_HORIZON'
79+
);
80+
81+
const shardIterator = shardIteratorResponse.shardIterator;
82+
83+
// Get records from the shard
84+
const recordsResponse = await kinesis.getRecords(shardIterator, { limit: 10 });
85+
86+
console.log('Records retrieved:', recordsResponse.records.length);
87+
console.log('Milliseconds behind latest:', recordsResponse.millisBehindLatest);
88+
89+
// Process the records
90+
recordsResponse.records.forEach((record, index) => {
91+
console.log(`Record ${index}:`);
92+
console.log(' Sequence number:', record.sequenceNumber);
93+
console.log(' Partition key:', record.partitionKey);
94+
console.log(' Data:', record.data);
95+
console.log(' Arrival timestamp:', record.approximateArrivalTimestamp);
96+
97+
// Parse JSON data if applicable
98+
try {
99+
const jsonData = JSON.parse(record.data);
100+
console.log(' Parsed data:', jsonData);
101+
} catch (e) {
102+
console.log(' Data is not JSON');
103+
}
104+
});
105+
106+
// Continue reading with the next shard iterator
107+
if (recordsResponse.nextShardIterator) {
108+
const nextBatch = await kinesis.getRecords(recordsResponse.nextShardIterator, { limit: 5 });
109+
console.log('Next batch size:', nextBatch.records.length);
110+
}
111+
}
112+
}
113+
```

0 commit comments

Comments
 (0)