1818 */
1919package io .streamnative .function .mesh .proxy ;
2020
21+ import io .netty .util .concurrent .DefaultThreadFactory ;
2122import lombok .extern .slf4j .Slf4j ;
23+ import org .apache .bookkeeper .common .util .OrderedExecutor ;
2224import org .apache .pulsar .broker .PulsarServerException ;
2325import org .apache .pulsar .broker .ServiceConfiguration ;
2426import org .apache .pulsar .broker .authentication .AuthenticationService ;
2527import org .apache .pulsar .broker .authorization .AuthorizationService ;
28+ import org .apache .pulsar .broker .cache .ConfigurationCacheService ;
2629import org .apache .pulsar .common .configuration .PulsarConfigurationLoader ;
2730import org .apache .pulsar .functions .worker .ErrorNotifier ;
2831import org .apache .pulsar .functions .worker .Worker ;
2932import org .apache .pulsar .functions .worker .WorkerConfig ;
3033import org .apache .pulsar .functions .worker .WorkerService ;
3134import org .apache .pulsar .functions .worker .rest .WorkerServer ;
35+ import org .apache .pulsar .zookeeper .GlobalZooKeeperCache ;
36+ import org .apache .pulsar .zookeeper .ZooKeeperClientFactory ;
37+ import org .apache .pulsar .zookeeper .ZookeeperBkClientFactoryImpl ;
38+
39+ import java .io .IOException ;
40+ import java .util .concurrent .Executors ;
41+ import java .util .concurrent .ScheduledExecutorService ;
3242
3343/**
3444 * This class for test.
3545 */
3646@ Slf4j
3747public class FunctionMeshProxyWorker {
3848
49+ private ZooKeeperClientFactory zkClientFactory = null ;
50+ private final OrderedExecutor orderedExecutor = OrderedExecutor .newBuilder ().numThreads (8 ).name ("zk-cache-ordered" ).build ();
51+ private final ScheduledExecutorService cacheExecutor = Executors .newScheduledThreadPool (10 ,
52+ new DefaultThreadFactory ("zk-cache-callback" ));
53+ private GlobalZooKeeperCache globalZkCache ;
54+ private ConfigurationCacheService configurationCacheService ;
3955 private final WorkerConfig workerConfig ;
4056 private final WorkerService workerService ;
4157 private final ErrorNotifier errorNotifier ;
@@ -50,8 +66,7 @@ public FunctionMeshProxyWorker(WorkerConfig workerConfig) {
5066
5167 protected void start () throws Exception {
5268 workerService .initAsStandalone (workerConfig );
53- // To do add authorization and authentication
54- workerService .start (getAuthenticationService (), null , errorNotifier );
69+ workerService .start (getAuthenticationService (), getAuthorizationService (), errorNotifier );
5570 server = new WorkerServer (workerService , getAuthenticationService ());
5671 server .start ();
5772 log .info ("/** Started worker server on port={} **/" , this .workerConfig .getWorkerPort ());
@@ -64,6 +79,37 @@ protected void start() throws Exception {
6479 }
6580 }
6681
82+ public ZooKeeperClientFactory getZooKeeperClientFactory () {
83+ if (zkClientFactory == null ) {
84+ zkClientFactory = new ZookeeperBkClientFactoryImpl (orderedExecutor );
85+ }
86+ // Return default factory
87+ return zkClientFactory ;
88+ }
89+
90+ private AuthorizationService getAuthorizationService () throws PulsarServerException {
91+ if (this .workerConfig .isAuthorizationEnabled ()) {
92+ log .info ("starting configuration cache service" );
93+
94+ this .globalZkCache = new GlobalZooKeeperCache (getZooKeeperClientFactory (),
95+ (int ) workerConfig .getZooKeeperSessionTimeoutMillis (),
96+ workerConfig .getZooKeeperOperationTimeoutSeconds (),
97+ workerConfig .getConfigurationStoreServers (),
98+ orderedExecutor , cacheExecutor ,
99+ workerConfig .getZooKeeperOperationTimeoutSeconds ());
100+ try {
101+ this .globalZkCache .start ();
102+ } catch (IOException e ) {
103+ throw new PulsarServerException (e );
104+ }
105+
106+ this .configurationCacheService = new ConfigurationCacheService (
107+ this .globalZkCache , this .workerConfig .getPulsarFunctionsCluster ());
108+ return new AuthorizationService (getServiceConfiguration (), this .configurationCacheService );
109+ }
110+ return null ;
111+ }
112+
67113 private AuthenticationService getAuthenticationService () throws PulsarServerException {
68114 return new AuthenticationService (getServiceConfiguration ());
69115 }
@@ -79,5 +125,12 @@ protected void stop() {
79125 this .server .stop ();
80126 }
81127 workerService .stop ();
128+ if (this .globalZkCache != null ) {
129+ try {
130+ this .globalZkCache .close ();
131+ } catch (IOException e ) {
132+ log .warn ("Failed to close global zk cache " , e );
133+ }
134+ }
82135 }
83136}
0 commit comments