diff --git a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/AWSCloudWatchLoggingSessionController.swift b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/AWSCloudWatchLoggingSessionController.swift index 85f2d86ed6..a35054fe65 100644 --- a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/AWSCloudWatchLoggingSessionController.swift +++ b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/AWSCloudWatchLoggingSessionController.swift @@ -132,14 +132,19 @@ final class AWSCloudWatchLoggingSessionController { } self.batchSubscription = producer.logBatchPublisher.sink { [weak self] batch in guard self?.networkMonitor.isOnline == true else { return } + + // Capture strong references to consumer and batch before the async task + let strongConsumer = consumer + let strongBatch = batch + Task { do { - try await consumer.consume(batch: batch) + try await strongConsumer.consume(batch: strongBatch) } catch { Amplify.Logging.default.error("Error flushing logs with error \(error.localizedDescription)") let payload = HubPayload(eventName: HubPayload.EventName.Logging.flushLogFailure, context: error.localizedDescription) Amplify.Hub.dispatch(to: HubChannel.logging, payload: payload) - try batch.complete() + try strongBatch.complete() } } } @@ -178,8 +183,15 @@ final class AWSCloudWatchLoggingSessionController { } private func consumeLogBatch(_ batch: LogBatch) async throws { + // Check if consumer exists before trying to use it + guard let consumer = self.consumer else { + // If consumer is nil, still mark the batch as completed to prevent memory leaks + try batch.complete() + return + } + do { - try await consumer?.consume(batch: batch) + try await consumer.consume(batch: batch) } catch { Amplify.Logging.default.error("Error flushing logs with error \(error.localizedDescription)") let payload = HubPayload(eventName: HubPayload.EventName.Logging.flushLogFailure, context: error.localizedDescription) diff --git a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift index 3ff0d65068..bdcc17c458 100644 --- a/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift +++ b/AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift @@ -18,7 +18,12 @@ class CloudWatchLoggingConsumer { private let logGroupName: String private var logStreamName: String? private var ensureLogStreamExistsComplete: Bool = false - private let encoder = JSONEncoder() + private let encoderLock = NSLock() + private let encoder: JSONEncoder = { + let encoder = JSONEncoder() + encoder.dateEncodingStrategy = .millisecondsSince1970 + return encoder + }() init( client: CloudWatchLogsClientProtocol, @@ -29,6 +34,12 @@ class CloudWatchLoggingConsumer { self.formatter = CloudWatchLoggingStreamNameFormatter(userIdentifier: userIdentifier) self.logGroupName = logGroupName } + + private func safeEncode(_ value: T) throws -> Data { + return try encoderLock.withLock { + return try encoder.encode(value) + } + } } extension CloudWatchLoggingConsumer: LogBatchConsumer { @@ -40,28 +51,54 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { } await ensureLogStreamExists() - let batchByteSize = try encoder.encode(entries).count - if entries.count > AWSCloudWatchConstants.maxLogEvents { - let smallerEntries = entries.chunked(into: AWSCloudWatchConstants.maxLogEvents) + // Add safety check for nil logStreamName + guard let _ = self.logStreamName else { + Amplify.Logging.error("Log stream name is nil, cannot send logs") + try batch.complete() + return + } + + // Create a strong reference to entries to prevent deallocation during encoding + let entriesCopy = entries + + var batchByteSize: Int + do { + batchByteSize = try safeEncode(entriesCopy).count + } catch { + Amplify.Logging.error("Failed to encode log entries: \(error)") + try batch.complete() + return + } + + if entriesCopy.count > AWSCloudWatchConstants.maxLogEvents { + let smallerEntries = entriesCopy.chunked(into: AWSCloudWatchConstants.maxLogEvents) for entries in smallerEntries { - let entrySize = try encoder.encode(entries).count - if entrySize > AWSCloudWatchConstants.maxBatchByteSize { - let chunks = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize) - for chunk in chunks { - try await sendLogEvents(chunk) + do { + let entrySize = try safeEncode(entries).count + if entrySize > AWSCloudWatchConstants.maxBatchByteSize { + let chunks = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize) + for chunk in chunks { + try await sendLogEvents(chunk) + } + } else { + try await sendLogEvents(entries) } - } else { - try await sendLogEvents(entries) + } catch { + Amplify.Logging.error("Error processing log batch: \(error)") + continue } } - } else if batchByteSize > AWSCloudWatchConstants.maxBatchByteSize { - let smallerEntries = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize) - for entries in smallerEntries { - try await sendLogEvents(entries) + do { + let smallerEntries = try chunk(entriesCopy, into: AWSCloudWatchConstants.maxBatchByteSize) + for entries in smallerEntries { + try await sendLogEvents(entries) + } + } catch { + Amplify.Logging.error("Error chunking log entries: \(error)") } } else { - try await sendLogEvents(entries) + try await sendLogEvents(entriesCopy) } try batch.complete() @@ -72,47 +109,103 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { return } - defer { - ensureLogStreamExistsComplete = true - } - + // Only mark as complete after everything has finished successfully + // to avoid potential race conditions with incomplete state + if logStreamName == nil { - self.logStreamName = await formatter.formattedStreamName() + do { + // Explicitly capture self to avoid potential memory issues + let streamName = await self.formatter.formattedStreamName() + // Check if self is still valid and streamName is not nil before assigning + if !streamName.isEmpty { + self.logStreamName = streamName + } else { + // Fallback to a default if the stream name couldn't be determined + self.logStreamName = "default.\(UUID().uuidString)" + } + } catch { + // Handle any potential errors from async call + Amplify.Logging.error("Failed to get formatted stream name: \(error)") + // Fallback to a default + self.logStreamName = "default.\(UUID().uuidString)" + } } - let stream = try? await self.client.describeLogStreams(input: DescribeLogStreamsInput( - logGroupName: self.logGroupName, - logStreamNamePrefix: self.logStreamName - )).logStreams?.first(where: { stream in - return stream.logStreamName == self.logStreamName - }) - if stream != nil { + // Safety check - ensure we have a valid stream name before proceeding + guard let logStreamName = self.logStreamName, !logStreamName.isEmpty else { + Amplify.Logging.error("Invalid log stream name") + ensureLogStreamExistsComplete = true return } - - _ = try? await self.client.createLogStream(input: CreateLogStreamInput( - logGroupName: self.logGroupName, - logStreamName: self.logStreamName - )) + + do { + let stream = try? await self.client.describeLogStreams(input: DescribeLogStreamsInput( + logGroupName: self.logGroupName, + logStreamNamePrefix: logStreamName + )).logStreams?.first(where: { stream in + return stream.logStreamName == logStreamName + }) + + if stream == nil { + _ = try? await self.client.createLogStream(input: CreateLogStreamInput( + logGroupName: self.logGroupName, + logStreamName: logStreamName + )) + } + + // Mark as complete only after all operations finished + ensureLogStreamExistsComplete = true + } catch { + Amplify.Logging.error("Error ensuring log stream exists: \(error)") + // Still mark as complete to avoid getting stuck in a failed state + ensureLogStreamExistsComplete = true + } } private func sendLogEvents(_ entries: [LogEntry]) async throws { + // Safety check for empty entries + if entries.isEmpty { + return + } + + // Safety check for logStreamName + guard let logStreamName = self.logStreamName, !logStreamName.isEmpty else { + Amplify.Logging.error("Cannot send log events: Log stream name is nil or empty") + return + } + let events = convertToCloudWatchInputLogEvents(for: entries) - let response = try await self.client.putLogEvents(input: PutLogEventsInput( - logEvents: events, - logGroupName: self.logGroupName, - logStreamName: self.logStreamName, - sequenceToken: nil - )) - let retriableEntries = retriable(entries: entries, in: response) - if !retriableEntries.isEmpty { - let retriableEvents = convertToCloudWatchInputLogEvents(for: retriableEntries) - _ = try await self.client.putLogEvents(input: PutLogEventsInput( - logEvents: retriableEvents, + + // Safety check for empty events + if events.isEmpty { + Amplify.Logging.warn("No valid events to send to CloudWatch") + return + } + + do { + let response = try await self.client.putLogEvents(input: PutLogEventsInput( + logEvents: events, logGroupName: self.logGroupName, - logStreamName: self.logStreamName, + logStreamName: logStreamName, sequenceToken: nil )) + + // Handle retriable entries + let retriableEntries = retriable(entries: entries, in: response) + if !retriableEntries.isEmpty { + let retriableEvents = convertToCloudWatchInputLogEvents(for: retriableEntries) + if !retriableEvents.isEmpty { + _ = try await self.client.putLogEvents(input: PutLogEventsInput( + logEvents: retriableEvents, + logGroupName: self.logGroupName, + logStreamName: logStreamName, + sequenceToken: nil + )) + } + } + } catch { + Amplify.Logging.error("Failed to send log events: \(error)") + throw error } } @@ -147,19 +240,48 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer { var chunks: [[LogEntry]] = [] var chunk: [LogEntry] = [] var currentChunkSize = 0 + for entry in entries { - let entrySize = try encoder.encode(entry).count + // Wrap the encoding in a do-catch to handle potential errors + var entrySize: Int + do { + entrySize = try encoder.encode(entry).count + } catch { + Amplify.Logging.error("Failed to encode log entry: \(error)") + // Skip this entry and continue with the next one + continue + } + if currentChunkSize + entrySize < maxByteSize { chunk.append(entry) currentChunkSize = currentChunkSize + entrySize } else { - chunks.append(chunk) + // Only add non-empty chunks + if !chunk.isEmpty { + chunks.append(chunk) + } chunk = [entry] - currentChunkSize = currentChunkSize + entrySize + currentChunkSize = entrySize } } - + + // Add the last chunk if it's not empty + if !chunk.isEmpty { + chunks.append(chunk) + } + + // Return even if chunks is empty to avoid null pointer issues return chunks } // swiftlint:enable shorthand_operator } + +private extension NSLock { + func withLock(_ block: () throws -> T) throws -> T { + lock() + defer { unlock() } + return try block() + } +} + +