Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions channels/sync_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,10 @@ type SyncRunner struct {
expiry *uint32 // document expiry (in seconds) specified via expiry() callback
}

func NewSyncRunner(ctx context.Context, funcSource string, timeout time.Duration) (*SyncRunner, error) {
func NewSyncRunnerWithLogging(ctx context.Context, funcSource string, timeout time.Duration, errorLogFunc, infoLogFunc func(string)) (*SyncRunner, error) {
funcSource = wrappedFuncSource(funcSource)
runner := &SyncRunner{}
err := runner.InitWithLogging(funcSource, timeout,
func(s string) { base.ErrorfCtx(ctx, base.KeyJavascript.String()+": Sync %s", base.UD(s)) },
func(s string) { base.InfofCtx(ctx, base.KeyJavascript, "Sync %s", base.UD(s)) })
err := runner.InitWithLogging(funcSource, timeout, errorLogFunc, infoLogFunc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -212,6 +210,12 @@ func NewSyncRunner(ctx context.Context, funcSource string, timeout time.Duration
return runner, nil
}

func NewSyncRunner(ctx context.Context, funcSource string, timeout time.Duration) (*SyncRunner, error) {
errorLogFunc := func(s string) { base.ErrorfCtx(ctx, base.KeyJavascript.String()+": Sync %s", base.UD(s)) }
infoLogFunc := func(s string) { base.InfofCtx(ctx, base.KeyJavascript, "Sync %s", base.UD(s)) }
return NewSyncRunnerWithLogging(ctx, funcSource, timeout, errorLogFunc, infoLogFunc)
}

func (runner *SyncRunner) SetFunction(funcSource string) (bool, error) {
funcSource = wrappedFuncSource(funcSource)
return runner.JSRunner.SetFunction(funcSource)
Expand Down
41 changes: 19 additions & 22 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,8 +1697,7 @@ func (db *DatabaseCollectionWithUser) PutExistingRevWithBody(ctx context.Context
// SyncFnDryrun Runs the given document body through a sync function and returns expiry, channels doc was placed in,
// access map for users, roles, handler errors and sync fn exceptions.
// If syncFn is provided, it will be used instead of the one configured on the database.
func (db *DatabaseCollectionWithUser) SyncFnDryrun(ctx context.Context, newDoc, oldDoc *Document, syncFn string) (*channels.ChannelMapperOutput, error) {

func (db *DatabaseCollectionWithUser) SyncFnDryrun(ctx context.Context, newDoc, oldDoc *Document, syncFn string, errorLogFunc, infoLogFunc func(string)) (*channels.ChannelMapperOutput, error) {
mutableBody, metaMap, _, err := db.prepareSyncFn(oldDoc, newDoc)
if err != nil {
base.InfofCtx(ctx, base.KeyDiagnostic, "Failed to prepare to run sync function: %v", err)
Expand All @@ -1709,32 +1708,30 @@ func (db *DatabaseCollectionWithUser) SyncFnDryrun(ctx context.Context, newDoc,
if err != nil {
return nil, err
}
var output *channels.ChannelMapperOutput
var syncErr error
if syncFn == "" && db.ChannelMapper != nil {
output, err = db.ChannelMapper.MapToChannelsAndAccess(ctx, mutableBody, string(oldDoc._rawBody), metaMap, syncOptions)
if err != nil {
return nil, &base.SyncFnDryRunError{Err: err}
}
} else {
if syncFn == "" {

// fetch configured sync function if one is not provided
if syncFn == "" {
if db.ChannelMapper != nil {
syncFn = db.ChannelMapper.Function()
} else {
scopeAndCollectionName := db.ScopeAndCollectionName()
syncFn = channels.GetDefaultSyncFunction(scopeAndCollectionName.Scope, scopeAndCollectionName.Collection)
}
jsTimeout := time.Duration(base.DefaultJavascriptTimeoutSecs) * time.Second
syncRunner, err := channels.NewSyncRunner(ctx, syncFn, jsTimeout)
if err != nil {
return nil, fmt.Errorf("failed to create sync runner: %v", err)
}
jsOutput, err := syncRunner.Call(ctx, mutableBody, string(oldDoc._rawBody), metaMap, syncOptions)
if err != nil {
}

return nil, &base.SyncFnDryRunError{Err: err}
}
output = jsOutput.(*channels.ChannelMapperOutput)
// create new sync runner instance for this dry run
jsTimeout := time.Duration(base.DefaultJavascriptTimeoutSecs) * time.Second
syncRunner, err := channels.NewSyncRunnerWithLogging(ctx, syncFn, jsTimeout, errorLogFunc, infoLogFunc)
if err != nil {
return nil, fmt.Errorf("failed to create sync runner: %v", err)
}

jsOutput, err := syncRunner.Call(ctx, mutableBody, sgbucket.JSONString(oldDoc._rawBody), metaMap, syncOptions)
if err != nil {
return nil, &base.SyncFnDryRunError{Err: err}
}

return output, syncErr
return jsOutput.(*channels.ChannelMapperOutput), nil
}

// revTreeConflictCheck checks for conflicts in the rev tree history and returns the parent revid, currentRevIndex
Expand Down
31 changes: 24 additions & 7 deletions rest/diagnostic_doc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import (
"github.com/couchbase/sync_gateway/db"
)

type SyncFnDryRunLogging struct {
Errors []string `json:"errors"`
Info []string `json:"info"`
}

type SyncFnDryRun struct {
Channels base.Set `json:"channels"`
Access channels.AccessMap `json:"access"`
Roles channels.AccessMap `json:"roles"`
Exception string `json:"exception"`
Expiry *uint32 `json:"expiry,omitempty"`
Channels base.Set `json:"channels"`
Access channels.AccessMap `json:"access"`
Roles channels.AccessMap `json:"roles"`
Exception string `json:"exception,omitempty"`
Expiry *uint32 `json:"expiry,omitempty"`
Logging SyncFnDryRunLogging `json:"logging"`
}

type ImportFilterDryRun struct {
Expand Down Expand Up @@ -83,7 +89,7 @@ func (h *handler) handleSyncFnDryRun() error {
}

if syncDryRunPayload.Doc == nil && docid == "" {
return base.HTTPErrorf(http.StatusBadRequest, "no docid or document provided")
return base.HTTPErrorf(http.StatusBadRequest, "no doc_id or document provided")
}

oldDoc := &db.Document{ID: docid}
Expand Down Expand Up @@ -144,7 +150,16 @@ func (h *handler) handleSyncFnDryRun() error {
newRev := db.CreateRevIDWithBytes(generation, matchRev, rawDocBytes)
newDoc.RevID = newRev

output, err := h.collection.SyncFnDryrun(h.ctx(), newDoc, oldDoc, syncDryRunPayload.Function)
logErrors := make([]string, 0)
logInfo := make([]string, 0)
errorLogFn := func(s string) {
logErrors = append(logErrors, s)
}
infoLogFn := func(s string) {
logInfo = append(logInfo, s)
}

output, err := h.collection.SyncFnDryrun(h.ctx(), newDoc, oldDoc, syncDryRunPayload.Function, errorLogFn, infoLogFn)
if err != nil {
var syncFnDryRunErr *base.SyncFnDryRunError
if !errors.As(err, &syncFnDryRunErr) {
Expand All @@ -154,6 +169,7 @@ func (h *handler) handleSyncFnDryRun() error {
errMsg := syncFnDryRunErr.Error()
resp := SyncFnDryRun{
Exception: errMsg,
Logging: SyncFnDryRunLogging{Errors: logErrors, Info: logInfo},
}
h.writeJSON(resp)
return nil
Expand All @@ -169,6 +185,7 @@ func (h *handler) handleSyncFnDryRun() error {
output.Roles,
errorMsg,
output.Expiry,
SyncFnDryRunLogging{Errors: logErrors, Info: logInfo},
}
h.writeJSON(resp)
return nil
Expand Down
Loading