3232import org .apache .beam .it .common .PipelineLauncher .LaunchConfig ;
3333import org .apache .beam .it .common .PipelineLauncher .LaunchInfo ;
3434import org .apache .beam .it .common .PipelineOperator .Result ;
35+ import org .apache .beam .it .common .TestProperties ;
3536import org .apache .beam .it .common .utils .ResourceManagerUtils ;
3637import org .apache .beam .it .gcp .TemplateTestBase ;
3738import org .apache .beam .it .gcp .pubsub .PubsubResourceManager ;
4344import org .junit .runners .JUnit4 ;
4445import org .slf4j .Logger ;
4546import org .slf4j .LoggerFactory ;
47+ import org .testcontainers .containers .GenericContainer ;
48+ import org .testcontainers .utility .DockerImageName ;
4649import redis .clients .jedis .Jedis ;
4750
4851/** Integration test for {@link PubSubToRedis}. */
@@ -54,13 +57,29 @@ public final class PubSubToRedisIT extends TemplateTestBase {
5457
5558 private static final Logger LOG = LoggerFactory .getLogger (PubSubToRedis .class );
5659
60+ private static final int REDIS_PORT = 6379 ;
61+
5762 private PubsubResourceManager pubsubResourceManager ;
63+ private GenericContainer <?> redisContainer ;
5864 private Jedis redisClient ;
65+ private String redisHost ;
66+ private int redisMappedPort ;
5967
6068 @ Before
6169 public void setup () throws IOException {
6270 pubsubResourceManager =
6371 PubsubResourceManager .builder (testName , PROJECT , credentialsProvider ).build ();
72+
73+ // Start Redis container
74+ redisContainer =
75+ new GenericContainer <>(DockerImageName .parse ("redis:7-alpine" ))
76+ .withExposedPorts (REDIS_PORT );
77+ redisContainer .start ();
78+
79+ // Get the host IP that is accessible from Dataflow workers
80+ redisHost = TestProperties .hostIp ();
81+ redisMappedPort = redisContainer .getFirstMappedPort ();
82+ LOG .info ("Redis container started at {}:{}" , redisHost , redisMappedPort );
6483 }
6584
6685 @ After
@@ -69,6 +88,17 @@ public void tearDown() {
6988 if (redisClient != null ) {
7089 redisClient .close ();
7190 }
91+ if (redisContainer != null ) {
92+ redisContainer .stop ();
93+ }
94+ }
95+
96+ /** Creates a Jedis client connected to the Redis container. */
97+ private Jedis createRedisClient () {
98+ LOG .info ("Connecting to Redis at {}:{}" , redisHost , redisMappedPort );
99+ Jedis jedis = new Jedis (redisHost , redisMappedPort );
100+ jedis .select (0 );
101+ return jedis ;
72102 }
73103
74104 @ Test
@@ -83,13 +113,7 @@ public void pubSubToRedisStringSink(
83113 String inSubscriptionName = testName + "-sub" ;
84114 pubsubResourceManager .createSubscription (tc , inSubscriptionName );
85115
86- String redisHost = "127.0.0.1" ;
87- int redisPort = 6379 ;
88- String redisPassword = "" ;
89-
90- redisClient = new Jedis (redisHost , redisPort );
91- redisClient .select (0 );
92- redisClient .flushDB ();
116+ redisClient = createRedisClient ();
93117
94118 LaunchConfig .Builder options =
95119 paramsAdder .apply (
@@ -98,8 +122,8 @@ public void pubSubToRedisStringSink(
98122 "inputSubscription" ,
99123 "projects/" + PROJECT + "/subscriptions/" + inSubscriptionName )
100124 .addParameter ("redisHost" , redisHost )
101- .addParameter ("redisPort" , String .valueOf (redisPort ))
102- .addParameter ("redisPassword" , redisPassword )
125+ .addParameter ("redisPort" , String .valueOf (redisMappedPort ))
126+ .addParameter ("redisPassword" , "" )
103127 .addParameter ("redisSinkType" , "STRING_SINK" ));
104128
105129 // Act
@@ -152,13 +176,7 @@ public void pubSubToRedisHashSink(
152176 String inSubscriptionName = testId + "-sub" ;
153177 pubsubResourceManager .createSubscription (tc , inSubscriptionName );
154178
155- String redisHost = "127.0.0.1" ;
156- int redisPort = 6379 ;
157- String redisPassword = "" ;
158-
159- redisClient = new Jedis (redisHost , redisPort );
160- redisClient .select (0 );
161- redisClient .flushDB ();
179+ redisClient = createRedisClient ();
162180
163181 LaunchConfig .Builder options =
164182 paramsAdder .apply (
@@ -167,8 +185,8 @@ public void pubSubToRedisHashSink(
167185 "inputSubscription" ,
168186 "projects/" + PROJECT + "/subscriptions/" + inSubscriptionName )
169187 .addParameter ("redisHost" , redisHost )
170- .addParameter ("redisPort" , String .valueOf (redisPort ))
171- .addParameter ("redisPassword" , redisPassword )
188+ .addParameter ("redisPort" , String .valueOf (redisMappedPort ))
189+ .addParameter ("redisPassword" , "" )
172190 .addParameter ("redisSinkType" , "HASH_SINK" ));
173191
174192 // Act
@@ -223,13 +241,7 @@ public void pubSubToRedisStreamsSink(
223241 String inSubscriptionName = testId + "-sub" ;
224242 pubsubResourceManager .createSubscription (tc , inSubscriptionName );
225243
226- String redisHost = "127.0.0.1" ;
227- int redisPort = 6379 ;
228- String redisPassword = "" ;
229-
230- redisClient = new Jedis (redisHost , redisPort );
231- redisClient .select (0 );
232- redisClient .flushDB ();
244+ redisClient = createRedisClient ();
233245
234246 LaunchConfig .Builder options =
235247 paramsAdder .apply (
@@ -238,8 +250,8 @@ public void pubSubToRedisStreamsSink(
238250 "inputSubscription" ,
239251 "projects/" + PROJECT + "/subscriptions/" + inSubscriptionName )
240252 .addParameter ("redisHost" , redisHost )
241- .addParameter ("redisPort" , String .valueOf (redisPort ))
242- .addParameter ("redisPassword" , redisPassword )
253+ .addParameter ("redisPort" , String .valueOf (redisMappedPort ))
254+ .addParameter ("redisPassword" , "" )
243255 .addParameter ("redisSinkType" , "STREAMS_SINK" ));
244256
245257 // Act
0 commit comments