2929import org .apache .flink .configuration .Configuration ;
3030import org .apache .flink .types .Row ;
3131import org .apache .flink .util .Preconditions ;
32- import org .apache .hadoop .hbase .HBaseConfiguration ;
33- import org .apache .hadoop .hbase .TableName ;
32+ import org .apache .hadoop .hbase .*;
3433import org .apache .hadoop .hbase .client .Connection ;
3534import org .apache .hadoop .hbase .client .ConnectionFactory ;
3635import org .apache .hadoop .hbase .client .Delete ;
@@ -70,6 +69,8 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
7069 private String regionserverPrincipal ;
7170 private String securityKrb5Conf ;
7271 private String zookeeperSaslClient ;
72+ private String clientPrincipal ;
73+ private String clientKeytabFile ;
7374
7475 private String [] families ;
7576 private String [] qualifiers ;
@@ -78,48 +79,78 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
7879 private transient Connection conn ;
7980 private transient Table table ;
8081
82+ private transient ChoreService choreService ;
83+
8184 @ Override
8285 public void configure (Configuration parameters ) {
8386 LOG .warn ("---configure---" );
8487 conf = HBaseConfiguration .create ();
85- conf .set ("hbase.zookeeper.quorum" , host );
86- if (zkParent != null && !"" .equals (zkParent )) {
87- conf .set ("zookeeper.znode.parent" , zkParent );
88- }
89- LOG .warn ("---configure end ---" );
9088 }
9189
9290 @ Override
9391 public void open (int taskNumber , int numTasks ) throws IOException {
9492 LOG .warn ("---open---" );
95- if (kerberosAuthEnable ) {
96- conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_QUORUM , host );
97- conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM , zkParent );
98- fillSyncKerberosConfig (conf , regionserverKeytabFile , regionserverPrincipal , zookeeperSaslClient , securityKrb5Conf );
99-
100- UserGroupInformation userGroupInformation = HbaseConfigUtils .loginAndReturnUGI (conf , regionserverPrincipal , regionserverKeytabFile );
101- org .apache .hadoop .conf .Configuration finalConf = conf ;
102- conn = userGroupInformation .doAs (new PrivilegedAction <Connection >() {
103- @ Override
104- public Connection run () {
105- try {
106- return ConnectionFactory .createConnection (finalConf );
107- } catch (IOException e ) {
108- LOG .error ("Get connection fail with config:{}" , finalConf );
109- throw new RuntimeException (e );
110- }
111- }
112- });
113- } else {
114- conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_QUORUM , host );
115- conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM , zkParent );
116- conn = ConnectionFactory .createConnection (conf );
117- }
93+ openConn ();
11894 table = conn .getTable (TableName .valueOf (tableName ));
11995 LOG .warn ("---open end(get table from hbase) ---" );
12096 initMetric ();
12197 }
12298
99+ private void openConn (){
100+ try {
101+ if (kerberosAuthEnable ) {
102+ LOG .info ("open kerberos conn" );
103+ openKerberosConn ();
104+ } else {
105+ LOG .info ("open conn" );
106+ conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_QUORUM , host );
107+ conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM , zkParent );
108+ conn = ConnectionFactory .createConnection (conf );
109+ }
110+ }catch (Exception e ){
111+ throw new RuntimeException (e );
112+ }
113+
114+ }
115+ private void openKerberosConn () throws IOException {
116+ conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_QUORUM , host );
117+ conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM , zkParent );
118+
119+ LOG .info ("kerberos config:{}" , this .toString ());
120+ Preconditions .checkArgument (!StringUtils .isEmpty (clientPrincipal ), " clientPrincipal not null!" );
121+ Preconditions .checkArgument (!StringUtils .isEmpty (clientKeytabFile ), " clientKeytabFile not null!" );
122+
123+ fillSyncKerberosConfig (conf , regionserverPrincipal , zookeeperSaslClient , securityKrb5Conf );
124+
125+ clientKeytabFile = System .getProperty ("user.dir" ) + File .separator + clientKeytabFile ;
126+ clientPrincipal = !StringUtils .isEmpty (clientPrincipal ) ? clientPrincipal : regionserverPrincipal ;
127+
128+ conf .set (HbaseConfigUtils .KEY_HBASE_CLIENT_KEYTAB_FILE , clientKeytabFile );
129+ conf .set (HbaseConfigUtils .KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL , clientPrincipal );
130+
131+ UserGroupInformation userGroupInformation = HbaseConfigUtils .loginAndReturnUGI (conf , clientPrincipal , clientKeytabFile );
132+ org .apache .hadoop .conf .Configuration finalConf = conf ;
133+ conn = userGroupInformation .doAs (new PrivilegedAction <Connection >() {
134+ @ Override
135+ public Connection run () {
136+ try {
137+ ScheduledChore authChore = AuthUtil .getAuthChore (finalConf );
138+ if (authChore != null ) {
139+ choreService = new ChoreService ("hbaseKerberosSink" );
140+ choreService .scheduleChore (authChore );
141+ }
142+
143+ return ConnectionFactory .createConnection (finalConf );
144+ } catch (IOException e ) {
145+ LOG .error ("Get connection fail with config:{}" , finalConf );
146+ throw new RuntimeException (e );
147+ }
148+ }
149+ });
150+ }
151+
152+
153+
123154 @ Override
124155 public void writeRecord (Tuple2 tuple2 ) {
125156 Tuple2 <Boolean , Row > tupleTrans = tuple2 ;
@@ -227,7 +258,6 @@ public void close() throws IOException {
227258 conn = null ;
228259 }
229260 }
230-
231261 private HbaseOutputFormat () {
232262 }
233263
@@ -264,11 +294,6 @@ public HbaseOutputFormatBuilder setRowkey(String rowkey) {
264294 return this ;
265295 }
266296
267- public HbaseOutputFormatBuilder setUpdateMode (String updateMode ) {
268- format .updateMode = updateMode ;
269- return this ;
270- }
271-
272297 public HbaseOutputFormatBuilder setColumnNames (String [] columnNames ) {
273298 format .columnNames = columnNames ;
274299 return this ;
@@ -283,6 +308,7 @@ public HbaseOutputFormatBuilder setColumnNameFamily(Map<String, String> columnNa
283308 format .columnNameFamily = columnNameFamily ;
284309 return this ;
285310 }
311+
286312 public HbaseOutputFormatBuilder setKerberosAuthEnable (boolean kerberosAuthEnable ) {
287313 format .kerberosAuthEnable = kerberosAuthEnable ;
288314 return this ;
@@ -308,6 +334,16 @@ public HbaseOutputFormatBuilder setZookeeperSaslClient(String zookeeperSaslClien
308334 return this ;
309335 }
310336
337+ public HbaseOutputFormatBuilder setClientPrincipal (String clientPrincipal ) {
338+ format .clientPrincipal = clientPrincipal ;
339+ return this ;
340+ }
341+
342+ public HbaseOutputFormatBuilder setClientKeytabFile (String clientKeytabFile ) {
343+ format .clientKeytabFile = clientKeytabFile ;
344+ return this ;
345+ }
346+
311347
312348 public HbaseOutputFormat finish () {
313349 Preconditions .checkNotNull (format .host , "zookeeperQuorum should be specified" );
@@ -323,7 +359,7 @@ public HbaseOutputFormat finish() {
323359 String [] columns = keySet .toArray (new String [keySet .size ()]);
324360 for (int i = 0 ; i < columns .length ; ++i ) {
325361 String col = columns [i ];
326- String [] part = StringUtils .split (col , ":" ); ;
362+ String [] part = col .split (":" );
327363 families [i ] = part [0 ];
328364 qualifiers [i ] = part [1 ];
329365 }
@@ -336,16 +372,8 @@ public HbaseOutputFormat finish() {
336372
337373 }
338374
339- private void fillSyncKerberosConfig ( org .apache .hadoop .conf .Configuration config , String regionserverKeytabFile , String regionserverPrincipal ,
375+ private void fillSyncKerberosConfig (org .apache .hadoop .conf .Configuration config , String regionserverPrincipal ,
340376 String zookeeperSaslClient , String securityKrb5Conf ) throws IOException {
341- if (StringUtils .isEmpty (regionserverKeytabFile )) {
342- throw new IllegalArgumentException ("Must provide regionserverKeytabFile when authentication is Kerberos" );
343- }
344- String regionserverKeytabFilePath = System .getProperty ("user.dir" ) + File .separator + regionserverKeytabFile ;
345- LOG .info ("regionserverKeytabFilePath:{}" ,regionserverKeytabFilePath );
346- config .set (HbaseConfigUtils .KEY_HBASE_MASTER_KEYTAB_FILE , regionserverKeytabFilePath );
347- config .set (HbaseConfigUtils .KEY_HBASE_REGIONSERVER_KEYTAB_FILE , regionserverKeytabFilePath );
348-
349377 if (StringUtils .isEmpty (regionserverPrincipal )) {
350378 throw new IllegalArgumentException ("Must provide regionserverPrincipal when authentication is Kerberos" );
351379 }
@@ -366,5 +394,17 @@ private void fillSyncKerberosConfig( org.apache.hadoop.conf.Configuration config
366394 }
367395 }
368396
397+ @ Override
398+ public String toString () {
399+ return "HbaseOutputFormat kerberos{" +
400+ "kerberosAuthEnable=" + kerberosAuthEnable +
401+ ", regionserverKeytabFile='" + regionserverKeytabFile + '\'' +
402+ ", regionserverPrincipal='" + regionserverPrincipal + '\'' +
403+ ", securityKrb5Conf='" + securityKrb5Conf + '\'' +
404+ ", zookeeperSaslClient='" + zookeeperSaslClient + '\'' +
405+ ", clientPrincipal='" + clientPrincipal + '\'' +
406+ ", clientKeytabFile='" + clientKeytabFile + '\'' +
407+ '}' ;
408+ }
369409
370410}
0 commit comments