@@ -27,18 +27,18 @@ async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[source
2727 The simple source generates messages with incrementing numbers.
2828 """
2929 _LOGGER .info (f"Read request: num_records={ datum .num_records } , timeout_ms={ datum .timeout_ms } " )
30-
30+
3131 # Generate the requested number of messages
3232 for i in range (datum .num_records ):
3333 # Create message payload
3434 payload = f"message-{ self .counter } " .encode ("utf-8" )
35-
35+
3636 # Create offset
3737 offset = sourcer .Offset (
3838 offset = str (self .counter ).encode ("utf-8" ),
3939 partition_id = self .partition_idx
4040 )
41-
41+
4242 # Create message
4343 message = sourcer .Message (
4444 payload = payload ,
@@ -47,12 +47,12 @@ async def read_handler(self, datum: sourcer.ReadRequest) -> AsyncIterator[source
4747 keys = ["key1" ],
4848 headers = {"source" : "simple" }
4949 )
50-
50+
5151 _LOGGER .info (f"Generated message: { self .counter } " )
5252 self .counter += 1
53-
53+
5454 yield message
55-
55+
5656 # Small delay to simulate real source
5757 await asyncio .sleep (0.1 )
5858
@@ -94,9 +94,7 @@ async def partitions_handler(self) -> sourcer.PartitionsResponse:
9494
9595
9696async def start ():
97- sock_file = "/var/run/numaflow/source.sock"
98- server_info_file = "/var/run/numaflow/sourcer-server-info"
99- server = sourcer .SourceAsyncServer (sock_file , server_info_file )
97+ server = sourcer .SourceAsyncServer ()
10098
10199 # Create an instance of the source handler
102100 handler = SimpleSource ()
@@ -122,4 +120,3 @@ async def start():
122120
123121if __name__ == "__main__" :
124122 asyncio .run (start ())
125-
0 commit comments