The dwm package is a standalone set of business logic for maintaining marketing database quality. This repo is an Openshift-based implementation which applies said package to an Eloqua instance.
The data flow of this app uses a queue-based processing system (using the package pyqm):
- Eloqua_Contacts_GetDWM.py
- Export the specified contacts from Eloqua via Bulk API (using Python package
pyeloqua) - Add them to the queue
dwmQueue
- Eloqua_Contacts_PostDWM.py
- Pick up records that have finished processing and import back to Eloqua
- limit 30k to avoid data limits
- References queue
processedQueue - Removes from shared list on import
- Eloqua_Contacts_RunDWM.py
- Run the
dwmAllfunction on a set of contacts - 600, currently
- when done, remove from
dwmQueueand add toindicatorQueue
- Eloqua_Contacts_CleanQueues.py
- Run a queue cleanup script
- Timeout records with locks older than 300 seconds
- Report current queue size and timeout stats to Prometheus for monitoring
- Eloqua_Contacts_UpdateContactsIndicators.py
- Retrieve job from
indicatorQueue - Update record in Contacts.Indicators (by
emailAddress) and setContacts.Indicators.Data_Status='PROCESS as MOD'via Bulk API - remove from
indicatorQueueand add toprocessedQueue
- Eloqua_Contacts.Indicators_Refresh.py
- Retrieve a max of 80k Contacts.Indicators records from Eloqua where
Contacts.Indicators.Updated_Timestamp>180 days agoandContacts.Indicators.Data_Status=='PROCESSED' - set
Contacts.Indicators.Data_Status='PROCESS as MOD' - Import records back to Eloqua via Bulk API
This system provides enough redundancy to allow for troubleshooting of a crashed script. Also helps minimize the impact on Bulk API utilization limits.
Current implementation has two custom functions:
CleanZipcodeUS- Apply zipcode standardization to contacts where
country='US' - Takes only first string of numbers before a non-digit character
- Strips down to first 5 digits
- Adds leading 0s which may have been stripped by Excel auto-formatting
- Apply zipcode standardization to contacts where
CleanAnnualRevenue- Remove any "," or "$" characters
- Try converting to integer
- If successful, group into pre-determined
annualRevenuebucket
- If successful, group into pre-determined
- Logfile:
$OPENSHIFT_LOG_DIR/CLEAN_QUEUE_DWM_YYYY_MM_DD.log- Explicitly logged runtime info
- Accounted for exceptions
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts_CleanQueues_Console_YYYY_MM_DD.log- Runtime console output (including uncaught exceptions)
- Prometheus metrics (SLI):
- QueueSize: # of records currently in each queue
- QueueTimeout: # of records "released" back into queue after timeout
- last_success_unixtime: Last time of successful run
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts_DWM_GET_YYYY_MM_DD.log- Explicitly logged runtime info
- Accounted for exceptions
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts_GetDWM_Console_YYYY_MM_DD.log- Runtime console output (including uncaught exceptions)
- Prometheus metrics (SLI):
- last_success_unixtime: Last time of successful run
- total_seconds: # of seconds to complete entire script
- total_records_total: # of records processed
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts_DWM_RUN_YYYY_MM_DD.log- Explicitly logged runtime info
- Accounted for exceptions
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts_RunDWM_Console_YYYY_MM_DD.log- Runtime console output (including uncaught exceptions)
- If argument
verbose=True, includestqdmoutput 'progress bar', showing # records / second
- Prometheus metrics (SLI):
- last_success_unixtime: Last time of successful run
- total_seconds: # of seconds to complete entire script
- total_records_total: # of records processed
- total_seconds_dwm: # of seconds to complete DWM functions (not including queue processing time)
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts_DWM_POST_YYYY_MM_DD.log- Explicitly logged runtime info
- Accounted for exceptions
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts_PostDWM_Console_YYYY_MM_DD.log- Runtime console output (including uncaught exceptions)
- Prometheus metrics (SLI):
- last_success_unixtime: Last time of successful run
- total_seconds: # of seconds to complete entire script
- total_records_total: # of records processed
- total_records_errored: # of records from batches which received an an error on import
- total_records_warning: # of records from batches which received a warning on import
- total_records_success: # of records which successfully imported to Eloqua
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts_DWM_INDICATORS_YYYY_MM_DD.log- Explicitly logged runtime info
- Accounted for exceptions
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts_UpdateContactsIndicators_Console_YYYY_MM_DD.log- Runtime console output (including uncaught exceptions)
- Prometheus metrics (SLI):
- last_success_unixtime: Last time of successful run
- total_seconds: # of seconds to complete entire script
- total_records_total: # of records processed
- total_records_errored: # of records from batches which received an an error on import
- total_records_warning: # of records from batches which received a warning on import
- total_records_success: # of records which successfully imported to Eloqua
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts.Indicators_Refresh_YYYY_MM_DD.log- Explicitly logged runtime info
- Accounted for exceptions
- Logfile:
$OPENSHIFT_LOG_DIR/Eloqua_Contacts.Indicators_Refresh_Console_YYYY_MM_DD.log- Runtime console output (including uncaught exceptions)
- Prometheus metrics (SLI):
- last_success_unixtime: Last time of successful run
- total_seconds: # of seconds to complete entire script
- total_records_total: # of records processed
- total_records_errored: # of records from batches which received an an error on import
- total_records_warning: # of records from batches which received a warning on import
- total_records_success: # of records which successfully imported to Eloqua
This implementation use ITOS (Red Hat IT-Hosted Openshift v2; comparable to Openshift Enterprise). Using the Openshift PaaS is a good rapid deployment solution in that it's fast and consistent in setup.
- Internal-hosted medium gear
- Python 3.3
- Scalable; set to 1
- 1GB storage
- Eloqua variables (service account)
ELOQUA_COMPANYELOQUA_USERNAMEELOQUA_PASSWORD
- Monitoring
PUSHGATEWAY(for Prometheus monitoring of batch jobs)
- Internal-hosted medium gear
- MongoDB 3.2
- 10GB storage
Best practices for testing in a Openshift DEV environment, then promoting to and Openshift PROD environment.
- Test under database load by replicating prod DB
- New features interacting with MongoDB may require additional indexing; testing with a full data replication has a greater chance of catching these issues
- Establish port forward to PROD:
rhc port-forward dwmops -n PRODUCTION_NAMESPACE- In a separate terminal:
mongodump --port 12345- Kill original port forward
- Establish port forward to DEV:
rhc port-forward dwmops -n DEV_NAMESPACE- In a separate terminal:
mongorestore - Ensure
runscriptsare uncommented for non-PROD environments- In DEV,
runscriptsfor non-PROD should normally be commented out to avoid extra load on Eloqua's Bulk API
- In DEV,
- Wait up to 2 hours for next load from Eloqua
- Monitor queues via Prometheus to ensure proper flow
- If testing features expecting a different return result, check sample data from the final queue and the
dwmdev.contactHistorycollection to ensure proper application of business rules
- SSH into the gear and manually comment out python command in
runscripts/halfhour_getdwm.sh - Wait until all queues have been emptied
- Manually backup MongoDB
- Create a new release using
git flow release start vX.Y.Z - Populate any relevant release notes in the
CHANGELOG.md - Update the version in
setup.py - Finish release using
git flow release finish vX.Y.Z - Verify that PROD app
deployment-branch==master - Push to PROD
- Monitor queues regularly for next 8 hours, then next 2 mornings, to ensure proper data flow
- Set deployment branch to previous stable release
rhc app-configure dwmops -n PRODUCTION_NAMESPACE --deployment-branch vA.B.C git push
- Build an API operating off the same MongoDB
