-
Notifications
You must be signed in to change notification settings - Fork 68
Description
Hi ,
We are trying to Reading a Salesforce Object using com.springml.spark.salesforce package. we are getting below error while saving the dataframe to s3 bucket. We are facing this issue from so long, not sure where is the bug on this.
SOQL is SOQL as (select Id, RecordTypeId, WhoId, WhatId, WhoCount, WhatCount, Subject, ActivityDate, Status, Priority, IsHighPriority, OwnerId, Description, CurrencyIsoCode, Type, IsDeleted, AccountId, IsClosed, CreatedDate, CreatedById, LastModifiedDate, LastModifiedById, SystemModstamp, IsArchived, CallDurationInSeconds, CallType, CallDisposition, CallObject, ReminderDateTime, IsReminderSet, RecurrenceActivityId, IsRecurrence, RecurrenceStartDateOnly, RecurrenceEndDateOnly, RecurrenceTimeZoneSidKey, RecurrenceType, RecurrenceInterval, RecurrenceDayOfWeekMask, RecurrenceDayOfMonth, RecurrenceInstance, RecurrenceMonthOfYear, RecurrenceRegeneratedType, TaskSubtype, CompletedDateTime, Power_of_One__c, Hour__c, Interaction_Duration__c, Connect__c, Conversation__c, TimeTrade_SF1__Invitation__c, TimeTrade_SF1__Meeting__c, DNIS__c, Created_Date_Time__c, Genesys_Call_Details__c, Unassociated__c, G4S__Glance_Duration__c, G4S__Glance_End_Time__c, G4S__Glance_Guests__c, G4S__Glance_Location__c, G4S__Glance_Session_Key__c, G4S__Glance_Session_Type__c, G4S__Glance_Start_Time__c, Start_Date_Time__c, Brand__c, Disposition__c from TASK where Id='XXXXXXXXXXX' )
val sfdcObjectDF = spark.read.format("com.springml.spark.salesforce").option("username", userName).
option("password", s"$sfdcPassword$sfdcToken").option("soql", retrievingSOQL).
option("version", JavaUtils.getConfigProps(runtimeEnvironment).getProperty("sfdc.api.version")).
option("sfObject", sfdcObject).option("bulk", "true").option("pkChunking", pkChunking).
option("chunkSize", checkingSize).
option("timeout", bulkTimeoutMillis.toString).option("maxCharsPerColumn","-1").
schema(sfdcObjectSchema).load()
sfdcObjectDF.write.format("parquet").mode(SaveMode.Overwrite).save(s"${JavaUtils.getConfigProps(runtimeEnvironment).getProperty("etl.dataset.root")}/$accountName/$sfdcObject")
java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174)
at org.apache.spark.sql.Row$class.apply(Row.scala:163)
at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(rows.scala:166)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)