diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index 7378af14..6c0b7dd2 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -26,6 +26,11 @@ jobs: with: submodules: 'recursive' + - name: Setup cmake + uses: jwlawson/actions-setup-cmake@v2 + with: + cmake-version: '3.x' + - name: Configure CMake # Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make. # See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type diff --git a/.github/workflows/kvsappcli.yaml b/.github/workflows/kvsappcli.yaml new file mode 100644 index 00000000..e67bb94f --- /dev/null +++ b/.github/workflows/kvsappcli.yaml @@ -0,0 +1,75 @@ +name: kvsappcli test + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + kvsappcli-on-mac: + runs-on: macos-15 + + permissions: + id-token: write + contents: read + + env: + STREAM_NAME: kvs-producer-embedded-c-kvsappcli-macos + AWS_KVS_HOST: "kinesisvideo.us-west-2.amazonaws.com" + + steps: + - uses: actions/checkout@v4 + with: + submodules: 'recursive' + + - name: Install dependencies + run: | + brew update + brew install coreutils # for gtimeout + brew install --cask mkvtoolnix + + - name: Setup cmake + uses: jwlawson/actions-setup-cmake@v2 + with: + cmake-version: '3.x' + + - name: Fix the const issue + working-directory: ./libraries/amazon/amazon-kinesis-video-streams-media-interface + run: | + git apply ../../../patches/amazon-kinesis-video-streams-media-interface/const-fix.patch + + - name: Build project + run: | + mkdir build + cd build + cmake .. -DCMAKE_BUILD_TYPE="Debug" -DENABLE_MKV_DUMP=ON + make -j + + - name: Setup AWS Credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-region: us-west-2 + role-to-assume: ${{ secrets.KVS_GITHUB_ACTIONS_ROLE_ARN }} + + - name: Run the sample + working-directory: ./build + run: | + gtimeout --signal=SIGINT --kill-after=15s --preserve-status 30s ./bin/kvsappcli "$STREAM_NAME" + + - name: Check MKV file + working-directory: ./build + run: | + if [ ! -f ./dumped_output.mkv ]; then + echo "❌ MKV file was not created!" + exit 1 + fi + + output="$(mkvinfo -v ./dumped_output.mkv)" + echo "$output" + + if ! echo "$output" | grep "String: test_value_9"; then + echo "❌ No metadata found!" + exit 1 + fi + shell: bash diff --git a/.gitignore b/.gitignore index dd285725..2850c400 100755 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ build +cmake-build-debug samples/kvs-esp32/sdkconfig.old samples/kvs-esp32/components/llhttp/* diff --git a/patches/amazon-kinesis-video-streams-media-interface/const-fix.patch b/patches/amazon-kinesis-video-streams-media-interface/const-fix.patch new file mode 100644 index 00000000..69593638 --- /dev/null +++ b/patches/amazon-kinesis-video-streams-media-interface/const-fix.patch @@ -0,0 +1,183 @@ +diff --git a/include/com/amazonaws/kinesis/video/capturer/AudioCapturer.h b/include/com/amazonaws/kinesis/video/capturer/AudioCapturer.h +index e0747f4..63d0c79 100644 +--- a/include/com/amazonaws/kinesis/video/capturer/AudioCapturer.h ++++ b/include/com/amazonaws/kinesis/video/capturer/AudioCapturer.h +@@ -46,7 +46,7 @@ AudioCapturerHandle audioCapturerCreate(void); + * @param[in] handle Handle of AudioCapturer. + * @return AudioCapturerStatus + */ +-AudioCapturerStatus audioCapturerGetStatus(const AudioCapturerHandle const handle); ++AudioCapturerStatus audioCapturerGetStatus(const AudioCapturerHandle handle); + + /** + * @brief Get capturer capability. +@@ -55,7 +55,7 @@ AudioCapturerStatus audioCapturerGetStatus(const AudioCapturerHandle const handl + * @param[out] pCapability Capturer capability. + * @return int 0 or error code. + */ +-int audioCapturerGetCapability(const AudioCapturerHandle const handle, AudioCapability* pCapability); ++int audioCapturerGetCapability(const AudioCapturerHandle handle, AudioCapability* pCapability); + + /** + * @brief Set capturer format, channel number, bit depth and sample rate. +@@ -80,7 +80,7 @@ int audioCapturerSetFormat(AudioCapturerHandle handle, const AudioFormat format, + * @param[in] pBitDepth Frame bit depth. + * @return int 0 or error code. + */ +-int audioCapturerGetFormat(const AudioCapturerHandle const handle, AudioFormat* pFormat, AudioChannel* pChannel, AudioSampleRate* pSampleRate, ++int audioCapturerGetFormat(const AudioCapturerHandle handle, AudioFormat* pFormat, AudioChannel* pChannel, AudioSampleRate* pSampleRate, + AudioBitDepth* pBitDepth); + + /** +diff --git a/include/com/amazonaws/kinesis/video/capturer/VideoCapturer.h b/include/com/amazonaws/kinesis/video/capturer/VideoCapturer.h +index 0b3671f..412625a 100644 +--- a/include/com/amazonaws/kinesis/video/capturer/VideoCapturer.h ++++ b/include/com/amazonaws/kinesis/video/capturer/VideoCapturer.h +@@ -46,7 +46,7 @@ VideoCapturerHandle videoCapturerCreate(void); + * @param[in] handle Handle of VideoCapturer. + * @return VideoCapturerStatus Status of VideoCapturer. + */ +-VideoCapturerStatus videoCapturerGetStatus(const VideoCapturerHandle const handle); ++VideoCapturerStatus videoCapturerGetStatus(const VideoCapturerHandle handle); + + /** + * @brief Get capturer capability. +@@ -55,7 +55,7 @@ VideoCapturerStatus videoCapturerGetStatus(const VideoCapturerHandle const handl + * @param[out] pCapability Capturer capability. + * @return int 0 or error code. + */ +-int videoCapturerGetCapability(const VideoCapturerHandle const handle, VideoCapability* pCapability); ++int videoCapturerGetCapability(const VideoCapturerHandle handle, VideoCapability* pCapability); + + /** + * @brief Set capturer format and resolution. +@@ -75,7 +75,7 @@ int videoCapturerSetFormat(VideoCapturerHandle handle, const VideoFormat format, + * @param[out] pResolution Frame resolution. + * @return int 0 or error code. + */ +-int videoCapturerGetFormat(const VideoCapturerHandle const handle, VideoFormat* pFormat, VideoResolution* pResolution); ++int videoCapturerGetFormat(const VideoCapturerHandle handle, VideoFormat* pFormat, VideoResolution* pResolution); + + /** + * @brief Acquire and turn on video stream. +diff --git a/include/com/amazonaws/kinesis/video/player/AudioPlayer.h b/include/com/amazonaws/kinesis/video/player/AudioPlayer.h +index db15477..2bf1c51 100644 +--- a/include/com/amazonaws/kinesis/video/player/AudioPlayer.h ++++ b/include/com/amazonaws/kinesis/video/player/AudioPlayer.h +@@ -46,7 +46,7 @@ AudioPlayerHandle audioPlayerCreate(void); + * @param[in] handle Handle of AudioPlayer. + * @return AudioPlayerStatus Status of AudioPlayer. + */ +-AudioPlayerStatus audioPlayerGetStatus(const AudioPlayerHandle const handle); ++AudioPlayerStatus audioPlayerGetStatus(const AudioPlayerHandle handle); + + /** + * @brief Get player capability. +@@ -55,7 +55,7 @@ AudioPlayerStatus audioPlayerGetStatus(const AudioPlayerHandle const handle); + * @param[out] pCapability Player capability. + * @return int 0 or error code. + */ +-int audioPlayerGetCapability(const AudioPlayerHandle const handle, AudioCapability* pCapability); ++int audioPlayerGetCapability(const AudioPlayerHandle handle, AudioCapability* pCapability); + + /** + * @brief Set player format, channel number, bit depth and sample rate. +@@ -80,7 +80,7 @@ int audioPlayerSetFormat(AudioPlayerHandle handle, const AudioFormat format, con + * @param[in] pBitDepth Frame bit depth. + * @return int 0 or error code. + */ +-int audioPlayerGetFormat(const AudioPlayerHandle const handle, AudioFormat* pFormat, AudioChannel* pChannel, AudioSampleRate* pSampleRate, ++int audioPlayerGetFormat(const AudioPlayerHandle handle, AudioFormat* pFormat, AudioChannel* pChannel, AudioSampleRate* pSampleRate, + AudioBitDepth* pBitDepth); + + /** +diff --git a/source/FILE/FILEAudioCapturer.c b/source/FILE/FILEAudioCapturer.c +index 8fb3e69..17d4ca2 100644 +--- a/source/FILE/FILEAudioCapturer.c ++++ b/source/FILE/FILEAudioCapturer.c +@@ -84,7 +84,7 @@ AudioCapturerHandle audioCapturerCreate(void) + return (AudioCapturerHandle) fileHandle; + } + +-AudioCapturerStatus audioCapturerGetStatus(const AudioCapturerHandle const handle) ++AudioCapturerStatus audioCapturerGetStatus(const AudioCapturerHandle handle) + { + if (!handle) { + return AUD_CAP_STATUS_NOT_READY; +@@ -94,7 +94,7 @@ AudioCapturerStatus audioCapturerGetStatus(const AudioCapturerHandle const handl + return fileHandle->status; + } + +-int audioCapturerGetCapability(const AudioCapturerHandle const handle, AudioCapability* pCapability) ++int audioCapturerGetCapability(const AudioCapturerHandle handle, AudioCapability* pCapability) + { + FILE_HANDLE_NULL_CHECK(handle); + FILE_HANDLE_GET(handle); +@@ -170,7 +170,7 @@ int audioCapturerSetFormat(AudioCapturerHandle handle, const AudioFormat format, + return 0; + } + +-int audioCapturerGetFormat(const AudioCapturerHandle const handle, AudioFormat* pFormat, AudioChannel* pChannel, AudioSampleRate* pSampleRate, ++int audioCapturerGetFormat(const AudioCapturerHandle handle, AudioFormat* pFormat, AudioChannel* pChannel, AudioSampleRate* pSampleRate, + AudioBitDepth* pBitDepth) + { + FILE_HANDLE_NULL_CHECK(handle); +diff --git a/source/FILE/FILEAudioPlayer.c b/source/FILE/FILEAudioPlayer.c +index 8f35381..2f18b77 100644 +--- a/source/FILE/FILEAudioPlayer.c ++++ b/source/FILE/FILEAudioPlayer.c +@@ -33,12 +33,12 @@ AudioPlayerHandle audioPlayerCreate(void) + return NULL; + } + +-AudioPlayerStatus audioPlayerGetStatus(const AudioPlayerHandle const handle) ++AudioPlayerStatus audioPlayerGetStatus(const AudioPlayerHandle handle) + { + return AUD_PLY_STATUS_NOT_READY; + } + +-int audioPlayerGetCapability(const AudioPlayerHandle const handle, AudioCapability* pCapability) ++int audioPlayerGetCapability(const AudioPlayerHandle handle, AudioCapability* pCapability) + { + return -EAGAIN; + } +@@ -49,7 +49,7 @@ int audioPlayerSetFormat(AudioPlayerHandle handle, const AudioFormat format, con + return -EAGAIN; + } + +-int audioPlayerGetFormat(const AudioPlayerHandle const handle, AudioFormat* pFormat, AudioChannel* pChannel, AudioSampleRate* pSampleRate, ++int audioPlayerGetFormat(const AudioPlayerHandle handle, AudioFormat* pFormat, AudioChannel* pChannel, AudioSampleRate* pSampleRate, + AudioBitDepth* pBitDepth) + { + return -EAGAIN; +diff --git a/source/FILE/FILEVideoCapturer.c b/source/FILE/FILEVideoCapturer.c +index 4054bb2..018c548 100644 +--- a/source/FILE/FILEVideoCapturer.c ++++ b/source/FILE/FILEVideoCapturer.c +@@ -74,7 +74,7 @@ VideoCapturerHandle videoCapturerCreate(void) + return (VideoCapturerHandle) fileHandle; + } + +-VideoCapturerStatus videoCapturerGetStatus(const VideoCapturerHandle const handle) ++VideoCapturerStatus videoCapturerGetStatus(const VideoCapturerHandle handle) + { + if (!handle) { + return VID_CAP_STATUS_NOT_READY; +@@ -84,7 +84,7 @@ VideoCapturerStatus videoCapturerGetStatus(const VideoCapturerHandle const handl + return fileHandle->status; + } + +-int videoCapturerGetCapability(const VideoCapturerHandle const handle, VideoCapability* pCapability) ++int videoCapturerGetCapability(const VideoCapturerHandle handle, VideoCapability* pCapability) + { + FILE_HANDLE_NULL_CHECK(handle); + FILE_HANDLE_GET(handle); +@@ -132,7 +132,7 @@ int videoCapturerSetFormat(VideoCapturerHandle handle, const VideoFormat format, + return 0; + } + +-int videoCapturerGetFormat(const VideoCapturerHandle const handle, VideoFormat* pFormat, VideoResolution* pResolution) ++int videoCapturerGetFormat(const VideoCapturerHandle handle, VideoFormat* pFormat, VideoResolution* pResolution) + { + FILE_HANDLE_NULL_CHECK(handle); + FILE_HANDLE_GET(handle); diff --git a/samples/kvsapp/kvsappcli.c b/samples/kvsapp/kvsappcli.c index 99c2b5d1..3bb1caa4 100644 --- a/samples/kvsapp/kvsappcli.c +++ b/samples/kvsapp/kvsappcli.c @@ -220,7 +220,7 @@ static void *audioThread(void *arg) } } - audioCapturerReleaseStream(videoCapturerHandle); + audioCapturerReleaseStream(audioCapturerHandle); printf("audio thread leaving, err:%d\n", res); return NULL; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bd7c09de..f9b22164 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -13,6 +13,8 @@ set(CMAKE_C_FLAGS "-D_XOPEN_SOURCE=600 -D_POSIX_C_SOURCE=200112L ${CMAKE_C_FLAGS set(LIB_DIR ${CMAKE_CURRENT_SOURCE_DIR}) +option(ENABLE_MKV_DUMP "Enable MKV dump to file" OFF) + set(LIB_SRC ${LIB_DIR}/include/kvs/kvsapp.h ${LIB_DIR}/include/kvs/kvsapp_options.h @@ -98,6 +100,11 @@ if(${USE_WEBRTC_MBEDTLS_LIB}) target_include_directories(${LIB_NAME} PUBLIC ${WEBRTC_INC_PATH}) endif() +if(${ENABLE_MKV_DUMP}) + message(STATUS "MKV dump enabled") + target_compile_definitions(${LIB_NAME} PUBLIC ENABLE_MKV_DUMP) +endif() + target_link_libraries(${LIB_NAME} PUBLIC ${LINK_LIBS} ) diff --git a/src/include/kvs/mkv_generator.h b/src/include/kvs/mkv_generator.h index ebaff131..144ef3eb 100755 --- a/src/include/kvs/mkv_generator.h +++ b/src/include/kvs/mkv_generator.h @@ -38,7 +38,7 @@ #define MKV_TRACK_SIZE ( 2 ) - +#define MAX_TAG_AMOUNT 10 #define MAX_TAG_NAME_LEN 128 #define MAX_TAG_VALUE_LEN 256 @@ -46,13 +46,14 @@ //https://github.com/awslabs/amazon-kinesis-video-streams-pic/blob/c98c2a256a7bb3dfc4db41ae26d45e28ac7eec56/src/mkvgen/include/com/amazonaws/kinesis/video/mkvgen/Include.h#L104C31-L104C34 typedef struct { - char key[MAX_TAG_NAME_LEN]; char value[MAX_TAG_VALUE_LEN]; - } MkvTag_t; - +typedef struct { + uint8_t* buffer; + size_t size; +} MkvTagsBuffer_t; typedef enum TrackType { @@ -229,12 +230,16 @@ int Mkv_generateAacCodecPrivateData(Mpeg4AudioObjectTypes_t objectType, uint32_t */ int Mkv_generatePcmCodecPrivateData(PcmFormatCode_t format, uint32_t uSamplingRate, uint16_t channels, uint8_t **ppCodecPrivateData, size_t *puCodecPrivateDataLen); - - -int Mkv_initializeTagsHdr(uint8_t *pTagHdr, - size_t uTagHdrLen, - uint32_t numTags, - const MkvTag_t *tags); - +/** + * @brief Allocates and writes MKV tags with their headers to the buffer. The caller is responsible for freeing out->buffer using free(). + * + * @param tagsList[in] the tags to write + * @param tagsListLen[in] length of tagsList + * @param out[out] buffer containing the bytes written and its length + * @return 0 on success, non-zero value otherwise + * + * @see MkvTag_t https://www.matroska.org/technical/elements.html + */ +int Mkv_generateTags(const MkvTag_t tagsList[], size_t tagsListLen, MkvTagsBuffer_t* out); #endif /* KVS_MKV_GENERATOR_H */ \ No newline at end of file diff --git a/src/include/kvs/stream.h b/src/include/kvs/stream.h index 96cd0624..4d023a2e 100755 --- a/src/include/kvs/stream.h +++ b/src/include/kvs/stream.h @@ -132,6 +132,21 @@ int Kvs_streamMemStatTotal(StreamHandle xStreamHandle, size_t *puMemTotal); */ int Kvs_dataFrameGetContent(DataFrameHandle xDataFrameHandle, uint8_t **ppMkvHeader, size_t *puMkvHeaderLen, uint8_t **ppData, size_t *puDataLen); +/** + * @brief Add MKV tags to the data frame + * + * @param xDataFrameHandle[in] The data frame handle + * @param tagsList[in] List of tags to add to this data frame + * @param tagsListLen[in] Length of the tagsList + * @param endOfStream[in] Whether to add the end of fragment tag (EOFR) + * @param ppMkvHeader[out] The MKV header + * @param puMkvHeaderLen[out] THe MKV header length + * @param ppData[out] The data pointer + * @param puDataLen[out] The data length + * @return 0 on success, non-zero value otherwise + */ +int Kvs_dataFrameAddTags(DataFrameHandle xDataFrameHandle, MkvTag_t* tagsList, size_t tagsListLen, bool endOfStream, uint8_t **ppMkvHeader, size_t *puMkvHeaderLen, uint8_t **ppData, size_t *puDataLen); + /** * @brief Terminate a data frame handle * diff --git a/src/source/app/kvsapp.c b/src/source/app/kvsapp.c index 55ea9b0f..0ea31a78 100644 --- a/src/source/app/kvsapp.c +++ b/src/source/app/kvsapp.c @@ -114,6 +114,9 @@ typedef struct KvsApp bool isAudioTrackPresent; AudioTrackInfo_t *pAudioTrackInfo; + MkvTag_t* tagsList; + size_t tagsListLen; + /* Session scope callbacks */ OnMkvSentCallbackInfo_t onMkvSentCallbackInfo; } KvsApp_t; @@ -780,7 +783,13 @@ static int prvPutMediaSendData(KvsApp_t *pKvs, int *pxSendCnt, bool bForceSend) LogError("Failed to get data and mkv header to send"); /* Propagate the res error */ } - else if ((res = Kvs_putMediaUpdate(pKvs->xPutMediaHandle, pMkvHeader, uMkvHeaderLen, pData, uDataLen)) != KVS_ERRNO_NONE) + else if ( + (((DataFrameIn_t *)xDataFrameHandle)->xClusterType == MKV_CLUSTER) && pKvs->tagsListLen > 0 && + (res = Kvs_dataFrameAddTags(xDataFrameHandle, pKvs->tagsList, pKvs->tagsListLen, false, &pMkvHeader, &uMkvHeaderLen, &pData, &uDataLen)) != KVS_ERRNO_NONE) { + LogError("Failed to add tags"); + /* Propagate the res error */ + } + else if (Kvs_putMediaUpdate(pKvs->xPutMediaHandle, pMkvHeader, uMkvHeaderLen, pData, uDataLen) != KVS_ERRNO_NONE) { LogError("Failed to update"); /* Propagate the res error */ @@ -893,6 +902,33 @@ static int prvPutMediaDoWorkSendEndOfFrames(KvsApp_t *pKvs) return res; } +static int setupTagsForSession(KvsApp_t *pKvs) +{ + int res = KVS_ERRNO_NONE; + const size_t tagsListLen = MAX_TAG_AMOUNT - 1; // Note: end of fragment tag counts towards the 10 limit + MkvTag_t *tags = kvsMalloc(tagsListLen * sizeof(MkvTag_t)); + if (tags == NULL) + { + res = KVS_ERROR_OUT_OF_MEMORY; + LogError("OOM: tagsList"); + } + else + { + for (int i = 1; i <= tagsListLen; i++) + { + snprintf((char *)tags[i - 1].key, MAX_TAG_NAME_LEN, "test_key_%d", i); + snprintf((char *)tags[i - 1].value, MAX_TAG_VALUE_LEN, "test_value_%d", i); + } + + pKvs->tagsList = tags; + pKvs->tagsListLen = tagsListLen; + + LogInfo("Setup tags for session"); + } + + return res; +} + KvsAppHandle KvsApp_create(const char *pcHost, const char *pcRegion, const char *pcService, const char *pcStreamName) { int res = KVS_ERRNO_NONE; @@ -950,6 +986,12 @@ KvsAppHandle KvsApp_create(const char *pcHost, const char *pcRegion, const char pKvs->pVideoTrackInfo = NULL; pKvs->isAudioTrackPresent = false; pKvs->pAudioTrackInfo = NULL; + + if ((res = setupTagsForSession(pKvs)) != KVS_ERRNO_NONE) + { + LogError("Failed to setup tags"); + /* Propagate the res error */ + } } } @@ -1062,6 +1104,11 @@ void KvsApp_terminate(KvsAppHandle handle) kvsFree(pKvs->pPps); pKvs->pPps = NULL; } + if (pKvs->tagsList != NULL) + { + kvsFree(pKvs->tagsList); + pKvs->tagsList = NULL; + } Unlock(pKvs->xLock); diff --git a/src/source/mkv/mkv_generator.c b/src/source/mkv/mkv_generator.c index 81a81605..87eecffa 100644 --- a/src/source/mkv/mkv_generator.c +++ b/src/source/mkv/mkv_generator.c @@ -998,35 +998,85 @@ int Mkv_generatePcmCodecPrivateData(PcmFormatCode_t format, uint32_t uSamplingRa return res; } +/*-----------------------------------------------------------*/ -/** - * @brief Initializes the MKV (Matroska) Tags header - * - * This function sets up the header for MKV Tags, which are used to add metadata - * to the MKV container. It populates the provided buffer with the necessary - * header information based on the given tags. - * - * @param pTagHdr Pointer to the buffer where the tag header will be written - * @param uTagHdrLen Size of the buffer pointed to by pTagHdr - * @param numTags Number of tags to be added - * @param tags Pointer to an array of MkvTag_t structures containing the tag data - * - * @return int Returns 0 on success, or a negative error code on failure - * - * @note The caller is responsible for ensuring that pTagHdr is large enough to - * hold the entire tag header. - * - * @see MkvTag_t https://www.matroska.org/technical/elements.html - */ -int Mkv_initializeTagsHdr(uint8_t *pTagHdr, size_t uTagHdrLen, uint32_t numTags, const MkvTag_t *tags) { +// No access to strnlen +size_t safe_strlen(const char* str, const size_t max_len) +{ + const char *end = (const char *)memchr(str, '\0', max_len); + return end ? (size_t)(end - str) : max_len; +} + +int Mkv_generateTags(const MkvTag_t tagsList[], const size_t tagsListLen, MkvTagsBuffer_t* out) +{ + if (tagsList == NULL) + { + LogError("Null pointer provided for tags"); + return KVS_ERROR_INVALID_ARGUMENT; + } + + if (out == NULL) + { + LogError("Null pointer provided for out"); + return KVS_ERROR_INVALID_ARGUMENT; + } - if (pTagHdr == NULL || tags == NULL) { - LogError("Null pointer provided for Tag Header and Tag Initialization"); + if (tagsListLen > MAX_TAG_AMOUNT) + { + LogError("Adding too many tags. Adding: %zu, Max: %d", tagsListLen, MAX_TAG_AMOUNT); return KVS_ERROR_INVALID_ARGUMENT; } - uint8_t *pIdx = pTagHdr; - uint8_t *pEndBuffer = pTagHdr + uTagHdrLen; + out->buffer = NULL; + out->size = 0; + + // Calculate the total size needed for the Tags element and all its children + // Size Marker Size Field Length Max Encodable Size (Bytes) + // 0b10000000 (0x80) 1 byte 127 + // 0b01000000 (0x40) 2 bytes 16,383 + // 0b00100000 (0x20) 3 bytes 2,097,151 + // 0b00010000 (0x10) 4 bytes 268,435,455 + + size_t totalSize = 4 + 2; // Tags element (ID + 2-byte size) + for (size_t i = 0; i < tagsListLen; i++) + { + const size_t keyLen = safe_strlen(tagsList[i].key, MAX_TAG_NAME_LEN + 1); + const size_t valueLen = safe_strlen(tagsList[i].value, MAX_TAG_VALUE_LEN + 1); + + if (keyLen > MAX_TAG_NAME_LEN || valueLen > MAX_TAG_VALUE_LEN) + { + LogError("Tag key or value exceeds maximum length"); + return KVS_ERROR_INVALID_ARGUMENT; + } + + // Tags element (above) + totalSize += 2 + 2 + // └─Tag element (ID + 2-byte size) + 2 + 2 + // └─SimpleTag element (ID + 2-byte size) + 2 + 2 + keyLen + // ├─TagName element (ID + 2-byte size + content) + 2 + 2 + valueLen; // └─TagString element (ID + 2-byte size + content) + } + + // Allocate buffer + uint8_t *buffer = (uint8_t *)malloc(totalSize); + if (buffer == NULL) + { + LogError("Failed to allocate memory for tags"); + return KVS_ERROR_OUT_OF_MEMORY; + } + + uint8_t *pIdx = buffer; + + /** + * +-----------------+-------------+------------------+------------------------+ + * | Element | EBML ID | Size Field | Description | + * +-----------------+-------------+------------------+------------------------+ + * | Tags | 0x1254C367 | 2 bytes | Master element | + * | └─Tag | 0x7373 | 2 bytes | Single tag container | + * | └─SimpleTag | 0x67C8 | 2 bytes | Tag content container | + * | ├─Name | 0x45A3 | 2 bytes | Tag name/key | + * | └─String | 0x4487 | 2 bytes | Tag value | + * +-----------------+-------------+------------------+------------------------+ + */ // Write Tags master element (ID: 0x1254C367) *(pIdx++) = 0x12; @@ -1034,111 +1084,70 @@ int Mkv_initializeTagsHdr(uint8_t *pTagHdr, size_t uTagHdrLen, uint32_t numTags, *(pIdx++) = 0xC3; *(pIdx++) = 0x67; - // Write Tags size field (8 bytes, placeholder for now) - *(pIdx++) = 0x01; // Size length indicator - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; + // Write Tags size field (2 bytes, placeholder for now) + *(pIdx++) = 0x40; // 2-byte size marker + *(pIdx++) = 0x00; // Actual size, will update later // Process each tag - for (uint32_t i = 0; i < numTags; i++) { - size_t tagNameLen = strlen(tags[i].key); - size_t tagValueLen = strlen(tags[i].value); + for (size_t i = 0; i < tagsListLen; i++) + { + const size_t tagNameLen = safe_strlen(tagsList[i].key, MAX_TAG_NAME_LEN + 1); + const size_t tagValueLen = safe_strlen(tagsList[i].value, MAX_TAG_VALUE_LEN + 1); // Compute dynamic sizes: - // Name element: 2 (ID) + 8 (size) + tagNameLen - // String element: 2 (ID) + 8 (size) + tagValueLen - // SimpleTag payload = Name element + String element = 20 + tagNameLen + tagValueLen - // SimpleTag total size = 10 (header) + (20 + tagNameLen + tagValueLen) - // Tag payload = SimpleTag total size = 30 + tagNameLen + tagValueLen - size_t simpleTagPayloadSize = 20 + tagNameLen + tagValueLen; - size_t simpleTagTotalSize = 30 + tagNameLen + tagValueLen; - size_t tagPayloadSize = simpleTagTotalSize; + // Name element: 2 (ID) + 2 (size) + tagNameLen + // String element: 2 (ID) + 2 (size) + tagValueLen + // SimpleTag payload = Name element + String element = 8 + tagNameLen + tagValueLen + const size_t simpleTagPayloadSize = 8 + tagNameLen + tagValueLen; // Write Tag element (ID: 0x7373) *(pIdx++) = 0x73; *(pIdx++) = 0x73; - // Write Tag size (8 bytes) using computed payload size - *(pIdx++) = 0x01; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = (uint8_t)tagPayloadSize; + // Write Tag size (2 bytes) - payload size plus SimpleTag header (4 bytes) + size_t tagSize = simpleTagPayloadSize + 4; + *(pIdx++) = 0x40 | (uint8_t)(tagSize >> 8); + *(pIdx++) = (uint8_t)(tagSize & 0xFF); // Write SimpleTag element (ID: 0x67C8) *(pIdx++) = 0x67; *(pIdx++) = 0xC8; - // Write SimpleTag size (8 bytes) using computed payload size (Name+String only) - *(pIdx++) = 0x01; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = (uint8_t)simpleTagPayloadSize; - - // Write Tag Name element (ID: 0x45A3) + // Write SimpleTag size (2 bytes) + *(pIdx++) = 0x40 | (uint8_t)(simpleTagPayloadSize >> 8); + *(pIdx++) = (uint8_t)(simpleTagPayloadSize & 0xFF); + + // Write Name element (ID: 0x45A3) *(pIdx++) = 0x45; *(pIdx++) = 0xA3; - // Write Name size (8 bytes) with tagNameLen - *(pIdx++) = 0x01; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = (uint8_t)tagNameLen; - - // Write Tag Name aka Key - memcpy(pIdx, tags[i].key, tagNameLen); + // Write Name size (2 byte) with tagNameLen + *(pIdx++) = 0x40 | (uint8_t)(tagNameLen >> 8); // 2-byte size field + *(pIdx++) = (uint8_t)(tagNameLen & 0xFF); + + // Write Name content + memcpy(pIdx, tagsList[i].key, tagNameLen); pIdx += tagNameLen; // Write String element (ID: 0x4487) *(pIdx++) = 0x44; *(pIdx++) = 0x87; - // Write Tag v alue String size (8 bytes) with tagValueLen - *(pIdx++) = 0x01; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = 0x00; - *(pIdx++) = (uint8_t)tagValueLen; - - // Write Tag value String content - memcpy(pIdx, tags[i].value, tagValueLen); + // Write String size (2 bytes) with tagValueLen + *(pIdx++) = 0x40 | (uint8_t)(tagValueLen >> 8); + *(pIdx++) = (uint8_t)(tagValueLen & 0xFF); + + // Write String content + memcpy(pIdx, tagsList[i].value, tagValueLen); pIdx += tagValueLen; } - // Calculate and update the total size for the Tags master element. - // The size field should cover everything after the initial 12 bytes (ID + size field) - size_t totalSize = pIdx - pTagHdr - 12; - pTagHdr[11] = (uint8_t)totalSize; // Update the size field - - // Debug print for verification - - // for (size_t i = 0; i < 64 && i < (pIdx - pTagHdr); i++) { - // printf("%02x ", pTagHdr[i]); - // if ((i + 1) % 16 == 0) { - // printf("\n"); - // } - // } - // printf("\n"); + // Update the total size + const size_t payloadSize = pIdx - buffer - 6; + buffer[4] = 0x40 | (uint8_t)(payloadSize >> 8); + buffer[5] = (uint8_t)(payloadSize & 0xFF); + out->buffer = buffer; + out->size = totalSize; return KVS_ERRNO_NONE; } - diff --git a/src/source/restful/kvs/restapi_kvs.c b/src/source/restful/kvs/restapi_kvs.c index 347a3a17..4655fe0a 100644 --- a/src/source/restful/kvs/restapi_kvs.c +++ b/src/source/restful/kvs/restapi_kvs.c @@ -499,12 +499,27 @@ static int prvParseFragmentAck(char *pcSrc, size_t uLen, FragmentAck_t *pxFragAc else if ((xStFragMsg = STRING_construct_n(pcSrc + uBytesRead, uMsgLen)) == NULL || parseFragmentMsg(STRING_c_str(xStFragMsg), pxFragAck) != KVS_ERRNO_NONE) { - res = KVS_ERROR_FAIL_TO_PARSE_FRAGMENT_ACK_MSG; - LogInfo("Failed to parse fragment ack"); + res = KVS_ERROR_INVALID_ARGUMENT; + LogError("Fail to parse fragment ack. Invalid uMsgLen: %zu (exceeds buffer size uLen: %zu)", uMsgLen, uLen); } else { - *puFragAckLen = uBytesRead + uMsgLen + 2; + // Allocate a temporary buffer with a null terminator + char tempBuffer[uMsgLen + 1]; + memcpy(tempBuffer, pcSrc + uBytesRead, uMsgLen); + tempBuffer[uMsgLen] = '\0'; // Ensure null termination + + // STRING_construct_n uses strlen, so the input MUST be null terminated + if ((xStFragMsg = STRING_construct_n(tempBuffer, uMsgLen)) == NULL || + parseFragmentMsg(STRING_c_str(xStFragMsg), pxFragAck) != KVS_ERRNO_NONE) + { + res = KVS_ERROR_FAIL_TO_PARSE_FRAGMENT_ACK_MSG; + LogInfo("Failed to parse fragment ack"); + } + else + { + *puFragAckLen = uBytesRead + uMsgLen + 2; + } } STRING_delete(xStFragMsg); @@ -705,83 +720,87 @@ int Kvs_describeStream(KvsServiceParameter_t *pServPara, KvsDescribeStreamParame LogError("Invalid argument"); /* Propagate the res error */ } - else if ((res = getTimeInIso8601(pcXAmzDate, sizeof(pcXAmzDate))) != KVS_ERRNO_NONE) - { - LogError("Failed to get time"); - /* Propagate the res error */ - } - else if ( - (xStHttpBody = STRING_construct_sprintf(DESCRIBE_STREAM_HTTP_BODY_TEMPLATE, pDescPara->pcStreamName)) == NULL || - (xStContentLength = STRING_construct_sprintf("%u", STRING_length(xStHttpBody))) == NULL) - { - res = KVS_ERROR_UNABLE_TO_ALLOCATE_HTTP_BODY; - LogError("Failed to allocate HTTP body"); - } - else if ( - (xHttpReqHeaders = HTTPHeaders_Alloc()) == NULL || - HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_HOST, pServPara->pcHost) != HTTP_HEADERS_OK || - HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_ACCEPT, VAL_ACCEPT_ANY) != HTTP_HEADERS_OK || - HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_CONTENT_LENGTH, STRING_c_str(xStContentLength)) != HTTP_HEADERS_OK || - HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_CONTENT_TYPE, VAL_CONTENT_TYPE_APPLICATION_jSON) != HTTP_HEADERS_OK || - HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_USER_AGENT, VAL_USER_AGENT) != HTTP_HEADERS_OK || - HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_X_AMZ_DATE, pcXAmzDate) != HTTP_HEADERS_OK || - (pServPara->pcToken != NULL && (HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_X_AMZ_SECURITY_TOKEN, pServPara->pcToken) != HTTP_HEADERS_OK))) - { - res = KVS_ERROR_UNABLE_TO_GENERATE_HTTP_HEADER; - LogError("Failed to generate HTTP headers"); - } - else if ( - (xAwsSigV4Handle = prvSign(pServPara, KVS_URI_DESCRIBE_STREAM, URI_QUERY_EMPTY, xHttpReqHeaders, STRING_c_str(xStHttpBody))) == NULL || - HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_AUTHORIZATION, AwsSigV4_GetAuthorization(xAwsSigV4Handle)) != HTTP_HEADERS_OK) - { - res = KVS_ERROR_FAIL_TO_SIGN_HTTP_REQ; - LogError("Failed to sign"); - } - else if ((xNetIoHandle = NetIo_create()) == NULL) - { - res = KVS_ERROR_FAIL_TO_CREATE_NETIO_HANDLE; - LogError("Failed to create NetIo handle"); - } - else if ( - (res = NetIo_setRecvTimeout(xNetIoHandle, pServPara->uRecvTimeoutMs)) != KVS_ERRNO_NONE || - (res = NetIo_setSendTimeout(xNetIoHandle, pServPara->uSendTimeoutMs)) != KVS_ERRNO_NONE || - (res = NetIo_connect(xNetIoHandle, pServPara->pcHost, PORT_HTTPS)) != KVS_ERRNO_NONE) - { - LogError("Failed to connect to %s", pServPara->pcHost); - /* Propagate the res error */ - } - else if ((res = Http_executeHttpReq(xNetIoHandle, HTTP_METHOD_POST, KVS_URI_DESCRIBE_STREAM, xHttpReqHeaders, STRING_c_str(xStHttpBody))) != KVS_ERRNO_NONE) - { - LogError("Failed send http request to %s", pServPara->pcHost); - /* Propagate the res error */ - } - else if ((res = Http_recvHttpRsp(xNetIoHandle, &uHttpStatusCode, &pRspBody, &uRspBodyLen)) != KVS_ERRNO_NONE) - { - LogError("Failed recv http response from %s", pServPara->pcHost); - /* Propagate the res error */ - } else { - if (puHttpStatusCode != NULL) + LogInfo("Making describe request to: %s", pServPara->pcHost); + LogInfo("Stream name: %s", pDescPara->pcStreamName); + if ((res = getTimeInIso8601(pcXAmzDate, sizeof(pcXAmzDate))) != KVS_ERRNO_NONE) { - *puHttpStatusCode = uHttpStatusCode; + LogError("Failed to get time"); + /* Propagate the res error */ } - - if (uHttpStatusCode != 200) + else if ( + (xStHttpBody = STRING_construct_sprintf(DESCRIBE_STREAM_HTTP_BODY_TEMPLATE, pDescPara->pcStreamName)) == NULL || + (xStContentLength = STRING_construct_sprintf("%u", STRING_length(xStHttpBody))) == NULL) { - LogInfo("Describe Stream failed, HTTP status code: %u", uHttpStatusCode); - LogInfo("HTTP response message:%.*s", (int)uRspBodyLen, pRspBody); + res = KVS_ERROR_UNABLE_TO_ALLOCATE_HTTP_BODY; + LogError("Failed to allocate HTTP body"); } - } + else if ( + (xHttpReqHeaders = HTTPHeaders_Alloc()) == NULL || + HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_HOST, pServPara->pcHost) != HTTP_HEADERS_OK || + HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_ACCEPT, VAL_ACCEPT_ANY) != HTTP_HEADERS_OK || + HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_CONTENT_LENGTH, STRING_c_str(xStContentLength)) != HTTP_HEADERS_OK || + HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_CONTENT_TYPE, VAL_CONTENT_TYPE_APPLICATION_jSON) != HTTP_HEADERS_OK || + HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_USER_AGENT, VAL_USER_AGENT) != HTTP_HEADERS_OK || + HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_X_AMZ_DATE, pcXAmzDate) != HTTP_HEADERS_OK || + (pServPara->pcToken != NULL && (HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_X_AMZ_SECURITY_TOKEN, pServPara->pcToken) != HTTP_HEADERS_OK))) + { + res = KVS_ERROR_UNABLE_TO_GENERATE_HTTP_HEADER; + LogError("Failed to generate HTTP headers"); + } + else if ( + (xAwsSigV4Handle = prvSign(pServPara, KVS_URI_DESCRIBE_STREAM, URI_QUERY_EMPTY, xHttpReqHeaders, STRING_c_str(xStHttpBody))) == NULL || + HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_AUTHORIZATION, AwsSigV4_GetAuthorization(xAwsSigV4Handle)) != HTTP_HEADERS_OK) + { + res = KVS_ERROR_FAIL_TO_SIGN_HTTP_REQ; + LogError("Failed to sign"); + } + else if ((xNetIoHandle = NetIo_create()) == NULL) + { + res = KVS_ERROR_FAIL_TO_CREATE_NETIO_HANDLE; + LogError("Failed to create NetIo handle"); + } + else if ( + (res = NetIo_setRecvTimeout(xNetIoHandle, pServPara->uRecvTimeoutMs)) != KVS_ERRNO_NONE || + (res = NetIo_setSendTimeout(xNetIoHandle, pServPara->uSendTimeoutMs)) != KVS_ERRNO_NONE || + (res = NetIo_connect(xNetIoHandle, pServPara->pcHost, PORT_HTTPS)) != KVS_ERRNO_NONE) + { + LogError("Failed to connect to %s", pServPara->pcHost); + /* Propagate the res error */ + } + else if ((res = Http_executeHttpReq(xNetIoHandle, HTTP_METHOD_POST, KVS_URI_DESCRIBE_STREAM, xHttpReqHeaders, STRING_c_str(xStHttpBody))) != KVS_ERRNO_NONE) + { + LogError("Failed send http request to %s", pServPara->pcHost); + /* Propagate the res error */ + } + else if ((res = Http_recvHttpRsp(xNetIoHandle, &uHttpStatusCode, &pRspBody, &uRspBodyLen)) != KVS_ERRNO_NONE) + { + LogError("Failed recv http response from %s", pServPara->pcHost); + /* Propagate the res error */ + } + else + { + if (puHttpStatusCode != NULL) + { + *puHttpStatusCode = uHttpStatusCode; + } - NetIo_disconnect(xNetIoHandle); - NetIo_terminate(xNetIoHandle); - SAFE_FREE(pRspBody); - HTTPHeaders_Free(xHttpReqHeaders); - AwsSigV4_Terminate(xAwsSigV4Handle); - STRING_delete(xStContentLength); - STRING_delete(xStHttpBody); + if (uHttpStatusCode != 200) + { + LogInfo("Describe Stream failed, HTTP status code: %u", uHttpStatusCode); + LogInfo("HTTP response message:%.*s", (int)uRspBodyLen, pRspBody); + } + } + NetIo_disconnect(xNetIoHandle); + NetIo_terminate(xNetIoHandle); + SAFE_FREE(pRspBody); + HTTPHeaders_Free(xHttpReqHeaders); + AwsSigV4_Terminate(xAwsSigV4Handle); + STRING_delete(xStContentLength); + STRING_delete(xStHttpBody); + } return res; } @@ -1148,6 +1167,10 @@ int Kvs_putMediaStart(KvsServiceParameter_t *pServPara, KvsPutMediaParameter_t * return res; } +#ifdef ENABLE_MKV_DUMP +static int g_mkvDumpCounter = 0; +#endif + int Kvs_putMediaUpdate(PutMediaHandle xPutMediaHandle, uint8_t *pMkvHeader, size_t uMkvHeaderLen, uint8_t *pData, size_t uDataLen) { int res = KVS_ERRNO_NONE; @@ -1187,6 +1210,26 @@ int Kvs_putMediaUpdate(PutMediaHandle xPutMediaHandle, uint8_t *pMkvHeader, size else { /* nop */ + +#ifdef ENABLE_MKV_DUMP + char filename[256]; + snprintf(filename, sizeof(filename), "dumped_output.mkv"); + + // Open in append mode + FILE *fpMkvDump = fopen(filename, "ab"); + if (!fpMkvDump) { + printf("Failed to open MKV dump file.\n"); + } else { + if (pMkvHeader && uMkvHeaderLen > 0) { + fwrite(pMkvHeader, 1, uMkvHeaderLen, fpMkvDump); + } + if (pData && uDataLen > 0) { + fwrite(pData, 1, uDataLen, fpMkvDump); + } + fclose(fpMkvDump); + printf("MKV data dumped to %s\n", filename); + } +#endif } } } @@ -1227,6 +1270,33 @@ int Kvs_putMediaUpdateRaw(PutMediaHandle xPutMediaHandle, uint8_t *pBuf, size_t else { /* nop */ + +#ifdef ENABLE_MKV_DUMP + char filename[256]; + snprintf(filename, sizeof(filename), "dumped_output.mkv"); + + // Reset the file if it's the first frame + if (g_mkvDumpCounter == 0) { + FILE *fpReset = fopen(filename, "wb"); + if (!fpReset) { + printf("Failed to reset MKV dump file.\n"); + } else { + printf("MKV dump file reset.\n"); + fclose(fpReset); + } + } + g_mkvDumpCounter++; + + // Open file in append mode for writing raw data + FILE *fpMkvDump = fopen(filename, "ab"); + if (!fpMkvDump) { + printf("Failed to open MKV dump file.\n"); + } else { + fwrite(pBuf, 1, uLen, fpMkvDump); + fclose(fpMkvDump); + printf("Raw MKV data dumped to dumped_output.mkv\n"); + } +#endif } } } diff --git a/src/source/stream/stream.c b/src/source/stream/stream.c index 0e2012ec..4647c014 100644 --- a/src/source/stream/stream.c +++ b/src/source/stream/stream.c @@ -453,6 +453,114 @@ int Kvs_dataFrameGetContent(DataFrameHandle xDataFrameHandle, uint8_t **ppMkvHea return res; } +int Kvs_dataFrameAddTags(DataFrameHandle xDataFrameHandle, MkvTag_t* tagsList, size_t tagsListLen, bool endOfStream, uint8_t **ppMkvHeader, size_t *puMkvHeaderLen, uint8_t **ppData, size_t *puDataLen) +{ + int res = KVS_ERRNO_NONE; + DataFrame_t *pxDataFrame = xDataFrameHandle; + + static bool firstClusterSeen = false; + + if (pxDataFrame->xDataFrameIn.xClusterType != MKV_CLUSTER) + { + LogInfo("Not a cluster"); + return KVS_ERRNO_NONE; + } + + // Input validation + if (pxDataFrame == NULL || ppMkvHeader == NULL || + puMkvHeaderLen == NULL || ppData == NULL || puDataLen == NULL) + { + LogError("Invalid argument in Kvs_dataFrameAddTags"); + return KVS_ERROR_INVALID_ARGUMENT; + } + + size_t effectiveTagsLen = tagsListLen + (endOfStream ? 1 : 0); + if (effectiveTagsLen >= MAX_TAG_AMOUNT) { + LogError("Trying to add too many tags - Adding: %zu tags, Max: %d", effectiveTagsLen, MAX_TAG_AMOUNT); + return KVS_ERROR_INVALID_ARGUMENT; + } + + // Track cluster transitions + if (!firstClusterSeen) + { + firstClusterSeen = true; + LogInfo("First cluster detected - skipping tags"); + } + else + { + LogInfo("Adding tags before cluster"); + + // Get the original header information + uint8_t *originalHeader = (uint8_t *)pxDataFrame->pMkvHdr; + size_t originalHeaderLen = pxDataFrame->uMkvHdrLen; + + MkvTag_t tagsToAddBuffer[MAX_TAG_AMOUNT]; + MkvTag_t* tagsToAdd; + size_t tagsToAddLen; + + if (endOfStream) + { + memcpy(tagsToAddBuffer, tagsList, tagsListLen * sizeof(MkvTag_t)); + + MkvTag_t endOfFragmentTag; + snprintf(endOfFragmentTag.key, MAX_TAG_NAME_LEN, "AWS_KINESISVIDEO_END_OF_FRAGMENT"); + endOfFragmentTag.value[0] = '\0'; + + tagsToAddBuffer[tagsListLen] = endOfFragmentTag; + + tagsToAdd = tagsToAddBuffer; + tagsToAddLen = tagsListLen + 1; + } + else + { + tagsToAdd = tagsList; + tagsToAddLen = tagsListLen; + } + + for (size_t i = 0; i < tagsToAddLen; i++) + { + LogInfo("Adding tag: key=%s, val=%s", tagsToAdd[i].key, tagsToAdd[i].value); + } + + // Generate tags header + MkvTagsBuffer_t tagsBuffer; + if (Mkv_generateTags(tagsToAdd, tagsToAddLen, &tagsBuffer) != KVS_ERRNO_NONE || tagsBuffer.buffer == NULL) + { + LogError("Failed to create MKV tags header"); + return KVS_ERROR_OUT_OF_MEMORY; + } + + // Allocate and create new combined header + size_t newHeaderLen = tagsBuffer.size + originalHeaderLen; + uint8_t *newHeader = (uint8_t *)malloc(newHeaderLen); + if (newHeader == NULL) + { + free(tagsBuffer.buffer); + LogError("Failed to allocate memory for new header"); + return KVS_ERROR_OUT_OF_MEMORY; + } + + // Combine headers + memcpy(newHeader, tagsBuffer.buffer, tagsBuffer.size); + memcpy(newHeader + tagsBuffer.size, originalHeader, originalHeaderLen); + + // Clean up tags header buffer + free(tagsBuffer.buffer); + + // Update the frame's header information + pxDataFrame->pMkvHdr = (char *)newHeader; + pxDataFrame->uMkvHdrLen = newHeaderLen; + } + + // Assign frame content + *ppMkvHeader = (uint8_t *)(pxDataFrame->pMkvHdr); + *puMkvHeaderLen = pxDataFrame->uMkvHdrLen; + *ppData = (uint8_t *)(pxDataFrame->xDataFrameIn.pData); + *puDataLen = pxDataFrame->xDataFrameIn.uDataLen; + + return res; +} + void Kvs_dataFrameTerminate(DataFrameHandle xDataFrameHandle) { DataFrame_t *pxDataFrame = xDataFrameHandle;