Skip to content

Commit b6a721c

Browse files
author
Michael Christensen
committed
Increase code coverage
1 parent 32b8c2a commit b6a721c

File tree

4 files changed

+322
-2
lines changed

4 files changed

+322
-2
lines changed

src/amazon-keyspaces-mcp-server/awslabs/amazon_keyspaces_mcp_server/llm_context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def build_list_tables_context(_keyspace_name: str, _tables: List[TableInfo]) ->
8383
"entity they represent."
8484
),
8585
}
86-
86+
8787
# Add guidance for empty results
8888
if not _tables:
8989
tables_guidance["empty_result_interpretation"] = (
@@ -92,7 +92,7 @@ def build_list_tables_context(_keyspace_name: str, _tables: List[TableInfo]) ->
9292
"the keyspace might not exist, use the listKeyspaces tool to verify the "
9393
"keyspace name is correct before concluding it's empty."
9494
)
95-
95+
9696
context['tables_guidance'] = tables_guidance
9797

9898
return dict_to_markdown(context)

src/amazon-keyspaces-mcp-server/tests/test_client.py

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,251 @@ async def test_get_session_connection_failure(self):
613613

614614
self.assertIn('Failed to connect to Cassandra cluster', str(context.exception))
615615

616+
async def test_list_keyspaces_with_invalid_replication_factor_value_error(self):
617+
"""Test listing keyspaces with invalid replication_factor (ValueError)."""
618+
mock_row = Mock()
619+
mock_row.keyspace_name = 'test_ks'
620+
mock_row.replication = {'class': 'SimpleStrategy', 'replication_factor': 'invalid'}
621+
622+
self.mock_session.execute.return_value = [mock_row]
623+
624+
with patch('awslabs.amazon_keyspaces_mcp_server.client.Cluster') as mock_cluster_class:
625+
mock_cluster_instance = mock_cluster_class.return_value
626+
mock_cluster_instance.connect.return_value = self.mock_session
627+
628+
client = UnifiedCassandraClient(self.cassandra_config)
629+
keyspaces = await client.list_keyspaces()
630+
631+
self.assertEqual(len(keyspaces), 1)
632+
self.assertEqual(keyspaces[0].replication_factor, 0)
633+
634+
async def test_list_keyspaces_with_none_replication_factor_type_error(self):
635+
"""Test listing keyspaces with None replication_factor (TypeError)."""
636+
mock_row = Mock()
637+
mock_row.keyspace_name = 'test_ks'
638+
mock_row.replication = {'class': 'SimpleStrategy', 'replication_factor': None}
639+
640+
self.mock_session.execute.return_value = [mock_row]
641+
642+
with patch('awslabs.amazon_keyspaces_mcp_server.client.Cluster') as mock_cluster_class:
643+
mock_cluster_instance = mock_cluster_class.return_value
644+
mock_cluster_instance.connect.return_value = self.mock_session
645+
646+
client = UnifiedCassandraClient(self.cassandra_config)
647+
keyspaces = await client.list_keyspaces()
648+
649+
self.assertEqual(len(keyspaces), 1)
650+
self.assertEqual(keyspaces[0].replication_factor, 0)
651+
652+
def test_build_service_characteristics(self):
653+
"""Test building service characteristics for Keyspaces."""
654+
with patch('awslabs.amazon_keyspaces_mcp_server.client.Cluster'):
655+
client = UnifiedCassandraClient(self.keyspaces_config)
656+
characteristics = client._build_service_characteristics() # pylint: disable=protected-access
657+
658+
self.assertIn('write_throughput_limitation', characteristics)
659+
self.assertIn('implementation_notes', characteristics)
660+
self.assertIn('response_guidance', characteristics)
661+
self.assertIn('do_not_mention', characteristics['response_guidance'])
662+
self.assertIn('preferred_terminology', characteristics['response_guidance'])
663+
self.assertEqual(len(characteristics['response_guidance']['do_not_mention']), 3)
664+
self.assertEqual(len(characteristics['response_guidance']['preferred_terminology']), 3)
665+
666+
def test_close_without_session(self):
667+
"""Test closing the client when no session exists."""
668+
with patch('awslabs.amazon_keyspaces_mcp_server.client.Cluster'):
669+
client = UnifiedCassandraClient(self.cassandra_config)
670+
client.close() # Should not raise an exception
671+
672+
async def test_close_without_cluster(self):
673+
"""Test closing the client when session exists but cluster is None."""
674+
with patch('awslabs.amazon_keyspaces_mcp_server.client.Cluster') as mock_cluster_class:
675+
mock_cluster_instance = mock_cluster_class.return_value
676+
mock_cluster_instance.connect.return_value = self.mock_session
677+
self.mock_session.cluster = None
678+
679+
client = UnifiedCassandraClient(self.cassandra_config)
680+
await client.get_session()
681+
client.close()
682+
683+
self.mock_session.shutdown.assert_called_once()
684+
685+
async def test_describe_table_with_capacity_mode(self):
686+
"""Test describing a Keyspaces table with capacity mode information."""
687+
mock_table_row = Mock()
688+
mock_table_row.table_name = 'users'
689+
mock_table_row.keyspace_name = 'mykeyspace'
690+
691+
mock_capacity_row = Mock()
692+
mock_capacity_row.custom_properties = {
693+
'capacity_mode': 'PROVISIONED',
694+
'read_capacity_units': '100',
695+
'write_capacity_units': '50'
696+
}
697+
698+
def mock_execute(query, _params=None):
699+
if 'tables' in query and 'system_schema_mcs' not in query:
700+
result = Mock()
701+
result.one.return_value = mock_table_row
702+
return result
703+
elif 'columns' in query:
704+
return []
705+
elif 'indexes' in query:
706+
return []
707+
elif 'system_schema_mcs' in query:
708+
result = Mock()
709+
result.one.return_value = mock_capacity_row
710+
return result
711+
return []
712+
713+
self.mock_session.execute = mock_execute
714+
715+
with patch('awslabs.amazon_keyspaces_mcp_server.client.Cluster') as mock_cluster_class:
716+
mock_cluster_instance = mock_cluster_class.return_value
717+
mock_cluster_instance.connect.return_value = self.mock_session
718+
719+
client = UnifiedCassandraClient(self.keyspaces_config)
720+
table_details = await client.describe_table('mykeyspace', 'users')
721+
722+
self.assertEqual(table_details['capacity_mode'], 'PROVISIONED')
723+
self.assertEqual(table_details['read_capacity_units'], 100)
724+
self.assertEqual(table_details['write_capacity_units'], 50)
725+
726+
async def test_execute_read_only_query_with_column_error(self):
727+
"""Test query execution when getting column value raises an error."""
728+
mock_column_names = ['id', 'bad_column']
729+
mock_row = Mock()
730+
mock_row.id = 1
731+
type(mock_row).bad_column = property(lambda self: (_ for _ in ()).throw(ValueError('Bad column')))
732+
733+
mock_result_set = Mock()
734+
mock_result_set.column_names = mock_column_names
735+
mock_result_set.__iter__ = lambda self: iter([mock_row])
736+
mock_result_set.response_future = None
737+
738+
self.mock_session.execute.return_value = mock_result_set
739+
740+
with patch('awslabs.amazon_keyspaces_mcp_server.client.Cluster') as mock_cluster_class:
741+
mock_cluster_instance = mock_cluster_class.return_value
742+
mock_cluster_instance.connect.return_value = self.mock_session
743+
744+
client = UnifiedCassandraClient(self.cassandra_config)
745+
result = await client.execute_read_only_query('SELECT * FROM users')
746+
747+
self.assertEqual(result['rows'][0]['id'], 1)
748+
self.assertIsNone(result['rows'][0]['bad_column'])
749+
750+
async def test_describe_table_without_capacity_mode(self):
751+
"""Test table without capacity_mode in custom_properties."""
752+
mock_table_row = Mock()
753+
mock_table_row.table_name = 'users'
754+
mock_table_row.keyspace_name = 'mykeyspace'
755+
756+
mock_capacity_row = Mock()
757+
mock_capacity_row.custom_properties = {'other_property': 'value'}
758+
759+
def mock_execute(query, _params=None):
760+
if 'tables' in query and 'system_schema_mcs' not in query:
761+
result = Mock()
762+
result.one.return_value = mock_table_row
763+
return result
764+
elif 'columns' in query:
765+
return []
766+
elif 'indexes' in query:
767+
return []
768+
elif 'system_schema_mcs' in query:
769+
result = Mock()
770+
result.one.return_value = mock_capacity_row
771+
return result
772+
return []
773+
774+
self.mock_session.execute = mock_execute
775+
776+
with patch('awslabs.amazon_keyspaces_mcp_server.client.Cluster') as mock_cluster_class:
777+
mock_cluster_instance = mock_cluster_class.return_value
778+
mock_cluster_instance.connect.return_value = self.mock_session
779+
780+
client = UnifiedCassandraClient(self.keyspaces_config)
781+
table_details = await client.describe_table('mykeyspace', 'users')
782+
783+
self.assertNotIn('capacity_mode', table_details)
784+
self.assertNotIn('read_capacity_units', table_details)
785+
self.assertNotIn('write_capacity_units', table_details)
786+
787+
788+
async def test_describe_table_with_on_demand_capacity(self):
789+
"""Test table with ON_DEMAND capacity mode."""
790+
mock_table_row = Mock()
791+
mock_table_row.table_name = 'users'
792+
mock_table_row.keyspace_name = 'mykeyspace'
793+
794+
mock_capacity_row = Mock()
795+
mock_capacity_row.custom_properties = {'capacity_mode': 'ON_DEMAND'}
796+
797+
def mock_execute(query, _params=None):
798+
if 'tables' in query and 'system_schema_mcs' not in query:
799+
result = Mock()
800+
result.one.return_value = mock_table_row
801+
return result
802+
elif 'columns' in query:
803+
return []
804+
elif 'indexes' in query:
805+
return []
806+
elif 'system_schema_mcs' in query:
807+
result = Mock()
808+
result.one.return_value = mock_capacity_row
809+
return result
810+
return []
811+
812+
self.mock_session.execute = mock_execute
813+
814+
with patch('awslabs.amazon_keyspaces_mcp_server.client.Cluster') as mock_cluster_class:
815+
mock_cluster_instance = mock_cluster_class.return_value
816+
mock_cluster_instance.connect.return_value = self.mock_session
817+
818+
client = UnifiedCassandraClient(self.keyspaces_config)
819+
table_details = await client.describe_table('mykeyspace', 'users')
820+
821+
self.assertEqual(table_details['capacity_mode'], 'ON_DEMAND')
822+
self.assertNotIn('read_capacity_units', table_details)
823+
self.assertNotIn('write_capacity_units', table_details)
824+
825+
async def test_describe_table_provisioned_missing_capacity_units(self):
826+
"""Test PROVISIONED mode without required capacity units raises error."""
827+
mock_table_row = Mock()
828+
mock_table_row.table_name = 'users'
829+
mock_table_row.keyspace_name = 'mykeyspace'
830+
831+
mock_capacity_row = Mock()
832+
mock_capacity_row.custom_properties = {'capacity_mode': 'PROVISIONED'}
833+
834+
def mock_execute(query, _params=None):
835+
if 'tables' in query and 'system_schema_mcs' not in query:
836+
result = Mock()
837+
result.one.return_value = mock_table_row
838+
return result
839+
elif 'columns' in query:
840+
return []
841+
elif 'indexes' in query:
842+
return []
843+
elif 'system_schema_mcs' in query:
844+
result = Mock()
845+
result.one.return_value = mock_capacity_row
846+
return result
847+
return []
848+
849+
self.mock_session.execute = mock_execute
850+
851+
with patch('awslabs.amazon_keyspaces_mcp_server.client.Cluster') as mock_cluster_class:
852+
mock_cluster_instance = mock_cluster_class.return_value
853+
mock_cluster_instance.connect.return_value = self.mock_session
854+
855+
client = UnifiedCassandraClient(self.keyspaces_config)
856+
table_details = await client.describe_table('mykeyspace', 'users')
857+
858+
self.assertNotIn('read_capacity_units', table_details)
859+
self.assertNotIn('write_capacity_units', table_details)
860+
616861

617862
if __name__ == '__main__':
618863
unittest.main()
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Tests for llm_context module."""
16+
17+
import unittest
18+
19+
from awslabs.amazon_keyspaces_mcp_server.llm_context import (
20+
build_query_result_context,
21+
build_table_details_context,
22+
dict_to_markdown,
23+
)
24+
25+
26+
class TestLlmContext(unittest.TestCase):
27+
"""Test cases for llm_context functions."""
28+
29+
def test_build_table_details_context_with_keyspaces_context(self):
30+
"""Test building table context with Keyspaces-specific metadata."""
31+
table_details = {
32+
'keyspace_name': 'test_ks',
33+
'table_name': 'test_table',
34+
'_keyspaces_context': {
35+
'service_characteristics': 'serverless'
36+
}
37+
}
38+
result = build_table_details_context(table_details)
39+
self.assertIn('Service Characteristics', result)
40+
self.assertIn('serverless', result)
41+
42+
def test_build_query_result_context_large_result(self):
43+
"""Test query result context with large result set."""
44+
query_results = {
45+
'row_count': 150,
46+
'columns': ['id'],
47+
'rows': []
48+
}
49+
result = build_query_result_context(query_results)
50+
self.assertIn('Large Result', result)
51+
52+
def test_dict_to_markdown_with_list_of_dicts(self):
53+
"""Test markdown conversion with nested list of dicts."""
54+
data = {'items': [{'name': 'item1'}, {'name': 'item2'}]}
55+
result = dict_to_markdown(data)
56+
self.assertIn('Items', result)
57+
self.assertIn('Name', result)
58+
59+
def test_dict_to_markdown_with_list_of_strings(self):
60+
"""Test markdown conversion with list of strings (covers line 341)."""
61+
data = {'features': ['feature1', 'feature2', 'feature3']}
62+
result = dict_to_markdown(data)
63+
self.assertIn('Features', result)
64+
self.assertIn('- feature1', result)
65+
self.assertIn('- feature2', result)
66+
self.assertIn('- feature3', result)
67+
68+
69+
if __name__ == '__main__':
70+
unittest.main()

src/amazon-keyspaces-mcp-server/tests/test_services.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,3 +148,8 @@ async def test_describe_table(self):
148148
self.assertEqual(len(result['columns']), 2)
149149
self.assertEqual(result['columns'][0]['name'], 'user_id')
150150
self.assertEqual(result['partition_key'], ['user_id'])
151+
152+
153+
if __name__ == '__main__':
154+
unittest.main()
155+

0 commit comments

Comments
 (0)