Skip to content

Commit 4b5fea8

Browse files
committed
add new field and logger for testing
Add logger to get whole history sync js change in idl and remove logger add history size and logger into task handlers Update reference to cadence-idl, sync field name with idl repo update idl ref
1 parent d720b1e commit 4b5fea8

File tree

11 files changed

+70
-56
lines changed

11 files changed

+70
-56
lines changed

.gen/go/shared/shared.go

Lines changed: 18 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ require (
1515
github.com/shirou/gopsutil v3.21.11+incompatible
1616
github.com/stretchr/testify v1.8.1
1717
github.com/uber-go/tally v3.3.15+incompatible
18-
github.com/uber/cadence-idl v0.0.0-20230131090243-b690237fffaa
18+
github.com/uber/cadence-idl v0.0.0-20230829165610-d4f516f3afb0
1919
github.com/uber/jaeger-client-go v2.22.1+incompatible
2020
github.com/uber/tchannel-go v1.32.1
2121
go.uber.org/atomic v1.9.0
@@ -66,5 +66,3 @@ require (
6666
gopkg.in/yaml.v3 v3.0.1 // indirect
6767
honnef.co/go/tools v0.3.2 // indirect
6868
)
69-
70-
replace github.com/uber/cadence-idl => github.com/timl3136/cadence-idl v0.0.0-20230811235258-a2fc7af34b3e

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,6 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
194194
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
195195
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
196196
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
197-
github.com/timl3136/cadence-idl v0.0.0-20230811235258-a2fc7af34b3e h1:3E7jeCTrIErCi7XI1rSpxjSBUoyphW8ssLq7k7whTpY=
198-
github.com/timl3136/cadence-idl v0.0.0-20230811235258-a2fc7af34b3e/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
199197
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
200198
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
201199
github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
@@ -206,6 +204,8 @@ github.com/uber-go/mapdecode v1.0.0/go.mod h1:b5nP15FwXTgpjTjeA9A2uTHXV5UJCl4arw
206204
github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
207205
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
208206
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
207+
github.com/uber/cadence-idl v0.0.0-20230829165610-d4f516f3afb0 h1:0bw3vE5IBDTKK2gOepGsM+UFiUC/ToACLLO3S9IZuc4=
208+
github.com/uber/cadence-idl v0.0.0-20230829165610-d4f516f3afb0/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
209209
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
210210
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
211211
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=

idls

Submodule idls updated from eb9698c to d4f516f

internal/compatibility/proto/response.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ func PollForDecisionTaskResponse(t *shared.PollForDecisionTaskResponse) *apiv1.P
223223
StartedTime: unixNanoToTime(t.StartedTimestamp),
224224
Queries: WorkflowQueryMap(t.Queries),
225225
NextEventId: t.GetNextEventId(),
226-
HistorySize: t.GetHistorySize(),
226+
TotalHistoryBytes: t.GetTotalHistoryBytes(),
227227
}
228228
}
229229

internal/compatibility/thrift/response.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ func PollForDecisionTaskResponse(t *apiv1.PollForDecisionTaskResponse) *shared.P
223223
StartedTimestamp: timeToUnixNano(t.StartedTime),
224224
Queries: WorkflowQueryMap(t.Queries),
225225
NextEventId: &t.NextEventId,
226-
HistorySize: &t.HistorySize,
226+
TotalHistoryBytes: &t.TotalHistoryBytes,
227227
}
228228
}
229229

internal/internal_task_handlers.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,9 +824,12 @@ process_Workflow_Loop:
824824
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
825825
task := workflowTask.task
826826
historyIterator := workflowTask.historyIterator
827+
w.workflowInfo.TotalHistoryBytes = task.GetTotalHistoryBytes()
828+
w.workflowInfo.HistoryCount = task.GetNextEventId() - 1
827829
if err := w.ResetIfStale(task, historyIterator); err != nil {
828830
return nil, err
829831
}
832+
830833
w.SetCurrentTask(task)
831834

832835
eventHandler := w.getEventHandler()

internal/internal_task_pollers.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,16 @@ type (
110110
}
111111

112112
historyIteratorImpl struct {
113-
iteratorFunc func(nextPageToken []byte) (*s.History, []byte, error)
114-
execution *s.WorkflowExecution
115-
nextPageToken []byte
116-
domain string
117-
service workflowserviceclient.Interface
118-
metricsScope tally.Scope
119-
startedEventID int64
120-
maxEventID int64
121-
featureFlags FeatureFlags
113+
iteratorFunc func(nextPageToken []byte) (*s.History, []byte, error)
114+
execution *s.WorkflowExecution
115+
nextPageToken []byte
116+
domain string
117+
service workflowserviceclient.Interface
118+
metricsScope tally.Scope
119+
startedEventID int64
120+
maxEventID int64 // Equivalent to History Count
121+
featureFlags FeatureFlags
122+
totalHistoryBytes int64
122123
}
123124

124125
localActivityTaskPoller struct {
@@ -330,7 +331,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
330331
})
331332
return nil
332333
}
333-
334+
// TODO: get workflowinfo
334335
doneCh := make(chan struct{})
335336
laResultCh := make(chan *localActivityResult)
336337
// close doneCh so local activity worker won't get blocked forever when trying to send back result to laResultCh.
@@ -341,6 +342,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
341342
startTime := time.Now()
342343
task.doneCh = doneCh
343344
task.laResultCh = laResultCh
345+
// Process the task.
344346
completedRequest, err := wtp.taskHandler.ProcessWorkflowTask(
345347
task,
346348
func(response interface{}, startTime time.Time) (*workflowTask, error) {
@@ -852,14 +854,15 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *s.PollForDecisionTaskRes
852854
}
853855
}
854856
historyIterator := &historyIteratorImpl{
855-
nextPageToken: response.NextPageToken,
856-
execution: response.WorkflowExecution,
857-
domain: wtp.domain,
858-
service: wtp.service,
859-
metricsScope: wtp.metricsScope,
860-
startedEventID: startEventID,
861-
maxEventID: nextEventID - 1,
862-
featureFlags: wtp.featureFlags,
857+
nextPageToken: response.NextPageToken,
858+
execution: response.WorkflowExecution,
859+
domain: wtp.domain,
860+
service: wtp.service,
861+
metricsScope: wtp.metricsScope,
862+
startedEventID: startEventID,
863+
maxEventID: nextEventID - 1,
864+
featureFlags: wtp.featureFlags,
865+
totalHistoryBytes: response.GetTotalHistoryBytes(),
863866
}
864867
task := &workflowTask{
865868
task: response,
@@ -897,6 +900,16 @@ func (h *historyIteratorImpl) HasNextPage() bool {
897900
return h.nextPageToken != nil
898901
}
899902

903+
// GetTotalHistoryBytes returns History Size of current history
904+
func (h *historyIteratorImpl) GetTotalHistoryBytes() int64 {
905+
return h.totalHistoryBytes
906+
}
907+
908+
// GetHistoryCount returns History Event Count of current history (aka maxEventID)
909+
func (h *historyIteratorImpl) GetHistoryCount() int64 {
910+
return h.maxEventID
911+
}
912+
900913
func newGetHistoryPageFunc(
901914
ctx context.Context,
902915
service workflowserviceclient.Interface,

internal/internal_worker_base.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,7 @@ func (bw *baseWorker) Start() {
216216
go bw.runTaskDispatcher()
217217

218218
// We want the emit function run once per host instead of run once per worker
219-
//collectHardwareUsageOnce.Do(func() {
220-
// bw.shutdownWG.Add(1)
221-
// go bw.emitHardwareUsage()
222-
//})
223-
219+
// since the emit function is host level metric.
224220
bw.shutdownWG.Add(1)
225221
go bw.emitHardwareUsage()
226222

internal/workflow.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,6 +1111,8 @@ type WorkflowInfo struct {
11111111
BinaryChecksum *string // The identifier(generated by md5sum by default) of worker code that is making the current decision(can be used for auto-reset feature)
11121112
DecisionStartedEventID int64 // the eventID of DecisionStarted that is making the current decision(can be used for reset API)
11131113
RetryPolicy *s.RetryPolicy
1114+
TotalHistoryBytes int64
1115+
HistoryCount int64
11141116
}
11151117

11161118
// GetBinaryChecksum returns the binary checksum(identifier) of this worker
@@ -1158,14 +1160,16 @@ func (wc *workflowEnvironmentInterceptor) GetMetricsScope(ctx Context) tally.Sco
11581160
return wc.env.GetMetricsScope()
11591161
}
11601162

1161-
// GetHistorySize returns the current history size of that workflow
1162-
func GetHistorySize(ctx Context) int64 {
1163-
return 0
1163+
// GetTotalHistoryBytes returns the current history size of that workflow
1164+
func GetTotalHistoryBytes(ctx Context) int64 {
1165+
i := getWorkflowInterceptor(ctx)
1166+
return i.GetWorkflowInfo(ctx).TotalHistoryBytes
11641167
}
11651168

11661169
// GetHistoryCount returns the current number of events of that workflow
11671170
func GetHistoryCount(ctx Context) int64 {
1168-
return 0
1171+
i := getWorkflowInterceptor(ctx)
1172+
return i.GetWorkflowInfo(ctx).HistoryCount
11691173
}
11701174

11711175
// Now returns the current time in UTC. It corresponds to the time when the decision task is started or replayed.

0 commit comments

Comments
 (0)