11import { afterEach , beforeEach , expect , inject , test } from 'vitest'
2+
3+ import { create } from '@bufbuild/protobuf'
4+ import { CreateTopicRequestSchema , DropTopicRequestSchema , TopicServiceDefinition } from '@ydbjs/api/topic'
25import { Driver } from '@ydbjs/core'
6+ import { query } from '@ydbjs/query'
7+ import type { TopicMessage } from '@ydbjs/topic/message'
38import { createTopicReader , createTopicTxReader } from '@ydbjs/topic/reader'
49import { createTopicTxWriter , createTopicWriter } from '@ydbjs/topic/writer'
5- import { CreateTopicRequestSchema , DropTopicRequestSchema , TopicServiceDefinition } from '@ydbjs/api/topic'
6- import { create } from '@bufbuild/protobuf'
7- import { query } from '@ydbjs/query'
810
911// #region setup
1012declare module 'vitest' {
@@ -42,15 +44,17 @@ beforeEach(async () => {
4244 name : testConsumerName ,
4345 } ,
4446 ] ,
45- } )
47+ } ) ,
48+ { signal : AbortSignal . timeout ( 1000 ) }
4649 )
4750} )
4851
4952afterEach ( async ( ) => {
5053 await topicService . dropTopic (
5154 create ( DropTopicRequestSchema , {
5255 path : testTopicName ,
53- } )
56+ } ) ,
57+ { signal : AbortSignal . timeout ( 1000 ) }
5458 )
5559} )
5660// #endregion
@@ -68,6 +72,7 @@ test('writes and reads in tx', async () => {
6872 await writer . close ( )
6973
7074 // Begin a transaction
75+ let batchInsideTx : TopicMessage [ ] | undefined
7176 await yql . begin ( { idempotent : true } , async ( tx ) => {
7277 let readerTx = createTopicTxReader ( tx , driver , {
7378 topic : testTopicName ,
@@ -86,17 +91,20 @@ test('writes and reads in tx', async () => {
8691 // Read messages inside the transaction.
8792 // Expect to see the message written outside the transaction (1).
8893 // Expect NOT to see the message written in the transaction (2).
89- for await ( let batch of readerTx . read ( ) ) {
90- expect ( batch ) . toHaveLength ( 1 )
91-
92- let message = batch [ 0 ] !
93- expect ( message . seqNo ) . toBe ( 1n )
94- expect ( message . offset ) . toBe ( 0n )
95- expect ( message . payload ) . toStrictEqual ( Buffer . from ( 'written' , 'utf-8' ) )
94+ for await ( let batch of readerTx . read ( { signal : AbortSignal . timeout ( 5000 ) } ) ) {
95+ batchInsideTx = batch
9696 break
9797 }
9898 } )
9999
100+ expect ( batchInsideTx ) . toStrictEqual ( [
101+ expect . objectContaining ( {
102+ seqNo : 1n ,
103+ offset : 0n ,
104+ payload : Buffer . from ( 'written' , 'utf-8' ) ,
105+ } ) ,
106+ ] )
107+
100108 await using reader = createTopicReader ( driver , {
101109 topic : testTopicName ,
102110 consumer : testConsumerName ,
@@ -105,16 +113,20 @@ test('writes and reads in tx', async () => {
105113 // Read messages outside of the transaction.
106114 // Expect to see the message written in the transaction (2).
107115 // Expect NOT to see the message written outside the transaction (1).
108- for await ( let batch of reader . read ( ) ) {
109- expect ( batch ) . toHaveLength ( 1 )
110-
111- let message = batch [ 0 ] !
112- expect ( message . seqNo ) . toBe ( 2n )
113- expect ( message . offset ) . toBe ( 1n )
114- expect ( message . payload ) . toStrictEqual ( Buffer . from ( 'written in tx' , 'utf-8' ) )
116+ let batchOutsideTx : TopicMessage [ ] | undefined
117+ for await ( let batch of reader . read ( { signal : AbortSignal . timeout ( 5000 ) } ) ) {
118+ batchOutsideTx = batch
115119 await reader . commit ( batch )
116120 break
117121 }
122+
123+ expect ( batchOutsideTx ) . toStrictEqual ( [
124+ expect . objectContaining ( {
125+ seqNo : 2n ,
126+ offset : 1n ,
127+ payload : Buffer . from ( 'written in tx' , 'utf-8' ) ,
128+ } ) ,
129+ ] )
118130} )
119131
120132test ( 'rollbacks reads' , async ( ) => {
@@ -129,31 +141,24 @@ test('rollbacks reads', async () => {
129141 writer . write ( Buffer . from ( 'written' , 'utf-8' ) , { seqNo : 1n } )
130142 await writer . close ( )
131143
132- await yql
133- . begin ( { idempotent : true } , async ( tx ) => {
144+ await expect ( async ( ) => {
145+ await yql . begin ( { idempotent : true } , async ( tx ) => {
134146 let readerTx = createTopicTxReader ( tx , driver , {
135147 topic : testTopicName ,
136148 consumer : testConsumerName ,
137149 } )
138150
139-
140- // Read messages inside the transaction.
141- // Expect to see the message written outside the transaction (1).
142- for await ( let batch of readerTx . read ( ) ) {
143- expect ( batch ) . toHaveLength ( 1 )
144- expect ( batch [ 0 ] ?. payload ) . toStrictEqual ( Buffer . from ( 'written' , 'utf-8' ) )
151+ for await ( let _ of readerTx . read ( ) ) {
145152 break
146153 }
147154
148155 // Simulate a transaction failure. User error is always non-retriable.
149156 throw new Error ( 'User error' )
150157 } )
151- . catch ( ( error ) => {
152- expect ( error ) . toBeInstanceOf ( Error )
153- expect ( error . message ) . toBe ( 'Transaction failed.' )
154- expect ( ( error as Error ) . cause ) . toBeInstanceOf ( Error )
155- expect ( ( ( error as Error ) . cause as Error ) . message ) . toBe ( 'User error' )
156- } )
158+ } ) . rejects . toMatchObject ( {
159+ message : 'Transaction failed.' ,
160+ cause : expect . objectContaining ( { message : 'User error' } ) ,
161+ } )
157162
158163 await using reader = createTopicReader ( driver , {
159164 topic : testTopicName ,
@@ -162,16 +167,20 @@ test('rollbacks reads', async () => {
162167
163168 // Read messages outside of the transaction.
164169 // Expect to see the message written outside the transaction (1).
170+ let committedBatch : Array < TopicMessage > | undefined
165171 for await ( let batch of reader . read ( ) ) {
166- expect ( batch ) . toHaveLength ( 1 )
167-
168- let message = batch [ 0 ] !
169- expect ( message . seqNo ) . toBe ( 1n )
170- expect ( message . offset ) . toBe ( 0n )
171- expect ( message . payload ) . toStrictEqual ( Buffer . from ( 'written' , 'utf-8' ) )
172+ committedBatch = batch
172173 await reader . commit ( batch )
173174 break
174175 }
176+
177+ expect ( committedBatch ) . toStrictEqual ( [
178+ expect . objectContaining ( {
179+ seqNo : 1n ,
180+ offset : 0n ,
181+ payload : Buffer . from ( 'written' , 'utf-8' ) ,
182+ } ) ,
183+ ] )
175184} )
176185
177186test ( 'rollbacks writes' , async ( ) => {
@@ -182,8 +191,8 @@ test('rollbacks writes', async () => {
182191 consumer : testConsumerName ,
183192 } )
184193
185- await yql
186- . begin ( { idempotent : true } , async ( tx ) => {
194+ await expect ( async ( ) => {
195+ await yql . begin ( { idempotent : true } , async ( tx ) => {
187196 let writerTx = createTopicTxWriter ( tx , driver , {
188197 topic : testTopicName ,
189198 producer : testProducerName ,
@@ -196,17 +205,18 @@ test('rollbacks writes', async () => {
196205 // Simulate a transaction failure. User error is always non-retriable.
197206 throw new Error ( 'User error' )
198207 } )
199- . catch ( ( error ) => {
200- expect ( error ) . toBeInstanceOf ( Error )
201- expect ( error . message ) . toBe ( 'Transaction failed.' )
202- expect ( ( error as Error ) . cause ) . toBeInstanceOf ( Error )
203- expect ( ( ( error as Error ) . cause as Error ) . message ) . toBe ( 'User error' )
204- } )
208+ } ) . rejects . toMatchObject ( {
209+ message : 'Transaction failed.' ,
210+ cause : expect . objectContaining ( { message : 'User error' } ) ,
211+ } )
205212
206213 // Read messages outside of the transaction.
207214 // Expect NOT to see the message written inside the transaction (2).
215+ let observedBatch : Array < TopicMessage > | undefined
208216 for await ( let batch of reader . read ( { waitMs : 1000 } ) ) {
209- expect ( batch ) . toHaveLength ( 0 )
217+ observedBatch = batch
210218 break
211219 }
220+
221+ expect ( observedBatch ) . toStrictEqual ( [ ] )
212222} )
0 commit comments