1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+
18+ package org .apache .eventmesh .connector .file .sink .connector ;
19+
20+
21+ import static org .mockito .Mockito .mock ;
22+ import static org .mockito .Mockito .when ;
23+
24+ import org .apache .eventmesh .common .config .connector .file .FileSinkConfig ;
25+ import org .apache .eventmesh .common .config .connector .file .SinkConnectorConfig ;
26+ import org .apache .eventmesh .openconnect .offsetmgmt .api .data .ConnectRecord ;
27+
28+ import java .nio .charset .StandardCharsets ;
29+ import java .nio .file .Files ;
30+ import java .nio .file .Path ;
31+ import java .nio .file .Paths ;
32+ import java .util .Calendar ;
33+ import java .util .Collections ;
34+ import java .util .List ;
35+ import java .util .Locale ;
36+
37+ import org .junit .jupiter .api .Assertions ;
38+ import org .junit .jupiter .api .Test ;
39+ import org .mockito .Mock ;
40+
41+
42+ public class FileSinkConnectorTest {
43+
44+ private FileSinkConnector fileSinkConnector ;
45+
46+ @ Mock
47+ private FileSinkConfig fileSinkConfig ;
48+
49+ @ Test
50+ void testFileSinkConnector () throws Exception {
51+
52+ fileSinkConfig = mock (FileSinkConfig .class );
53+ SinkConnectorConfig connectorConfig = mock (SinkConnectorConfig .class );
54+ when (fileSinkConfig .getConnectorConfig ()).thenReturn (connectorConfig );
55+ when (connectorConfig .getTopic ()).thenReturn ("test-topic" );
56+ when (fileSinkConfig .getFlushSize ()).thenReturn (10 );
57+ when (fileSinkConfig .isHourlyFlushEnabled ()).thenReturn (false );
58+
59+ fileSinkConnector = new FileSinkConnector ();
60+ fileSinkConnector .init (fileSinkConfig );
61+ fileSinkConnector .start ();
62+
63+ String content = "line1\n line2\n line3" ;
64+ ConnectRecord record = new ConnectRecord (null , null , System .currentTimeMillis (), content .getBytes (StandardCharsets .UTF_8 ));
65+ List <ConnectRecord > connectRecords = Collections .singletonList (record );
66+ fileSinkConnector .put (connectRecords );
67+ fileSinkConnector .stop ();
68+
69+ Calendar calendar = Calendar .getInstance (Locale .CHINA );
70+ int year = calendar .get (Calendar .YEAR );
71+ int month = calendar .get (Calendar .MONTH ) + 1 ;
72+ int day = calendar .get (Calendar .DATE );
73+ Path topicPath = Paths .get ("test-topic" ,
74+ String .valueOf (year ),
75+ String .valueOf (month ),
76+ String .valueOf (day ));
77+ Assertions .assertTrue (Files .exists (topicPath ), "Directory for topic should exist" );
78+
79+ Path outputPath = Files .list (topicPath )
80+ .filter (path -> path .toString ().contains ("test-topic" ))
81+ .findFirst ()
82+ .orElseThrow (() -> new RuntimeException ("No output file found with 'test-topic' in the name" ));
83+
84+ List <String > lines = Files .readAllLines (outputPath , StandardCharsets .UTF_8 );
85+ String actualContent = String .join ("\n " , lines );
86+ Assertions .assertEquals (content , actualContent );
87+
88+ }
89+ }
0 commit comments