@@ -24,6 +24,7 @@ use IPC::Run qw(run);
2424use JSON::MaybeXS;
2525use File::Slurper qw( read_text) ;
2626
27+
2728sub main {
2829 my $show_usage ;
2930 my $dryrun = 0;
@@ -65,7 +66,6 @@ sub main {
6566
6667 # get the schema updates from the configuration directory compared to what is in influxdb
6768 my $updates = extract_updates($influxdb_client , $schema_dir , $dryrun , $force );
68-
6969 # only print the updates if we're in diff mode
7070 if ($diff ) {
7171 print_diff($updates );
@@ -77,6 +77,7 @@ sub main {
7777 exit ($unapplied_updates_count == 0 ? 0 : 1);
7878}
7979
80+
8081sub extract_updates {
8182 my ($influxdb_client , $schema_dir , $dryrun , $force ) = @_ ;
8283
@@ -92,7 +93,9 @@ sub extract_updates {
9293 return $updates ;
9394}
9495
96+
9597sub apply_updates {
98+
9699 my ($updates , $influxdb_client ) = @_ ;
97100
98101 my %object_str = (db => ' database' , rp => ' retention policy' , cq => ' continuous query' );
@@ -107,12 +110,13 @@ sub apply_updates {
107110 next ;
108111 }
109112 say " $action_prefix {$update ->{action}} $description " ;
110- query_influxql($influxdb_client , $update -> {query })
113+ query_influxql($influxdb_client , $update -> {query });
111114 }
112115
113116 return $skipped_count ;
114117}
115118
119+
116120sub print_diff {
117121 my ($updates ) = @_ ;
118122
@@ -122,6 +126,7 @@ sub print_diff {
122126 }
123127}
124128
129+
125130# Databases and Retention Policies ---------------------------------------------
126131
127132# returns an array of updates:
@@ -137,12 +142,13 @@ sub print_diff {
137142# ...
138143# ]
139144sub extract_database_updates {
145+
140146 my ($db_schemas_in_influxdb , $db_schemas_in_config , $dryrun , $force ) = @_ ;
141147
142148 my ($old_dbs , $eq_dbs , $new_dbs ) = get_Ldifference_intersection_Rdifference([keys %{$db_schemas_in_influxdb }], [keys %{$db_schemas_in_config }]);
143-
149+
144150 my %rp_updates ;
145- for my $db (@$eq_dbs ) {
151+ for my $db (@$eq_dbs , @$new_dbs ) {
146152 my ($old , $updated , $new ) = extract_retention_policy_updates($db , $db_schemas_in_influxdb -> {$db }, $db_schemas_in_config -> {$db }-> {rps }, $dryrun , $force );
147153 $rp_updates {old_rps }-> {$db } = $old ;
148154 $rp_updates {updated_rps }-> {$db } = $updated ;
@@ -156,6 +162,7 @@ sub extract_database_updates {
156162 push @updates , @{$rp_updates {old_rps }-> {$db }};
157163 }
158164 # old databases
165+ # will not delete them by default (skip => 1) or if dryrun=1. Will delete them if dryrun=0 and force=1
159166 for my $db (reverse sort @$old_dbs ){
160167 push @updates , {
161168 action => ' delete' ,
@@ -189,6 +196,7 @@ sub extract_database_updates {
189196 return \@updates ;
190197}
191198
199+
192200sub extract_retention_policy_updates {
193201 my ($db , $rps_in_influxdb , $rps_in_config , $dryrun , $force ) = @_ ;
194202
@@ -225,14 +233,15 @@ sub extract_retention_policy_updates {
225233 object => ' rp' ,
226234 db => $db ,
227235 name => $rp ,
228- query => " CREATE RETENTION POLICY \" $rp \" ON $db DURATION $rps_in_config ->{$rp }->{duration} REPLICATION 1 SHARD DURATION $rps_in_config ->{$rp }->{shard_duration}" . ($rps_in_config -> {$rp }-> {default } ? ' DEFAULT;' : ' ;' ),
236+ query => " CREATE RETENTION POLICY \" $rp \" ON $db DURATION $rps_in_config ->{$rp }->{duration} REPLICATION $rps_in_config ->{ $rp }->{rp_replication} SHARD DURATION $rps_in_config ->{$rp }->{shard_duration}" . ($rps_in_config -> {$rp }-> {default } ? ' DEFAULT;' : ' ;' ),
229237 skip => $dryrun ,
230238 };
231239 }
232240
233241 return (\@old_rps , \@updated_rps , \@new_rps );
234242}
235243
244+
236245sub compare_rps {
237246 my ($rp1 , $rp2 ) = @_ ;
238247
@@ -241,8 +250,10 @@ sub compare_rps {
241250 || ($rp1 -> {default } xor $rp2 -> {default });
242251}
243252
253+
244254# Continuous Queries -----------------------------------------------------------
245255
256+
246257sub extract_continuous_query_updates {
247258 my ($all_cqs_in_influxdb , $all_cqs_in_config , $dryrun , $force ) = @_ ;
248259
@@ -298,6 +309,7 @@ sub extract_continuous_query_updates {
298309 return (\@cq_deletions , \@cq_updates_and_creations );
299310}
300311
312+
301313sub compare_cqs {
302314 my ($cq1 , $cq2 ) = @_ ;
303315 for my $cq ($cq1 , $cq2 ) {
@@ -349,6 +361,16 @@ sub load_db_schemas_in_influxdb {
349361 return \%db_schemas_in_influxdb ;
350362}
351363
364+
365+ #
366+ # Iterates over all files found in the schema directory. For each file it parses the statements and adds the valid ones in a hash data structure.
367+ #
368+ # Arguments:
369+ # $schema_dir string: the directory name where the config files are
370+ #
371+ # Returns:
372+ # a reference to a hash holding the parsed statements. This hash is structured as below.
373+ #
352374# {
353375# <db> => {
354376# create_query => '...',
@@ -363,28 +385,142 @@ sub load_db_schemas_in_influxdb {
363385# },
364386# ...
365387# }
388+ #
366389sub load_db_schemas_in_config {
367390 my ($schema_dir ) = @_ ;
368391
369- my %dbs_struct ;
392+ # this will be populated with all statements parsed from the files
393+ my %parsed_statements ;
370394 my $db_files = get_schema_files_for_dir(" $schema_dir /db" );
371-
395+
372396 for my $db_file (@$db_files ) {
373- my $create_queries = parse_create_queries(" $schema_dir /db/$db_file " );
397+ my $statements_in_file = read_text(" $schema_dir /db/$db_file " );
398+ my $parsed_statements_from_file = parse_statements($statements_in_file );
399+
400+ # add the statements from this file to the initial hash
401+ %parsed_statements = (%parsed_statements , %$parsed_statements_from_file );
402+ }
403+ return \%parsed_statements ;
404+ }
374405
375- for my $db (keys %$create_queries ) {
376- my $create_query = $create_queries -> {$db };
377- my $rps = parse_retention_policies($db , $create_query );
378406
379- $dbs_struct {$db } = {
380- create_query => $create_query ,
381- rps => $rps ,
382- };
407+ #
408+ # Iterates over the lines of a config file, parses the valid statements and adds them to the given hash.
409+ # Valid statements are:
410+ # - Create database with optional retention policy defined
411+ # - Create retention policy
412+ #
413+ # Arguments:
414+ # $string_to_parse string: the contents of the config file (loaded as string)
415+ # $parsed_statements reference: a reference to a hash. The function will populate it with the parsed statements.
416+ #
417+ # Returns:
418+ # a reference to a hash holding the parsed statements from the file.
419+ #
420+ sub parse_statements {
421+ my ($string_to_parse ) = @_ ;
422+ # we want to iterate line-by-line
423+ my @splitted_lines = split " \n " , $string_to_parse ;
424+ my %parsed_statements ;
425+
426+ # captures a 'create database' statement with an optional retention policy definition
427+ my $db_regex = qr / ^\s *(create \s + database \s + (\w +)) # a create db statement, optionally starting with a space. Any valid word as db name. Captured group.
428+ (
429+ \s + with \s + duration # optional statement to define a retention policy in the db creation statement
430+ \s + ((?:\d +[smhdw])|(?:inf)) # captured policy duration as one or more numbers followed by a letter, or 'inf'
431+ \s + replication \s + (\d +) # captured policy replication
432+ \s + shard \s + duration \s + "?(\d +[smhdw])"? # captured shard duration, as above
433+ \s + name \s + "?([\w .]+)"? # captured policy name as one or more word letters or dots, optionally enclosed in double quotes.
434+ )?
435+ \s *;?\s *$ # optional semilocon and end of line
436+ / xi ;
437+
438+ # captures a 'create retention policy' definition statement
439+ my $rp_regex = qr / ^\s *create \s + retention \s + policy
440+ \s + "?([\w .]+)"? # any string containing word chars or '.', optionally enclosed in double quotes
441+ \s + on \s + (\w +)
442+ \s + duration \s + ((?:\d +[smhdw])|(?:inf)) # policy duration: either one or more numbers followed by a letter in the brackets, or 'inf'. Capture only the enclosing group.
443+ \s + replication \s + (\d +) \s + shard \s + duration # capture policy replication
444+ \s + "?(\d +[smhdw])"? # shard duration: one or more numbers followed by a letter in the brackets, optionally enclosed in double quotes
445+ (\s + default)? # default retention policy: optional 'default' value
446+ \s *;?\s *$ # optional semilocon and end of line
447+ / xi ;
448+
449+ # a line in the file can be a database creation or retention policy creation
450+ my $integrated_regex = qr /$db_regex |$rp_regex / ;
451+
452+ # parse each matching line in a loop
453+ for my $line (@splitted_lines ) {
454+ # ignore empty and commented lines
455+ next if $line =~ / ^\s *(#.*)?$ / ;
456+
457+ if ($line =~ / $integrated_regex / ) {
458+ # capture groups from the regex
459+ my $create_db_statement = $1 ;
460+ my $db_name = $2 ;
461+ my $inline_rp_statement = $3 ;
462+ my $inline_rp_duration = $4 ;
463+ my $inline_rp_replication = $5 ;
464+ my $inline_rp_shard_duration = $6 ;
465+ my $inline_rp_name = $7 ;
466+ my $rp_name = $8 ;
467+ my $rp_db_name = $9 ;
468+ my $rp_duration = $10 ;
469+ my $rp_replication = $11 ;
470+ my $rp_shard_duration = $12 ;
471+ my $default_rp = $13 ? 1 : 0;
472+
473+ # the line is a 'create database' statement...
474+ if ($create_db_statement ) {
475+ check_if_db_statement_exists_and_die(\%parsed_statements , $db_name );
476+ $parsed_statements {$db_name }{create_query } = $create_db_statement . ' ;' ;
477+ # ...and has a retention policy defined inline
478+ if ($inline_rp_statement ) {
479+ # flagged as the default policy (assume that the inline policy is the default)
480+ $parsed_statements {$db_name }{rps }{$inline_rp_name } = {duration => $inline_rp_duration ,
481+ shard_duration => $inline_rp_shard_duration ,
482+ rp_replication => $inline_rp_replication ,
483+ default => 1};
484+ }
485+ }
486+ # the line is a 'create retention policy' statement
487+ elsif ($rp_name ) {
488+ $parsed_statements {$rp_db_name }{rps }{$rp_name } = {duration => $rp_duration ,
489+ shard_duration => $rp_shard_duration ,
490+ rp_replication => $rp_replication ,
491+ default => $default_rp }
492+ }
493+ else {
494+ die " error, should never reach this"
495+ }
496+ }
497+ else {
498+ die " could not parse input: $line " ;
383499 }
384500 }
385- return \%dbs_struct ;
501+ return \%parsed_statements ;
386502}
387503
504+
505+ #
506+ # Checks if the db creation statement is already added in the parsed statements hash. If yes then it throws an exception.
507+ #
508+ # Arguments:
509+ # $parsed_statements reference: a reference to the hash containing the db statements
510+ # $db_name string:
511+ # Returns:
512+ # -
513+ #
514+ sub check_if_db_statement_exists_and_die {
515+ my ($parsed_statements , $db_name ) = @_ ;
516+
517+ # there should be no 'create database' statement yet for this database
518+ if ($parsed_statements -> {$db_name }{create_query }) {
519+ die " Create statement already defined for database $db_name . Fix the config file" ;
520+ }
521+ }
522+
523+
388524# {
389525# <db> => {
390526# <cq_name> => <cq_create_query>,
@@ -431,73 +567,6 @@ sub get_schema_files_for_dir {
431567
432568# Parsers ----------------------------------------------------------------------
433569
434- # parse a file possibly containing multiple database create queries (with their respective RPs)
435- sub parse_create_queries {
436- my ($file_name ) = @_ ;
437- my $file_content = read_text($file_name );
438-
439- my %create_queries ;
440- while ($file_content =~ / (create database (\w +)[\s\S ]+?)(?=(create database)|\z )/ig ) {
441- my ($create_query , $db ) = ($1 , $2 );
442- $create_query =~ s / ^\s +|\s +$// g ;
443- $create_queries {$db } = $create_query ;
444- }
445- if (! %create_queries ) {
446- die " No create query was found in $file_name , there is probably something wrong with the regex" ;
447- }
448-
449- return \%create_queries ;
450- }
451-
452- # parse RPs from something like:
453- # CREATE DATABASE test WITH DURATION 260w REPLICATION 1 SHARD DURATION 12w NAME rp2;
454- # CREATE RETENTION POLICY rp1 ON test DURATION 100d REPLICATION 1 SHARD DURATION 2w;
455- sub parse_retention_policies {
456- my ($db , $db_create_query ) = @_ ;
457-
458- my $default_rp ;
459- # first rp in the config
460- my %rps ;
461- if ($db_create_query =~ / create database $db with duration ((?:\d +[smhdw])|(?:inf)) [\w ]*? shard duration (\d +[smhdw]) name "?([\w .]+)"?/ig ) {
462- $rps {$3 } = {
463- duration => $1 ,
464- shard_duration => $2 ,
465- };
466- $default_rp = $3 ;
467- }
468- else { # https://docs.influxdata.com/influxdb/v1.5/query_language/database_management/#retention-policy-management
469- $rps {' autogen' } = {
470- duration => ' INF' ,
471- shard_duration => ' 7d' ,
472- };
473- $default_rp = ' autogen' ;
474- }
475- # loop over the rest
476- my $at_least_one_rp = scalar %rps ;
477- while ($db_create_query =~ / create retention policy "?([\w .]+)"? on $db duration ((?:\d +[smhdw])|(?:inf)) replication 1 shard duration (\d +[smhdw])( default)?/ig ) {
478- $at_least_one_rp = 1;
479- $rps {$1 } = {
480- duration => $2 ,
481- shard_duration => $3 ,
482- };
483- $default_rp = $1 if $4 ;
484- }
485- if (!$at_least_one_rp ) {
486- die " No retention policy was matched for $db , there is probably something wrong with the regex!" ;
487- }
488- # drop rps if needed
489- while ($db_create_query =~ / drop retention policy "?([\w .]+)"? on "?$db "?/ig ) {
490- delete $rps {$1 };
491- }
492-
493- # only take in account the last rp declared as default
494- if ($default_rp ) {
495- $rps {$default_rp }-> {default } = 1;
496- }
497-
498- return \%rps ;
499- }
500-
501570# parse CQs from something like:
502571# CREATE CONTINUOUS QUERY cq1 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT LAST(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END;
503572# CREATE CONTINUOUS QUERY cq2 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT MAX(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END;
0 commit comments