@@ -740,3 +740,126 @@ async def test_get_legacy_paste_returns_none_for_nonexistent_file(
740740 result = await paste_service .get_legacy_paste_by_name ("nonexistent" )
741741
742742 assert result is None
743+
744+
745+ @pytest .mark .integration
746+ class TestPasteServiceCompression :
747+ """Tests for paste compression functionality."""
748+
749+ async def test_create_paste_compresses_large_content (
750+ self ,
751+ paste_service : PasteService ,
752+ sample_user_metadata : UserMetaData ,
753+ temp_file_storage : Path ,
754+ ):
755+ """Create paste should compress content above threshold."""
756+ # Create content above 512-byte threshold
757+ large_content = "This is test content that will be compressed. " * 50
758+ paste_dto = CreatePaste (
759+ title = "Large Paste" ,
760+ content = large_content ,
761+ content_language = PasteContentLanguage .plain_text ,
762+ )
763+
764+ result = await paste_service .create_paste (paste_dto , sample_user_metadata )
765+
766+ # Verify database metadata
767+ async with paste_service .session_maker () as session :
768+ stmt = select (PasteEntity ).where (PasteEntity .id == result .id )
769+ db_paste = (await session .execute (stmt )).scalar_one ()
770+
771+ assert db_paste .is_compressed is True
772+ assert db_paste .original_size is not None
773+ assert db_paste .original_size > db_paste .content_size
774+
775+ # Verify file on disk is gzip-compressed
776+ file_path = temp_file_storage / db_paste .content_path
777+ assert file_path .exists ()
778+ file_content = file_path .read_bytes ()
779+ # Check for gzip magic number
780+ assert file_content .startswith (b"\x1f \x8b " )
781+
782+ async def test_create_paste_does_not_compress_small_content (
783+ self ,
784+ paste_service : PasteService ,
785+ sample_user_metadata : UserMetaData ,
786+ ):
787+ """Create paste should not compress content below threshold."""
788+ small_content = "Small paste"
789+ paste_dto = CreatePaste (
790+ title = "Small Paste" ,
791+ content = small_content ,
792+ content_language = PasteContentLanguage .plain_text ,
793+ )
794+
795+ result = await paste_service .create_paste (paste_dto , sample_user_metadata )
796+
797+ # Verify database metadata
798+ async with paste_service .session_maker () as session :
799+ stmt = select (PasteEntity ).where (PasteEntity .id == result .id )
800+ db_paste = (await session .execute (stmt )).scalar_one ()
801+
802+ assert db_paste .is_compressed is False
803+ assert db_paste .original_size is None
804+ assert db_paste .content_size == len (small_content .encode ('utf-8' ))
805+
806+ async def test_get_compressed_paste_returns_original_content (
807+ self ,
808+ paste_service : PasteService ,
809+ sample_user_metadata : UserMetaData ,
810+ ):
811+ """Get paste should decompress compressed content correctly."""
812+ # Create compressed paste with large content
813+ large_content = "This is test content that will be compressed. " * 50
814+ paste_dto = CreatePaste (
815+ title = "Compressed Paste" ,
816+ content = large_content ,
817+ content_language = PasteContentLanguage .plain_text ,
818+ )
819+
820+ created_paste = await paste_service .create_paste (paste_dto , sample_user_metadata )
821+
822+ # Read it back
823+ retrieved_paste = await paste_service .get_paste_by_id (created_paste .id )
824+
825+ assert retrieved_paste is not None
826+ assert retrieved_paste .content == large_content
827+
828+ async def test_backward_compatibility_with_uncompressed_pastes (
829+ self ,
830+ paste_service : PasteService ,
831+ temp_file_storage : Path ,
832+ ):
833+ """Get paste should handle legacy uncompressed pastes correctly."""
834+ # Manually create an uncompressed paste (simulating legacy data)
835+ paste_id = uuid .uuid4 ()
836+ paste_path = f"pastes/{ paste_id } .txt"
837+ full_path = temp_file_storage / paste_path
838+ full_path .parent .mkdir (parents = True , exist_ok = True )
839+
840+ legacy_content = "This is a legacy uncompressed paste"
841+ full_path .write_text (legacy_content )
842+
843+ # Insert database record with is_compressed=False, original_size=None
844+ async with paste_service .session_maker () as session :
845+ entity = PasteEntity (
846+ id = paste_id ,
847+ title = "Legacy Paste" ,
848+ content_path = paste_path ,
849+ content_language = "plain_text" ,
850+ creator_ip = "127.0.0.1" ,
851+ creator_user_agent = "Test" ,
852+ content_size = len (legacy_content .encode ('utf-8' )),
853+ is_compressed = False ,
854+ original_size = None ,
855+ edit_token = hash_token ("edit123" ),
856+ delete_token = hash_token ("delete123" ),
857+ )
858+ session .add (entity )
859+ await session .commit ()
860+
861+ # Read with get_paste_by_id
862+ result = await paste_service .get_paste_by_id (paste_id )
863+
864+ assert result is not None
865+ assert result .content == legacy_content
0 commit comments