diff --git a/.github/scripts/clean_up_stream_table.sh b/.github/scripts/clean_up_stream_table.sh new file mode 100644 index 0000000..292566c --- /dev/null +++ b/.github/scripts/clean_up_stream_table.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +aws kinesis delete-stream --stream-name $STREAM_NAME || true + +# Reset the values of checkpoint, leaseCounter, ownerSwitchesSinceCheckpoint, and leaseOwner in DynamoDB table +echo "Resetting DDB table" +aws dynamodb update-item \ + --table-name $APP_NAME \ + --key '{"leaseKey": {"S": "shardId-000000000000"}}' \ + --update-expression "SET checkpoint = :checkpoint, leaseCounter = :counter, ownerSwitchesSinceCheckpoint = :switches, leaseOwner = :owner" \ + --expression-attribute-values '{ + ":checkpoint": {"S": "TRIM_HORIZON"}, + ":counter": {"N": "0"}, + ":switches": {"N": "0"}, + ":owner": {"S": "AVAILABLE"} + }' \ + --return-values NONE \ No newline at end of file diff --git a/.github/scripts/create_stream.sh b/.github/scripts/create_stream.sh new file mode 100644 index 0000000..f0bbb24 --- /dev/null +++ b/.github/scripts/create_stream.sh @@ -0,0 +1,12 @@ +#!/bin/bash +set -e + +for i in {1..10}; do + if aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 1; then + break + else + echo "Stream creation failed, attempt $i/10. Waiting $((i * 3)) seconds..." + sleep $((i * 3)) + fi +done +aws kinesis wait stream-exists --stream-name $STREAM_NAME \ No newline at end of file diff --git a/.github/scripts/manipulate_properties.sh b/.github/scripts/manipulate_properties.sh new file mode 100644 index 0000000..d866245 --- /dev/null +++ b/.github/scripts/manipulate_properties.sh @@ -0,0 +1,32 @@ +#!/bin/bash +set -e + +# Manipulate sample.properties file that the KCL application pulls properties from (ex: streamName, applicationName) +# Depending on the OS, different properties need to be changed +if [[ "$RUNNER_OS" == "macOS" ]]; then + sed -i "" "s/kclpysample/$STREAM_NAME/g" samples/sample.properties + sed -i "" "s/PythonKCLSample/$APP_NAME/g" samples/sample.properties + sed -i "" 's/us-east-5/us-east-1/g' samples/sample.properties + grep -v "idleTimeBetweenReadsInMillis" samples/sample.properties > samples/temp.properties + echo "idleTimeBetweenReadsInMillis = 250" >> samples/temp.properties + mv samples/temp.properties samples/sample.properties +elif [[ "$RUNNER_OS" == "Linux" ]]; then + sed -i "s/kclpysample/$STREAM_NAME/g" samples/sample.properties + sed -i "s/PythonKCLSample/$APP_NAME/g" samples/sample.properties + sed -i 's/us-east-5/us-east-1/g' samples/sample.properties + sed -i "/idleTimeBetweenReadsInMillis/c\idleTimeBetweenReadsInMillis = 250" samples/sample.properties +elif [[ "$RUNNER_OS" == "Windows" ]]; then + sed -i "s/kclpysample/$STREAM_NAME/g" samples/sample.properties + sed -i "s/PythonKCLSample/$APP_NAME/g" samples/sample.properties + sed -i 's/us-east-5/us-east-1/g' samples/sample.properties + sed -i "/idleTimeBetweenReadsInMillis/c\idleTimeBetweenReadsInMillis = 250" samples/sample.properties + + echo '@echo off' > samples/run_script.bat + echo 'python %~dp0\sample_kclpy_app.py %*' >> samples/run_script.bat + sed -i 's/executableName = sample_kclpy_app.py/executableName = samples\/run_script.bat/' samples/sample.properties +else + echo "Unknown OS: $RUNNER_OS" + exit 1 +fi + +cat samples/sample.properties \ No newline at end of file diff --git a/.github/scripts/put_words_to_stream.sh b/.github/scripts/put_words_to_stream.sh new file mode 100644 index 0000000..3a394e7 --- /dev/null +++ b/.github/scripts/put_words_to_stream.sh @@ -0,0 +1,15 @@ +#!/bin/bash +set -e + +sample_kinesis_wordputter.py --stream $STREAM_NAME -w cat -w dog -w bird -w lobster -w octopus + +# Get records from stream to verify they exist before continuing +SHARD_ITERATOR=$(aws kinesis get-shard-iterator --stream-name $STREAM_NAME --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --query 'ShardIterator' --output text) +INITIAL_RECORDS=$(aws kinesis get-records --shard-iterator $SHARD_ITERATOR) +RECORD_COUNT_BEFORE=$(echo $INITIAL_RECORDS | jq '.Records | length') + +if [ "$RECORD_COUNT_BEFORE" -eq 0 ]; then + echo "No records found in stream. Test cannot proceed." + exit 1 +fi +echo "Found $RECORD_COUNT_BEFORE records in stream before KCL start" \ No newline at end of file diff --git a/.github/scripts/start_kcl.sh b/.github/scripts/start_kcl.sh new file mode 100644 index 0000000..ee6dbd4 --- /dev/null +++ b/.github/scripts/start_kcl.sh @@ -0,0 +1,45 @@ +#!/bin/bash +set -e +set -o pipefail + +chmod +x samples/sample.properties +chmod +x samples/sample_kclpy_app.py + +# Reset the values of checkpoint, leaseCounter, ownerSwitchesSinceCheckpoint, and leaseOwner in DynamoDB table +echo "Resetting checkpoint for shardId-000000000000" +aws dynamodb update-item \ + --table-name $APP_NAME \ + --key '{"leaseKey": {"S": "shardId-000000000000"}}' \ + --update-expression "SET checkpoint = :checkpoint, leaseCounter = :counter, ownerSwitchesSinceCheckpoint = :switches, leaseOwner = :owner" \ + --expression-attribute-values '{ + ":checkpoint": {"S": "TRIM_HORIZON"}, + ":counter": {"N": "0"}, + ":switches": {"N": "0"}, + ":owner": {"S": "AVAILABLE"} + }' \ + --return-values NONE + +# Get records from stream to verify they exist before continuing +SHARD_ITERATOR=$(aws kinesis get-shard-iterator --stream-name $STREAM_NAME --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --query 'ShardIterator' --output text) +INITIAL_RECORDS=$(aws kinesis get-records --shard-iterator $SHARD_ITERATOR) +RECORD_COUNT_BEFORE=$(echo $INITIAL_RECORDS | jq '.Records | length') + +echo "Found $RECORD_COUNT_BEFORE records in stream before KCL start" + +if [[ "$RUNNER_OS" == "macOS" ]]; then + brew install coreutils + KCL_COMMAND=$(amazon_kclpy_helper.py --print_command --java $(which java) --properties samples/sample.properties) + gtimeout 240 $KCL_COMMAND 2>&1 | tee kcl_output.log || [ $? -eq 124 ] +elif [[ "$RUNNER_OS" == "Linux" ]]; then + KCL_COMMAND=$(amazon_kclpy_helper.py --print_command --java $(which java) --properties samples/sample.properties) + timeout 240 $KCL_COMMAND 2>&1 | tee kcl_output.log || [ $? -eq 124 ] +elif [[ "$RUNNER_OS" == "Windows" ]]; then + KCL_COMMAND=$(amazon_kclpy_helper.py --print_command --java $(which java) --properties samples/sample.properties) + timeout 300 $KCL_COMMAND 2>&1 | tee kcl_output.log || [ $? -eq 124 ] +else + echo "Unknown OS: $RUNNER_OS" + exit 1 +fi + +echo "---------ERROR LOGS HERE-------" +grep -i error kcl_output.log || echo "No errors found in logs" \ No newline at end of file diff --git a/.github/scripts/verify_kcl.sh b/.github/scripts/verify_kcl.sh new file mode 100644 index 0000000..e00734b --- /dev/null +++ b/.github/scripts/verify_kcl.sh @@ -0,0 +1,20 @@ +#!/bin/bash +set -e + +LEASE_EXISTS=$(aws dynamodb scan --table-name $APP_NAME --select "COUNT" --query "Count" --output text || echo "0") +CHECKPOINT_EXISTS=$(aws dynamodb scan --table-name $APP_NAME --select "COUNT" --filter-expression "attribute_exists(checkpoint) AND checkpoint <> :trim_horizon" --expression-attribute-values '{":trim_horizon": {"S": "TRIM_HORIZON"}}' --query "Count" --output text || echo "0") + +echo "Found $LEASE_EXISTS leases and $CHECKPOINT_EXISTS non-TRIM-HORIZON checkpoint in DynamoDB" + +echo "Printing checkpoint values" +aws dynamodb scan --table-name $APP_NAME --projection-expression "leaseKey,checkpoint" --output json + +if [ "$LEASE_EXISTS" -gt 0 ] && [ "$CHECKPOINT_EXISTS" -gt 0 ]; then + echo "Test passed: Found both leases and non-TRIM_HORIZON checkpoints in DDB (KCL is fully functional)" + exit 0 +else + echo "Test failed: KCL not fully functional" + echo "Lease(s) found: $LEASE_EXISTS" + echo "non-TRIM_HORIZON checkpoint(s) found: $CHECKPOINT_EXISTS" + exit 1 +fi \ No newline at end of file diff --git a/.github/workflows/privileged-run.yml b/.github/workflows/privileged-run.yml deleted file mode 100644 index 4431c3d..0000000 --- a/.github/workflows/privileged-run.yml +++ /dev/null @@ -1,112 +0,0 @@ -# This workflow will trigger on pushes, pull requests (to master branch), and manually from the GitHub Actions tab (when requested) -# sample_run uses matrix to create 12 unique combinations of operating systems and python versions -# each of the 12 runs download the jars needed to run the KCL, run the sample_kinesis_wordputter.py, and use a timeout command to run the sample_kclpy_app.py -# auto_merge uses GitHub events to check if dependabot is the pull requester, and if the request fits the criteria the PR is automatically merged - -name: Sample Run and Dependabot Auto-merge -on: - push: - branches: [ master ] - -permissions: - id-token: write - contents: write - pull-requests: write - statuses: write - -jobs: - sample-run: - timeout-minutes: 8 - runs-on: ${{ matrix.os }} - defaults: - run: - shell: bash - - strategy: - fail-fast: false - matrix: - python-version: [ "3.9", "3.10", "3.11" ] - jdk-version: [ "8", "11", "17", "21", "24" ] - os: [ ubuntu-latest, macOS-latest, windows-latest ] - - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - aws-region: us-east-1 - role-to-assume: arn:aws:iam::751999266872:role/GitHubPython - role-session-name: myGitHubActionsPython - - - name: Set up JDK ${{ matrix.jdk-version }} - uses: actions/setup-java@v4 - with: - java-version: ${{ matrix.jdk-version }} - distribution: 'corretto' - - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - - name: Install Python and required pips - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - pip install -r test_requirements.txt - pip install build - - - name: Test with Pytest - run: | - python -m pytest - - - name: Install .jar files - run: | - python -m build - python setup.py download_jars - python setup.py install - env: - KCL_MVN_REPO_SEARCH_URL: https://repo1.maven.org/maven2/ - - - name: Put words to sample stream - run: | - sample_kinesis_wordputter.py --stream kclpysample -w cat -w dog -w bird -w lobster -w octopus - - - name: Start KCL application (windows or ubuntu) - if: matrix.os != 'macOS-latest' - run: | - timeout 45 $(amazon_kclpy_helper.py --print_command --java $(which java) --properties samples/sample.properties) || code=$?; if [[ $code -ne 124 && $code -ne 0 ]]; then exit $code; fi - - - name: Start KCL application (macOS) - if: matrix.os == 'macOS-latest' - run: | - brew install coreutils - gtimeout 45 $(amazon_kclpy_helper.py --print_command --java $(which java) --properties samples/sample.properties) || code=$?; if [[ $code -ne 124 && $code -ne 0 ]]; then exit $code; fi - - auto-merge-dependabot: - needs: [sample-run] - runs-on: ubuntu-latest - if: github.actor == 'dependabot[bot]' && github.event.pull_request.user.login == 'dependabot[bot]' - steps: - - name: Fetch Dependabot metadata - id: metadata - uses: dependabot/fetch-metadata@v2 - with: - alert-lookup: true - github-token: "${{ secrets.GITHUB_TOKEN }}" - -# - name: Approve PR -# if: steps.metadata.outputs.update-type != 'version-update:semver-major' -# run: gh pr review --approve "$PR_URL" -# env: -# PR_URL: ${{github.event.pull_request.html_url}} -# GH_TOKEN: ${{secrets.GITHUB_TOKEN}} - -# - name: Enable auto-merge for Dependabot PRs -# if: steps.metadata.outputs.update-type != 'version-update:semver-major' -# run: gh pr merge --auto --merge "$PR_URL" -# env: -# PR_URL: ${{github.event.pull_request.html_url}} -# GH_TOKEN: ${{secrets.GITHUB_TOKEN}} diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml new file mode 100644 index 0000000..c6fb60e --- /dev/null +++ b/.github/workflows/python.yml @@ -0,0 +1,192 @@ +# This workflow will trigger on pushes and pull requests to master branch +# sample_run uses matrix to create unique combinations of operating systems and python versions +# each of the unique runs downloads the jars needed to run the KCL, runs the sample_kinesis_wordputter.py, and use a timeout command to run the sample_kclpy_app.py +# auto_merge uses GitHub events to check if dependabot is the pull requester, and if the request fits the criteria the PR is automatically merged + +name: Sample Run Tests and Dependabot +on: + push: + branches: [ master ] + pull_request_target: + branches: [ master ] + types: [opened, synchronize, reopened, unlabeled] + +concurrency: + group: python.yml + cancel-in-progress: false + +jobs: + # Evaluates if the sample tests should run. If the workflow is triggered by a push OR + # is triggered by a pull request without the 'skip-sample-tests' label, the sample tests should run. + # Otherwise, the sample tests will be skipped + check-if-should-run: + runs-on: ubuntu-latest + if: ${{ github.event_name == 'push' || (github.event_name == 'pull_request_target' && !contains(github.event.pull_request.labels.*.name, 'skip-sample-tests')) }} + outputs: + should_run: 'true' + is_fork: ${{ github.event_name == 'pull_request_target' && (github.event.pull_request.head.repo.fork || github.event.pull_request.user.login == 'dependabot[bot]') }} + steps: + - run: echo "Evaluating workflow conditions" + + # Workflow will pause and wait here if it is triggered by a fork PR. The workflow will continue to wait until + # an approved member of the environment 'manual_approval' allows the workflow to run + wait-for-approval: + needs: [ check-if-should-run ] + if: ${{ needs.check-if-should-run.outputs.is_fork == 'true' }} + runs-on: ubuntu-latest + environment: manual-approval + steps: + - run: echo "Fork PR approved by a team member." + + # Sample run tests of the KCL + # Runs only if (check-if-should-run allows AND (the PR is not from a fork OR it has been approved to run)) + sample-run: + needs: [ check-if-should-run, wait-for-approval ] + permissions: + id-token: write + if: ${{ always() && needs.check-if-should-run.outputs.should_run == 'true' && (needs.check-if-should-run.outputs.is_fork != 'true' || needs.wait-for-approval.result == 'success') }} + timeout-minutes: 20 + runs-on: ${{ matrix.os }} + defaults: + run: + shell: bash + + # Initialize matrix based on PR labels (more-tests label runs more tests) + strategy: + fail-fast: false + matrix: + python-version: ${{ github.event_name == 'pull_request_target' && contains(github.event.pull_request.labels.*.name, 'more-tests') && fromJSON('["3.9", "3.11"]') || fromJSON('["3.9"]') }} + jdk-version: ${{ github.event_name == 'pull_request_target' && contains(github.event.pull_request.labels.*.name, 'more-tests') && fromJSON('["8", "11", "17", "21", "24"]') || fromJSON('["8", "11"]') }} + os: [ ubuntu-latest, macOS-latest, windows-latest ] + + steps: + # For pull_request_target, checkout PR head instead of merge commit + - name: Checkout + uses: actions/checkout@v4 + with: + ref: ${{ github.event_name == 'pull_request_target' && github.event.pull_request.head.sha || github.ref }} + + - name: Validate os, python-version, and jdk-version + run: | + [[ "${{ matrix.os }}" =~ ^(ubuntu-latest|macOS-latest|windows-latest)$ ]] || exit 1 + [[ "${{ matrix.python-version }}" =~ ^(3.9|3.11)$ ]] || exit 1 + [[ "${{ matrix.jdk-version }}" =~ ^(8|11|17|21|24)$ ]] || exit 1 + + # Configure AWS Credentials. Role session name is unique to avoid OIDC errors when running multiple tests concurrently + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-region: us-east-1 + role-to-assume: ${{ secrets.AWS_ARN_GHA }} + role-session-name: GHA-${{ github.run_id }}-${{ matrix.python-version }}-${{ matrix.jdk-version }}-${{ matrix.os }} + + - name: Set up JDK ${{ matrix.jdk-version }} + uses: actions/setup-java@v4 + with: + java-version: ${{ matrix.jdk-version }} + distribution: 'corretto' + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Install pip and requirements + run: | + python -m pip install --upgrade pip --no-cache-dir + pip install --no-cache-dir -r requirements.txt -r test_requirements.txt + + - name: Test with Pytest + run: | + python -m pytest + + # Download .jar files required to run KCL app + - name: Install .jar files + run: | + python setup.py download_jars + python setup.py install + env: + KCL_MVN_REPO_SEARCH_URL: https://repo1.maven.org/maven2/ + + # Create unique identifiers for the stream name and application name + - name: Set up unique identifiers + run: | + STREAM_NAME="kclpysample-${{ matrix.os }}-py${{ matrix.python-version }}-jdk${{ matrix.jdk-version }}" + APP_NAME="PythonKCLSample-${{ matrix.os }}-py${{ matrix.python-version }}-jdk${{ matrix.jdk-version }}" + echo "STREAM_NAME=$STREAM_NAME" >> $GITHUB_ENV + echo "APP_NAME=$APP_NAME" >> $GITHUB_ENV + + # Manipulate sample.properties file to use unique stream name, application name, and OS specific program changes + - name: Manipulate sample.properties file + run: | + chmod +x .github/scripts/manipulate_properties.sh + .github/scripts/manipulate_properties.sh + env: + RUNNER_OS: ${{ matrix.os }} + STREAM_NAME: ${{ env.STREAM_NAME }} + APP_NAME: ${{ env.APP_NAME }} + + # Create kinesis stream with unique name and wait for it to exist + - name: Create and wait Kinesis stream + run: | + chmod +x .github/scripts/create_stream.sh + .github/scripts/create_stream.sh + env: + STREAM_NAME: ${{ env.STREAM_NAME }} + + # Put words to sample stream with unique name based on run ID + - name: Put words to sample stream + run: | + chmod +x .github/scripts/put_words_to_stream.sh + .github/scripts/put_words_to_stream.sh + env: + STREAM_NAME: ${{ env.STREAM_NAME }} + + # Run sample KCL application + - name: Start KCL application + run: | + chmod +x .github/scripts/start_kcl.sh + .github/scripts/start_kcl.sh + env: + RUNNER_OS: ${{ matrix.os }} + STREAM_NAME: ${{ env.STREAM_NAME }} + + # Check and verify results of KCL test + - name: Verify KCL Functionality + run: | + chmod +x .github/scripts/verify_kcl.sh + .github/scripts/verify_kcl.sh + env: + APP_NAME: ${{ env.APP_NAME }} + + # Clean up all existing Streams and DDB tables + - name: Clean up Kinesis Stream and DynamoDB table + if: always() + run: | + chmod +x .github/scripts/clean_up_stream_table.sh + .github/scripts/clean_up_stream_table.sh + env: + STREAM_NAME: ${{ env.STREAM_NAME }} + APP_NAME: ${{ env.APP_NAME }} + + auto-merge-dependabot: + needs: [ sample-run ] + runs-on: ubuntu-latest + if: github.event.pull_request.user.login == 'dependabot[bot]' + permissions: + contents: read + pull-requests: write + steps: + - name: Fetch Dependabot metadata + id: metadata + uses: dependabot/fetch-metadata@v2 + with: + alert-lookup: true + github-token: "${{ secrets.GITHUB_TOKEN }}" + + - name: Approve PR + if: steps.metadata.outputs.update-type != 'version-update:semver-major' + run: gh pr review --approve "$PR_URL" + env: + PR_URL: ${{github.event.pull_request.html_url}} + GH_TOKEN: ${{secrets.GITHUB_TOKEN}} diff --git a/.gitignore b/.gitignore index ed56893..9628a93 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ # IntelliJ idea stuff .idea +.DS_Store +*/.DS_Store diff --git a/samples/sample_kclpy_app.py b/samples/sample_kclpy_app.py index 6081161..f3e7bf7 100755 --- a/samples/sample_kclpy_app.py +++ b/samples/sample_kclpy_app.py @@ -24,7 +24,7 @@ class RecordProcessor(processor.RecordProcessorBase): def __init__(self): self._SLEEP_SECONDS = 5 self._CHECKPOINT_RETRIES = 5 - self._CHECKPOINT_FREQ_SECONDS = 60 + self._CHECKPOINT_FREQ_SECONDS = 30 self._largest_seq = (None, None) self._largest_sub_seq = None self._last_checkpoint_time = None @@ -40,7 +40,7 @@ def initialize(self, initialize_input): processor has been assigned. """ self._largest_seq = (None, None) - self._last_checkpoint_time = time.time() + self._last_checkpoint_time = 0 def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None): """ @@ -148,4 +148,4 @@ def shutdown_requested(self, shutdown_requested_input): if __name__ == "__main__": kcl_process = kcl.KCLProcess(RecordProcessor()) - kcl_process.run() + kcl_process.run() \ No newline at end of file