Skip to content

Commit efce99b

Browse files
committed
wip
1 parent 1e6c44d commit efce99b

File tree

15 files changed

+919
-118
lines changed

15 files changed

+919
-118
lines changed

nats-jetstream/schemas/jetstream/api/v1/consumer_getnext_request.json

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@
1111
},
1212
"batch": {
1313
"$ref": "definitions.json#/definitions/golang_int",
14-
"description": "How many messages the server should deliver to the requester",
14+
"description": "How many messages the server should deliver to the requestor",
1515
"minimum": 0,
1616
"maximum": 256
1717
},
1818
"max_bytes": {
1919
"$ref": "definitions.json#/definitions/golang_int",
20-
"description": "Sends at most this many bytes to the requester, limited by consumer configuration max_bytes",
20+
"description": "Sends at most this many bytes to the requestor, limited by consumer configuration max_bytes",
2121
"minimum": 0
2222
},
2323
"no_wait": {
@@ -27,6 +27,30 @@
2727
"idle_heartbeat": {
2828
"$ref": "definitions.json#/definitions/golang_duration_nanos",
2929
"description": "When not 0 idle heartbeats will be sent on this interval"
30+
},
31+
"group": {
32+
"type": "string",
33+
"description": "The consumer group to pull from"
34+
},
35+
"min_pending": {
36+
"$ref": "definitions.json#/definitions/golang_int64",
37+
"description": "The minimum number of messages the server should have in the consumer's pending queue before serving this pull",
38+
"minimum": 0
39+
},
40+
"min_ack_pending": {
41+
"$ref": "definitions.json#/definitions/golang_int64",
42+
"description": "The minimum number of messages the server should have in the consumer's ack pending queue before serving this pull",
43+
"minimum": 0
44+
},
45+
"id": {
46+
"type": "string",
47+
"description": "When pulling from a Pinned Client consumer this is the unique client ID"
48+
},
49+
"priority": {
50+
"type": "integer",
51+
"description": "The priority of the pull request",
52+
"minimum": 0,
53+
"maximum": 9
3054
}
3155
}
3256
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"$schema": "http://json-schema.org/draft-07/schema#",
3+
"$id": "https://nats.io/schemas/jetstream/api/v1/consumer_leader_stepdown_request.json",
4+
"description": "A request for the JetStream $JS.API.CONSUMER.INFO API",
5+
"title": "io.nats.jetstream.api.v1.consumer_info_request",
6+
"type": "object",
7+
"properties": {}
8+
}

nats-jetstream/schemas/jetstream/api/v1/consumer_list_response.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@
3333
"type": "array",
3434
"items": {
3535
"type": "string"
36+
},
37+
"offline": {
38+
"description": "List of streams that are offline and reasons for being offline",
39+
"type": "object",
40+
"additionalProperties": {
41+
"type": "string"
42+
}
3643
}
3744
}
3845
}

nats-jetstream/schemas/jetstream/api/v1/consumer_unpin_response.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
},
1414
"type": {
1515
"type": "string",
16-
"const": "io.nats.jetstream.api.v1.consumer_pause_response"
16+
"const": "io.nats.jetstream.api.v1.consumer_unpin_response"
1717
}
1818
}
1919
}

nats-jetstream/schemas/jetstream/api/v1/definitions.json

Lines changed: 48 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
"definitions": {
88
"golang_duration_nanos": {
99
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
10-
"$ref": "#/definitions/golang_int64"
10+
"$ref": "#/definitions/golang_int64",
11+
"minimum": 0
1112
},
1213
"golang_int": {
1314
"$comment": "integer with a dynamic bit size depending on the platform the cluster runs on, can be up to 64bit",
@@ -259,9 +260,22 @@
259260
"$ref": "#/definitions/peer_info"
260261
}
261262
},
263+
"leader_since": {
264+
"type": "string",
265+
"description": "The time that it was elected as leader in RFC3339 format, absent when not the leader",
266+
"format": "date-time"
267+
},
262268
"raft_group": {
263269
"type": "string",
264270
"description": "In clustered environments the name of the Raft group managing the asset"
271+
},
272+
"system_account": {
273+
"type": "boolean",
274+
"description": "Indicates if the traffic_account is the system account. When true, replication traffic goes over the system account."
275+
},
276+
"traffic_account": {
277+
"type": "string",
278+
"description": "The account where the replication traffic goes over."
265279
}
266280
}
267281
},
@@ -427,7 +441,8 @@
427441
},
428442
"max_ack_pending": {
429443
"type": "integer",
430-
"description": "The maximum number of outstanding ACKs any consumer may configure"
444+
"description": "The maximum number of outstanding ACKs any consumer may configure",
445+
"minimum": -1
431446
},
432447
"memory_max_stream_bytes": {
433448
"type": "integer",
@@ -556,7 +571,8 @@
556571
"enum": [
557572
"none",
558573
"overflow",
559-
"pinned_client"
574+
"pinned_client",
575+
"prioritized"
560576
]
561577
},
562578
"deliver_policy": {
@@ -936,15 +952,16 @@
936952
"max_batch": {
937953
"type": "integer",
938954
"description": "The largest batch property that may be specified when doing a pull on a Pull Consumer",
939-
"default": 0
955+
"default": 0,
956+
"minimum": 0
940957
},
941958
"max_expires": {
942959
"description": "The maximum expires value that may be set when doing a pull on a Pull Consumer",
943960
"$ref": "#/definitions/golang_duration_nanos",
944961
"default": 0
945962
},
946963
"max_bytes": {
947-
"description": "The maximum bytes value that maybe set when dong a pull on a Pull Consumer",
964+
"description": "The maximum bytes value that maybe set when doing a pull on a Pull Consumer",
948965
"$ref": "definitions.json#/definitions/golang_int",
949966
"minimum": 0,
950967
"default": 0
@@ -997,7 +1014,9 @@
9971014
"$ref": "#/definitions/priority_policy"
9981015
},
9991016
"priority_timeout": {
1000-
"description": "For pinned_client priority policy how long before the client times out"
1017+
"description": "For pinned_client priority policy how long before the client times out",
1018+
"$ref": "#/definitions/golang_duration_nanos",
1019+
"default": 0
10011020
},
10021021
"opt_start_seq": {
10031022
"description": "Start sequence used with the DeliverByStartSequence deliver policy.",
@@ -1380,6 +1399,21 @@
13801399
"default": false,
13811400
"description": "Allow higher performance, direct access to get individual messages"
13821401
},
1402+
"allow_atomic": {
1403+
"type": "boolean",
1404+
"default": false,
1405+
"description": "Allow atomic batched publishes"
1406+
},
1407+
"allow_msg_counter": {
1408+
"type": "boolean",
1409+
"default": false,
1410+
"description": "Configures a stream to be a counter and to reject all other messages"
1411+
},
1412+
"allow_msg_schedules": {
1413+
"type": "boolean",
1414+
"default": false,
1415+
"description": "Allows the scheduling of messages"
1416+
},
13831417
"mirror_direct": {
13841418
"type": "boolean",
13851419
"default": false,
@@ -1413,49 +1447,16 @@
14131447
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
14141448
"$ref": "#/definitions/golang_duration_nanos",
14151449
"minimum": 0
1416-
}
1417-
}
1418-
},
1419-
"stream_template_info": {
1420-
"type": "object",
1421-
"required": [
1422-
"config",
1423-
"streams"
1424-
],
1425-
"properties": {
1426-
"config": {
1427-
"$ref": "#/definitions/stream_template_configuration"
14281450
},
1429-
"streams": {
1430-
"description": "List of Streams managed by this Template",
1431-
"type": "array",
1432-
"items": {
1433-
"type": "string"
1434-
}
1435-
}
1436-
}
1437-
},
1438-
"stream_template_configuration": {
1439-
"type": "object",
1440-
"additionalProperties": false,
1441-
"required": [
1442-
"name",
1443-
"config",
1444-
"max_streams"
1445-
],
1446-
"properties": {
1447-
"name": {
1451+
"persist_mode": {
1452+
"description": "Sets a specific persistence mode for writing to the Stream",
14481453
"type": "string",
1449-
"description": "A unique name for the Template"
1450-
},
1451-
"config": {
1452-
"description": "The template configuration to create Streams with",
1453-
"$ref": "#/definitions/stream_configuration"
1454-
},
1455-
"max_streams": {
1456-
"type": "integer",
1457-
"description": "The maximum number of streams to allow using this Template",
1458-
"minimum": -1
1454+
"enum": [
1455+
"",
1456+
"default",
1457+
"async"
1458+
],
1459+
"default": ""
14591460
}
14601461
}
14611462
}

nats-jetstream/schemas/jetstream/api/v1/pub_ack_response.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,18 @@
3030
"domain": {
3131
"type": "string",
3232
"description": "If the Stream accepting the message is in a JetStream server configured for a domain this would be that domain"
33+
},
34+
"batch": {
35+
"type": "string",
36+
"description": "When doing Atomic Batch Publishes this will be the Batch ID being committed"
37+
},
38+
"count": {
39+
"type": "integer",
40+
"description": "When doing Atomic Batch Publishes how many messages was in the batch"
41+
},
42+
"val": {
43+
"type": "string",
44+
"description": "The current value of the counter on counter enabled streams"
3345
}
3446
}
3547
}

nats-jetstream/schemas/jetstream/api/v1/stream_list_response.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@
3535
"items": {
3636
"type": "string"
3737
}
38+
},
39+
"offline": {
40+
"description": "List of streams that are offline and reasons for being offline",
41+
"type": "object",
42+
"additionalProperties": {
43+
"type": "string"
44+
}
3845
}
3946
}
4047
}

nats-jetstream/src/nats/jetstream/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
JetStreamNotEnabledForAccountError,
2222
MaximumConsumersLimitError,
2323
MessageNotFoundError,
24+
PinIDMismatchError,
2425
StreamNameAlreadyInUseError,
2526
StreamNotFoundError,
2627
)
@@ -861,6 +862,7 @@ def new(client: Client, prefix: str = "$JS.API", domain: str | None = None, stri
861862
"JetStreamNotEnabledForAccountError",
862863
"MaximumConsumersLimitError",
863864
"MessageNotFoundError",
865+
"PinIDMismatchError",
864866
"StreamNameAlreadyInUseError",
865867
"StreamNotFoundError",
866868
]

nats-jetstream/src/nats/jetstream/api/client.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
ConsumerNamesResponse,
3939
ConsumerPauseRequest,
4040
ConsumerPauseResponse,
41+
ConsumerUnpinRequest,
42+
ConsumerUnpinResponse,
4143
ErrorResponse,
4244
StreamCreateRequest,
4345
StreamCreateResponse,
@@ -239,6 +241,32 @@ async def consumer_pause(
239241
) from e
240242
raise
241243

244+
async def consumer_unpin(
245+
self, stream_name: str, consumer_name: str, /, **request: Unpack[ConsumerUnpinRequest]
246+
) -> ConsumerUnpinResponse:
247+
"""Unpin a consumer from its current pinned client.
248+
249+
Args:
250+
stream_name: The stream name
251+
consumer_name: The consumer name
252+
**request: Request body with group field
253+
254+
Returns:
255+
ConsumerUnpinResponse
256+
"""
257+
try:
258+
return await self.request_json(
259+
f"{self._prefix}.CONSUMER.UNPIN.{stream_name}.{consumer_name}",
260+
request if request else None,
261+
response_type=ConsumerUnpinResponse,
262+
)
263+
except JetStreamError as e:
264+
if e.error_code == ErrorCode.CONSUMER_NOT_FOUND:
265+
raise ConsumerNotFoundError(
266+
e.description, code=e.code, error_code=e.error_code, description=e.description
267+
) from e
268+
raise
269+
242270
async def stream_create(self, name: str, /, **request: Unpack[StreamCreateRequest]) -> StreamCreateResponse:
243271
try:
244272
return await self.request_json(

0 commit comments

Comments
 (0)