diff --git a/endpoints/openrtb2/auction.go b/endpoints/openrtb2/auction.go index bb01f452f97..bd3e1d008de 100644 --- a/endpoints/openrtb2/auction.go +++ b/endpoints/openrtb2/auction.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/prebid/prebid-server/privacy" "io" "io/ioutil" "net/http" @@ -15,8 +16,6 @@ import ( "strings" "time" - "github.com/prebid/prebid-server/privacy" - "github.com/buger/jsonparser" "github.com/gofrs/uuid" "github.com/golang/glog" @@ -205,13 +204,6 @@ func (deps *endpointDeps) Auction(w http.ResponseWriter, r *http.Request, _ http ctx := context.Background() timeout := deps.cfg.AuctionTimeouts.LimitAuctionTimeout(time.Duration(req.TMax) * time.Millisecond) - - // adjust tmax for requests with MSB feature enabled - msbConfig := exchange.ExtractMSBInfoReq(req.BidRequest) - if msbConfig.LastPeek.PeekStartTimeMilliSeconds > 0 { - timeout += time.Duration(msbConfig.LastPeek.PeekStartTimeMilliSeconds) * time.Millisecond - } - if timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithDeadline(ctx, start.Add(timeout)) diff --git a/exchange/exchange.go b/exchange/exchange.go index 5852b53caac..8fb1f45de42 100644 --- a/exchange/exchange.go +++ b/exchange/exchange.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/prebid/prebid-server/privacy" "math/rand" "net/url" "runtime/debug" @@ -14,11 +15,6 @@ import ( "strings" "time" - "github.com/buger/jsonparser" - uuid "github.com/gofrs/uuid" - "github.com/golang/glog" - "github.com/prebid/openrtb/v19/openrtb2" - "github.com/prebid/openrtb/v19/openrtb3" "github.com/prebid/prebid-server/adapters" "github.com/prebid/prebid-server/adservertargeting" "github.com/prebid/prebid-server/bidadjustment" @@ -35,11 +31,16 @@ import ( "github.com/prebid/prebid-server/metrics" "github.com/prebid/prebid-server/openrtb_ext" "github.com/prebid/prebid-server/prebid_cache_client" - "github.com/prebid/prebid-server/privacy" "github.com/prebid/prebid-server/stored_requests" "github.com/prebid/prebid-server/stored_responses" "github.com/prebid/prebid-server/usersync" "github.com/prebid/prebid-server/util/maputil" + + "github.com/buger/jsonparser" + "github.com/gofrs/uuid" + "github.com/golang/glog" + "github.com/prebid/openrtb/v19/openrtb2" + "github.com/prebid/openrtb/v19/openrtb3" ) type extCacheInstructions struct { @@ -655,86 +656,6 @@ func (e *exchange) makeAuctionContext(ctx context.Context, needsCache bool) (auc return } -func (e *exchange) processBidder( - ctx context.Context, - bidderRequests []BidderRequest, - bidAdjustments map[string]float64, - conversions currency.Conversions, - accountDebugAllowed bool, - globalPrivacyControlHeader string, - headerDebugAllowed bool, - alternateBidderCodes openrtb_ext.ExtAlternateBidderCodes, - experiment *openrtb_ext.Experiment, - hookExecutor hookexecution.StageExecutor, - bidAdjustmentRules map[string][]openrtb_ext.Adjustment, - chBids chan *bidResponseWrapper, - bidder BidderRequest) { - // Here we actually call the adapters and collect the bids. - bidderRunner := e.recoverSafely(bidderRequests, func(bidderRequest BidderRequest, conversions currency.Conversions) { - // Passing in aName so a doesn't change out from under the go routine - if bidderRequest.BidderLabels.Adapter == "" { - glog.Errorf("Exchange: bidlables for %s (%s) missing adapter string", bidderRequest.BidderName, bidderRequest.BidderCoreName) - bidderRequest.BidderLabels.Adapter = bidderRequest.BidderCoreName - } - brw := new(bidResponseWrapper) - brw.bidder = bidderRequest.BidderName - brw.adapter = bidderRequest.BidderCoreName - // Defer basic metrics to insure we capture them after all the values have been set - defer func() { - e.me.RecordAdapterRequest(bidderRequest.BidderLabels) - }() - start := time.Now() - - reqInfo := adapters.NewExtraRequestInfo(conversions) - reqInfo.PbsEntryPoint = bidderRequest.BidderLabels.RType - reqInfo.GlobalPrivacyControlHeader = globalPrivacyControlHeader - reqInfo.BidderRequestStartTime = start - - bidReqOptions := bidRequestOptions{ - accountDebugAllowed: accountDebugAllowed, - headerDebugAllowed: headerDebugAllowed, - addCallSignHeader: isAdsCertEnabled(experiment, e.bidderInfo[string(bidderRequest.BidderName)]), - bidAdjustments: bidAdjustments, - } - - seatBids, err := e.adapterMap[bidderRequest.BidderCoreName].requestBid(ctx, bidderRequest, conversions, &reqInfo, e.adsCertSigner, bidReqOptions, alternateBidderCodes, hookExecutor, bidAdjustmentRules) - - // Add in time reporting - elapsed := time.Since(start) - brw.adapterSeatBids = seatBids - // Structure to record extra tracking data generated during bidding - ae := new(seatResponseExtra) - ae.ResponseTimeMillis = int(elapsed / time.Millisecond) - if len(seatBids) != 0 { - ae.HttpCalls = seatBids[0].HttpCalls - } - // SeatBidsPreparationStartTime is needed to calculate duration for openrtb response preparation time metric - // No metric needs to be logged for requests which error out - if err == nil { - ae.MakeBidsTimeInfo = reqInfo.MakeBidsTimeInfo - } - // Timing statistics - e.me.RecordAdapterTime(bidderRequest.BidderLabels, elapsed) - bidderRequest.BidderLabels.AdapterBids = bidsToMetric(brw.adapterSeatBids) - bidderRequest.BidderLabels.AdapterErrors = errorsToMetric(err) - // Append any bid validation errors to the error list - ae.Errors = errsToBidderErrors(err) - ae.Warnings = errsToBidderWarnings(err) - brw.adapterExtra = ae - for _, seatBid := range seatBids { - if seatBid != nil { - for _, bid := range seatBid.Bids { - var cpm = float64(bid.Bid.Price * 1000) - e.me.RecordAdapterPrice(bidderRequest.BidderLabels, cpm) - e.me.RecordAdapterBidReceived(bidderRequest.BidderLabels, bid.BidType, bid.Bid.AdM != "") - } - } - } - chBids <- brw - }, chBids) - go bidderRunner(bidder, conversions) -} - // This piece sends all the requests to the bidder adapters and gathers the results. func (e *exchange) getAllBids( ctx context.Context, @@ -760,30 +681,71 @@ func (e *exchange) getAllBids( bidsFound := false e.me.RecordOverheadTime(metrics.MakeBidderRequests, time.Since(pbsRequestStartTime)) - lastPeekBidderRequests := []BidderRequest{} - msbConfig := extractMSBInfoBidders(bidderRequests) for _, bidder := range bidderRequests { - // save 2nd - nth peek tier bidder requests and process later - // if needed add in the future - - // save last peek bidder requests and process later - bidderName := bidder.BidderName.String() - if _, isLastPeekBidder := msbConfig.LastPeek.PeekBidderFloorMultMap[bidderName]; isLastPeekBidder { - lastPeekBidderRequests = append(lastPeekBidderRequests, bidder) - continue - } - e.processBidder(ctx, bidderRequests, bidAdjustments, conversions, accountDebugAllowed, globalPrivacyControlHeader, headerDebugAllowed, alternateBidderCodes, experiment, hookExecutor, bidAdjustmentRules, chBids, bidder) - } - - // process msb 2nd - nth peek tier bidder request: - // if needed add in the future - - // process msb last peek bidder requests: - if len(lastPeekBidderRequests) > 0 { - for _, bidder := range mspUpdateLastPeekBiddersRequest(chBids, lastPeekBidderRequests, msbConfig.LastPeek, len(bidderRequests)-len(lastPeekBidderRequests)) { - e.processBidder(ctx, bidderRequests, bidAdjustments, conversions, accountDebugAllowed, globalPrivacyControlHeader, headerDebugAllowed, alternateBidderCodes, experiment, hookExecutor, bidAdjustmentRules, chBids, bidder) - } + // Here we actually call the adapters and collect the bids. + bidderRunner := e.recoverSafely(bidderRequests, func(bidderRequest BidderRequest, conversions currency.Conversions) { + // Passing in aName so a doesn't change out from under the go routine + if bidderRequest.BidderLabels.Adapter == "" { + glog.Errorf("Exchange: bidlables for %s (%s) missing adapter string", bidderRequest.BidderName, bidderRequest.BidderCoreName) + bidderRequest.BidderLabels.Adapter = bidderRequest.BidderCoreName + } + brw := new(bidResponseWrapper) + brw.bidder = bidderRequest.BidderName + brw.adapter = bidderRequest.BidderCoreName + // Defer basic metrics to insure we capture them after all the values have been set + defer func() { + e.me.RecordAdapterRequest(bidderRequest.BidderLabels) + }() + start := time.Now() + + reqInfo := adapters.NewExtraRequestInfo(conversions) + reqInfo.PbsEntryPoint = bidderRequest.BidderLabels.RType + reqInfo.GlobalPrivacyControlHeader = globalPrivacyControlHeader + reqInfo.BidderRequestStartTime = start + + bidReqOptions := bidRequestOptions{ + accountDebugAllowed: accountDebugAllowed, + headerDebugAllowed: headerDebugAllowed, + addCallSignHeader: isAdsCertEnabled(experiment, e.bidderInfo[string(bidderRequest.BidderName)]), + bidAdjustments: bidAdjustments, + } + seatBids, err := e.adapterMap[bidderRequest.BidderCoreName].requestBid(ctx, bidderRequest, conversions, &reqInfo, e.adsCertSigner, bidReqOptions, alternateBidderCodes, hookExecutor, bidAdjustmentRules) + + // Add in time reporting + elapsed := time.Since(start) + brw.adapterSeatBids = seatBids + // Structure to record extra tracking data generated during bidding + ae := new(seatResponseExtra) + ae.ResponseTimeMillis = int(elapsed / time.Millisecond) + if len(seatBids) != 0 { + ae.HttpCalls = seatBids[0].HttpCalls + } + // SeatBidsPreparationStartTime is needed to calculate duration for openrtb response preparation time metric + // No metric needs to be logged for requests which error out + if err == nil { + ae.MakeBidsTimeInfo = reqInfo.MakeBidsTimeInfo + } + // Timing statistics + e.me.RecordAdapterTime(bidderRequest.BidderLabels, elapsed) + bidderRequest.BidderLabels.AdapterBids = bidsToMetric(brw.adapterSeatBids) + bidderRequest.BidderLabels.AdapterErrors = errorsToMetric(err) + // Append any bid validation errors to the error list + ae.Errors = errsToBidderErrors(err) + ae.Warnings = errsToBidderWarnings(err) + brw.adapterExtra = ae + for _, seatBid := range seatBids { + if seatBid != nil { + for _, bid := range seatBid.Bids { + var cpm = float64(bid.Bid.Price * 1000) + e.me.RecordAdapterPrice(bidderRequest.BidderLabels, cpm) + e.me.RecordAdapterBidReceived(bidderRequest.BidderLabels, bid.BidType, bid.Bid.AdM != "") + } + } + } + chBids <- brw + }, chBids) + go bidderRunner(bidder, conversions) } var fledge *openrtb_ext.Fledge diff --git a/exchange/msp_exchange.go b/exchange/msp_exchange.go index 6508f121ec7..9a712c382ca 100644 --- a/exchange/msp_exchange.go +++ b/exchange/msp_exchange.go @@ -2,78 +2,17 @@ package exchange import ( "encoding/json" - "math" "time" - "github.com/buger/jsonparser" - "github.com/golang/glog" "github.com/prebid/openrtb/v19/openrtb2" "github.com/prebid/prebid-server/exchange/entities" "github.com/prebid/prebid-server/openrtb_ext" - jsonpatch "gopkg.in/evanphx/json-patch.v4" ) const ( MSP_SEAT_IN_HOUSE = "msp-in-house" ) -type MSBExt struct { - MSB MSBConfig `json:"msb"` -} - -type MSBConfig struct { - LastPeek MSBLastPeekConfig `json:"last_peek"` -} - -type MSBLastPeekConfig struct { - // start peek available bids after this - PeekStartTimeMilliSeconds int64 `json:"peek_start_time_miliseconds"` - // bidder -> FloorMult - // set floor for last peek bidder = current_available_max_bid_price * FloorMult to: - // 1. break tie - // 2. gain more revenue - PeekBidderFloorMultMap map[string]float64 `json:"peek_bidder_floor_mult_map"` -} - -/* -MSB feature controller(req.ext) example: -{ - "ext": { - "msb": { - "last_peek": { - "peek_start_time_miliseconds": 900, - "peek_bidder_floor_mult_map": { - "msp_google": 1.01, - "msp_nova": 1.01 - } - } - } - } -} -MSP server is response for adding above MSB info to requests for certain traffic/placement/exps. peek_bidder_floor_mult_map controls which bidders -are there for different peek tiers - -In the bove example - there are two peek tiers for bidders: - 1. last peek tier - 2. the rest(normal tier) - - after all reponses are ready for normal tier or timeout=peek_start_time_miliseconds(900ms), for bidders in last peek tier(msp_google and msp_nova) peek available responses from normal tier, - get max_available_bid_prices_for_normal_tier and set floor = max_available_bid_prices_for_normal_tier * peek_bidder_floor_mult_map[bidder] -*/ - -type MSPFloor struct { - Floor float64 `json:"floor"` -} - -var mspBidders = map[openrtb_ext.BidderName]int{ - openrtb_ext.BidderMspGoogle: 1, - openrtb_ext.BidderMspFbAlpha: 1, - openrtb_ext.BidderMspFbBeta: 1, - openrtb_ext.BidderMspFbGamma: 1, - openrtb_ext.BidderMspNova: 1, -} - func mspUpdateStoredAuctionResponse(r *AuctionRequest) bool { if len(r.StoredAuctionResponses) > 0 { if rawSeatBid, ok := r.StoredAuctionResponses[r.BidRequestWrapper.Imp[0].ID]; ok { @@ -121,107 +60,3 @@ func mspPostProcessAuction( return adapterBids, fledge, liveAdapters, nil, anyBidsReturned } - -// peek max available(ready) bid price from channel without comsuming -func peekChannelAvailableMaxBidPriceWithinTimeout(peekTier string, chBids chan *bidResponseWrapper, lastPeekConfig MSBLastPeekConfig, totalNormalBidders int) float64 { - timeout := time.After(time.Duration(lastPeekConfig.PeekStartTimeMilliSeconds) * time.Millisecond) - maxPrice := 0.0 - hasData := true - peekedRespList := []*bidResponseWrapper{} - availableBidders := []string{} - // keep consuming message from channel until all normal bidder requests are collected or timeout reaches - for hasData { - select { - case resp, ok := <-chBids: - if !ok { - hasData = false - } else { - for _, bids := range resp.adapterSeatBids { - for _, bid := range bids.Bids { - if bid.Bid != nil { - maxPrice = math.Max(maxPrice, bid.Bid.Price) - } - } - } - peekedRespList = append(peekedRespList, resp) - availableBidders = append(availableBidders, resp.bidder.String()) - if len(peekedRespList) == totalNormalBidders { - hasData = false - } - } - - case <-timeout: - hasData = false - } - - } - // push message back - for _, resp := range peekedRespList { - chBids <- resp - } - glog.Infof("MSB tier %s, peeked from available bidders %v, current max bid price: %f", peekTier, availableBidders, maxPrice) - return maxPrice -} - -func mspUpdateLastPeekBiddersRequest( - chBids chan *bidResponseWrapper, - lastPeekBidderRequests []BidderRequest, - lastPeekConfig MSBLastPeekConfig, - totalNormalBidders int, -) []BidderRequest { - maxPrice := peekChannelAvailableMaxBidPriceWithinTimeout("lastPeek", chBids, lastPeekConfig, totalNormalBidders) - for reqIdx := range lastPeekBidderRequests { - mult := lastPeekConfig.PeekBidderFloorMultMap[lastPeekBidderRequests[reqIdx].BidderName.String()] - updatedFloor := mult * maxPrice - for idx := range lastPeekBidderRequests[reqIdx].BidRequest.Imp { - bidder := &lastPeekBidderRequests[reqIdx] - // update req.imp.bidfloor - bidder.BidRequest.Imp[idx].BidFloor = math.Max(bidder.BidRequest.Imp[idx].BidFloor, updatedFloor) - - // for msp bidders, update req.imp.ext.bidder.floor which is the source of truth for msp bidder's floor and - // will be updated/overwritten later by msp module stage: https://github.com/ParticleMedia/msp/blob/master/pkg/modules/dam_buckets/module/hook_bidder_request.go#L69 - if _, found := mspBidders[bidder.BidderName]; found { - extBytes, err := jsonObject(bidder.BidRequest.Imp[idx].Ext, "bidder") - if err == nil { - var impExt MSPFloor - err = json.Unmarshal(extBytes, &impExt) - if err == nil { - impExt.Floor = math.Max(impExt.Floor, updatedFloor) - updatedBytes, _ := json.Marshal(impExt) - updatedBidderBytes, _ := jsonpatch.MergePatch(extBytes, updatedBytes) - updatedExtBytes, _ := jsonparser.Set(bidder.BidRequest.Imp[idx].Ext, updatedBidderBytes, "bidder") - bidder.BidRequest.Imp[idx].Ext = updatedExtBytes - } - } - } - } - } - return lastPeekBidderRequests -} - -func jsonObject(data []byte, keys ...string) ([]byte, error) { - if result, dataType, _, err := jsonparser.Get(data, keys...); err == nil && dataType == jsonparser.Object { - return result, nil - } else { - return nil, err - } -} - -func extractMSBInfoBidders(reqList []BidderRequest) MSBConfig { - if len(reqList) > 0 { - return ExtractMSBInfoReq(reqList[0].BidRequest) - } - - return MSBConfig{} -} - -func ExtractMSBInfoReq(req *openrtb2.BidRequest) MSBConfig { - var config MSBExt - if req != nil { - err := json.Unmarshal(req.Ext, &config) - if err != nil { - glog.Error("MSB extract bidder config:", err) - } - } - return config.MSB -}