Skip to content

Commit 2ef46a5

Browse files
Merge pull request #4871 from linuxfoundation/unicron-if-fivetran-synced-is-used-update-approx-date-fields-in-backfill-script
Address slack feeback regarding dates backfill
2 parents 82f9935 + e26d703 commit 2ef46a5

File tree

3 files changed

+81
-177
lines changed

3 files changed

+81
-177
lines changed

cla-backend-go/cmd/signatures_timestamp_backfill/main.go

Lines changed: 78 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -34,24 +34,27 @@ const (
3434
regionDefault = "us-east-1"
3535

3636
// attribute names
37-
attrDateCreated = "date_created"
38-
attrDateModified = "date_modified"
37+
attrDateCreated = "date_created"
38+
attrDateModified = "date_modified"
39+
attrApproxDateCreated = "approx_date_created"
40+
attrApproxDateModified = "approx_date_modified"
3941

4042
// cutoff date for _FIVETRAN_SYNCED usage
4143
fivetranCutoffDate = "2024-03-09T00:00:00Z"
4244

4345
// update expression helpers
44-
setPrefix = "SET "
45-
commaSep = ", "
46-
exprSetDateCreated = "#date_created = :date_created"
47-
exprSetDateModified = "#date_modified = :date_modified"
48-
condAnyMissing = "attribute_not_exists(#date_created) OR #date_created = :empty OR attribute_not_exists(#date_modified) OR #date_modified = :empty"
46+
setPrefix = "SET "
47+
commaSep = ", "
48+
exprSetDateCreated = "#date_created = :date_created"
49+
exprSetDateModified = "#date_modified = :date_modified"
50+
exprSetApproxDateCreated = "#approx_date_created = :approx_date_created"
51+
exprSetApproxDateModified = "#approx_date_modified = :approx_date_modified"
52+
condAnyMissing = "attribute_not_exists(#date_created) OR #date_created = :empty OR attribute_not_exists(#date_modified) OR #date_modified = :empty"
4953

5054
// source labels
5155
labelFromCreated = "from_created"
5256
labelFromModified = "from_modified"
5357
labelFivetranSynced = "fivetran_synced"
54-
labelNow = "now"
5558
labelSignURLCreated = "signurl_createdat"
5659
labelSignURLIssued = "signurl_issuedat"
5760
labelSignedOn = "signed_on"
@@ -91,6 +94,8 @@ type SignatureRecord struct {
9194
SignatureID string `dynamodbav:"signature_id"`
9295
DateCreated string `dynamodbav:"date_created"`
9396
DateModified string `dynamodbav:"date_modified"`
97+
ApproxDateCreated string `dynamodbav:"approx_date_created"`
98+
ApproxDateModified string `dynamodbav:"approx_date_modified"`
9499
SignedOn string `dynamodbav:"signed_on"`
95100
UserDocusignDateSigned string `dynamodbav:"user_docusign_date_signed"`
96101
UserDocusignRawXML string `dynamodbav:"user_docusign_raw_xml"`
@@ -104,12 +109,19 @@ type Counter map[string]int
104109
func (c Counter) Inc(label string) { c[label]++ }
105110

106111
type UpdateStats struct {
107-
Created Counter
108-
Modified Counter
112+
Created Counter
113+
Modified Counter
114+
ApproxCreated Counter
115+
ApproxModified Counter
109116
}
110117

111118
func newStats() UpdateStats {
112-
return UpdateStats{Created: Counter{}, Modified: Counter{}}
119+
return UpdateStats{
120+
Created: Counter{},
121+
Modified: Counter{},
122+
ApproxCreated: Counter{},
123+
ApproxModified: Counter{},
124+
}
113125
}
114126

115127
// -----------------------------------------------------------------------------
@@ -122,7 +134,6 @@ func main() {
122134
stage = "dev"
123135
}
124136
dryRun := getEnvBool("DRY_RUN")
125-
allowCurrentTime := getEnvBool("ALLOW_CURRENT_TIME")
126137
debug = getEnvBool("DEBUG")
127138

128139
// Snowflake helper & table
@@ -141,7 +152,7 @@ func main() {
141152
if fallbackCLIPath == "" {
142153
fallbackCLIPath = fmt.Sprintf("backfill-fallback-commands-cla-%s-signatures-%s.sh", stage, time.Now().UTC().Format("20060102T150405Z"))
143154
}
144-
fmt.Printf("Signature backfill | stage=%s dry-run=%t allow-current-time(after SF)=%t DEBUG=%t\n", stage, dryRun, allowCurrentTime, debug)
155+
fmt.Printf("Signature backfill | stage=%s dry-run=%t DEBUG=%t\n", stage, dryRun, debug)
145156
fmt.Printf("Snowflake: table=%s via %s (batch=%d)\n", sfTable, sfCmd, sfBatchSize)
146157

147158
awsSession, err := session.NewSession(&aws.Config{Region: aws.String(regionDefault)})
@@ -168,7 +179,17 @@ func main() {
168179
return
169180
}
170181
cliFile = f
171-
if _, e := fmt.Fprintf(cliFile, "#!/usr/bin/env bash\nset -euo pipefail\n# generated %s UTC, stage=%s, table=%s\n\n", time.Now().UTC().Format(time.RFC3339), stage, tableName); e != nil {
182+
if _, e := fmt.Fprintf(
183+
cliFile,
184+
"# Copyright The Linux Foundation and each contributor to CommunityBridge.\n"+
185+
"# SPDX-License-Identifier: MIT\n"+
186+
"#!/usr/bin/env bash\n"+
187+
"set -euo pipefail\n"+
188+
"# generated %s UTC, stage=%s, table=%s\n\n",
189+
time.Now().UTC().Format(time.RFC3339),
190+
stage,
191+
tableName,
192+
); e != nil {
172193
log.Printf("WARN: writing header to %s: %v", clean, e)
173194
}
174195
cliOpen = true
@@ -211,21 +232,6 @@ func main() {
211232
)
212233
cliCount += sfCliCount
213234
updated += sfFixed
214-
215-
// 3) Final now() pass (only if allowed)
216-
nowFixed, nowCliCount := finalNowFix(
217-
ddb, tableName, stage, region, dryRun, &stats, pending, allowCurrentTime,
218-
func(cmd string) {
219-
openCLI()
220-
if cliFile != nil {
221-
if _, werr := fmt.Fprintln(cliFile, cmd); werr != nil {
222-
log.Printf("WARN: could not append CLI line: %v", werr)
223-
}
224-
}
225-
},
226-
)
227-
cliCount += nowCliCount
228-
updated += nowFixed
229235
skipped := len(pending)
230236

231237
fmt.Printf("\nCompleted. Updated: %d | Still pending (skipped): %d\n", updated, skipped)
@@ -599,34 +605,58 @@ func snowflakeFix(
599605
updateExpr := setPrefix
600606
vals := map[string]*dynamodb.AttributeValue{":empty": {S: aws.String("")}}
601607
names := map[string]*string{
602-
"#date_created": aws.String(attrDateCreated),
603-
"#date_modified": aws.String(attrDateModified),
608+
"#date_created": aws.String(attrDateCreated),
609+
"#date_modified": aws.String(attrDateModified),
610+
"#approx_date_created": aws.String(attrApproxDateCreated),
611+
"#approx_date_modified": aws.String(attrApproxDateModified),
604612
}
605613
first := true
606614
if setCreated {
607615
if !first {
608616
updateExpr += commaSep
609617
}
610-
updateExpr += exprSetDateCreated
611-
vals[":date_created"] = &dynamodb.AttributeValue{S: aws.String(finalC)}
618+
// Use approx field if source is Fivetran synced, otherwise use regular field
619+
if srcC == labelFivetranSynced {
620+
updateExpr += exprSetApproxDateCreated
621+
vals[":approx_date_created"] = &dynamodb.AttributeValue{S: aws.String(finalC)}
622+
} else {
623+
updateExpr += exprSetDateCreated
624+
vals[":date_created"] = &dynamodb.AttributeValue{S: aws.String(finalC)}
625+
}
612626
first = false
613627
}
614628
if setModified {
615629
if !first {
616630
updateExpr += commaSep
617631
}
618-
updateExpr += exprSetDateModified
619-
vals[":date_modified"] = &dynamodb.AttributeValue{S: aws.String(finalM)}
632+
// Use approx field if source is Fivetran synced, otherwise use regular field
633+
if srcM == labelFivetranSynced {
634+
updateExpr += exprSetApproxDateModified
635+
vals[":approx_date_modified"] = &dynamodb.AttributeValue{S: aws.String(finalM)}
636+
} else {
637+
updateExpr += exprSetDateModified
638+
vals[":date_modified"] = &dynamodb.AttributeValue{S: aws.String(finalM)}
639+
}
620640
}
621641

622642
// Stats
623643
if setCreated {
624-
stats.Created.Inc(srcC)
625-
stats.Created.Inc("_total")
644+
if srcC == labelFivetranSynced {
645+
stats.ApproxCreated.Inc(srcC)
646+
stats.ApproxCreated.Inc("_total")
647+
} else {
648+
stats.Created.Inc(srcC)
649+
stats.Created.Inc("_total")
650+
}
626651
}
627652
if setModified {
628-
stats.Modified.Inc(srcM)
629-
stats.Modified.Inc("_total")
653+
if srcM == labelFivetranSynced {
654+
stats.ApproxModified.Inc(srcM)
655+
stats.ApproxModified.Inc("_total")
656+
} else {
657+
stats.Modified.Inc(srcM)
658+
stats.Modified.Inc("_total")
659+
}
630660
}
631661

632662
cmd := buildAwsCliUpdate(region, stage, tableName, id, updateExpr, names, vals, condAnyMissing)
@@ -665,140 +695,6 @@ func snowflakeFix(
665695
return fixed, cliCount
666696
}
667697

668-
// -----------------------------------------------------------------------------
669-
// Final now()-fill pass (only if allowed)
670-
// -----------------------------------------------------------------------------
671-
672-
func finalNowFix(
673-
ddb *dynamodb.DynamoDB,
674-
tableName, stage, region string,
675-
dryRun bool,
676-
stats *UpdateStats,
677-
pending map[string]*pendingInfo,
678-
allowNow bool,
679-
emitCLI func(string),
680-
) (fixed int, cliCount int) {
681-
if !allowNow || len(pending) == 0 {
682-
return 0, 0
683-
}
684-
now := time.Now().UTC().Format(time.RFC3339)
685-
686-
for id, info := range pending {
687-
mC := info.MissingC
688-
mM := info.MissingM
689-
if !mC && !mM {
690-
delete(pending, id)
691-
continue
692-
}
693-
694-
var newC, srcC string
695-
if mC {
696-
if !isMissing(info.Record.DateModified) {
697-
newC, srcC = normalize(info.Record.DateModified), labelFromModified
698-
} else {
699-
newC, srcC = now, labelNow
700-
}
701-
}
702-
703-
var newM, srcM string
704-
if mM {
705-
switch {
706-
case !isMissing(info.Record.DateCreated):
707-
newM, srcM = normalize(info.Record.DateCreated), labelFromCreated
708-
case mC && newC != "":
709-
newM, srcM = newC, labelFromCreated
710-
default:
711-
newM, srcM = now, labelNow
712-
}
713-
}
714-
715-
finalC := ifEmpty(info.Record.DateCreated, newC)
716-
finalM := ifEmpty(info.Record.DateModified, newM)
717-
718-
setCreated := mC && finalC != ""
719-
setModified := mM && finalM != ""
720-
if !setCreated && !setModified {
721-
delete(pending, id)
722-
continue
723-
}
724-
725-
// Monotonic clamp
726-
tc := parseTime(finalC)
727-
tm := parseTime(finalM)
728-
if !tc.IsZero() && !tm.IsZero() && tm.Before(tc) {
729-
finalM = finalC
730-
srcM = labelFromCreated
731-
setModified = mM
732-
}
733-
734-
updateExpr := setPrefix
735-
vals := map[string]*dynamodb.AttributeValue{":empty": {S: aws.String("")}}
736-
names := map[string]*string{
737-
"#date_created": aws.String(attrDateCreated),
738-
"#date_modified": aws.String(attrDateModified),
739-
}
740-
first := true
741-
if setCreated {
742-
if !first {
743-
updateExpr += commaSep
744-
}
745-
updateExpr += exprSetDateCreated
746-
vals[":date_created"] = &dynamodb.AttributeValue{S: aws.String(finalC)}
747-
first = false
748-
}
749-
if setModified {
750-
if !first {
751-
updateExpr += commaSep
752-
}
753-
updateExpr += exprSetDateModified
754-
vals[":date_modified"] = &dynamodb.AttributeValue{S: aws.String(finalM)}
755-
}
756-
757-
// Stats
758-
if setCreated {
759-
stats.Created.Inc(srcC)
760-
stats.Created.Inc("_total")
761-
}
762-
if setModified {
763-
stats.Modified.Inc(srcM)
764-
stats.Modified.Inc("_total")
765-
}
766-
767-
cmd := buildAwsCliUpdate(region, stage, tableName, id, updateExpr, names, vals, condAnyMissing)
768-
dbg(" NOW CLI: %s", cmd)
769-
770-
if dryRun {
771-
if emitCLI != nil {
772-
emitCLI(cmd)
773-
cliCount++
774-
}
775-
fixed++
776-
delete(pending, id)
777-
continue
778-
}
779-
780-
_, uerr := ddb.UpdateItem(&dynamodb.UpdateItemInput{
781-
TableName: aws.String(tableName),
782-
Key: map[string]*dynamodb.AttributeValue{"signature_id": {S: aws.String(id)}},
783-
UpdateExpression: aws.String(updateExpr),
784-
ExpressionAttributeNames: names,
785-
ExpressionAttributeValues: vals,
786-
ConditionExpression: aws.String(condAnyMissing),
787-
})
788-
if uerr != nil {
789-
log.Printf("Update failed (now) %s: %v", id, uerr)
790-
if emitCLI != nil {
791-
emitCLI(cmd)
792-
cliCount++
793-
}
794-
continue
795-
}
796-
fixed++
797-
delete(pending, id)
798-
}
799-
return fixed, cliCount
800-
}
801-
802698
// -----------------------------------------------------------------------------
803699
// Candidate collection & selection
804700
// -----------------------------------------------------------------------------
@@ -1181,14 +1077,20 @@ func buildAwsCliUpdate(region, stage, table, sigID, updateExpr string, names map
11811077
if av, ok := values[":date_modified"]; ok && av != nil && av.S != nil {
11821078
valsFlat[":date_modified"] = map[string]string{"S": *av.S}
11831079
}
1080+
if av, ok := values[":approx_date_created"]; ok && av != nil && av.S != nil {
1081+
valsFlat[":approx_date_created"] = map[string]string{"S": *av.S}
1082+
}
1083+
if av, ok := values[":approx_date_modified"]; ok && av != nil && av.S != nil {
1084+
valsFlat[":approx_date_modified"] = map[string]string{"S": *av.S}
1085+
}
11841086

11851087
kb, kerr := json.Marshal(key)
11861088
if kerr != nil {
11871089
kb = []byte(fmt.Sprintf(`{"signature_id":{"S":"%s"}}`, sigID))
11881090
}
11891091
nb, nerr := json.Marshal(namesFlat)
11901092
if nerr != nil {
1191-
nb = []byte(`{"#date_created":"date_created","#date_modified":"date_modified"}`)
1093+
nb = []byte(`{"#date_created":"date_created","#date_modified":"date_modified","#approx_date_created":"approx_date_created","#approx_date_modified":"approx_date_modified"}`)
11921094
}
11931095
vb, verr := json.Marshal(valsFlat)
11941096
if verr != nil {
@@ -1220,4 +1122,6 @@ func printStats(stats UpdateStats) {
12201122
}
12211123
print(attrDateCreated, stats.Created)
12221124
print(attrDateModified, stats.Modified)
1125+
print(attrApproxDateCreated, stats.ApproxCreated)
1126+
print(attrApproxDateModified, stats.ApproxModified)
12231127
}

cla-backend-go/signatures_timestamp_backfill_dev.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
# SPDX-License-Identifier: MIT
33
#!/bin/bash
44
# source setenv.sh
5-
go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && ALLOW_CURRENT_TIME='' DEBUG=true STAGE=dev DRY_RUN=true ./signatures_timestamp_backfill
5+
go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && DEBUG=true STAGE=dev DRY_RUN=true ./signatures_timestamp_backfill

cla-backend-go/signatures_timestamp_backfill_prod.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
# SPDX-License-Identifier: MIT
33
#!/bin/bash
44
# source setenv-prod.sh.secret
5-
go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && ALLOW_CURRENT_TIME='' DEBUG='' STAGE=prod DRY_RUN=true ./signatures_timestamp_backfill
6-
# go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && ALLOW_CURRENT_TIME='' DEBUG=true STAGE=prod DRY_RUN='' ./signatures_timestamp_backfill
5+
go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && DEBUG='' STAGE=prod DRY_RUN=true ./signatures_timestamp_backfill
6+
# go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && DEBUG=true STAGE=prod DRY_RUN='' ./signatures_timestamp_backfill

0 commit comments

Comments
 (0)