1
+ /*
2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
3
+ * contributor license agreements. See the NOTICE file distributed with
4
+ * this work for additional information regarding copyright ownership.
5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
6
+ * (the "License"); you may not use this file except in compliance with
7
+ * the License. You may obtain a copy of the License at
8
+ *
9
+ * http://www.apache.org/licenses/LICENSE-2.0
10
+ *
11
+ * Unless required by applicable law or agreed to in writing, software
12
+ * distributed under the License is distributed on an "AS IS" BASIS,
13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ * See the License for the specific language governing permissions and
15
+ * limitations under the License.
16
+ */
17
+
18
+ package org .apache .hadoop .fs .cosn .auth ;
19
+
20
+ import com .qcloud .cos .auth .BasicSessionCredentials ;
21
+ import com .qcloud .cos .auth .COSCredentials ;
22
+ import com .qcloud .cos .auth .COSCredentialsProvider ;
23
+
24
+ import org .apache .hadoop .conf .Configuration ;
25
+ import org .apache .hadoop .fs .cosn .CosNConfigKeys ;
26
+
27
+ import com .tencentcloudapi .common .Credential ;
28
+ import com .tencentcloudapi .common .profile .ClientProfile ;
29
+ import com .tencentcloudapi .common .profile .HttpProfile ;
30
+ import com .tencentcloudapi .sts .v20180813 .StsClient ;
31
+ import com .tencentcloudapi .sts .v20180813 .models .GetFederationTokenRequest ;
32
+ import com .tencentcloudapi .sts .v20180813 .models .GetFederationTokenResponse ;
33
+ import org .slf4j .Logger ;
34
+ import org .slf4j .LoggerFactory ;
35
+
36
+ import java .io .IOException ;
37
+ import java .util .concurrent .atomic .AtomicReference ;
38
+
39
+ /**
40
+ * A COSCredentialsProvider that generates temporary credentials from Tencent Cloud STS.
41
+ * This provider requires a long-term secret ID and key with permission to call
42
+ * the STS GetFederationToken action.
43
+ */
44
+ public class DynamicTemporaryCosnCredentialsProvider implements COSCredentialsProvider {
45
+ private static final Logger LOG =
46
+ LoggerFactory .getLogger (DynamicTemporaryCosnCredentialsProvider .class );
47
+
48
+ public static final String STS_SECRET_ID_KEY = "fs.cosn.auth.sts.secret.id" ;
49
+ public static final String STS_SECRET_KEY_KEY = "fs.cosn.auth.sts.secret.key" ;
50
+ public static final String STS_ENDPOINT_KEY = "fs.cosn.auth.sts.endpoint" ;
51
+ public static final String DEFAULT_STS_ENDPOINT = "sts.tencentcloudapi.com" ;
52
+ public static final String TOKEN_DURATION_SECONDS_KEY = "fs.cosn.auth.sts.token.duration.seconds" ;
53
+ public static final int DEFAULT_TOKEN_DURATION_SECONDS = 900 ; // 15 minutes
54
+
55
+ private final String longTermSecretId ;
56
+ private final String longTermSecretKey ;
57
+ private final String stsEndpoint ;
58
+ private final String region ;
59
+ private final String bucketName ;
60
+ private final long durationSeconds ;
61
+
62
+ private final AtomicReference <ExpiringCredentials > expiringCredentialsRef =
63
+ new AtomicReference <>();
64
+
65
+ public DynamicTemporaryCosnCredentialsProvider (Configuration conf ) throws IOException {
66
+ this .longTermSecretId = conf .get (STS_SECRET_ID_KEY );
67
+ this .longTermSecretKey = conf .get (STS_SECRET_KEY_KEY );
68
+ this .stsEndpoint = conf .get (STS_ENDPOINT_KEY , DEFAULT_STS_ENDPOINT );
69
+ this .region = conf .get (CosNConfigKeys .COSN_REGION_KEY );
70
+ this .bucketName = conf .get ("fs.defaultFS" ).replace ("cosn://" , "" );
71
+ this .durationSeconds = conf .getLong (TOKEN_DURATION_SECONDS_KEY , DEFAULT_TOKEN_DURATION_SECONDS );
72
+
73
+ if (this .longTermSecretId == null || this .longTermSecretKey == null ) {
74
+ throw new IOException (
75
+ "Long-term STS credentials not provided in configuration. Please set " + STS_SECRET_ID_KEY
76
+ + " and " + STS_SECRET_KEY_KEY );
77
+ }
78
+ if (this .region == null || this .bucketName == null ) {
79
+ throw new IOException ("Bucket region or name not configured." );
80
+ }
81
+ }
82
+
83
+ @ Override
84
+ public COSCredentials getCredentials () {
85
+ ExpiringCredentials current = expiringCredentialsRef .get ();
86
+ // Refresh if credentials are not present, or are within 60 seconds of expiry.
87
+ if (current == null
88
+ || System .currentTimeMillis () >= current .getExpirationTimeMillis () - 60000 ) {
89
+ LOG .info ("STS credentials expired or not found, requesting new token." );
90
+ refresh ();
91
+ }
92
+ return expiringCredentialsRef .get ().getCredentials ();
93
+ }
94
+
95
+ @ Override
96
+ public void refresh () {
97
+ try {
98
+ Credential cred = new Credential (this .longTermSecretId , this .longTermSecretKey );
99
+ HttpProfile httpProfile = new HttpProfile ();
100
+ httpProfile .setEndpoint (this .stsEndpoint );
101
+ ClientProfile clientProfile = new ClientProfile ();
102
+ clientProfile .setHttpProfile (httpProfile );
103
+
104
+ StsClient client = new StsClient (cred , this .region , clientProfile );
105
+ GetFederationTokenRequest req = new GetFederationTokenRequest ();
106
+
107
+ String policyTemplate = "{\" version\" :\" 2.0\" ,\" statement\" :[{\" action\" :[\" cos:*\" ],"
108
+ + "\" effect\" :\" allow\" ,\" resource\" :[\" qcs::cos:%s:uid/%s:%s/*\" ]}]}" ;
109
+ String policy =
110
+ String .format (policyTemplate , this .region , getAppIdFromBucket (this .bucketName ),
111
+ this .bucketName );
112
+ req .setPolicy (policy );
113
+
114
+ req .setDurationSeconds (this .durationSeconds );
115
+ req .setName ("HadoopCosNContractTest" );
116
+
117
+ GetFederationTokenResponse resp = client .GetFederationToken (req );
118
+
119
+ long expirationTimeMillis = (resp .getExpiredTime () * 1000 );
120
+ BasicSessionCredentials credentials =
121
+ new BasicSessionCredentials (resp .getCredentials ().getTmpSecretId (),
122
+ resp .getCredentials ().getTmpSecretKey (), resp .getCredentials ().getToken ());
123
+
124
+ expiringCredentialsRef .set (new ExpiringCredentials (credentials , expirationTimeMillis ));
125
+ LOG .info ("Successfully refreshed STS credentials. Expiration: {}" ,
126
+ new java .util .Date (expirationTimeMillis ));
127
+
128
+ } catch (Exception e ) {
129
+ LOG .error ("Failed to get token from STS: {}" , e .toString ());
130
+ throw new RuntimeException ("Failed to get token from STS" , e );
131
+ }
132
+ }
133
+
134
+ private String getAppIdFromBucket (String bucket ) {
135
+ int lastDash = bucket .lastIndexOf ('-' );
136
+ if (lastDash != -1 && lastDash < bucket .length () - 1 ) {
137
+ return bucket .substring (lastDash + 1 );
138
+ }
139
+ throw new IllegalArgumentException ("Could not determine AppID from bucket name: " + bucket );
140
+ }
141
+
142
+ /**
143
+ * Helper class to hold credentials and their expiration time.
144
+ */
145
+ private static class ExpiringCredentials {
146
+ private final BasicSessionCredentials credentials ;
147
+ private final long expirationTimeMillis ;
148
+
149
+ ExpiringCredentials (BasicSessionCredentials credentials , long expirationTimeMillis ) {
150
+ this .credentials = credentials ;
151
+ this .expirationTimeMillis = expirationTimeMillis ;
152
+ }
153
+
154
+ BasicSessionCredentials getCredentials () {
155
+ return credentials ;
156
+ }
157
+
158
+ long getExpirationTimeMillis () {
159
+ return expirationTimeMillis ;
160
+ }
161
+ }
162
+ }
0 commit comments