Skip to content

Commit 37c2de2

Browse files
authored
Merge pull request #1759 from twitter/oscar/optional-unsafe-counters
Add a setting to skip null counters
2 parents f0776e0 + 02baaec commit 37c2de2

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

scalding-core/src/main/scala/com/twitter/scalding/Config.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,19 @@ abstract class Config extends Serializable {
432432
def setVerboseFileSourceLogging(b: Boolean): Config =
433433
this + (VerboseFileSourceLoggingKey -> b.toString)
434434

435+
def getSkipNullCounters: Boolean =
436+
get(SkipNullCounters)
437+
.map(_.toBoolean)
438+
.getOrElse(false)
439+
440+
/**
441+
* If this is true, on hadoop, when we get a null Counter
442+
* for a given name, we just ignore the counter instead
443+
* of NPE
444+
*/
445+
def setSkipNullCounters(boolean: Boolean): Config =
446+
this + (SkipNullCounters -> boolean.toString)
447+
435448
override def hashCode = toMap.hashCode
436449
override def equals(that: Any) = that match {
437450
case thatConf: Config => toMap == thatConf.toMap
@@ -451,6 +464,7 @@ object Config {
451464
val ScaldingJobArgs: String = "scalding.job.args"
452465
val ScaldingJobArgsSerialized: String = "scalding.job.argsserialized"
453466
val ScaldingVersion: String = "scalding.version"
467+
val SkipNullCounters: String = "scalding.counters.skipnull"
454468
val HRavenHistoryUserName: String = "hraven.history.user.name"
455469
val ScaldingRequireOrderedSerialization: String = "scalding.require.orderedserialization"
456470
val FlowListeners: String = "scalding.observability.flowlisteners"

scalding-core/src/main/scala/com/twitter/scalding/Stats.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ private[scalding] final case class HadoopFlowPCounterImpl(fp: HadoopFlowProcess,
7171
c <- Option(r.getCounter(statKey.group, statKey.counter))
7272
} yield c).orNull
7373

74+
def skipNull: Boolean =
75+
fp.getProperty(Config.SkipNullCounters) match {
76+
case null => false // by default don't skip
77+
case isset => isset.toString.toBoolean
78+
}
79+
80+
require((counter != null) || skipNull, s"counter for $statKey is null and ${Config.SkipNullCounters} is not set to true")
81+
7482
override def increment(amount: Long): Unit =
7583
if (counter != null) counter.increment(amount) else ()
7684
}

0 commit comments

Comments
 (0)