2525import com .dtstack .flink .sql .side .FieldInfo ;
2626import com .dtstack .flink .sql .side .JoinInfo ;
2727import com .dtstack .flink .sql .side .hbase .table .HbaseSideTableInfo ;
28+ import com .dtstack .flink .sql .side .hbase .utils .HbaseConfigUtils ;
2829import org .apache .calcite .sql .JoinType ;
2930import org .apache .commons .collections .map .HashedMap ;
31+ import org .apache .commons .lang .StringUtils ;
3032import org .apache .flink .api .java .typeutils .RowTypeInfo ;
3133import com .google .common .collect .Maps ;
3234import org .apache .flink .table .runtime .types .CRow ;
3638import org .apache .hadoop .conf .Configuration ;
3739import org .apache .hadoop .hbase .Cell ;
3840import org .apache .hadoop .hbase .CellUtil ;
41+ import org .apache .hadoop .hbase .HBaseConfiguration ;
3942import org .apache .hadoop .hbase .TableName ;
4043import org .apache .hadoop .hbase .client .Connection ;
4144import org .apache .hadoop .hbase .client .ConnectionFactory ;
4447import org .apache .hadoop .hbase .client .Scan ;
4548import org .apache .hadoop .hbase .client .Table ;
4649import org .apache .hadoop .hbase .util .Bytes ;
50+ import org .apache .hadoop .security .UserGroupInformation ;
4751import org .slf4j .Logger ;
4852import org .slf4j .LoggerFactory ;
4953
54+ import java .io .File ;
5055import java .io .IOException ;
56+ import java .security .PrivilegedAction ;
5157import java .sql .SQLException ;
5258import java .sql .Timestamp ;
5359import java .util .Calendar ;
@@ -166,13 +172,44 @@ public void flatMap(CRow input, Collector<CRow> out) throws Exception {
166172 private void loadData (Map <String , Map <String , Object >> tmpCache ) throws SQLException {
167173 AbstractSideTableInfo sideTableInfo = sideInfo .getSideTableInfo ();
168174 HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo ) sideTableInfo ;
169- Configuration conf = new Configuration ();
170- conf .set ("hbase.zookeeper.quorum" , hbaseSideTableInfo .getHost ());
171- Connection conn = null ;
172- Table table = null ;
173- ResultScanner resultScanner = null ;
175+ boolean openKerberos = hbaseSideTableInfo .isKerberosAuthEnable ();
176+ int loadDataCount = 0 ;
174177 try {
175- conn = ConnectionFactory .createConnection (conf );
178+ conf = HBaseConfiguration .create ();
179+ if (openKerberos ) {
180+ conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_QUORUM , hbaseSideTableInfo .getHost ());
181+ conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_SYNC , hbaseSideTableInfo .getParent ());
182+
183+ fillSyncKerberosConfig (conf ,hbaseSideTableInfo );
184+ LOG .info ("hbase.security.authentication:{}" , conf .get ("hbase.security.authentication" ));
185+ LOG .info ("hbase.security.authorization:{}" , conf .get ("hbase.security.authorization" ));
186+ LOG .info ("hbase.master.keytab.file:{}" , conf .get ("hbase.master.keytab.file" ));
187+ LOG .info ("hbase.master.kerberos.principal:{}" , conf .get ("hbase.master.kerberos.principal" ));
188+ LOG .info ("hbase.regionserver.keytab.file:{}" , conf .get ("hbase.regionserver.keytab.file" ));
189+ LOG .info ("hbase.regionserver.kerberos.principal:{}" , conf .get ("hbase.regionserver.kerberos.principal" ));
190+
191+ UserGroupInformation userGroupInformation = HbaseConfigUtils .loginAndReturnUGI (conf , hbaseSideTableInfo .getRegionserverPrincipal (),
192+ hbaseSideTableInfo .getRegionserverKeytabFile ());
193+
194+ Configuration finalConf = conf ;
195+ conn = userGroupInformation .doAs (new PrivilegedAction <Connection >() {
196+ @ Override
197+ public Connection run () {
198+ try {
199+ return ConnectionFactory .createConnection (finalConf );
200+ } catch (IOException e ) {
201+ LOG .error ("Get connection fail with config:{}" , finalConf );
202+ throw new RuntimeException (e );
203+ }
204+ }
205+ });
206+
207+ } else {
208+ conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_QUORUM , hbaseSideTableInfo .getHost ());
209+ conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM_SYNC , hbaseSideTableInfo .getParent ());
210+ conn = ConnectionFactory .createConnection (conf );
211+ }
212+
176213 table = conn .getTable (TableName .valueOf (tableName ));
177214 resultScanner = table .getScanner (new Scan ());
178215 for (Result r : resultScanner ) {
@@ -187,13 +224,15 @@ private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLExcep
187224
188225 kv .put (aliasNameInversion .get (key .toString ()), value );
189226 }
227+ loadDataCount ++;
190228 tmpCache .put (new String (r .getRow ()), kv );
191229 }
192230 } catch (IOException e ) {
193- LOG . error ( "" , e );
231+ throw new RuntimeException ( e );
194232 } finally {
233+ LOG .info ("load Data count: {}" , loadDataCount );
195234 try {
196- if (null != conn && ! conn . isClosed () ) {
235+ if (null != conn ) {
197236 conn .close ();
198237 }
199238
@@ -209,4 +248,34 @@ private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLExcep
209248 }
210249 }
211250 }
251+
252+ private void fillSyncKerberosConfig (Configuration config , HbaseSideTableInfo hbaseSideTableInfo ) throws IOException {
253+ String regionserverKeytabFile = hbaseSideTableInfo .getRegionserverKeytabFile ();
254+ if (StringUtils .isEmpty (regionserverKeytabFile )) {
255+ throw new IllegalArgumentException ("Must provide regionserverKeytabFile when authentication is Kerberos" );
256+ }
257+ String regionserverKeytabFilePath = System .getProperty ("user.dir" ) + File .separator + regionserverKeytabFile ;
258+ LOG .info ("regionserverKeytabFilePath:{}" , regionserverKeytabFilePath );
259+ config .set (HbaseConfigUtils .KEY_HBASE_MASTER_KEYTAB_FILE , regionserverKeytabFilePath );
260+ config .set (HbaseConfigUtils .KEY_HBASE_REGIONSERVER_KEYTAB_FILE , regionserverKeytabFilePath );
261+
262+ String regionserverPrincipal = hbaseSideTableInfo .getRegionserverPrincipal ();
263+ if (StringUtils .isEmpty (regionserverPrincipal )) {
264+ throw new IllegalArgumentException ("Must provide regionserverPrincipal when authentication is Kerberos" );
265+ }
266+ config .set (HbaseConfigUtils .KEY_HBASE_MASTER_KERBEROS_PRINCIPAL , regionserverPrincipal );
267+ config .set (HbaseConfigUtils .KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL , regionserverPrincipal );
268+ config .set (HbaseConfigUtils .KEY_HBASE_SECURITY_AUTHORIZATION , "true" );
269+ config .set (HbaseConfigUtils .KEY_HBASE_SECURITY_AUTHENTICATION , "kerberos" );
270+
271+ if (!StringUtils .isEmpty (hbaseSideTableInfo .getZookeeperSaslClient ())) {
272+ System .setProperty (HbaseConfigUtils .KEY_ZOOKEEPER_SASL_CLIENT , hbaseSideTableInfo .getZookeeperSaslClient ());
273+ }
274+
275+ if (!StringUtils .isEmpty (hbaseSideTableInfo .getSecurityKrb5Conf ())) {
276+ String krb5ConfPath = System .getProperty ("user.dir" ) + File .separator + hbaseSideTableInfo .getSecurityKrb5Conf ();
277+ LOG .info ("krb5ConfPath:{}" , krb5ConfPath );
278+ System .setProperty (HbaseConfigUtils .KEY_JAVA_SECURITY_KRB5_CONF , krb5ConfPath );
279+ }
280+ }
212281}
0 commit comments