1+ package com .yugabyte .simulation .service ;
2+
3+ import com .yugabyte .simulation .dao .InvocationResult ;
4+ import com .yugabyte .simulation .dao .ParamValue ;
5+ import com .yugabyte .simulation .dao .WorkloadDesc ;
6+ import com .yugabyte .simulation .dao .WorkloadParamDesc ;
7+ import com .yugabyte .simulation .services .ServiceManager ;
8+ import com .yugabyte .simulation .workload .*;
9+ import org .springframework .beans .factory .annotation .Autowired ;
10+ import org .springframework .beans .factory .annotation .Value ;
11+ import org .springframework .jdbc .core .JdbcTemplate ;
12+ import org .springframework .jdbc .core .RowCallbackHandler ;
13+ import org .springframework .stereotype .Repository ;
14+
15+ import java .sql .ResultSet ;
16+ import java .sql .SQLException ;
17+ import java .sql .Types ;
18+ import java .util .*;
19+ import java .util .concurrent .ThreadLocalRandom ;
20+
21+ @ Repository
22+ public class CloudantWorkload extends WorkloadSimulationBase implements WorkloadSimulation {
23+
24+ @ Autowired
25+ private JdbcTemplate jdbcTemplate ;
26+
27+ @ Autowired
28+ private ServiceManager serviceManager ;
29+
30+ @ Value ("${SPRING_APPLICATION_NAME:}" )
31+ private String applicationName ;
32+
33+ @ Override
34+ public String getName () {
35+ return "Cloudant" + ((applicationName != null && !applicationName .equals ("" )) ? " [" + applicationName + "]" : "" );
36+ }
37+
38+ private static final String DROP_TRANSACTIONS_TABLE = "DROP TABLE IF EXISTS transactions CASCADE;" ;
39+ private static final String DROP_USERS_TABLE = "DROP TABLE IF EXISTS users CASCADE;" ;
40+
41+ private static final String CREATE_TRANSACTIONS_TABLE = "CREATE TABLE transactions (\n " +
42+ " transaction_id UUID PRIMARY KEY,\n " +
43+ " \" user\" UUID NOT NULL,\n " +
44+ " walletId TEXT,\n " +
45+ " type TEXT,\n " +
46+ " device UUID,\n " +
47+ " amount DECIMAL(10, 2) NOT NULL,\n " +
48+ " item TEXT NOT NULL,\n " +
49+ " time TIMESTAMP WITH TIME ZONE NOT NULL,\n " +
50+ " surcharge DECIMAL(10, 2),\n " +
51+ " offer JSONB,\n " +
52+ " status INT NOT NULL,\n " +
53+ " authKey TEXT,\n " +
54+ " created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,\n " +
55+ " updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n " +
56+ ");" ;
57+
58+ private static final String CREATE_USERS_TABLE = "CREATE TABLE users (\n " +
59+ " user_id UUID PRIMARY KEY,\n " +
60+ " auth JSONB,\n " +
61+ " auth_key TEXT,\n " +
62+ " created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,\n " +
63+ " updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n " +
64+ ");" ;
65+
66+ private static final String CREATE_INDEX_TRANSACTIONS_USER_STATUS_AMOUNT_WALLETID = "CREATE INDEX idx_transactions_user_status_amount_walletid \n " +
67+ "ON transactions (\" user\" , status, amount, walletId);" ;
68+
69+ private static final String CREATE_INDEX_TRANSACTIONS_USER_STATUS_AMOUNT_WALLETID_PARTIAL = "CREATE INDEX idx_transactions_user_status_amount_walletid_partial \n " +
70+ "ON transactions (\" user\" , status, amount, walletId)\n " +
71+ "WHERE status NOT IN (1, 2) AND amount IS NOT NULL AND \" user\" IS NOT NULL;" ;
72+
73+ private static final String CREATE_INDEX_TRANSACTIONS_STATUS_AMOUNT_USER_DEVICE = "CREATE INDEX idx_transactions_status_amount_user_device \n " +
74+ "ON transactions (status, amount, \" user\" , device);" ;
75+
76+ private static final String INSERT_USER_RECORD = "INSERT INTO users (user_id, auth, auth_key) VALUES (?, ?::jsonb, ?);" ;
77+
78+ private static final String INSERT_TRANSACTION_RECORD = "INSERT INTO transactions (transaction_id, \" user\" , walletId, type, device, amount, item, time, surcharge, offer, status, authKey) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?, ?);" ;
79+
80+ private static final String SELECT_TRANSACTIONS_QUERY = "SELECT \" user\" , \n " +
81+ " COALESCE(\n " +
82+ " CASE \n " +
83+ " WHEN walletId IN ('prepaid_usd', 'prepaid_cad') THEN type \n " +
84+ " ELSE walletId \n " +
85+ " END, \n " +
86+ " type\n " +
87+ " ) AS wallet_key, \n " +
88+ " SUM(\n " +
89+ " CASE \n " +
90+ " WHEN status NOT IN (1, 2) AND amount IS NOT NULL AND \" user\" IS NOT NULL THEN \n " +
91+ " CASE \n " +
92+ " WHEN device IS NOT NULL THEN \n " +
93+ " -(amount + COALESCE(surcharge, 0) - COALESCE(offer->>'offerAmount', '0')::numeric)\n " +
94+ " ELSE \n " +
95+ " amount + COALESCE(surcharge, 0) - COALESCE(offer->>'offerAmount', '0')::numeric\n " +
96+ " END \n " +
97+ " ELSE 0 \n " +
98+ " END\n " +
99+ " ) AS balance \n " +
100+ "FROM transactions \n " +
101+ "WHERE status NOT IN (1, 2) \n " +
102+ " AND amount IS NOT NULL \n " +
103+ " AND \" user\" IS NOT NULL \n " +
104+ " AND \" user\" = ? \n " +
105+ "GROUP BY \" user\" , wallet_key \n " +
106+ "ORDER BY \" user\" , wallet_key;" ;
107+
108+ private static final int ROWS_TO_PRELOAD = 10000 ;
109+
110+ private enum WorkloadType {
111+ CREATE_TABLES ,
112+ SEED_DATA ,
113+ RUN_SIMULATION_FIXED_WORKLOAD
114+ }
115+
116+ private final FixedStepsWorkloadType createTablesWorkloadType ;
117+ private final FixedTargetWorkloadType seedingWorkloadType ;
118+ private final FixedTargetWorkloadType simulationFixedWorkloadType ;
119+
120+ public CloudantWorkload () {
121+ this .createTablesWorkloadType = new FixedStepsWorkloadType (
122+ new Step ("Drop transactions table" , (a , b ) -> jdbcTemplate .execute (DROP_TRANSACTIONS_TABLE )),
123+ new Step ("Drop users table" , (a , b ) -> jdbcTemplate .execute (DROP_USERS_TABLE )),
124+ new Step ("Create transactions table" , (a , b ) -> jdbcTemplate .execute (CREATE_TRANSACTIONS_TABLE )),
125+ new Step ("Create users table" , (a , b ) -> jdbcTemplate .execute (CREATE_USERS_TABLE )),
126+ new Step ("Create index on transactions (user, status, amount, walletId)" , (a , b ) -> jdbcTemplate .execute (CREATE_INDEX_TRANSACTIONS_USER_STATUS_AMOUNT_WALLETID )),
127+ new Step ("Create partial index on transactions (user, status, amount, walletId)" , (a , b ) -> jdbcTemplate .execute (CREATE_INDEX_TRANSACTIONS_USER_STATUS_AMOUNT_WALLETID_PARTIAL )),
128+ new Step ("Create index on transactions (status, amount, user, device)" , (a , b ) -> jdbcTemplate .execute (CREATE_INDEX_TRANSACTIONS_STATUS_AMOUNT_USER_DEVICE )),
129+ new Step ("Populate Users" , (a , b ) -> populateUsers ())
130+ );
131+
132+ this .seedingWorkloadType = new FixedTargetWorkloadType ();
133+ this .simulationFixedWorkloadType = new FixedTargetWorkloadType ();
134+ }
135+
136+ private WorkloadDesc createTablesWorkload = new WorkloadDesc (
137+ CloudantWorkload .WorkloadType .CREATE_TABLES .toString (),
138+ "Create Tables" ,
139+ "Create the database tables. If the table already exists it will be dropped"
140+ );
141+
142+ private WorkloadDesc seedingWorkload = new WorkloadDesc (
143+ CloudantWorkload .WorkloadType .SEED_DATA .toString (),
144+ "Seed Data" ,
145+ "Load data into the transactions table" ,
146+ new WorkloadParamDesc ("Items to generate:" , 1 , Integer .MAX_VALUE , 10000 ),
147+ new WorkloadParamDesc ("Threads" , 1 , Integer .MAX_VALUE , 32 )
148+ );
149+
150+ private WorkloadDesc simulationFixedWorkload = new WorkloadDesc (
151+ CloudantWorkload .WorkloadType .RUN_SIMULATION_FIXED_WORKLOAD .toString (),
152+ "Simulation" ,
153+ "Run a simulation of reads on transactions" ,
154+ new WorkloadParamDesc ("Invocations" , 1 , Integer .MAX_VALUE , 1000000 ),
155+ new WorkloadParamDesc ("Max Threads" , 1 , Integer .MAX_VALUE , 64 )
156+ );
157+
158+ @ Override
159+ public List <WorkloadDesc > getWorkloads () {
160+ return Arrays .asList (
161+ createTablesWorkload ,
162+ seedingWorkload ,
163+ simulationFixedWorkload
164+ );
165+ }
166+
167+ @ Override
168+ public InvocationResult invokeWorkload (String workloadId , ParamValue [] values ) {
169+ CloudantWorkload .WorkloadType type = CloudantWorkload .WorkloadType .valueOf (workloadId );
170+ try {
171+ switch (type ) {
172+ case CREATE_TABLES :
173+ this .createTables ();
174+ return new InvocationResult ("Ok" );
175+ case SEED_DATA :
176+ this .seedData (values [0 ].getIntValue (), values [1 ].getIntValue ());
177+ return new InvocationResult ("Ok" );
178+ case RUN_SIMULATION_FIXED_WORKLOAD :
179+ this .runSimulationFixedWorkload (values );
180+ return new InvocationResult ("Ok" );
181+ }
182+ throw new IllegalArgumentException ("Unknown workload " + workloadId );
183+ } catch (Exception e ) {
184+ return new InvocationResult (e );
185+ }
186+ }
187+
188+ private void createTables () {
189+ createTablesWorkloadType .createInstance (serviceManager ).execute ();
190+ }
191+
192+ private void populateUsers () {
193+ for (int i = 0 ; i < ROWS_TO_PRELOAD ; i ++) {
194+ UUID userId = LoadGeneratorUtils .getUUID ();
195+ String auth = LoadGeneratorUtils .getJson ();
196+ String authKey = LoadGeneratorUtils .getText (10 , 20 );
197+ jdbcTemplate .update (INSERT_USER_RECORD , userId , auth , authKey );
198+ }
199+ }
200+
201+ private void seedData (int numberToGenerate , int threads ) {
202+ List <UUID > userIds = new ArrayList <>();
203+ String query = "SELECT user_id FROM users LIMIT " +ROWS_TO_PRELOAD ;
204+ jdbcTemplate .query (query , new RowCallbackHandler () {
205+ @ Override
206+ public void processRow (ResultSet rs ) throws SQLException {
207+ userIds .add (UUID .fromString (rs .getString ("user_id" )));
208+ }
209+ });
210+ seedingWorkloadType
211+ .createInstance (serviceManager )
212+ .execute (threads , numberToGenerate , (customData , threadData ) -> {
213+ UUID userId = userIds .get (ThreadLocalRandom .current ().nextInt (userIds .size ()));
214+ runInserts (userId );
215+ return threadData ;
216+ });
217+ }
218+
219+ private void runSimulationFixedWorkload (ParamValue [] values ) {
220+ int numOfInvocations = values [0 ].getIntValue ();
221+ int maxThreads = values [1 ].getIntValue ();
222+ List <UUID > userIds = new ArrayList <>();
223+ String query = "SELECT user_id FROM users LIMIT " +ROWS_TO_PRELOAD ;
224+ jdbcTemplate .query (query , new RowCallbackHandler () {
225+ @ Override
226+ public void processRow (ResultSet rs ) throws SQLException {
227+ userIds .add (UUID .fromString (rs .getString ("user_id" )));
228+ }
229+ });
230+ seedingWorkloadType
231+ .createInstance (serviceManager )
232+ .execute (maxThreads , numOfInvocations , (customData , threadData ) -> {
233+ UUID userId = userIds .get (ThreadLocalRandom .current ().nextInt (userIds .size ()));
234+ runSelectQuery (userId );
235+ return threadData ;
236+ });
237+ }
238+
239+ private void runInserts (UUID userId ) {
240+ UUID transactionId = LoadGeneratorUtils .getUUID ();
241+ String walletId = LoadGeneratorUtils .getText (10 , 20 );
242+ String type = LoadGeneratorUtils .getText (5 , 10 );
243+ UUID device = LoadGeneratorUtils .getUUID ();
244+ double amount = LoadGeneratorUtils .getDouble (1.00 , 1000.00 );
245+ String item = LoadGeneratorUtils .getText (10 , 40 );
246+ Date time = LoadGeneratorUtils .getTimestamp ();
247+ double surcharge = LoadGeneratorUtils .getDouble (0.00 , 10.00 );
248+ String offer = LoadGeneratorUtils .getJson ();
249+ int status = LoadGeneratorUtils .getInt (0 , 2 );
250+ String authKey = LoadGeneratorUtils .getText (10 , 20 );
251+ jdbcTemplate .update (INSERT_TRANSACTION_RECORD , transactionId , userId , walletId , type , device , amount , item , time , surcharge , offer , status , authKey );
252+ }
253+
254+ private void runSelectQuery (UUID userId ) {
255+ jdbcTemplate .query (SELECT_TRANSACTIONS_QUERY , new Object [] {userId }, new int [] {Types .OTHER }, new RowCallbackHandler () {
256+ @ Override
257+ public void processRow (ResultSet rs ) throws SQLException {
258+ // Process the result set
259+ // System.out.printf("User: %s, Wallet Key: %s, Balance: %s%n",
260+ // rs.getString("user"), // "user" column
261+ // rs.getString("wallet_key"), // "wallet_key" alias
262+ // rs.getBigDecimal("balance") // "balance" alias
263+ // );
264+
265+ }
266+ });
267+ }
268+ }
0 commit comments